Skip to main content

mesa_dev/client/
change.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2
3use tonic::service::interceptor::InterceptedService;
4use tonic::transport::Channel;
5
6use crate::grpc::{self, control_service_client::ControlServiceClient};
7
8/// Interceptor that attaches a bearer token to every gRPC request.
9#[derive(Clone, Debug)]
10struct AuthInterceptor {
11    token: tonic::metadata::MetadataValue<tonic::metadata::Ascii>,
12}
13
14impl tonic::service::Interceptor for AuthInterceptor {
15    fn call(
16        &mut self,
17        mut request: tonic::Request<()>,
18    ) -> Result<tonic::Request<()>, tonic::Status> {
19        request
20            .metadata_mut()
21            .insert("authorization", self.token.clone());
22        Ok(request)
23    }
24}
25
26/// Client for change-based file operations (`/{org}/{repo}/changes`).
27///
28/// Uses the gRPC `ControlService` to create changes, apply file operations
29/// (create, modify, delete, move), and snapshot changes into commits.
30///
31/// # Example
32///
33/// ```rust,no_run
34/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
35/// use mesa_dev::MesaClient;
36///
37/// let client = MesaClient::builder()
38///     .build()?;
39///
40/// let org = client.org("my-org");
41/// let repo = org.repos().at("my-repo");
42/// let change_client = repo.change().await?;
43///
44/// // Create a change based on main branch
45/// let change = change_client.create_from_ref("refs/heads/main").await?;
46/// let change_id = change.id.unwrap();
47///
48/// // Apply file operations
49/// change_client.create_file(
50///     &change_id,
51///     "hello.txt",
52///     b"Hello, world!",
53///     None,
54/// ).await?;
55///
56/// // Snapshot to a commit
57/// change_client.snapshot(&change_id, "Add hello.txt").await?;
58/// # Ok(())
59/// # }
60/// ```
61#[derive(Debug)]
62pub struct ChangeClient {
63    client: ControlServiceClient<InterceptedService<Channel, AuthInterceptor>>,
64    repo_id: String,
65    /// Tracks the last applied op id for strict sequencing.
66    /// Starts at 0 for a fresh change and is updated after each `apply_ops`.
67    last_op_id: AtomicU64,
68}
69
70impl ChangeClient {
71    /// Build a `ChangeClient` from a gRPC channel, bearer token, and repo UUID.
72    pub(super) fn new(channel: &Channel, bearer_token: Option<&str>, repo_id: String) -> Self {
73        let auth_value = bearer_token
74            .map(|t| format!("Bearer {t}"))
75            .unwrap_or_default();
76
77        // MetadataValue::try_from can only fail on non-ASCII, which won't
78        // happen for bearer tokens.
79        let token = auth_value
80            .try_into()
81            .unwrap_or_else(|_| tonic::metadata::MetadataValue::from_static(""));
82
83        let interceptor = AuthInterceptor { token };
84
85        Self {
86            client: ControlServiceClient::with_interceptor(channel.clone(), interceptor),
87            repo_id,
88            last_op_id: AtomicU64::new(0),
89        }
90    }
91
92    /// Create a new change based on a ref name (e.g. `"refs/heads/main"`).
93    ///
94    /// # Errors
95    ///
96    /// Returns a gRPC error if the request fails.
97    #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id), err(Debug))]
98    pub async fn create_from_ref(
99        &self,
100        base_ref_name: &str,
101    ) -> Result<grpc::Change, tonic::Status> {
102        let resp = self
103            .client
104            .clone()
105            .create_change(grpc::CreateChangeRequest {
106                repo_id: self.repo_id.clone(),
107                base: Some(grpc::create_change_request::Base::BaseRefName(
108                    base_ref_name.to_owned(),
109                )),
110            })
111            .await?;
112
113        resp.into_inner()
114            .change
115            .ok_or_else(|| tonic::Status::internal("server returned empty change"))
116    }
117
118    /// Create a new change based on a commit OID.
119    ///
120    /// # Errors
121    ///
122    /// Returns a gRPC error if the request fails.
123    #[tracing::instrument(skip(self, commit_oid), fields(repo_id = %self.repo_id), err(Debug))]
124    pub async fn create_from_commit(
125        &self,
126        commit_oid: &[u8],
127    ) -> Result<grpc::Change, tonic::Status> {
128        let resp = self
129            .client
130            .clone()
131            .create_change(grpc::CreateChangeRequest {
132                repo_id: self.repo_id.clone(),
133                base: Some(grpc::create_change_request::Base::BaseCommit(
134                    grpc::CommitOid {
135                        value: commit_oid.to_vec(),
136                    },
137                )),
138            })
139            .await?;
140
141        resp.into_inner()
142            .change
143            .ok_or_else(|| tonic::Status::internal("server returned empty change"))
144    }
145
146    /// Create a file in the working copy of a change.
147    ///
148    /// Uses inline data for the file content. For large files, use
149    /// [`apply_ops`](Self::apply_ops) with an `OpCreateFile` referencing a pre-uploaded blob OID.
150    ///
151    /// `mode` defaults to `0o100644` (regular file) if `None`.
152    ///
153    /// # Errors
154    ///
155    /// Returns a gRPC error if the request fails.
156    #[tracing::instrument(skip(self, content), fields(repo_id = %self.repo_id, path), err(Debug))]
157    pub async fn create_file(
158        &self,
159        change_id: &grpc::ChangeId,
160        path: &str,
161        content: &[u8],
162        mode: Option<u32>,
163    ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
164        let op = grpc::Op {
165            op: Some(grpc::op::Op::CreateFile(grpc::OpCreateFile {
166                path: path.to_owned(),
167                mode: mode.unwrap_or(0o100_644),
168                content: Some(grpc::op_create_file::Content::InlineData(content.to_vec())),
169            })),
170        };
171        self.apply_ops(change_id, vec![op]).await
172    }
173
174    /// Modify an existing file in the working copy of a change.
175    ///
176    /// Replaces the file content with `content` (inline data).
177    ///
178    /// # Errors
179    ///
180    /// Returns a gRPC error if the request fails.
181    #[tracing::instrument(skip(self, content), fields(repo_id = %self.repo_id, path), err(Debug))]
182    pub async fn modify_file(
183        &self,
184        change_id: &grpc::ChangeId,
185        path: &str,
186        content: &[u8],
187    ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
188        let op = grpc::Op {
189            op: Some(grpc::op::Op::ModifyFile(grpc::OpModifyFile {
190                path: path.to_owned(),
191                content: Some(grpc::op_modify_file::Content::InlineData(content.to_vec())),
192            })),
193        };
194        self.apply_ops(change_id, vec![op]).await
195    }
196
197    /// Delete a path from the working copy of a change.
198    ///
199    /// Set `recursive` to `true` to delete a directory and all its contents.
200    ///
201    /// # Errors
202    ///
203    /// Returns a gRPC error if the request fails.
204    #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id, path), err(Debug))]
205    pub async fn delete_path(
206        &self,
207        change_id: &grpc::ChangeId,
208        path: &str,
209        recursive: bool,
210    ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
211        let op = grpc::Op {
212            op: Some(grpc::op::Op::DeletePath(grpc::OpDeletePath {
213                path: path.to_owned(),
214                recursive,
215            })),
216        };
217        self.apply_ops(change_id, vec![op]).await
218    }
219
220    /// Move (rename) a path in the working copy of a change.
221    ///
222    /// # Errors
223    ///
224    /// Returns a gRPC error if the request fails.
225    #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id, from_path, to_path), err(Debug))]
226    pub async fn move_path(
227        &self,
228        change_id: &grpc::ChangeId,
229        from_path: &str,
230        to_path: &str,
231    ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
232        let op = grpc::Op {
233            op: Some(grpc::op::Op::MovePath(grpc::OpMovePath {
234                from_path: from_path.to_owned(),
235                to_path: to_path.to_owned(),
236            })),
237        };
238        self.apply_ops(change_id, vec![op]).await
239    }
240
241    /// Apply a batch of operations to a change in a single RPC.
242    ///
243    /// This is the low-level method underlying [`create_file`](Self::create_file),
244    /// [`modify_file`](Self::modify_file), [`delete_path`](Self::delete_path), and
245    /// [`move_path`](Self::move_path). Use it when you need to apply multiple
246    /// operations atomically.
247    ///
248    /// # Errors
249    ///
250    /// Returns a gRPC error if the request fails.
251    #[tracing::instrument(skip(self, ops), fields(repo_id = %self.repo_id, op_count = ops.len()), err(Debug))]
252    pub async fn apply_ops(
253        &self,
254        change_id: &grpc::ChangeId,
255        ops: Vec<grpc::Op>,
256    ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
257        let request = grpc::ApplyOpsRequest {
258            repo_id: self.repo_id.clone(),
259            change_id: Some(change_id.clone()),
260            expected_last_op_id: Some(self.last_op_id.load(Ordering::Relaxed)),
261            idempotency_key: String::new(),
262            ops,
263        };
264
265        let resp = self
266            .client
267            .clone()
268            .apply_ops(tokio_stream::once(request))
269            .await?;
270
271        let inner = resp.into_inner();
272        self.last_op_id.store(inner.last_op_id, Ordering::Relaxed);
273        Ok(inner)
274    }
275
276    /// Snapshot the current working-copy state of a change into a commit.
277    ///
278    /// # Errors
279    ///
280    /// Returns a gRPC error if the request fails.
281    #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id), err(Debug))]
282    pub async fn snapshot(
283        &self,
284        change_id: &grpc::ChangeId,
285        message: &str,
286    ) -> Result<grpc::SnapshotChangeResponse, tonic::Status> {
287        let resp = self
288            .client
289            .clone()
290            .snapshot_change(grpc::SnapshotChangeRequest {
291                repo_id: self.repo_id.clone(),
292                change_id: Some(change_id.clone()),
293                expected_last_op_id: Some(self.last_op_id.load(Ordering::Relaxed)),
294                message: message.to_owned(),
295                idempotency_key: String::new(),
296            })
297            .await?;
298
299        Ok(resp.into_inner())
300    }
301}