1mod discussion;
15mod hook;
16mod hook_events;
17mod operation_log_query;
18mod signal;
19mod state_review;
20mod timeline;
21mod transaction;
22
23use std::sync::Arc;
24
25pub use discussion::LocalDiscussionService;
26pub use hook::LocalHookService;
27pub use hook_events::{EmitWaiter, HookEventBroadcaster, HookResponse};
28pub use operation_log_query::LocalOperationLogQueryService;
29use repo::{
30 Repository,
31 operation_dedup::{OperationDedupStore, reserve_operation_id_eager},
32};
33pub use signal::LocalSignalService;
34pub use state_review::LocalStateReviewService;
35pub use timeline::LocalTimelineService;
36pub use transaction::LocalTransactionService;
37
38#[derive(Clone)]
42pub struct GrpcLocalService {
43 pub(super) repo: Arc<Repository>,
44 pub(super) dedup: Arc<OperationDedupStore>,
45 pub(super) hook_events: HookEventBroadcaster,
51}
52
53impl GrpcLocalService {
54 pub fn new(repo: Arc<Repository>, dedup: Arc<OperationDedupStore>) -> Self {
55 Self {
56 repo,
57 dedup,
58 hook_events: HookEventBroadcaster::new(),
59 }
60 }
61
62 pub fn repo(&self) -> &Repository {
63 &self.repo
64 }
65
66 pub fn dedup(&self) -> &OperationDedupStore {
67 &self.dedup
68 }
69
70 pub fn hook_events(&self) -> &HookEventBroadcaster {
75 &self.hook_events
76 }
77}
78
79pub(super) async fn with_idempotency<F, Fut, T>(
87 service: &GrpcLocalService,
88 client_operation_id: &str,
89 verb: &'static str,
90 request_body: &[u8],
91 execute: F,
92) -> Result<T, tonic::Status>
93where
94 F: FnOnce() -> Fut,
95 Fut: std::future::Future<Output = Result<T, tonic::Status>>,
96 T: prost::Message + Default,
97{
98 use objects::object::OperationId;
99 use repo::operation_dedup::{DedupOutcome, hash_request_body};
100
101 if client_operation_id.is_empty() {
102 return execute().await;
103 }
104 let op_id: OperationId = client_operation_id.parse().map_err(|err| {
105 tonic::Status::invalid_argument(format!("invalid client_operation_id: {err}"))
106 })?;
107 let hash = hash_request_body(request_body);
108 let dedup = Arc::clone(&service.dedup);
113 let outcome = reserve_operation_id_eager(service.repo(), Arc::clone(&dedup), op_id, verb, hash)
114 .map_err(|err| tonic::Status::internal(format!("dedup reserve failed: {err}")))?;
115 match outcome {
116 DedupOutcome::Replay { response } => T::decode(response.as_slice())
117 .map_err(|err| tonic::Status::internal(format!("decode replay failed: {err}"))),
118 DedupOutcome::Conflict => Err(tonic::Status::failed_precondition(
119 "client_operation_id reused with a different request body",
120 )),
121 DedupOutcome::InFlight => Err(tonic::Status::aborted(
122 "client_operation_id is in flight from another caller; retry once it completes",
123 )),
124 DedupOutcome::Reserved => {
125 match execute().await {
130 Ok(result) => {
131 let encoded = result.encode_to_vec();
132 dedup.record(op_id, verb, hash, encoded).map_err(|err| {
133 tonic::Status::internal(format!("dedup record failed: {err}"))
134 })?;
135 Ok(result)
136 }
137 Err(status) => {
138 let _ = dedup.cancel(op_id, verb);
143 Err(status)
144 }
145 }
146 }
147 }
148}
149
150pub(super) fn to_status(err: objects::error::HeddleError) -> tonic::Status {
153 use objects::error::HeddleError;
154 match err {
155 HeddleError::NotFound(msg) => tonic::Status::not_found(msg),
156 HeddleError::StateNotFound(id) => tonic::Status::not_found(format!("state {id} not found")),
157 HeddleError::RepositoryNotFound(path) => {
158 tonic::Status::not_found(format!("repository not found at {}", path.display()))
159 }
160 HeddleError::InvalidObject(msg) => tonic::Status::invalid_argument(msg),
161 HeddleError::Conflict(msg) => tonic::Status::failed_precondition(msg),
162 HeddleError::Io(io) => tonic::Status::internal(format!("io error: {io}")),
163 other => tonic::Status::internal(other.to_string()),
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use std::{sync::Arc, time::Duration};
174
175 use grpc::heddle::v1::UpdateRefResponse;
176 use objects::object::OperationId;
177 use repo::{Repository, operation_dedup::OperationDedupStore};
178 use tempfile::TempDir;
179 use tokio::sync::oneshot;
180
181 use super::{GrpcLocalService, with_idempotency};
182
183 fn make_service() -> (TempDir, GrpcLocalService) {
184 let temp = TempDir::new().unwrap();
185 let repo = Arc::new(Repository::init_default(temp.path()).unwrap());
186 let store = Arc::new(OperationDedupStore::open(repo.heddle_dir()).unwrap());
187 (temp, GrpcLocalService::new(repo, store))
188 }
189
190 fn marker_response(marker: &str) -> UpdateRefResponse {
194 UpdateRefResponse {
195 success: true,
196 old_value: marker.to_string(),
197 error: String::new(),
198 old_revision_address: marker.to_string(),
199 }
200 }
201
202 #[tokio::test]
203 #[serial_test::serial(process_global)]
204 async fn replays_recorded_response() {
205 let (_t, service) = make_service();
206 let op_id = OperationId::new().to_string();
207 let body = b"req";
208
209 let first = with_idempotency(&service, &op_id, "verb", body, || async {
211 Ok::<UpdateRefResponse, tonic::Status>(marker_response("42"))
212 })
213 .await
214 .unwrap();
215 assert_eq!(first.old_value, "42");
216
217 let second = with_idempotency(&service, &op_id, "verb", body, || async {
220 #[allow(unreachable_code)]
221 Ok::<UpdateRefResponse, tonic::Status>(panic!("execute must not be called on replay"))
222 })
223 .await
224 .unwrap();
225 assert_eq!(second.old_value, "42");
226 }
227
228 #[tokio::test]
229 #[serial_test::serial(process_global)]
230 async fn concurrent_calls_with_same_op_id_run_execute_only_once() {
231 let (_t, service) = make_service();
237 let op_id = OperationId::new().to_string();
238 let body = b"req";
239
240 let (tx, rx) = oneshot::channel::<()>();
243 let service_a = service.clone();
244 let op_a = op_id.clone();
245 let a_handle = tokio::spawn(async move {
246 with_idempotency(&service_a, &op_a, "verb", body, || async move {
247 rx.await.expect("recv gate");
248 Ok::<UpdateRefResponse, tonic::Status>(marker_response("7"))
249 })
250 .await
251 });
252
253 tokio::time::sleep(Duration::from_millis(50)).await;
257
258 let service_b = service.clone();
259 let op_b = op_id.clone();
260 let b_result: Result<UpdateRefResponse, tonic::Status> =
261 with_idempotency(&service_b, &op_b, "verb", body, || async {
262 panic!("B's execute must not run while A holds the reservation");
263 })
264 .await;
265
266 let err = b_result.expect_err("B should be aborted");
268 assert_eq!(err.code(), tonic::Code::Aborted);
269
270 tx.send(()).unwrap();
272 let a_result = a_handle.await.unwrap().unwrap();
273 assert_eq!(a_result.old_value, "7");
274
275 let third = with_idempotency(&service, &op_id, "verb", body, || async {
278 #[allow(unreachable_code)]
279 Ok::<UpdateRefResponse, tonic::Status>(panic!("execute must not run on replay"))
280 })
281 .await
282 .unwrap();
283 assert_eq!(third.old_value, "7");
284 }
285
286 #[tokio::test]
287 #[serial_test::serial(process_global)]
288 async fn cancels_reservation_on_execute_failure() {
289 let (_t, service) = make_service();
296 let op_id = OperationId::new().to_string();
297 let body = b"req";
298
299 let first: Result<UpdateRefResponse, tonic::Status> =
300 with_idempotency(&service, &op_id, "verb", body, || async {
301 Err(tonic::Status::internal("transient"))
302 })
303 .await;
304 assert!(first.is_err());
305
306 let second = with_idempotency(&service, &op_id, "verb", body, || async {
308 Ok::<UpdateRefResponse, tonic::Status>(marker_response("11"))
309 })
310 .await
311 .unwrap();
312 assert_eq!(second.old_value, "11");
313 }
314}