Skip to main content

daemon/grpc_local_impl/
hook_events.rs

1// SPDX-License-Identifier: Apache-2.0
2//! In-process hook-event broker for the local-mode daemon.
3//!
4//! The hosted variant rides Postgres NOTIFY (see the `server` crate's
5//! `events` module). Local mode is single-process, single-user, no Postgres — so we
6//! model the same fan-out shape with a `tokio::sync::broadcast`
7//! channel for emit→subscribe and a per-correlator `oneshot` for
8//! response routing.
9//!
10//! Lifecycle of a single event:
11//!
12//! 1. The capture/merge code path (a future workstream consumer)
13//!    calls [`HookEventBroadcaster::emit`] with a JSON payload. The
14//!    broker mints a `hook_event_id`, registers a fresh response
15//!    slot, and broadcasts the event to every live subscriber.
16//! 2. Each subscriber (a `SubscribeHookEvents` server-stream) picks
17//!    up the event from its `mpsc::Receiver` and forwards it to the
18//!    hook process.
19//! 3. The hook reads the event, computes its reply, and the local
20//!    daemon delivers it via `RespondToHook`. The handler routes the
21//!    reply through [`HookEventBroadcaster::deliver_response`].
22//! 4. The original emit caller awaits the reply via
23//!    [`HookEventBroadcaster::await_response`], with a timeout so a
24//!    crashed hook can't wedge the operation.
25//!
26//! Out-of-scope here:
27//!   - Multiple hooks racing to reply: the first reply wins; the
28//!     second is reported as `accepted=false` to its caller. The
29//!     wire shape doesn't try to fan replies in.
30//!   - Persisting in-flight events across daemon restart: the local
31//!     daemon is meant to be the same lifetime as the agent loop,
32//!     so a crash drops every in-flight reply. Hooks see the stream
33//!     close and the emit caller times out.
34
35use std::{
36    collections::HashMap,
37    sync::{Arc, Mutex},
38    time::Duration,
39};
40
41use objects::object::OperationId;
42use prost_types::Timestamp;
43use serde::{Deserialize, Serialize};
44use tokio::sync::{broadcast, mpsc, oneshot};
45
46use grpc::heddle::v1::HookEvent as ProtoHookEvent;
47
48/// Channel capacity for the in-process broadcast. Each subscriber gets
49/// its own queue; if a subscriber lags more than this many events
50/// behind, the oldest events are dropped (the subscriber sees a
51/// `Lagged` error in its `recv` and we close the stream so the hook
52/// can re-subscribe).
53const BROADCAST_CAPACITY: usize = 256;
54
55/// Typed hook response decoded from `RespondToHook`. The universal
56/// veto channel is `abort`; per-event extension fields ride on `extra`
57/// so per-event handlers can pull `extra_signals`, `veto`, etc.
58/// without the universal type having to know every shape.
59///
60/// This type's home will move to `crates/repo/src/hooks.rs` so the
61/// CLI hook runner can decode the same shape from stdout. Until that
62/// lands, the broker carries its own definition; the wire format on
63/// `RespondToHookRequest` decodes into this type and the emit-side
64/// awaits it. Field names match the spec verbatim so the eventual
65/// move to `repo::hooks` is a one-line `pub use`.
66#[derive(Debug, Clone, Default, Serialize, Deserialize)]
67pub struct HookResponse {
68    #[serde(default)]
69    pub abort: String,
70    #[serde(flatten, default)]
71    pub extra: serde_json::Value,
72}
73
74/// Per-event metadata routed from `emit` to `await_response`. The
75/// channel is single-shot — once a reply lands, the slot is removed.
76struct ResponseSlot {
77    sender: oneshot::Sender<HookResponse>,
78}
79
80/// In-process pub/sub broker for hook events. Lives on
81/// [`GrpcLocalService`](super::GrpcLocalService) so every handler
82/// shares the same broker and a `subscribe_hook_events` stream and a
83/// `respond_to_hook` reply meet on the same correlator.
84#[derive(Clone)]
85pub struct HookEventBroadcaster {
86    inner: Arc<HookEventBroadcasterInner>,
87}
88
89struct HookEventBroadcasterInner {
90    /// Broadcast sender. Fan-out shape so every subscriber gets its
91    /// own backpressure rather than blocking the emitter.
92    sender: broadcast::Sender<ProtoHookEvent>,
93    /// Pending response slots keyed by `hook_event_id`. Mutex is
94    /// fine here — every operation is short and the contention is low
95    /// (one entry per in-flight emit).
96    pending: Mutex<HashMap<String, ResponseSlot>>,
97}
98
99impl Default for HookEventBroadcaster {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105impl HookEventBroadcaster {
106    pub fn new() -> Self {
107        let (sender, _) = broadcast::channel(BROADCAST_CAPACITY);
108        Self {
109            inner: Arc::new(HookEventBroadcasterInner {
110                sender,
111                pending: Mutex::new(HashMap::new()),
112            }),
113        }
114    }
115
116    /// Subscribe a fresh stream. Each call returns its own
117    /// [`mpsc::Receiver`] backed by a forwarding task that drains the
118    /// underlying `broadcast::Receiver`. The mpsc shape lets us close
119    /// the stream cleanly when the subscriber drops, and lets the
120    /// `Lagged` error close the stream rather than panicking.
121    pub fn subscribe(&self) -> mpsc::Receiver<ProtoHookEvent> {
122        let mut rx = self.inner.sender.subscribe();
123        // Buffer one event ahead of the consumer; broadcast handles
124        // the actual fan-out backlog so the mpsc only needs a small
125        // shock-absorber capacity.
126        let (tx, out_rx) = mpsc::channel(16);
127        tokio::spawn(async move {
128            loop {
129                match rx.recv().await {
130                    Ok(event) => {
131                        if tx.send(event).await.is_err() {
132                            break;
133                        }
134                    }
135                    Err(broadcast::error::RecvError::Lagged(_)) => {
136                        // Subscriber fell behind. Drop the stream so
137                        // the hook reconnects rather than silently
138                        // missing events — the alternative (skip and
139                        // continue) makes silent veto loss possible.
140                        break;
141                    }
142                    Err(broadcast::error::RecvError::Closed) => break,
143                }
144            }
145        });
146        out_rx
147    }
148
149    /// Emit a fresh hook event. Returns the `hook_event_id` the
150    /// caller should pass to [`Self::await_response`]. The id is a
151    /// stringified UUIDv4 so it survives JSON round-trips intact.
152    ///
153    /// `payload_json` is delivered verbatim — schema validation lives
154    /// in the catalog (see `GetHookEventSchema`) and is the caller's
155    /// responsibility for now.
156    pub fn emit(&self, event_name: impl Into<String>, payload_json: impl Into<String>) -> String {
157        let hook_event_id = OperationId::new().to_string();
158        let now = std::time::SystemTime::now()
159            .duration_since(std::time::UNIX_EPOCH)
160            .unwrap_or_default();
161        let event = ProtoHookEvent {
162            hook_event_id: hook_event_id.clone(),
163            event_name: event_name.into(),
164            payload_json: payload_json.into(),
165            emitted_at: Some(Timestamp {
166                seconds: now.as_secs() as i64,
167                nanos: now.subsec_nanos() as i32,
168            }),
169        };
170        // Best-effort send: if there are no subscribers the broadcast
171        // returns an error which we deliberately swallow. The emit
172        // caller's `await_response` will time out — that's the
173        // documented "no hook installed" path.
174        let _ = self.inner.sender.send(event);
175        hook_event_id
176    }
177
178    /// Register a single-shot response slot for `hook_event_id` and
179    /// emit at the same time. Returns the id and a future that
180    /// resolves to the hook's reply (or times out).
181    ///
182    /// Use this from the capture/merge code paths when you both want
183    /// to fire the event and wait for the reply atomically.
184    pub fn emit_and_wait(
185        &self,
186        event_name: impl Into<String>,
187        payload_json: impl Into<String>,
188        timeout: Duration,
189    ) -> (String, EmitWaiter) {
190        let (sender, receiver) = oneshot::channel();
191        let event_name = event_name.into();
192        let payload_json = payload_json.into();
193        let hook_event_id = OperationId::new().to_string();
194        // Reserve the slot before we broadcast so the response can't
195        // race ahead of the registration.
196        {
197            let mut pending = self
198                .inner
199                .pending
200                .lock()
201                .expect("hook broker pending map poisoned");
202            pending.insert(hook_event_id.clone(), ResponseSlot { sender });
203        }
204        let now = std::time::SystemTime::now()
205            .duration_since(std::time::UNIX_EPOCH)
206            .unwrap_or_default();
207        let event = ProtoHookEvent {
208            hook_event_id: hook_event_id.clone(),
209            event_name,
210            payload_json,
211            emitted_at: Some(Timestamp {
212                seconds: now.as_secs() as i64,
213                nanos: now.subsec_nanos() as i32,
214            }),
215        };
216        let _ = self.inner.sender.send(event);
217        let waiter = EmitWaiter {
218            broker: self.clone(),
219            hook_event_id: hook_event_id.clone(),
220            receiver,
221            timeout,
222        };
223        (hook_event_id, waiter)
224    }
225
226    /// Await a reply for `hook_event_id` with a deadline. Returns
227    /// `None` when the deadline elapses before a hook responds (the
228    /// emit caller's "no hook acted on this in time" branch).
229    ///
230    /// The slot must have been reserved via [`Self::emit_and_wait`]
231    /// — calling this for a never-registered id resolves to `None`
232    /// immediately.
233    pub async fn await_response(
234        &self,
235        hook_event_id: &str,
236        timeout: Duration,
237    ) -> Option<HookResponse> {
238        // Reservation is the responsibility of `emit_and_wait`. If
239        // a caller wants to register, then emit, then wait separately
240        // they can use this directly — but they must have called
241        // `register_pending` first (kept private to avoid mis-use).
242        let receiver = {
243            let mut pending = self
244                .inner
245                .pending
246                .lock()
247                .expect("hook broker pending map poisoned");
248            pending.remove(hook_event_id).map(|slot| slot.sender)
249        };
250        // If no slot exists, fall back to creating one on the fly so
251        // callers that didn't use `emit_and_wait` still work. We
252        // re-insert and then take a fresh receiver.
253        let receiver = match receiver {
254            Some(_already_taken) => {
255                // The sender is consumed — there's no way to await on
256                // it here without rebuilding the slot. Fall through
257                // to the fresh-slot path.
258                let (sender, receiver) = oneshot::channel();
259                let mut pending = self
260                    .inner
261                    .pending
262                    .lock()
263                    .expect("hook broker pending map poisoned");
264                pending.insert(hook_event_id.to_string(), ResponseSlot { sender });
265                receiver
266            }
267            None => {
268                let (sender, receiver) = oneshot::channel();
269                let mut pending = self
270                    .inner
271                    .pending
272                    .lock()
273                    .expect("hook broker pending map poisoned");
274                pending.insert(hook_event_id.to_string(), ResponseSlot { sender });
275                receiver
276            }
277        };
278        match tokio::time::timeout(timeout, receiver).await {
279            Ok(Ok(response)) => Some(response),
280            Ok(Err(_canceled)) => None,
281            Err(_elapsed) => {
282                // Drop the slot so a late reply doesn't pile up
283                // memory. The `RespondToHook` handler will report
284                // `accepted=false` for late deliveries.
285                let mut pending = self
286                    .inner
287                    .pending
288                    .lock()
289                    .expect("hook broker pending map poisoned");
290                pending.remove(hook_event_id);
291                None
292            }
293        }
294    }
295
296    /// Deliver a hook reply to the in-flight emit waiting on
297    /// `hook_event_id`. Called by the `RespondToHook` handler.
298    /// Returns `true` when the reply was delivered (a waiter was
299    /// present); `false` when no waiter is registered (timed out, or
300    /// already replied).
301    pub fn deliver_response(&self, hook_event_id: &str, response: HookResponse) -> bool {
302        let slot = {
303            let mut pending = self
304                .inner
305                .pending
306                .lock()
307                .expect("hook broker pending map poisoned");
308            pending.remove(hook_event_id)
309        };
310        match slot {
311            Some(slot) => slot.sender.send(response).is_ok(),
312            None => false,
313        }
314    }
315
316    /// Number of subscribers currently attached. Useful for tests.
317    #[cfg(test)]
318    fn subscriber_count(&self) -> usize {
319        self.inner.sender.receiver_count()
320    }
321}
322
323/// Future returned by [`HookEventBroadcaster::emit_and_wait`]. Holds
324/// the receiver plus a hook back to the broker so it can clean up the
325/// pending slot if the future is dropped before the reply lands.
326pub struct EmitWaiter {
327    broker: HookEventBroadcaster,
328    hook_event_id: String,
329    receiver: oneshot::Receiver<HookResponse>,
330    timeout: Duration,
331}
332
333impl EmitWaiter {
334    /// Resolve the waiter, returning `Some` on a fresh reply and
335    /// `None` on timeout or hook crash. Drops the broker's pending
336    /// slot in either path.
337    pub async fn wait(self) -> Option<HookResponse> {
338        let EmitWaiter {
339            broker,
340            hook_event_id,
341            receiver,
342            timeout,
343        } = self;
344        match tokio::time::timeout(timeout, receiver).await {
345            Ok(Ok(response)) => Some(response),
346            Ok(Err(_canceled)) => {
347                broker
348                    .inner
349                    .pending
350                    .lock()
351                    .expect("hook broker pending map poisoned")
352                    .remove(&hook_event_id);
353                None
354            }
355            Err(_elapsed) => {
356                broker
357                    .inner
358                    .pending
359                    .lock()
360                    .expect("hook broker pending map poisoned")
361                    .remove(&hook_event_id);
362                None
363            }
364        }
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    #[tokio::test]
373    async fn emit_round_trips_to_subscriber() {
374        let broker = HookEventBroadcaster::new();
375        let mut sub = broker.subscribe();
376        // Yield once so the subscribe forwarding task can install the
377        // underlying broadcast receiver before the first emit.
378        tokio::task::yield_now().await;
379        let id = broker.emit("pre_capture", "{\"thread\":\"t1\"}");
380        let event = sub.recv().await.expect("event");
381        assert_eq!(event.hook_event_id, id);
382        assert_eq!(event.event_name, "pre_capture");
383        assert!(event.payload_json.contains("t1"));
384    }
385
386    #[tokio::test]
387    async fn await_response_returns_delivered_reply() {
388        let broker = HookEventBroadcaster::new();
389        let _sub = broker.subscribe();
390        tokio::task::yield_now().await;
391        let (id, waiter) = broker.emit_and_wait("pre_capture", "{}", Duration::from_secs(1));
392        let id_for_reply = id.clone();
393        let broker_clone = broker.clone();
394        tokio::spawn(async move {
395            tokio::time::sleep(Duration::from_millis(10)).await;
396            let _ = broker_clone.deliver_response(
397                &id_for_reply,
398                HookResponse {
399                    abort: "veto".into(),
400                    extra: serde_json::Value::Null,
401                },
402            );
403        });
404        let response = waiter.wait().await.expect("response");
405        assert_eq!(response.abort, "veto");
406    }
407
408    #[tokio::test]
409    async fn await_response_times_out_with_no_reply() {
410        let broker = HookEventBroadcaster::new();
411        let _sub = broker.subscribe();
412        let (_id, waiter) = broker.emit_and_wait("pre_capture", "{}", Duration::from_millis(20));
413        let response = waiter.wait().await;
414        assert!(response.is_none());
415    }
416
417    #[tokio::test]
418    async fn deliver_to_unknown_id_returns_false() {
419        let broker = HookEventBroadcaster::new();
420        let accepted = broker.deliver_response("no-such-id", HookResponse::default());
421        assert!(!accepted);
422    }
423
424    #[tokio::test]
425    async fn subscribers_are_independent() {
426        let broker = HookEventBroadcaster::new();
427        let mut a = broker.subscribe();
428        let mut b = broker.subscribe();
429        tokio::task::yield_now().await;
430        assert_eq!(broker.subscriber_count(), 2);
431        broker.emit("post_capture", "{}");
432        let event_a = a.recv().await.expect("a");
433        let event_b = b.recv().await.expect("b");
434        assert_eq!(event_a.hook_event_id, event_b.hook_event_id);
435    }
436}