1mod discussion;
15mod hook;
16mod hook_events;
17mod operation_log_query;
18mod signal;
19mod state_review;
20mod transaction;
21
22use std::sync::Arc;
23
24pub use discussion::LocalDiscussionService;
25pub use hook::LocalHookService;
26pub use hook_events::{EmitWaiter, HookEventBroadcaster, HookResponse};
27pub use operation_log_query::LocalOperationLogQueryService;
28use repo::{Repository, operation_dedup::OperationDedupStore};
29pub use signal::LocalSignalService;
30pub use state_review::LocalStateReviewService;
31pub use transaction::LocalTransactionService;
32
33#[derive(Clone)]
37pub struct GrpcLocalService {
38 pub(super) repo: Arc<Repository>,
39 pub(super) dedup: Arc<OperationDedupStore>,
40 pub(super) hook_events: HookEventBroadcaster,
46}
47
48impl GrpcLocalService {
49 pub fn new(repo: Arc<Repository>, dedup: Arc<OperationDedupStore>) -> Self {
50 Self {
51 repo,
52 dedup,
53 hook_events: HookEventBroadcaster::new(),
54 }
55 }
56
57 pub fn repo(&self) -> &Repository {
58 &self.repo
59 }
60
61 pub fn dedup(&self) -> &OperationDedupStore {
62 &self.dedup
63 }
64
65 pub fn hook_events(&self) -> &HookEventBroadcaster {
70 &self.hook_events
71 }
72}
73
74pub(super) async fn with_idempotency<F, Fut, T>(
82 dedup: &OperationDedupStore,
83 client_operation_id: &str,
84 verb: &'static str,
85 request_body: &[u8],
86 encode_response: impl FnOnce(&T) -> Vec<u8>,
87 decode_response: impl FnOnce(Vec<u8>) -> Result<T, tonic::Status>,
88 execute: F,
89) -> Result<T, tonic::Status>
90where
91 F: FnOnce() -> Fut,
92 Fut: std::future::Future<Output = Result<T, tonic::Status>>,
93{
94 use objects::object::OperationId;
95 use repo::operation_dedup::{DedupOutcome, hash_request_body};
96
97 if client_operation_id.is_empty() {
98 return execute().await;
99 }
100 let op_id: OperationId = client_operation_id.parse().map_err(|err| {
101 tonic::Status::invalid_argument(format!("invalid client_operation_id: {err}"))
102 })?;
103 let hash = hash_request_body(request_body);
104 let outcome = dedup
109 .reserve(op_id, verb, hash)
110 .map_err(|err| tonic::Status::internal(format!("dedup reserve failed: {err}")))?;
111 match outcome {
112 DedupOutcome::Replay { response } => decode_response(response),
113 DedupOutcome::Conflict => Err(tonic::Status::failed_precondition(
114 "client_operation_id reused with a different request body",
115 )),
116 DedupOutcome::InFlight => Err(tonic::Status::aborted(
117 "client_operation_id is in flight from another caller; retry once it completes",
118 )),
119 DedupOutcome::Reserved => {
120 match execute().await {
125 Ok(result) => {
126 let encoded = encode_response(&result);
127 dedup.record(op_id, verb, hash, encoded).map_err(|err| {
128 tonic::Status::internal(format!("dedup record failed: {err}"))
129 })?;
130 Ok(result)
131 }
132 Err(status) => {
133 let _ = dedup.cancel(op_id, verb);
138 Err(status)
139 }
140 }
141 }
142 }
143}
144
145pub(super) fn to_status(err: objects::error::HeddleError) -> tonic::Status {
148 use objects::error::HeddleError;
149 match err {
150 HeddleError::NotFound(msg) => tonic::Status::not_found(msg),
151 HeddleError::StateNotFound(id) => tonic::Status::not_found(format!("state {id} not found")),
152 HeddleError::RepositoryNotFound(path) => {
153 tonic::Status::not_found(format!("repository not found at {}", path.display()))
154 }
155 HeddleError::InvalidObject(msg) => tonic::Status::invalid_argument(msg),
156 HeddleError::Conflict(msg) => tonic::Status::failed_precondition(msg),
157 HeddleError::Io(io) => tonic::Status::internal(format!("io error: {io}")),
158 other => tonic::Status::internal(other.to_string()),
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use std::{sync::Arc, time::Duration};
169
170 use objects::object::OperationId;
171 use repo::operation_dedup::OperationDedupStore;
172 use tempfile::TempDir;
173 use tokio::sync::oneshot;
174
175 use super::with_idempotency;
176
177 fn make_store() -> (TempDir, Arc<OperationDedupStore>) {
178 let temp = TempDir::new().unwrap();
179 let heddle = temp.path().join(".heddle");
180 std::fs::create_dir_all(&heddle).unwrap();
181 let store = OperationDedupStore::open(&heddle).unwrap();
182 (temp, Arc::new(store))
183 }
184
185 #[tokio::test]
186 async fn replays_recorded_response() {
187 let (_t, store) = make_store();
188 let op_id = OperationId::new().to_string();
189 let body = b"req";
190
191 let first: i32 = with_idempotency(
193 &store,
194 &op_id,
195 "verb",
196 body,
197 |v: &i32| v.to_be_bytes().to_vec(),
198 |bytes| {
199 Ok(i32::from_be_bytes(
200 bytes.as_slice().try_into().expect("4 bytes"),
201 ))
202 },
203 || async { Ok::<i32, tonic::Status>(42) },
204 )
205 .await
206 .unwrap();
207 assert_eq!(first, 42);
208
209 let second: i32 = with_idempotency(
212 &store,
213 &op_id,
214 "verb",
215 body,
216 |v: &i32| v.to_be_bytes().to_vec(),
217 |bytes| {
218 Ok(i32::from_be_bytes(
219 bytes.as_slice().try_into().expect("4 bytes"),
220 ))
221 },
222 || async {
223 #[allow(unreachable_code)]
224 Ok::<i32, tonic::Status>(panic!("execute must not be called on replay"))
225 },
226 )
227 .await
228 .unwrap();
229 assert_eq!(second, 42);
230 }
231
232 #[tokio::test]
233 async fn concurrent_calls_with_same_op_id_run_execute_only_once() {
234 let (_t, store) = make_store();
240 let op_id = OperationId::new().to_string();
241 let body = b"req";
242
243 let (tx, rx) = oneshot::channel::<()>();
246 let store_a = Arc::clone(&store);
247 let op_a = op_id.clone();
248 let a_handle = tokio::spawn(async move {
249 with_idempotency(
250 &store_a,
251 &op_a,
252 "verb",
253 body,
254 |v: &i32| v.to_be_bytes().to_vec(),
255 |bytes| {
256 Ok(i32::from_be_bytes(
257 bytes.as_slice().try_into().expect("4 bytes"),
258 ))
259 },
260 || async move {
261 rx.await.expect("recv gate");
262 Ok::<i32, tonic::Status>(7)
263 },
264 )
265 .await
266 });
267
268 tokio::time::sleep(Duration::from_millis(50)).await;
272
273 let store_b = Arc::clone(&store);
274 let op_b = op_id.clone();
275 let b_result = with_idempotency(
276 &store_b,
277 &op_b,
278 "verb",
279 body,
280 |v: &i32| v.to_be_bytes().to_vec(),
281 |bytes| {
282 Ok(i32::from_be_bytes(
283 bytes.as_slice().try_into().expect("4 bytes"),
284 ))
285 },
286 || async {
287 panic!("B's execute must not run while A holds the reservation");
288 },
289 )
290 .await;
291
292 let err = b_result.expect_err("B should be aborted");
294 assert_eq!(err.code(), tonic::Code::Aborted);
295
296 tx.send(()).unwrap();
298 let a_result = a_handle.await.unwrap().unwrap();
299 assert_eq!(a_result, 7);
300
301 let third = with_idempotency(
304 &store,
305 &op_id,
306 "verb",
307 body,
308 |v: &i32| v.to_be_bytes().to_vec(),
309 |bytes| {
310 Ok(i32::from_be_bytes(
311 bytes.as_slice().try_into().expect("4 bytes"),
312 ))
313 },
314 || async {
315 panic!("execute must not run on replay");
316 },
317 )
318 .await
319 .unwrap();
320 assert_eq!(third, 7);
321 }
322
323 #[tokio::test]
324 async fn cancels_reservation_on_execute_failure() {
325 let (_t, store) = make_store();
332 let op_id = OperationId::new().to_string();
333 let body = b"req";
334
335 let first = with_idempotency::<_, _, i32>(
336 &store,
337 &op_id,
338 "verb",
339 body,
340 |v| v.to_be_bytes().to_vec(),
341 |bytes| {
342 Ok(i32::from_be_bytes(
343 bytes.as_slice().try_into().expect("4 bytes"),
344 ))
345 },
346 || async { Err(tonic::Status::internal("transient")) },
347 )
348 .await;
349 assert!(first.is_err());
350
351 let second = with_idempotency(
353 &store,
354 &op_id,
355 "verb",
356 body,
357 |v: &i32| v.to_be_bytes().to_vec(),
358 |bytes| {
359 Ok(i32::from_be_bytes(
360 bytes.as_slice().try_into().expect("4 bytes"),
361 ))
362 },
363 || async { Ok(11) },
364 )
365 .await
366 .unwrap();
367 assert_eq!(second, 11);
368 }
369}