Skip to main content

daemon/grpc_local_impl/
hook_events.rs

1// SPDX-License-Identifier: Apache-2.0
2//! In-process hook-event broker for the local-mode daemon.
3//!
4//! The hosted variant rides Postgres NOTIFY (see the `server` crate's
5//! `events` module). Local mode is single-process, single-user, no Postgres — so we
6//! model the same fan-out shape with a `tokio::sync::broadcast`
7//! channel for emit→subscribe and a per-correlator `oneshot` for
8//! response routing.
9//!
10//! Lifecycle of a single event:
11//!
12//! 1. The capture/merge code path (a future workstream consumer)
13//!    calls [`HookEventBroadcaster::emit`] with a JSON payload. The
14//!    broker mints a `hook_event_id`, registers a fresh response
15//!    slot, and broadcasts the event to every live subscriber.
16//! 2. Each subscriber (a `SubscribeHookEvents` server-stream) picks
17//!    up the event from its `mpsc::Receiver` and forwards it to the
18//!    hook process.
19//! 3. The hook reads the event, computes its reply, and the local
20//!    daemon delivers it via `RespondToHook`. The handler routes the
21//!    reply through [`HookEventBroadcaster::deliver_response`].
22//! 4. The original emit caller awaits the reply via
23//!    [`HookEventBroadcaster::await_response`], with a timeout so a
24//!    crashed hook can't wedge the operation.
25//!
26//! Out-of-scope here:
27//!   - Multiple hooks racing to reply: the first reply wins; the
28//!     second is reported as `accepted=false` to its caller. The
29//!     wire shape doesn't try to fan replies in.
30//!   - Persisting in-flight events across daemon restart: the local
31//!     daemon is meant to be the same lifetime as the agent loop,
32//!     so a crash drops every in-flight reply. Hooks see the stream
33//!     close and the emit caller times out.
34
35use std::{
36    collections::HashMap,
37    sync::{Arc, Mutex},
38    time::Duration,
39};
40
41use grpc::heddle::v1::HookEvent as ProtoHookEvent;
42use objects::object::OperationId;
43use prost_types::Timestamp;
44use serde::{Deserialize, Serialize};
45use tokio::sync::{broadcast, mpsc, oneshot};
46
47/// Channel capacity for the in-process broadcast. Each subscriber gets
48/// its own queue; if a subscriber lags more than this many events
49/// behind, the oldest events are dropped (the subscriber sees a
50/// `Lagged` error in its `recv` and we close the stream so the hook
51/// can re-subscribe).
52const BROADCAST_CAPACITY: usize = 256;
53
54/// Typed hook response decoded from `RespondToHook`. The universal
55/// veto channel is `abort`; per-event extension fields ride on `extra`
56/// so per-event handlers can pull `extra_signals`, `veto`, etc.
57/// without the universal type having to know every shape.
58///
59/// This type's home will move to `crates/repo/src/hooks.rs` so the
60/// CLI hook runner can decode the same shape from stdout. Until that
61/// lands, the broker carries its own definition; the wire format on
62/// `RespondToHookRequest` decodes into this type and the emit-side
63/// awaits it. Field names match the spec verbatim so the eventual
64/// move to `repo::hooks` is a one-line `pub use`.
65#[derive(Debug, Clone, Default, Serialize, Deserialize)]
66pub struct HookResponse {
67    #[serde(default)]
68    pub abort: String,
69    #[serde(flatten, default)]
70    pub extra: serde_json::Value,
71}
72
73/// Per-event metadata routed from `emit` to `await_response`. The
74/// channel is single-shot — once a reply lands, the slot is removed.
75struct ResponseSlot {
76    sender: oneshot::Sender<HookResponse>,
77}
78
79/// In-process pub/sub broker for hook events. Lives on
80/// [`GrpcLocalService`](super::GrpcLocalService) so every handler
81/// shares the same broker and a `subscribe_hook_events` stream and a
82/// `respond_to_hook` reply meet on the same correlator.
83#[derive(Clone)]
84pub struct HookEventBroadcaster {
85    inner: Arc<HookEventBroadcasterInner>,
86}
87
88struct HookEventBroadcasterInner {
89    /// Broadcast sender. Fan-out shape so every subscriber gets its
90    /// own backpressure rather than blocking the emitter.
91    sender: broadcast::Sender<ProtoHookEvent>,
92    /// Pending response slots keyed by `hook_event_id`. Mutex is
93    /// fine here — every operation is short and the contention is low
94    /// (one entry per in-flight emit).
95    pending: Mutex<HashMap<String, ResponseSlot>>,
96}
97
98impl Default for HookEventBroadcaster {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104impl HookEventBroadcaster {
105    pub fn new() -> Self {
106        let (sender, _) = broadcast::channel(BROADCAST_CAPACITY);
107        Self {
108            inner: Arc::new(HookEventBroadcasterInner {
109                sender,
110                pending: Mutex::new(HashMap::new()),
111            }),
112        }
113    }
114
115    /// Subscribe a fresh stream. Each call returns its own
116    /// [`mpsc::Receiver`] backed by a forwarding task that drains the
117    /// underlying `broadcast::Receiver`. The mpsc shape lets us close
118    /// the stream cleanly when the subscriber drops, and lets the
119    /// `Lagged` error close the stream rather than panicking.
120    pub fn subscribe(&self) -> mpsc::Receiver<ProtoHookEvent> {
121        let mut rx = self.inner.sender.subscribe();
122        // Buffer one event ahead of the consumer; broadcast handles
123        // the actual fan-out backlog so the mpsc only needs a small
124        // shock-absorber capacity.
125        let (tx, out_rx) = mpsc::channel(16);
126        tokio::spawn(async move {
127            loop {
128                match rx.recv().await {
129                    Ok(event) => {
130                        if tx.send(event).await.is_err() {
131                            break;
132                        }
133                    }
134                    Err(broadcast::error::RecvError::Lagged(_)) => {
135                        // Subscriber fell behind. Drop the stream so
136                        // the hook reconnects rather than silently
137                        // missing events — the alternative (skip and
138                        // continue) makes silent veto loss possible.
139                        break;
140                    }
141                    Err(broadcast::error::RecvError::Closed) => break,
142                }
143            }
144        });
145        out_rx
146    }
147
148    /// Emit a fresh hook event. Returns the `hook_event_id` the
149    /// caller should pass to [`Self::await_response`]. The id is a
150    /// stringified UUIDv4 so it survives JSON round-trips intact.
151    ///
152    /// `payload_json` is delivered verbatim — schema validation lives
153    /// in the catalog (see `GetHookEventSchema`) and is the caller's
154    /// responsibility for now.
155    pub fn emit(&self, event_name: impl Into<String>, payload_json: impl Into<String>) -> String {
156        let hook_event_id = OperationId::new().to_string();
157        let now = std::time::SystemTime::now()
158            .duration_since(std::time::UNIX_EPOCH)
159            .unwrap_or_default();
160        let event = ProtoHookEvent {
161            hook_event_id: hook_event_id.clone(),
162            event_name: event_name.into(),
163            payload_json: payload_json.into(),
164            emitted_at: Some(Timestamp {
165                seconds: now.as_secs() as i64,
166                nanos: now.subsec_nanos() as i32,
167            }),
168        };
169        // Best-effort send: if there are no subscribers the broadcast
170        // returns an error which we deliberately swallow. The emit
171        // caller's `await_response` will time out — that's the
172        // documented "no hook installed" path.
173        let _ = self.inner.sender.send(event);
174        hook_event_id
175    }
176
177    /// Register a single-shot response slot for `hook_event_id` and
178    /// emit at the same time. Returns the id and a future that
179    /// resolves to the hook's reply (or times out).
180    ///
181    /// Use this from the capture/merge code paths when you both want
182    /// to fire the event and wait for the reply atomically.
183    pub fn emit_and_wait(
184        &self,
185        event_name: impl Into<String>,
186        payload_json: impl Into<String>,
187        timeout: Duration,
188    ) -> (String, EmitWaiter) {
189        let (sender, receiver) = oneshot::channel();
190        let event_name = event_name.into();
191        let payload_json = payload_json.into();
192        let hook_event_id = OperationId::new().to_string();
193        // Reserve the slot before we broadcast so the response can't
194        // race ahead of the registration.
195        {
196            let mut pending = self
197                .inner
198                .pending
199                .lock()
200                .expect("hook broker pending map poisoned");
201            pending.insert(hook_event_id.clone(), ResponseSlot { sender });
202        }
203        let now = std::time::SystemTime::now()
204            .duration_since(std::time::UNIX_EPOCH)
205            .unwrap_or_default();
206        let event = ProtoHookEvent {
207            hook_event_id: hook_event_id.clone(),
208            event_name,
209            payload_json,
210            emitted_at: Some(Timestamp {
211                seconds: now.as_secs() as i64,
212                nanos: now.subsec_nanos() as i32,
213            }),
214        };
215        let _ = self.inner.sender.send(event);
216        let waiter = EmitWaiter {
217            broker: self.clone(),
218            hook_event_id: hook_event_id.clone(),
219            receiver,
220            timeout,
221        };
222        (hook_event_id, waiter)
223    }
224
225    /// Await a reply for `hook_event_id` with a deadline. Returns
226    /// `None` when the deadline elapses before a hook responds (the
227    /// emit caller's "no hook acted on this in time" branch).
228    ///
229    /// The slot must have been reserved via [`Self::emit_and_wait`]
230    /// — calling this for a never-registered id resolves to `None`
231    /// immediately.
232    pub async fn await_response(
233        &self,
234        hook_event_id: &str,
235        timeout: Duration,
236    ) -> Option<HookResponse> {
237        // Reservation is the responsibility of `emit_and_wait`. If
238        // a caller wants to register, then emit, then wait separately
239        // they can use this directly — but they must have called
240        // `register_pending` first (kept private to avoid mis-use).
241        let receiver = {
242            let mut pending = self
243                .inner
244                .pending
245                .lock()
246                .expect("hook broker pending map poisoned");
247            pending.remove(hook_event_id).map(|slot| slot.sender)
248        };
249        // If no slot exists, fall back to creating one on the fly so
250        // callers that didn't use `emit_and_wait` still work. We
251        // re-insert and then take a fresh receiver.
252        let receiver = match receiver {
253            Some(_already_taken) => {
254                // The sender is consumed — there's no way to await on
255                // it here without rebuilding the slot. Fall through
256                // to the fresh-slot path.
257                let (sender, receiver) = oneshot::channel();
258                let mut pending = self
259                    .inner
260                    .pending
261                    .lock()
262                    .expect("hook broker pending map poisoned");
263                pending.insert(hook_event_id.to_string(), ResponseSlot { sender });
264                receiver
265            }
266            None => {
267                let (sender, receiver) = oneshot::channel();
268                let mut pending = self
269                    .inner
270                    .pending
271                    .lock()
272                    .expect("hook broker pending map poisoned");
273                pending.insert(hook_event_id.to_string(), ResponseSlot { sender });
274                receiver
275            }
276        };
277        match tokio::time::timeout(timeout, receiver).await {
278            Ok(Ok(response)) => Some(response),
279            Ok(Err(_canceled)) => None,
280            Err(_elapsed) => {
281                // Drop the slot so a late reply doesn't pile up
282                // memory. The `RespondToHook` handler will report
283                // `accepted=false` for late deliveries.
284                let mut pending = self
285                    .inner
286                    .pending
287                    .lock()
288                    .expect("hook broker pending map poisoned");
289                pending.remove(hook_event_id);
290                None
291            }
292        }
293    }
294
295    /// Deliver a hook reply to the in-flight emit waiting on
296    /// `hook_event_id`. Called by the `RespondToHook` handler.
297    /// Returns `true` when the reply was delivered (a waiter was
298    /// present); `false` when no waiter is registered (timed out, or
299    /// already replied).
300    pub fn deliver_response(&self, hook_event_id: &str, response: HookResponse) -> bool {
301        let slot = {
302            let mut pending = self
303                .inner
304                .pending
305                .lock()
306                .expect("hook broker pending map poisoned");
307            pending.remove(hook_event_id)
308        };
309        match slot {
310            Some(slot) => slot.sender.send(response).is_ok(),
311            None => false,
312        }
313    }
314
315    /// Number of subscribers currently attached. Useful for tests.
316    #[cfg(test)]
317    fn subscriber_count(&self) -> usize {
318        self.inner.sender.receiver_count()
319    }
320}
321
322/// Future returned by [`HookEventBroadcaster::emit_and_wait`]. Holds
323/// the receiver plus a hook back to the broker so it can clean up the
324/// pending slot if the future is dropped before the reply lands.
325pub struct EmitWaiter {
326    broker: HookEventBroadcaster,
327    hook_event_id: String,
328    receiver: oneshot::Receiver<HookResponse>,
329    timeout: Duration,
330}
331
332impl EmitWaiter {
333    /// Resolve the waiter, returning `Some` on a fresh reply and
334    /// `None` on timeout or hook crash. Drops the broker's pending
335    /// slot in either path.
336    pub async fn wait(self) -> Option<HookResponse> {
337        let EmitWaiter {
338            broker,
339            hook_event_id,
340            receiver,
341            timeout,
342        } = self;
343        match tokio::time::timeout(timeout, receiver).await {
344            Ok(Ok(response)) => Some(response),
345            Ok(Err(_canceled)) => {
346                broker
347                    .inner
348                    .pending
349                    .lock()
350                    .expect("hook broker pending map poisoned")
351                    .remove(&hook_event_id);
352                None
353            }
354            Err(_elapsed) => {
355                broker
356                    .inner
357                    .pending
358                    .lock()
359                    .expect("hook broker pending map poisoned")
360                    .remove(&hook_event_id);
361                None
362            }
363        }
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    #[tokio::test]
372    async fn emit_round_trips_to_subscriber() {
373        let broker = HookEventBroadcaster::new();
374        let mut sub = broker.subscribe();
375        // Yield once so the subscribe forwarding task can install the
376        // underlying broadcast receiver before the first emit.
377        tokio::task::yield_now().await;
378        let id = broker.emit("pre_capture", "{\"thread\":\"t1\"}");
379        let event = sub.recv().await.expect("event");
380        assert_eq!(event.hook_event_id, id);
381        assert_eq!(event.event_name, "pre_capture");
382        assert!(event.payload_json.contains("t1"));
383    }
384
385    #[tokio::test]
386    async fn await_response_returns_delivered_reply() {
387        let broker = HookEventBroadcaster::new();
388        let _sub = broker.subscribe();
389        tokio::task::yield_now().await;
390        let (id, waiter) = broker.emit_and_wait("pre_capture", "{}", Duration::from_secs(1));
391        let id_for_reply = id.clone();
392        let broker_clone = broker.clone();
393        tokio::spawn(async move {
394            tokio::time::sleep(Duration::from_millis(10)).await;
395            let _ = broker_clone.deliver_response(
396                &id_for_reply,
397                HookResponse {
398                    abort: "veto".into(),
399                    extra: serde_json::Value::Null,
400                },
401            );
402        });
403        let response = waiter.wait().await.expect("response");
404        assert_eq!(response.abort, "veto");
405    }
406
407    #[tokio::test]
408    async fn await_response_times_out_with_no_reply() {
409        let broker = HookEventBroadcaster::new();
410        let _sub = broker.subscribe();
411        let (_id, waiter) = broker.emit_and_wait("pre_capture", "{}", Duration::from_millis(20));
412        let response = waiter.wait().await;
413        assert!(response.is_none());
414    }
415
416    #[tokio::test]
417    async fn deliver_to_unknown_id_returns_false() {
418        let broker = HookEventBroadcaster::new();
419        let accepted = broker.deliver_response("no-such-id", HookResponse::default());
420        assert!(!accepted);
421    }
422
423    #[tokio::test]
424    async fn subscribers_are_independent() {
425        let broker = HookEventBroadcaster::new();
426        let mut a = broker.subscribe();
427        let mut b = broker.subscribe();
428        tokio::task::yield_now().await;
429        assert_eq!(broker.subscriber_count(), 2);
430        broker.emit("post_capture", "{}");
431        let event_a = a.recv().await.expect("a");
432        let event_b = b.recv().await.expect("b");
433        assert_eq!(event_a.hook_event_id, event_b.hook_event_id);
434    }
435}