1use std::sync::atomic::{AtomicU64, Ordering};
2
3use tonic::service::interceptor::InterceptedService;
4use tonic::transport::Channel;
5
6use crate::grpc::{self, control_service_client::ControlServiceClient};
7
8#[derive(Clone, Debug)]
10struct AuthInterceptor {
11 token: tonic::metadata::MetadataValue<tonic::metadata::Ascii>,
12}
13
14impl tonic::service::Interceptor for AuthInterceptor {
15 fn call(
16 &mut self,
17 mut request: tonic::Request<()>,
18 ) -> Result<tonic::Request<()>, tonic::Status> {
19 request
20 .metadata_mut()
21 .insert("authorization", self.token.clone());
22 Ok(request)
23 }
24}
25
26#[derive(Debug)]
62pub struct ChangeClient {
63 client: ControlServiceClient<InterceptedService<Channel, AuthInterceptor>>,
64 repo_id: String,
65 last_op_id: AtomicU64,
68}
69
70impl ChangeClient {
71 pub(super) fn new(channel: &Channel, bearer_token: Option<&str>, repo_id: String) -> Self {
73 let auth_value = bearer_token
74 .map(|t| format!("Bearer {t}"))
75 .unwrap_or_default();
76
77 let token = auth_value
80 .try_into()
81 .unwrap_or_else(|_| tonic::metadata::MetadataValue::from_static(""));
82
83 let interceptor = AuthInterceptor { token };
84
85 Self {
86 client: ControlServiceClient::with_interceptor(channel.clone(), interceptor),
87 repo_id,
88 last_op_id: AtomicU64::new(0),
89 }
90 }
91
92 #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id), err(Debug))]
98 pub async fn create_from_ref(
99 &self,
100 base_ref_name: &str,
101 ) -> Result<grpc::Change, tonic::Status> {
102 let resp = self
103 .client
104 .clone()
105 .create_change(grpc::CreateChangeRequest {
106 repo_id: self.repo_id.clone(),
107 base: Some(grpc::create_change_request::Base::BaseRefName(
108 base_ref_name.to_owned(),
109 )),
110 })
111 .await?;
112
113 resp.into_inner()
114 .change
115 .ok_or_else(|| tonic::Status::internal("server returned empty change"))
116 }
117
118 #[tracing::instrument(skip(self, commit_oid), fields(repo_id = %self.repo_id), err(Debug))]
124 pub async fn create_from_commit(
125 &self,
126 commit_oid: &[u8],
127 ) -> Result<grpc::Change, tonic::Status> {
128 let resp = self
129 .client
130 .clone()
131 .create_change(grpc::CreateChangeRequest {
132 repo_id: self.repo_id.clone(),
133 base: Some(grpc::create_change_request::Base::BaseCommit(
134 grpc::CommitOid {
135 value: commit_oid.to_vec(),
136 },
137 )),
138 })
139 .await?;
140
141 resp.into_inner()
142 .change
143 .ok_or_else(|| tonic::Status::internal("server returned empty change"))
144 }
145
146 #[tracing::instrument(skip(self, content), fields(repo_id = %self.repo_id, path), err(Debug))]
157 pub async fn create_file(
158 &self,
159 change_id: &grpc::ChangeId,
160 path: &str,
161 content: &[u8],
162 mode: Option<u32>,
163 ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
164 let op = grpc::Op {
165 op: Some(grpc::op::Op::CreateFile(grpc::OpCreateFile {
166 path: path.to_owned(),
167 mode: mode.unwrap_or(0o100_644),
168 content: Some(grpc::op_create_file::Content::InlineData(content.to_vec())),
169 })),
170 };
171 self.apply_ops(change_id, vec![op]).await
172 }
173
174 #[tracing::instrument(skip(self, content), fields(repo_id = %self.repo_id, path), err(Debug))]
182 pub async fn modify_file(
183 &self,
184 change_id: &grpc::ChangeId,
185 path: &str,
186 content: &[u8],
187 ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
188 let op = grpc::Op {
189 op: Some(grpc::op::Op::ModifyFile(grpc::OpModifyFile {
190 path: path.to_owned(),
191 content: Some(grpc::op_modify_file::Content::InlineData(content.to_vec())),
192 })),
193 };
194 self.apply_ops(change_id, vec![op]).await
195 }
196
197 #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id, path), err(Debug))]
205 pub async fn delete_path(
206 &self,
207 change_id: &grpc::ChangeId,
208 path: &str,
209 recursive: bool,
210 ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
211 let op = grpc::Op {
212 op: Some(grpc::op::Op::DeletePath(grpc::OpDeletePath {
213 path: path.to_owned(),
214 recursive,
215 })),
216 };
217 self.apply_ops(change_id, vec![op]).await
218 }
219
220 #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id, from_path, to_path), err(Debug))]
226 pub async fn move_path(
227 &self,
228 change_id: &grpc::ChangeId,
229 from_path: &str,
230 to_path: &str,
231 ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
232 let op = grpc::Op {
233 op: Some(grpc::op::Op::MovePath(grpc::OpMovePath {
234 from_path: from_path.to_owned(),
235 to_path: to_path.to_owned(),
236 })),
237 };
238 self.apply_ops(change_id, vec![op]).await
239 }
240
241 #[tracing::instrument(skip(self, ops), fields(repo_id = %self.repo_id, op_count = ops.len()), err(Debug))]
252 pub async fn apply_ops(
253 &self,
254 change_id: &grpc::ChangeId,
255 ops: Vec<grpc::Op>,
256 ) -> Result<grpc::ApplyOpsResponse, tonic::Status> {
257 let request = grpc::ApplyOpsRequest {
258 repo_id: self.repo_id.clone(),
259 change_id: Some(change_id.clone()),
260 expected_last_op_id: Some(self.last_op_id.load(Ordering::Relaxed)),
261 idempotency_key: String::new(),
262 ops,
263 };
264
265 let resp = self
266 .client
267 .clone()
268 .apply_ops(tokio_stream::once(request))
269 .await?;
270
271 let inner = resp.into_inner();
272 self.last_op_id.store(inner.last_op_id, Ordering::Relaxed);
273 Ok(inner)
274 }
275
276 #[tracing::instrument(skip(self), fields(repo_id = %self.repo_id), err(Debug))]
282 pub async fn snapshot(
283 &self,
284 change_id: &grpc::ChangeId,
285 message: &str,
286 ) -> Result<grpc::SnapshotChangeResponse, tonic::Status> {
287 let resp = self
288 .client
289 .clone()
290 .snapshot_change(grpc::SnapshotChangeRequest {
291 repo_id: self.repo_id.clone(),
292 change_id: Some(change_id.clone()),
293 expected_last_op_id: Some(self.last_op_id.load(Ordering::Relaxed)),
294 message: message.to_owned(),
295 idempotency_key: String::new(),
296 })
297 .await?;
298
299 Ok(resp.into_inner())
300 }
301}