Skip to main content

daemon/grpc_local_impl/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local-mode gRPC services for `heddle agent serve`.
3//!
4//! These services implement the gRPC contract over a single local
5//! [`Repository`]. They are distinct from `grpc_hosted_impl/` because they
6//! - don't require Postgres, Biscuit auth, or the multi-tenant registry,
7//! - are reachable over a Unix-domain socket from the same user,
8//! - share the dedup/idempotency middleware with the hosted variant via
9//!   [`repo::operation_dedup::OperationDedupStore`].
10//!
11//! Each service has its own file. The shared scaffolding (the
12//! [`GrpcLocalService`] struct, idempotency helpers) lives here.
13
14mod discussion;
15mod hook;
16mod hook_events;
17mod operation_log_query;
18mod signal;
19mod state_review;
20mod timeline;
21mod transaction;
22
23use std::sync::Arc;
24
25pub use discussion::LocalDiscussionService;
26pub use hook::LocalHookService;
27pub use hook_events::{EmitWaiter, HookEventBroadcaster, HookResponse};
28pub use operation_log_query::LocalOperationLogQueryService;
29use repo::{
30    Repository,
31    operation_dedup::{OperationDedupStore, reserve_operation_id_eager},
32};
33pub use signal::LocalSignalService;
34pub use state_review::LocalStateReviewService;
35pub use timeline::LocalTimelineService;
36pub use transaction::LocalTransactionService;
37
38/// Shared state for the local gRPC services. Handlers borrow the repository
39/// for the duration of a single RPC; the dedup store is consulted on every
40/// state-changing call.
41#[derive(Clone)]
42pub struct GrpcLocalService {
43    pub(super) repo: Arc<Repository>,
44    pub(super) dedup: Arc<OperationDedupStore>,
45    /// In-process hook-event broker. Lives here so every
46    /// handler — `subscribe_hook_events` (subscribe side) and
47    /// `respond_to_hook` (reply side) — meets on the same broker
48    /// instance. The capture/merge code paths will eventually borrow
49    /// this through [`Self::hook_events`] to fire events.
50    pub(super) hook_events: HookEventBroadcaster,
51}
52
53impl GrpcLocalService {
54    pub fn new(repo: Arc<Repository>, dedup: Arc<OperationDedupStore>) -> Self {
55        Self {
56            repo,
57            dedup,
58            hook_events: HookEventBroadcaster::new(),
59        }
60    }
61
62    pub fn repo(&self) -> &Repository {
63        &self.repo
64    }
65
66    pub fn dedup(&self) -> &OperationDedupStore {
67        &self.dedup
68    }
69
70    /// Borrow the in-process hook event broker. The capture/merge
71    /// emit sites use this to fire events; the `SubscribeHookEvents`
72    /// and `RespondToHook` handlers in `hook.rs` use it to wire
73    /// streams and responses to the same correlator id.
74    pub fn hook_events(&self) -> &HookEventBroadcaster {
75        &self.hook_events
76    }
77}
78
79/// Idempotency wrapper. Centralises the `check → execute → record` pattern
80/// so every state-changing handler folds the same dedup-store flow.
81///
82/// `client_operation_id` may be empty (caller didn't supply one) — in that
83/// case we don't dedup at all and just execute. When supplied, the body
84/// must be a deterministic byte representation of the request (typically
85/// the protobuf-encoded request).
86pub(super) async fn with_idempotency<F, Fut, T>(
87    service: &GrpcLocalService,
88    client_operation_id: &str,
89    verb: &'static str,
90    request_body: &[u8],
91    execute: F,
92) -> Result<T, tonic::Status>
93where
94    F: FnOnce() -> Fut,
95    Fut: std::future::Future<Output = Result<T, tonic::Status>>,
96    T: prost::Message + Default,
97{
98    use objects::object::OperationId;
99    use repo::operation_dedup::{DedupOutcome, hash_request_body};
100
101    if client_operation_id.is_empty() {
102        return execute().await;
103    }
104    let op_id: OperationId = client_operation_id.parse().map_err(|err| {
105        tonic::Status::invalid_argument(format!("invalid client_operation_id: {err}"))
106    })?;
107    let hash = hash_request_body(request_body);
108    // The eager reservation atomically claims the (op_id, verb) slot before
109    // we run the mutation. Two concurrent retries with the same operation_id
110    // can no longer both observe "Fresh" and both apply side effects: the
111    // second sees `InFlight` and surfaces a transient `Aborted` to the client.
112    let dedup = Arc::clone(&service.dedup);
113    let outcome = reserve_operation_id_eager(service.repo(), Arc::clone(&dedup), op_id, verb, hash)
114        .map_err(|err| tonic::Status::internal(format!("dedup reserve failed: {err}")))?;
115    match outcome {
116        DedupOutcome::Replay { response } => T::decode(response.as_slice())
117            .map_err(|err| tonic::Status::internal(format!("decode replay failed: {err}"))),
118        DedupOutcome::Conflict => Err(tonic::Status::failed_precondition(
119            "client_operation_id reused with a different request body",
120        )),
121        DedupOutcome::InFlight => Err(tonic::Status::aborted(
122            "client_operation_id is in flight from another caller; retry once it completes",
123        )),
124        DedupOutcome::Reserved => {
125            // Reservation is held until we either record (success) or
126            // cancel (failure). Without the cancel-on-error path, a failed
127            // execution would leave a permanent tombstone that all retries
128            // would see as `Conflict`/`InFlight` until compaction.
129            match execute().await {
130                Ok(result) => {
131                    let encoded = result.encode_to_vec();
132                    dedup.record(op_id, verb, hash, encoded).map_err(|err| {
133                        tonic::Status::internal(format!("dedup record failed: {err}"))
134                    })?;
135                    Ok(result)
136                }
137                Err(status) => {
138                    // Best-effort: if cancel itself fails (disk error etc.)
139                    // we still want to surface the original status to the
140                    // caller. Compaction will eventually clean a stranded
141                    // reservation up.
142                    let _ = dedup.cancel(op_id, verb);
143                    Err(status)
144                }
145            }
146        }
147    }
148}
149
150/// Helper for translating a [`HeddleError`](objects::error::HeddleError) into
151/// a [`tonic::Status`] with consistent codes across the local services.
152pub(super) fn to_status(err: objects::error::HeddleError) -> tonic::Status {
153    use objects::error::HeddleError;
154    match err {
155        HeddleError::NotFound(msg) => tonic::Status::not_found(msg),
156        HeddleError::StateNotFound(id) => tonic::Status::not_found(format!("state {id} not found")),
157        HeddleError::RepositoryNotFound(path) => {
158            tonic::Status::not_found(format!("repository not found at {}", path.display()))
159        }
160        HeddleError::InvalidObject(msg) => tonic::Status::invalid_argument(msg),
161        HeddleError::Conflict(msg) => tonic::Status::failed_precondition(msg),
162        HeddleError::Io(io) => tonic::Status::internal(format!("io error: {io}")),
163        other => tonic::Status::internal(other.to_string()),
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    //! End-to-end tests for [`with_idempotency`] that exercise the
170    //! `Reserved` / `InFlight` / `Replay` / `Conflict` outcomes through the
171    //! same wrapper every gRPC handler calls.
172
173    use std::{sync::Arc, time::Duration};
174
175    use grpc::heddle::v1::UpdateRefResponse;
176    use objects::object::OperationId;
177    use repo::{Repository, operation_dedup::OperationDedupStore};
178    use tempfile::TempDir;
179    use tokio::sync::oneshot;
180
181    use super::{GrpcLocalService, with_idempotency};
182
183    fn make_service() -> (TempDir, GrpcLocalService) {
184        let temp = TempDir::new().unwrap();
185        let repo = Arc::new(Repository::init_default(temp.path()).unwrap());
186        let store = Arc::new(OperationDedupStore::open(repo.heddle_dir()).unwrap());
187        (temp, GrpcLocalService::new(repo, store))
188    }
189
190    /// A distinguishable prost response payload for the idempotency-flow
191    /// tests: the carried marker rides in `old_value` so a replayed decode
192    /// can be asserted against the originally-recorded value.
193    fn marker_response(marker: &str) -> UpdateRefResponse {
194        UpdateRefResponse {
195            success: true,
196            old_value: marker.to_string(),
197            error: String::new(),
198        }
199    }
200
201    #[tokio::test]
202    #[serial_test::serial(process_global)]
203    async fn replays_recorded_response() {
204        let (_t, service) = make_service();
205        let op_id = OperationId::new().to_string();
206        let body = b"req";
207
208        // First call executes and records.
209        let first = with_idempotency(&service, &op_id, "verb", body, || async {
210            Ok::<UpdateRefResponse, tonic::Status>(marker_response("42"))
211        })
212        .await
213        .unwrap();
214        assert_eq!(first.old_value, "42");
215
216        // Second call must replay without re-executing — proven by the
217        // execute closure panicking if invoked.
218        let second = with_idempotency(&service, &op_id, "verb", body, || async {
219            #[allow(unreachable_code)]
220            Ok::<UpdateRefResponse, tonic::Status>(panic!("execute must not be called on replay"))
221        })
222        .await
223        .unwrap();
224        assert_eq!(second.old_value, "42");
225    }
226
227    #[tokio::test]
228    #[serial_test::serial(process_global)]
229    async fn concurrent_calls_with_same_op_id_run_execute_only_once() {
230        // The original race window: caller A enters with `Fresh`, awaits
231        // execute(), and caller B enters with `Fresh` before A records.
232        // Both used to apply side effects. With reservation, B must see
233        // `InFlight` and surface `Aborted`.
234
235        let (_t, service) = make_service();
236        let op_id = OperationId::new().to_string();
237        let body = b"req";
238
239        // We gate the first execution on a oneshot so caller B starts
240        // while A is still pending.
241        let (tx, rx) = oneshot::channel::<()>();
242        let service_a = service.clone();
243        let op_a = op_id.clone();
244        let a_handle = tokio::spawn(async move {
245            with_idempotency(&service_a, &op_a, "verb", body, || async move {
246                rx.await.expect("recv gate");
247                Ok::<UpdateRefResponse, tonic::Status>(marker_response("7"))
248            })
249            .await
250        });
251
252        // Give A a moment to claim the reservation. The wrapper writes the
253        // pending entry synchronously inside the dedup mutex before it
254        // awaits, so once we yield the entry is visible.
255        tokio::time::sleep(Duration::from_millis(50)).await;
256
257        let service_b = service.clone();
258        let op_b = op_id.clone();
259        let b_result: Result<UpdateRefResponse, tonic::Status> =
260            with_idempotency(&service_b, &op_b, "verb", body, || async {
261                panic!("B's execute must not run while A holds the reservation");
262            })
263            .await;
264
265        // B sees the in-flight reservation and aborts.
266        let err = b_result.expect_err("B should be aborted");
267        assert_eq!(err.code(), tonic::Code::Aborted);
268
269        // Now release A.
270        tx.send(()).unwrap();
271        let a_result = a_handle.await.unwrap().unwrap();
272        assert_eq!(a_result.old_value, "7");
273
274        // After A finishes, the entry is finalised: a third call with the
275        // same body replays.
276        let third = with_idempotency(&service, &op_id, "verb", body, || async {
277            #[allow(unreachable_code)]
278            Ok::<UpdateRefResponse, tonic::Status>(panic!("execute must not run on replay"))
279        })
280        .await
281        .unwrap();
282        assert_eq!(third.old_value, "7");
283    }
284
285    #[tokio::test]
286    #[serial_test::serial(process_global)]
287    async fn cancels_reservation_on_execute_failure() {
288        // If execute returns Err, the reservation must be released so a
289        // retry isn't permanently blocked. Without `cancel`, a transient
290        // failure during the first attempt would leave the slot held and
291        // every subsequent retry would see Conflict/InFlight until
292        // compaction.
293
294        let (_t, service) = make_service();
295        let op_id = OperationId::new().to_string();
296        let body = b"req";
297
298        let first: Result<UpdateRefResponse, tonic::Status> =
299            with_idempotency(&service, &op_id, "verb", body, || async {
300                Err(tonic::Status::internal("transient"))
301            })
302            .await;
303        assert!(first.is_err());
304
305        // Retry must succeed — the reservation was released.
306        let second = with_idempotency(&service, &op_id, "verb", body, || async {
307            Ok::<UpdateRefResponse, tonic::Status>(marker_response("11"))
308        })
309        .await
310        .unwrap();
311        assert_eq!(second.old_value, "11");
312    }
313}