Skip to main content

daemon/grpc_local_impl/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local-mode gRPC services for `heddle agent serve`.
3//!
4//! These services implement the gRPC contract over a single local
5//! [`Repository`]. They are distinct from `grpc_hosted_impl/` because they
6//! - don't require Postgres, Biscuit auth, or the multi-tenant registry,
7//! - are reachable over a Unix-domain socket from the same user,
8//! - share the dedup/idempotency middleware with the hosted variant via
9//!   [`repo::operation_dedup::OperationDedupStore`].
10//!
11//! Each service has its own file. The shared scaffolding (the
12//! [`GrpcLocalService`] struct, idempotency helpers) lives here.
13
14mod discussion;
15mod hook;
16mod hook_events;
17mod operation_log_query;
18mod signal;
19mod state_review;
20mod transaction;
21
22use std::sync::Arc;
23
24pub use discussion::LocalDiscussionService;
25pub use hook::LocalHookService;
26pub use hook_events::{EmitWaiter, HookEventBroadcaster, HookResponse};
27pub use operation_log_query::LocalOperationLogQueryService;
28use repo::{Repository, operation_dedup::OperationDedupStore};
29pub use signal::LocalSignalService;
30pub use state_review::LocalStateReviewService;
31pub use transaction::LocalTransactionService;
32
33/// Shared state for the local gRPC services. Handlers borrow the repository
34/// for the duration of a single RPC; the dedup store is consulted on every
35/// state-changing call.
36#[derive(Clone)]
37pub struct GrpcLocalService {
38    pub(super) repo: Arc<Repository>,
39    pub(super) dedup: Arc<OperationDedupStore>,
40    /// In-process hook-event broker. Lives here so every
41    /// handler — `subscribe_hook_events` (subscribe side) and
42    /// `respond_to_hook` (reply side) — meets on the same broker
43    /// instance. The capture/merge code paths will eventually borrow
44    /// this through [`Self::hook_events`] to fire events.
45    pub(super) hook_events: HookEventBroadcaster,
46}
47
48impl GrpcLocalService {
49    pub fn new(repo: Arc<Repository>, dedup: Arc<OperationDedupStore>) -> Self {
50        Self {
51            repo,
52            dedup,
53            hook_events: HookEventBroadcaster::new(),
54        }
55    }
56
57    pub fn repo(&self) -> &Repository {
58        &self.repo
59    }
60
61    pub fn dedup(&self) -> &OperationDedupStore {
62        &self.dedup
63    }
64
65    /// Borrow the in-process hook event broker. The capture/merge
66    /// emit sites use this to fire events; the `SubscribeHookEvents`
67    /// and `RespondToHook` handlers in `hook.rs` use it to wire
68    /// streams and responses to the same correlator id.
69    pub fn hook_events(&self) -> &HookEventBroadcaster {
70        &self.hook_events
71    }
72}
73
74/// Idempotency wrapper. Centralises the `check → execute → record` pattern
75/// so every state-changing handler folds the same dedup-store flow.
76///
77/// `client_operation_id` may be empty (caller didn't supply one) — in that
78/// case we don't dedup at all and just execute. When supplied, the body
79/// must be a deterministic byte representation of the request (typically
80/// the protobuf-encoded request).
81pub(super) async fn with_idempotency<F, Fut, T>(
82    dedup: &OperationDedupStore,
83    client_operation_id: &str,
84    verb: &'static str,
85    request_body: &[u8],
86    encode_response: impl FnOnce(&T) -> Vec<u8>,
87    decode_response: impl FnOnce(Vec<u8>) -> Result<T, tonic::Status>,
88    execute: F,
89) -> Result<T, tonic::Status>
90where
91    F: FnOnce() -> Fut,
92    Fut: std::future::Future<Output = Result<T, tonic::Status>>,
93{
94    use objects::object::OperationId;
95    use repo::operation_dedup::{DedupOutcome, hash_request_body};
96
97    if client_operation_id.is_empty() {
98        return execute().await;
99    }
100    let op_id: OperationId = client_operation_id.parse().map_err(|err| {
101        tonic::Status::invalid_argument(format!("invalid client_operation_id: {err}"))
102    })?;
103    let hash = hash_request_body(request_body);
104    // `reserve` atomically claims the (op_id, verb) slot before we run the
105    // mutation. Two concurrent retries with the same operation_id can no
106    // longer both observe "Fresh" and both apply side effects: the second
107    // sees `InFlight` and surfaces a transient `Aborted` to the client.
108    let outcome = dedup
109        .reserve(op_id, verb, hash)
110        .map_err(|err| tonic::Status::internal(format!("dedup reserve failed: {err}")))?;
111    match outcome {
112        DedupOutcome::Replay { response } => decode_response(response),
113        DedupOutcome::Conflict => Err(tonic::Status::failed_precondition(
114            "client_operation_id reused with a different request body",
115        )),
116        DedupOutcome::InFlight => Err(tonic::Status::aborted(
117            "client_operation_id is in flight from another caller; retry once it completes",
118        )),
119        DedupOutcome::Reserved => {
120            // Reservation is held until we either record (success) or
121            // cancel (failure). Without the cancel-on-error path, a failed
122            // execution would leave a permanent tombstone that all retries
123            // would see as `Conflict`/`InFlight` until compaction.
124            match execute().await {
125                Ok(result) => {
126                    let encoded = encode_response(&result);
127                    dedup.record(op_id, verb, hash, encoded).map_err(|err| {
128                        tonic::Status::internal(format!("dedup record failed: {err}"))
129                    })?;
130                    Ok(result)
131                }
132                Err(status) => {
133                    // Best-effort: if cancel itself fails (disk error etc.)
134                    // we still want to surface the original status to the
135                    // caller. Compaction will eventually clean a stranded
136                    // reservation up.
137                    let _ = dedup.cancel(op_id, verb);
138                    Err(status)
139                }
140            }
141        }
142    }
143}
144
145/// Helper for translating a [`HeddleError`](objects::error::HeddleError) into
146/// a [`tonic::Status`] with consistent codes across the local services.
147pub(super) fn to_status(err: objects::error::HeddleError) -> tonic::Status {
148    use objects::error::HeddleError;
149    match err {
150        HeddleError::NotFound(msg) => tonic::Status::not_found(msg),
151        HeddleError::StateNotFound(id) => tonic::Status::not_found(format!("state {id} not found")),
152        HeddleError::RepositoryNotFound(path) => {
153            tonic::Status::not_found(format!("repository not found at {}", path.display()))
154        }
155        HeddleError::InvalidObject(msg) => tonic::Status::invalid_argument(msg),
156        HeddleError::Conflict(msg) => tonic::Status::failed_precondition(msg),
157        HeddleError::Io(io) => tonic::Status::internal(format!("io error: {io}")),
158        other => tonic::Status::internal(other.to_string()),
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    //! End-to-end tests for [`with_idempotency`] that exercise the
165    //! `Reserved` / `InFlight` / `Replay` / `Conflict` outcomes through the
166    //! same wrapper every gRPC handler calls.
167
168    use std::{sync::Arc, time::Duration};
169
170    use objects::object::OperationId;
171    use repo::operation_dedup::OperationDedupStore;
172    use tempfile::TempDir;
173    use tokio::sync::oneshot;
174
175    use super::with_idempotency;
176
177    fn make_store() -> (TempDir, Arc<OperationDedupStore>) {
178        let temp = TempDir::new().unwrap();
179        let heddle = temp.path().join(".heddle");
180        std::fs::create_dir_all(&heddle).unwrap();
181        let store = OperationDedupStore::open(&heddle).unwrap();
182        (temp, Arc::new(store))
183    }
184
185    #[tokio::test]
186    async fn replays_recorded_response() {
187        let (_t, store) = make_store();
188        let op_id = OperationId::new().to_string();
189        let body = b"req";
190
191        // First call executes and records.
192        let first: i32 = with_idempotency(
193            &store,
194            &op_id,
195            "verb",
196            body,
197            |v: &i32| v.to_be_bytes().to_vec(),
198            |bytes| {
199                Ok(i32::from_be_bytes(
200                    bytes.as_slice().try_into().expect("4 bytes"),
201                ))
202            },
203            || async { Ok::<i32, tonic::Status>(42) },
204        )
205        .await
206        .unwrap();
207        assert_eq!(first, 42);
208
209        // Second call must replay without re-executing — proven by the
210        // execute closure returning a sentinel that would mismatch.
211        let second: i32 = with_idempotency(
212            &store,
213            &op_id,
214            "verb",
215            body,
216            |v: &i32| v.to_be_bytes().to_vec(),
217            |bytes| {
218                Ok(i32::from_be_bytes(
219                    bytes.as_slice().try_into().expect("4 bytes"),
220                ))
221            },
222            || async {
223                #[allow(unreachable_code)]
224                Ok::<i32, tonic::Status>(panic!("execute must not be called on replay"))
225            },
226        )
227        .await
228        .unwrap();
229        assert_eq!(second, 42);
230    }
231
232    #[tokio::test]
233    async fn concurrent_calls_with_same_op_id_run_execute_only_once() {
234        // The original race window: caller A enters with `Fresh`, awaits
235        // execute(), and caller B enters with `Fresh` before A records.
236        // Both used to apply side effects. With reservation, B must see
237        // `InFlight` and surface `Aborted`.
238
239        let (_t, store) = make_store();
240        let op_id = OperationId::new().to_string();
241        let body = b"req";
242
243        // We gate the first execution on a oneshot so caller B starts
244        // while A is still pending.
245        let (tx, rx) = oneshot::channel::<()>();
246        let store_a = Arc::clone(&store);
247        let op_a = op_id.clone();
248        let a_handle = tokio::spawn(async move {
249            with_idempotency(
250                &store_a,
251                &op_a,
252                "verb",
253                body,
254                |v: &i32| v.to_be_bytes().to_vec(),
255                |bytes| {
256                    Ok(i32::from_be_bytes(
257                        bytes.as_slice().try_into().expect("4 bytes"),
258                    ))
259                },
260                || async move {
261                    rx.await.expect("recv gate");
262                    Ok::<i32, tonic::Status>(7)
263                },
264            )
265            .await
266        });
267
268        // Give A a moment to claim the reservation. The wrapper writes the
269        // pending entry synchronously inside the dedup mutex before it
270        // awaits, so once we yield the entry is visible.
271        tokio::time::sleep(Duration::from_millis(50)).await;
272
273        let store_b = Arc::clone(&store);
274        let op_b = op_id.clone();
275        let b_result = with_idempotency(
276            &store_b,
277            &op_b,
278            "verb",
279            body,
280            |v: &i32| v.to_be_bytes().to_vec(),
281            |bytes| {
282                Ok(i32::from_be_bytes(
283                    bytes.as_slice().try_into().expect("4 bytes"),
284                ))
285            },
286            || async {
287                panic!("B's execute must not run while A holds the reservation");
288            },
289        )
290        .await;
291
292        // B sees the in-flight reservation and aborts.
293        let err = b_result.expect_err("B should be aborted");
294        assert_eq!(err.code(), tonic::Code::Aborted);
295
296        // Now release A.
297        tx.send(()).unwrap();
298        let a_result = a_handle.await.unwrap().unwrap();
299        assert_eq!(a_result, 7);
300
301        // After A finishes, the entry is finalised: a third call with the
302        // same body replays.
303        let third = with_idempotency(
304            &store,
305            &op_id,
306            "verb",
307            body,
308            |v: &i32| v.to_be_bytes().to_vec(),
309            |bytes| {
310                Ok(i32::from_be_bytes(
311                    bytes.as_slice().try_into().expect("4 bytes"),
312                ))
313            },
314            || async {
315                panic!("execute must not run on replay");
316            },
317        )
318        .await
319        .unwrap();
320        assert_eq!(third, 7);
321    }
322
323    #[tokio::test]
324    async fn cancels_reservation_on_execute_failure() {
325        // If execute returns Err, the reservation must be released so a
326        // retry isn't permanently blocked. Without `cancel`, a transient
327        // failure during the first attempt would leave the slot held and
328        // every subsequent retry would see Conflict/InFlight until
329        // compaction.
330
331        let (_t, store) = make_store();
332        let op_id = OperationId::new().to_string();
333        let body = b"req";
334
335        let first = with_idempotency::<_, _, i32>(
336            &store,
337            &op_id,
338            "verb",
339            body,
340            |v| v.to_be_bytes().to_vec(),
341            |bytes| {
342                Ok(i32::from_be_bytes(
343                    bytes.as_slice().try_into().expect("4 bytes"),
344                ))
345            },
346            || async { Err(tonic::Status::internal("transient")) },
347        )
348        .await;
349        assert!(first.is_err());
350
351        // Retry must succeed — the reservation was released.
352        let second = with_idempotency(
353            &store,
354            &op_id,
355            "verb",
356            body,
357            |v: &i32| v.to_be_bytes().to_vec(),
358            |bytes| {
359                Ok(i32::from_be_bytes(
360                    bytes.as_slice().try_into().expect("4 bytes"),
361                ))
362            },
363            || async { Ok(11) },
364        )
365        .await
366        .unwrap();
367        assert_eq!(second, 11);
368    }
369}