Skip to main content

mesa_dev/client/
change.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2
3use tonic::service::interceptor::InterceptedService;
4use tonic::transport::Channel;
5
6use crate::grpc::{
7    self, control_service_client::ControlServiceClient, data_service_client::DataServiceClient,
8};
9
10/// Interceptor that attaches a bearer token to every gRPC request.
11#[derive(Clone, Debug)]
12struct AuthInterceptor {
13    token: tonic::metadata::MetadataValue<tonic::metadata::Ascii>,
14}
15
16impl tonic::service::Interceptor for AuthInterceptor {
17    fn call(
18        &mut self,
19        mut request: tonic::Request<()>,
20    ) -> Result<tonic::Request<()>, tonic::Status> {
21        request
22            .metadata_mut()
23            .insert("authorization", self.token.clone());
24        Ok(request)
25    }
26}
27
28/// Client for change-based file operations (`/{org}/{repo}/changes`).
29///
30/// Uses the gRPC `ControlService` to create changes, apply file operations
31/// (create, modify, delete, move), and snapshot changes into commits.
32///
33/// # Example
34///
35/// ```rust,no_run
36/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
37/// use mesa_dev::MesaClient;
38///
39/// let client = MesaClient::builder()
40///     .build()?;
41///
42/// let org = client.org("my-org");
43/// let repo = org.repos().at("my-repo");
44/// let change_client = repo.change().await?;
45///
46/// // Create a change based on main branch
47/// let change = change_client.create_from_ref("refs/heads/main").await?;
48/// let change_id = change.id.unwrap();
49///
50/// // Apply file operations
51/// change_client.create_file(
52///     &change_id,
53///     "hello.txt",
54///     b"Hello, world!",
55///     None,
56/// ).await?;
57///
58/// // Snapshot to a commit
59/// change_client.snapshot(&change_id, "Add hello.txt").await?;
60/// # Ok(())
61/// # }
62/// ```
63#[derive(Debug)]
64pub struct ChangeClient {
65    client: ControlServiceClient<InterceptedService<Channel, AuthInterceptor>>,
66    data_client: DataServiceClient<InterceptedService<Channel, AuthInterceptor>>,
67    repo_id: String,
68    /// Tracks the last applied op id for strict sequencing.
69    /// Starts at 0 for a fresh change and is updated after each `apply_ops`.
70    last_op_id: AtomicU64,
71}
72
73impl ChangeClient {
74    /// Build a `ChangeClient` from a gRPC channel, bearer token, and repo UUID.
75    pub(super) fn new(channel: &Channel, bearer_token: Option<&str>, repo_id: String) -> Self {
76        let auth_value = bearer_token
77            .map(|t| format!("Bearer {t}"))
78            .unwrap_or_default();
79
80        // MetadataValue::try_from can only fail on non-ASCII, which won't
81        // happen for bearer tokens.
82        let token = auth_value
83            .try_into()
84            .unwrap_or_else(|_| tonic::metadata::MetadataValue::from_static(""));
85
86        let interceptor = AuthInterceptor { token };
87
88        Self {
89            client: ControlServiceClient::with_interceptor(channel.clone(), interceptor.clone()),
90            data_client: DataServiceClient::with_interceptor(channel.clone(), interceptor),
91            repo_id,
92            last_op_id: AtomicU64::new(0),
93        }
94    }
95
96    /// Create a new change based on a ref name (e.g. `"refs/heads/main"`).
97    ///
98    /// # Errors
99    ///
100    /// Returns a gRPC error if the request fails.
101    #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id), err(Debug))]
102    pub async fn create_from_ref(
103        &self,
104        base_ref_name: &str,
105    ) -> Result<grpc::Change, tonic::Status> {
106        let resp = self
107            .client
108            .clone()
109            .create_change(grpc::CreateChangeRequest {
110                repo_id: self.repo_id.clone(),
111                base: Some(grpc::create_change_request::Base::BaseRefName(
112                    base_ref_name.to_owned(),
113                )),
114            })
115            .await?;
116
117        resp.into_inner()
118            .change
119            .ok_or_else(|| tonic::Status::internal("server returned empty change"))
120    }
121
122    /// Create a new change based on a commit OID.
123    ///
124    /// # Errors
125    ///
126    /// Returns a gRPC error if the request fails.
127    #[tracing::instrument(skip(self, commit_oid), fields(repo_id = %self.repo_id), err(Debug))]
128    pub async fn create_from_commit(
129        &self,
130        commit_oid: &[u8],
131    ) -> Result<grpc::Change, tonic::Status> {
132        let resp = self
133            .client
134            .clone()
135            .create_change(grpc::CreateChangeRequest {
136                repo_id: self.repo_id.clone(),
137                base: Some(grpc::create_change_request::Base::BaseCommit(
138                    grpc::CommitOid {
139                        value: commit_oid.to_vec(),
140                    },
141                )),
142            })
143            .await?;
144
145        resp.into_inner()
146            .change
147            .ok_or_else(|| tonic::Status::internal("server returned empty change"))
148    }
149
150    /// Create a file in the working copy of a change.
151    ///
152    /// Uses inline data for the file content. For large files, use
153    /// [`apply_ops`](Self::apply_ops) with an `OpCreateFile` referencing a pre-uploaded blob OID.
154    ///
155    /// `mode` defaults to `0o100644` (regular file) if `None`.
156    ///
157    /// # Errors
158    ///
159    /// Returns a gRPC error if the request fails.
160    #[tracing::instrument(skip(self, content), fields(repo_id = %self.repo_id, path), err(Debug))]
161    pub async fn create_file(
162        &self,
163        change_id: &grpc::ChangeId,
164        path: &str,
165        content: &[u8],
166        mode: Option<u32>,
167    ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
168        let op = grpc::Op {
169            op: Some(grpc::op::Op::CreateFile(grpc::OpCreateFile {
170                path: path.to_owned(),
171                mode: mode.unwrap_or(0o100_644),
172                content: Some(grpc::op_create_file::Content::InlineData(content.to_vec())),
173            })),
174        };
175        self.apply_ops(change_id, vec![op]).await
176    }
177
178    /// Modify an existing file in the working copy of a change.
179    ///
180    /// Replaces the file content with `content` (inline data).
181    ///
182    /// # Errors
183    ///
184    /// Returns a gRPC error if the request fails.
185    #[tracing::instrument(skip(self, content), fields(repo_id = %self.repo_id, path), err(Debug))]
186    pub async fn modify_file(
187        &self,
188        change_id: &grpc::ChangeId,
189        path: &str,
190        content: &[u8],
191    ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
192        let op = grpc::Op {
193            op: Some(grpc::op::Op::ModifyFile(grpc::OpModifyFile {
194                path: path.to_owned(),
195                content: Some(grpc::op_modify_file::Content::InlineData(content.to_vec())),
196            })),
197        };
198        self.apply_ops(change_id, vec![op]).await
199    }
200
201    /// Delete a path from the working copy of a change.
202    ///
203    /// Set `recursive` to `true` to delete a directory and all its contents.
204    ///
205    /// # Errors
206    ///
207    /// Returns a gRPC error if the request fails.
208    #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id, path), err(Debug))]
209    pub async fn delete_path(
210        &self,
211        change_id: &grpc::ChangeId,
212        path: &str,
213        recursive: bool,
214    ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
215        let op = grpc::Op {
216            op: Some(grpc::op::Op::DeletePath(grpc::OpDeletePath {
217                path: path.to_owned(),
218                recursive,
219            })),
220        };
221        self.apply_ops(change_id, vec![op]).await
222    }
223
224    /// Move (rename) a path in the working copy of a change.
225    ///
226    /// # Errors
227    ///
228    /// Returns a gRPC error if the request fails.
229    #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id, from_path, to_path), err(Debug))]
230    pub async fn move_path(
231        &self,
232        change_id: &grpc::ChangeId,
233        from_path: &str,
234        to_path: &str,
235    ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
236        let op = grpc::Op {
237            op: Some(grpc::op::Op::MovePath(grpc::OpMovePath {
238                from_path: from_path.to_owned(),
239                to_path: to_path.to_owned(),
240            })),
241        };
242        self.apply_ops(change_id, vec![op]).await
243    }
244
245    /// Apply a batch of operations to a change in a single RPC.
246    ///
247    /// This is the low-level method underlying [`create_file`](Self::create_file),
248    /// [`modify_file`](Self::modify_file), [`delete_path`](Self::delete_path), and
249    /// [`move_path`](Self::move_path). Use it when you need to apply multiple
250    /// operations atomically.
251    ///
252    /// # Errors
253    ///
254    /// Returns a gRPC error if the request fails.
255    #[tracing::instrument(skip(self, ops), fields(repo_id = %self.repo_id, op_count = ops.len()), err(Debug))]
256    pub async fn apply_ops(
257        &self,
258        change_id: &grpc::ChangeId,
259        ops: Vec<grpc::Op>,
260    ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
261        let request = grpc::ApplyOpsRequest {
262            repo_id: self.repo_id.clone(),
263            change_id: Some(change_id.clone()),
264            expected_last_op_id: Some(self.last_op_id.load(Ordering::Relaxed)),
265            idempotency_key: String::new(),
266            ops,
267        };
268
269        let resp = self
270            .client
271            .clone()
272            .apply_ops(tokio_stream::once(request))
273            .await?;
274
275        let inner = resp.into_inner();
276        self.last_op_id.store(inner.last_op_id, Ordering::Relaxed);
277        Ok(inner)
278    }
279
280    /// Snapshot the current working-copy state of a change into a commit.
281    ///
282    /// # Errors
283    ///
284    /// Returns a gRPC error if the request fails.
285    #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id), err(Debug))]
286    pub async fn snapshot(
287        &self,
288        change_id: &grpc::ChangeId,
289        message: &str,
290    ) -> Result<grpc::SnapshotChangeResponse, tonic::Status> {
291        let resp = self
292            .client
293            .clone()
294            .snapshot_change(grpc::SnapshotChangeRequest {
295                repo_id: self.repo_id.clone(),
296                change_id: Some(change_id.clone()),
297                expected_last_op_id: Some(self.last_op_id.load(Ordering::Relaxed)),
298                message: message.to_owned(),
299                idempotency_key: String::new(),
300            })
301            .await?;
302
303        Ok(resp.into_inner())
304    }
305
306    /// Move (update) a bookmark to point at a new commit OID.
307    ///
308    /// Use this after [`snapshot`](Self::snapshot) to make the new commit visible
309    /// via the REST content API by updating the ref the bookmark maps to.
310    ///
311    /// `expected_update_seq` is the optimistic concurrency token from a prior
312    /// bookmark response (use `0` if unknown / first update).
313    ///
314    /// # Errors
315    ///
316    /// Returns a gRPC error if the request fails.
317    #[tracing::instrument(skip(self, commit_oid), fields(repo_id = %self.repo_id, bookmark_name), err(Debug))]
318    pub async fn move_bookmark(
319        &self,
320        bookmark_name: &str,
321        commit_oid: &[u8],
322        expected_update_seq: u64,
323    ) -> Result<grpc::MoveBookmarkResponse, tonic::Status> {
324        let resp = self
325            .client
326            .clone()
327            .move_bookmark(grpc::MoveBookmarkRequest {
328                repo_id: self.repo_id.clone(),
329                bookmark_name: bookmark_name.to_owned(),
330                expected_update_seq,
331                new_commit_oid: Some(grpc::CommitOid {
332                    value: commit_oid.to_vec(),
333                }),
334            })
335            .await?;
336
337        Ok(resp.into_inner())
338    }
339
340    /// Resolve a ref name to its current target OID and metadata.
341    ///
342    /// Returns a [`RefInfo`](grpc::RefInfo) containing the ref's `update_seq`,
343    /// which is needed as the optimistic-concurrency token for
344    /// [`move_bookmark`](Self::move_bookmark).
345    ///
346    /// # Errors
347    ///
348    /// Returns a gRPC error if the request fails.
349    #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id, ref_name), err(Debug))]
350    pub async fn resolve_ref(&self, ref_name: &str) -> Result<grpc::RefInfo, tonic::Status> {
351        let resp = self
352            .data_client
353            .clone()
354            .resolve_ref(grpc::ResolveRefRequest {
355                repo_id: self.repo_id.clone(),
356                ref_name: ref_name.to_owned(),
357            })
358            .await?;
359
360        resp.into_inner()
361            .r#ref
362            .ok_or_else(|| tonic::Status::internal("server returned empty ref"))
363    }
364}