1use std::sync::atomic::{AtomicU64, Ordering};
2
3use tonic::service::interceptor::InterceptedService;
4use tonic::transport::Channel;
5
6use crate::grpc::{
7 self, control_service_client::ControlServiceClient, data_service_client::DataServiceClient,
8};
9
10#[derive(Clone, Debug)]
12struct AuthInterceptor {
13 token: tonic::metadata::MetadataValue<tonic::metadata::Ascii>,
14}
15
16impl tonic::service::Interceptor for AuthInterceptor {
17 fn call(
18 &mut self,
19 mut request: tonic::Request<()>,
20 ) -> Result<tonic::Request<()>, tonic::Status> {
21 request
22 .metadata_mut()
23 .insert("authorization", self.token.clone());
24 Ok(request)
25 }
26}
27
28#[derive(Debug)]
64pub struct ChangeClient {
65 client: ControlServiceClient<InterceptedService<Channel, AuthInterceptor>>,
66 data_client: DataServiceClient<InterceptedService<Channel, AuthInterceptor>>,
67 repo_id: String,
68 last_op_id: AtomicU64,
71}
72
73impl ChangeClient {
74 pub(super) fn new(channel: &Channel, bearer_token: Option<&str>, repo_id: String) -> Self {
76 let auth_value = bearer_token
77 .map(|t| format!("Bearer {t}"))
78 .unwrap_or_default();
79
80 let token = auth_value
83 .try_into()
84 .unwrap_or_else(|_| tonic::metadata::MetadataValue::from_static(""));
85
86 let interceptor = AuthInterceptor { token };
87
88 Self {
89 client: ControlServiceClient::with_interceptor(channel.clone(), interceptor.clone()),
90 data_client: DataServiceClient::with_interceptor(channel.clone(), interceptor),
91 repo_id,
92 last_op_id: AtomicU64::new(0),
93 }
94 }
95
96 #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id), err(Debug))]
102 pub async fn create_from_ref(
103 &self,
104 base_ref_name: &str,
105 ) -> Result<grpc::Change, tonic::Status> {
106 let resp = self
107 .client
108 .clone()
109 .create_change(grpc::CreateChangeRequest {
110 repo_id: self.repo_id.clone(),
111 base: Some(grpc::create_change_request::Base::BaseRefName(
112 base_ref_name.to_owned(),
113 )),
114 })
115 .await?;
116
117 resp.into_inner()
118 .change
119 .ok_or_else(|| tonic::Status::internal("server returned empty change"))
120 }
121
122 #[tracing::instrument(skip(self, commit_oid), fields(repo_id = %self.repo_id), err(Debug))]
128 pub async fn create_from_commit(
129 &self,
130 commit_oid: &[u8],
131 ) -> Result<grpc::Change, tonic::Status> {
132 let resp = self
133 .client
134 .clone()
135 .create_change(grpc::CreateChangeRequest {
136 repo_id: self.repo_id.clone(),
137 base: Some(grpc::create_change_request::Base::BaseCommit(
138 grpc::CommitOid {
139 value: commit_oid.to_vec(),
140 },
141 )),
142 })
143 .await?;
144
145 resp.into_inner()
146 .change
147 .ok_or_else(|| tonic::Status::internal("server returned empty change"))
148 }
149
150 #[tracing::instrument(skip(self, content), fields(repo_id = %self.repo_id, path), err(Debug))]
161 pub async fn create_file(
162 &self,
163 change_id: &grpc::ChangeId,
164 path: &str,
165 content: &[u8],
166 mode: Option<u32>,
167 ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
168 let op = grpc::Op {
169 op: Some(grpc::op::Op::CreateFile(grpc::OpCreateFile {
170 path: path.to_owned(),
171 mode: mode.unwrap_or(0o100_644),
172 content: Some(grpc::op_create_file::Content::InlineData(content.to_vec())),
173 })),
174 };
175 self.apply_ops(change_id, vec![op]).await
176 }
177
178 #[tracing::instrument(skip(self, content), fields(repo_id = %self.repo_id, path), err(Debug))]
186 pub async fn modify_file(
187 &self,
188 change_id: &grpc::ChangeId,
189 path: &str,
190 content: &[u8],
191 ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
192 let op = grpc::Op {
193 op: Some(grpc::op::Op::ModifyFile(grpc::OpModifyFile {
194 path: path.to_owned(),
195 content: Some(grpc::op_modify_file::Content::InlineData(content.to_vec())),
196 })),
197 };
198 self.apply_ops(change_id, vec![op]).await
199 }
200
201 #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id, path), err(Debug))]
209 pub async fn delete_path(
210 &self,
211 change_id: &grpc::ChangeId,
212 path: &str,
213 recursive: bool,
214 ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
215 let op = grpc::Op {
216 op: Some(grpc::op::Op::DeletePath(grpc::OpDeletePath {
217 path: path.to_owned(),
218 recursive,
219 })),
220 };
221 self.apply_ops(change_id, vec![op]).await
222 }
223
224 #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id, from_path, to_path), err(Debug))]
230 pub async fn move_path(
231 &self,
232 change_id: &grpc::ChangeId,
233 from_path: &str,
234 to_path: &str,
235 ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
236 let op = grpc::Op {
237 op: Some(grpc::op::Op::MovePath(grpc::OpMovePath {
238 from_path: from_path.to_owned(),
239 to_path: to_path.to_owned(),
240 })),
241 };
242 self.apply_ops(change_id, vec![op]).await
243 }
244
245 #[tracing::instrument(skip(self, ops), fields(repo_id = %self.repo_id, op_count = ops.len()), err(Debug))]
256 pub async fn apply_ops(
257 &self,
258 change_id: &grpc::ChangeId,
259 ops: Vec<grpc::Op>,
260 ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
261 let request = grpc::ApplyOpsRequest {
262 repo_id: self.repo_id.clone(),
263 change_id: Some(change_id.clone()),
264 expected_last_op_id: Some(self.last_op_id.load(Ordering::Relaxed)),
265 idempotency_key: String::new(),
266 ops,
267 };
268
269 let resp = self
270 .client
271 .clone()
272 .apply_ops(tokio_stream::once(request))
273 .await?;
274
275 let inner = resp.into_inner();
276 self.last_op_id.store(inner.last_op_id, Ordering::Relaxed);
277 Ok(inner)
278 }
279
280 #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id), err(Debug))]
286 pub async fn snapshot(
287 &self,
288 change_id: &grpc::ChangeId,
289 message: &str,
290 ) -> Result<grpc::SnapshotChangeResponse, tonic::Status> {
291 let resp = self
292 .client
293 .clone()
294 .snapshot_change(grpc::SnapshotChangeRequest {
295 repo_id: self.repo_id.clone(),
296 change_id: Some(change_id.clone()),
297 expected_last_op_id: Some(self.last_op_id.load(Ordering::Relaxed)),
298 message: message.to_owned(),
299 idempotency_key: String::new(),
300 })
301 .await?;
302
303 Ok(resp.into_inner())
304 }
305
306 #[tracing::instrument(skip(self, commit_oid), fields(repo_id = %self.repo_id, bookmark_name), err(Debug))]
318 pub async fn move_bookmark(
319 &self,
320 bookmark_name: &str,
321 commit_oid: &[u8],
322 expected_update_seq: u64,
323 ) -> Result<grpc::MoveBookmarkResponse, tonic::Status> {
324 let resp = self
325 .client
326 .clone()
327 .move_bookmark(grpc::MoveBookmarkRequest {
328 repo_id: self.repo_id.clone(),
329 bookmark_name: bookmark_name.to_owned(),
330 expected_update_seq,
331 new_commit_oid: Some(grpc::CommitOid {
332 value: commit_oid.to_vec(),
333 }),
334 })
335 .await?;
336
337 Ok(resp.into_inner())
338 }
339
340 #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id, ref_name), err(Debug))]
350 pub async fn resolve_ref(&self, ref_name: &str) -> Result<grpc::RefInfo, tonic::Status> {
351 let resp = self
352 .data_client
353 .clone()
354 .resolve_ref(grpc::ResolveRefRequest {
355 repo_id: self.repo_id.clone(),
356 ref_name: ref_name.to_owned(),
357 })
358 .await?;
359
360 resp.into_inner()
361 .r#ref
362 .ok_or_else(|| tonic::Status::internal("server returned empty ref"))
363 }
364}