daemon/grpc_local_impl/hook_events.rs
1// SPDX-License-Identifier: Apache-2.0
2//! In-process hook-event broker for the local-mode daemon.
3//!
4//! The hosted variant rides Postgres NOTIFY (see the `server` crate's
5//! `events` module). Local mode is single-process, single-user, no Postgres — so we
6//! model the same fan-out shape with a `tokio::sync::broadcast`
7//! channel for emit→subscribe and a per-correlator `oneshot` for
8//! response routing.
9//!
10//! Lifecycle of a single event:
11//!
12//! 1. The capture/merge code path (a future workstream consumer)
13//! calls [`HookEventBroadcaster::emit`] with a JSON payload. The
14//! broker mints a `hook_event_id`, registers a fresh response
15//! slot, and broadcasts the event to every live subscriber.
16//! 2. Each subscriber (a `SubscribeHookEvents` server-stream) picks
17//! up the event from its `mpsc::Receiver` and forwards it to the
18//! hook process.
19//! 3. The hook reads the event, computes its reply, and the local
20//! daemon delivers it via `RespondToHook`. The handler routes the
21//! reply through [`HookEventBroadcaster::deliver_response`].
22//! 4. The original emit caller awaits the reply via
23//! [`HookEventBroadcaster::await_response`], with a timeout so a
24//! crashed hook can't wedge the operation.
25//!
26//! Out-of-scope here:
27//! - Multiple hooks racing to reply: the first reply wins; the
28//! second is reported as `accepted=false` to its caller. The
29//! wire shape doesn't try to fan replies in.
30//! - Persisting in-flight events across daemon restart: the local
31//! daemon is meant to be the same lifetime as the agent loop,
32//! so a crash drops every in-flight reply. Hooks see the stream
33//! close and the emit caller times out.
34
35use std::{
36 collections::HashMap,
37 sync::{Arc, Mutex},
38 time::Duration,
39};
40
41use grpc::heddle::v1::HookEvent as ProtoHookEvent;
42use objects::object::OperationId;
43use prost_types::Timestamp;
44use serde::{Deserialize, Serialize};
45use tokio::sync::{broadcast, mpsc, oneshot};
46
47/// Channel capacity for the in-process broadcast. Each subscriber gets
48/// its own queue; if a subscriber lags more than this many events
49/// behind, the oldest events are dropped (the subscriber sees a
50/// `Lagged` error in its `recv` and we close the stream so the hook
51/// can re-subscribe).
52const BROADCAST_CAPACITY: usize = 256;
53
54/// Typed hook response decoded from `RespondToHook`. The universal
55/// veto channel is `abort`; per-event extension fields ride on `extra`
56/// so per-event handlers can pull `extra_signals`, `veto`, etc.
57/// without the universal type having to know every shape.
58///
59/// This type's home will move to `crates/repo/src/hooks.rs` so the
60/// CLI hook runner can decode the same shape from stdout. Until that
61/// lands, the broker carries its own definition; the wire format on
62/// `RespondToHookRequest` decodes into this type and the emit-side
63/// awaits it. Field names match the spec verbatim so the eventual
64/// move to `repo::hooks` is a one-line `pub use`.
65#[derive(Debug, Clone, Default, Serialize, Deserialize)]
66pub struct HookResponse {
67 #[serde(default)]
68 pub abort: String,
69 #[serde(flatten, default)]
70 pub extra: serde_json::Value,
71}
72
73/// Per-event metadata routed from `emit` to `await_response`. The
74/// channel is single-shot — once a reply lands, the slot is removed.
75struct ResponseSlot {
76 sender: oneshot::Sender<HookResponse>,
77}
78
79/// In-process pub/sub broker for hook events. Lives on
80/// [`GrpcLocalService`](super::GrpcLocalService) so every handler
81/// shares the same broker and a `subscribe_hook_events` stream and a
82/// `respond_to_hook` reply meet on the same correlator.
83#[derive(Clone)]
84pub struct HookEventBroadcaster {
85 inner: Arc<HookEventBroadcasterInner>,
86}
87
88struct HookEventBroadcasterInner {
89 /// Broadcast sender. Fan-out shape so every subscriber gets its
90 /// own backpressure rather than blocking the emitter.
91 sender: broadcast::Sender<ProtoHookEvent>,
92 /// Pending response slots keyed by `hook_event_id`. Mutex is
93 /// fine here — every operation is short and the contention is low
94 /// (one entry per in-flight emit).
95 pending: Mutex<HashMap<String, ResponseSlot>>,
96}
97
98impl Default for HookEventBroadcaster {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104impl HookEventBroadcaster {
105 pub fn new() -> Self {
106 let (sender, _) = broadcast::channel(BROADCAST_CAPACITY);
107 Self {
108 inner: Arc::new(HookEventBroadcasterInner {
109 sender,
110 pending: Mutex::new(HashMap::new()),
111 }),
112 }
113 }
114
115 /// Subscribe a fresh stream. Each call returns its own
116 /// [`mpsc::Receiver`] backed by a forwarding task that drains the
117 /// underlying `broadcast::Receiver`. The mpsc shape lets us close
118 /// the stream cleanly when the subscriber drops, and lets the
119 /// `Lagged` error close the stream rather than panicking.
120 pub fn subscribe(&self) -> mpsc::Receiver<ProtoHookEvent> {
121 let mut rx = self.inner.sender.subscribe();
122 // Buffer one event ahead of the consumer; broadcast handles
123 // the actual fan-out backlog so the mpsc only needs a small
124 // shock-absorber capacity.
125 let (tx, out_rx) = mpsc::channel(16);
126 tokio::spawn(async move {
127 loop {
128 match rx.recv().await {
129 Ok(event) => {
130 if tx.send(event).await.is_err() {
131 break;
132 }
133 }
134 Err(broadcast::error::RecvError::Lagged(_)) => {
135 // Subscriber fell behind. Drop the stream so
136 // the hook reconnects rather than silently
137 // missing events — the alternative (skip and
138 // continue) makes silent veto loss possible.
139 break;
140 }
141 Err(broadcast::error::RecvError::Closed) => break,
142 }
143 }
144 });
145 out_rx
146 }
147
148 /// Emit a fresh hook event. Returns the `hook_event_id` the
149 /// caller should pass to [`Self::await_response`]. The id is a
150 /// stringified UUIDv4 so it survives JSON round-trips intact.
151 ///
152 /// `payload_json` is delivered verbatim — schema validation lives
153 /// in the catalog (see `GetHookEventSchema`) and is the caller's
154 /// responsibility for now.
155 pub fn emit(&self, event_name: impl Into<String>, payload_json: impl Into<String>) -> String {
156 let hook_event_id = OperationId::new().to_string();
157 let now = std::time::SystemTime::now()
158 .duration_since(std::time::UNIX_EPOCH)
159 .unwrap_or_default();
160 let event = ProtoHookEvent {
161 hook_event_id: hook_event_id.clone(),
162 event_name: event_name.into(),
163 payload_json: payload_json.into(),
164 emitted_at: Some(Timestamp {
165 seconds: now.as_secs() as i64,
166 nanos: now.subsec_nanos() as i32,
167 }),
168 };
169 // Best-effort send: if there are no subscribers the broadcast
170 // returns an error which we deliberately swallow. The emit
171 // caller's `await_response` will time out — that's the
172 // documented "no hook installed" path.
173 let _ = self.inner.sender.send(event);
174 hook_event_id
175 }
176
177 /// Register a single-shot response slot for `hook_event_id` and
178 /// emit at the same time. Returns the id and a future that
179 /// resolves to the hook's reply (or times out).
180 ///
181 /// Use this from the capture/merge code paths when you both want
182 /// to fire the event and wait for the reply atomically.
183 pub fn emit_and_wait(
184 &self,
185 event_name: impl Into<String>,
186 payload_json: impl Into<String>,
187 timeout: Duration,
188 ) -> (String, EmitWaiter) {
189 let (sender, receiver) = oneshot::channel();
190 let event_name = event_name.into();
191 let payload_json = payload_json.into();
192 let hook_event_id = OperationId::new().to_string();
193 // Reserve the slot before we broadcast so the response can't
194 // race ahead of the registration.
195 {
196 let mut pending = self
197 .inner
198 .pending
199 .lock()
200 .expect("hook broker pending map poisoned");
201 pending.insert(hook_event_id.clone(), ResponseSlot { sender });
202 }
203 let now = std::time::SystemTime::now()
204 .duration_since(std::time::UNIX_EPOCH)
205 .unwrap_or_default();
206 let event = ProtoHookEvent {
207 hook_event_id: hook_event_id.clone(),
208 event_name,
209 payload_json,
210 emitted_at: Some(Timestamp {
211 seconds: now.as_secs() as i64,
212 nanos: now.subsec_nanos() as i32,
213 }),
214 };
215 let _ = self.inner.sender.send(event);
216 let waiter = EmitWaiter {
217 broker: self.clone(),
218 hook_event_id: hook_event_id.clone(),
219 receiver,
220 timeout,
221 };
222 (hook_event_id, waiter)
223 }
224
225 /// Await a reply for `hook_event_id` with a deadline. Returns
226 /// `None` when the deadline elapses before a hook responds (the
227 /// emit caller's "no hook acted on this in time" branch).
228 ///
229 /// The slot must have been reserved via [`Self::emit_and_wait`]
230 /// — calling this for a never-registered id resolves to `None`
231 /// immediately.
232 pub async fn await_response(
233 &self,
234 hook_event_id: &str,
235 timeout: Duration,
236 ) -> Option<HookResponse> {
237 // Reservation is the responsibility of `emit_and_wait`. If
238 // a caller wants to register, then emit, then wait separately
239 // they can use this directly — but they must have called
240 // `register_pending` first (kept private to avoid mis-use).
241 let receiver = {
242 let mut pending = self
243 .inner
244 .pending
245 .lock()
246 .expect("hook broker pending map poisoned");
247 pending.remove(hook_event_id).map(|slot| slot.sender)
248 };
249 // If no slot exists, fall back to creating one on the fly so
250 // callers that didn't use `emit_and_wait` still work. We
251 // re-insert and then take a fresh receiver.
252 let receiver = match receiver {
253 Some(_already_taken) => {
254 // The sender is consumed — there's no way to await on
255 // it here without rebuilding the slot. Fall through
256 // to the fresh-slot path.
257 let (sender, receiver) = oneshot::channel();
258 let mut pending = self
259 .inner
260 .pending
261 .lock()
262 .expect("hook broker pending map poisoned");
263 pending.insert(hook_event_id.to_string(), ResponseSlot { sender });
264 receiver
265 }
266 None => {
267 let (sender, receiver) = oneshot::channel();
268 let mut pending = self
269 .inner
270 .pending
271 .lock()
272 .expect("hook broker pending map poisoned");
273 pending.insert(hook_event_id.to_string(), ResponseSlot { sender });
274 receiver
275 }
276 };
277 match tokio::time::timeout(timeout, receiver).await {
278 Ok(Ok(response)) => Some(response),
279 Ok(Err(_canceled)) => None,
280 Err(_elapsed) => {
281 // Drop the slot so a late reply doesn't pile up
282 // memory. The `RespondToHook` handler will report
283 // `accepted=false` for late deliveries.
284 let mut pending = self
285 .inner
286 .pending
287 .lock()
288 .expect("hook broker pending map poisoned");
289 pending.remove(hook_event_id);
290 None
291 }
292 }
293 }
294
295 /// Deliver a hook reply to the in-flight emit waiting on
296 /// `hook_event_id`. Called by the `RespondToHook` handler.
297 /// Returns `true` when the reply was delivered (a waiter was
298 /// present); `false` when no waiter is registered (timed out, or
299 /// already replied).
300 pub fn deliver_response(&self, hook_event_id: &str, response: HookResponse) -> bool {
301 let slot = {
302 let mut pending = self
303 .inner
304 .pending
305 .lock()
306 .expect("hook broker pending map poisoned");
307 pending.remove(hook_event_id)
308 };
309 match slot {
310 Some(slot) => slot.sender.send(response).is_ok(),
311 None => false,
312 }
313 }
314
315 /// Number of subscribers currently attached. Useful for tests.
316 #[cfg(test)]
317 fn subscriber_count(&self) -> usize {
318 self.inner.sender.receiver_count()
319 }
320}
321
322/// Future returned by [`HookEventBroadcaster::emit_and_wait`]. Holds
323/// the receiver plus a hook back to the broker so it can clean up the
324/// pending slot if the future is dropped before the reply lands.
325pub struct EmitWaiter {
326 broker: HookEventBroadcaster,
327 hook_event_id: String,
328 receiver: oneshot::Receiver<HookResponse>,
329 timeout: Duration,
330}
331
332impl EmitWaiter {
333 /// Resolve the waiter, returning `Some` on a fresh reply and
334 /// `None` on timeout or hook crash. Drops the broker's pending
335 /// slot in either path.
336 pub async fn wait(self) -> Option<HookResponse> {
337 let EmitWaiter {
338 broker,
339 hook_event_id,
340 receiver,
341 timeout,
342 } = self;
343 match tokio::time::timeout(timeout, receiver).await {
344 Ok(Ok(response)) => Some(response),
345 Ok(Err(_canceled)) => {
346 broker
347 .inner
348 .pending
349 .lock()
350 .expect("hook broker pending map poisoned")
351 .remove(&hook_event_id);
352 None
353 }
354 Err(_elapsed) => {
355 broker
356 .inner
357 .pending
358 .lock()
359 .expect("hook broker pending map poisoned")
360 .remove(&hook_event_id);
361 None
362 }
363 }
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 #[tokio::test]
372 async fn emit_round_trips_to_subscriber() {
373 let broker = HookEventBroadcaster::new();
374 let mut sub = broker.subscribe();
375 // Yield once so the subscribe forwarding task can install the
376 // underlying broadcast receiver before the first emit.
377 tokio::task::yield_now().await;
378 let id = broker.emit("pre_capture", "{\"thread\":\"t1\"}");
379 let event = sub.recv().await.expect("event");
380 assert_eq!(event.hook_event_id, id);
381 assert_eq!(event.event_name, "pre_capture");
382 assert!(event.payload_json.contains("t1"));
383 }
384
385 #[tokio::test]
386 async fn await_response_returns_delivered_reply() {
387 let broker = HookEventBroadcaster::new();
388 let _sub = broker.subscribe();
389 tokio::task::yield_now().await;
390 let (id, waiter) = broker.emit_and_wait("pre_capture", "{}", Duration::from_secs(1));
391 let id_for_reply = id.clone();
392 let broker_clone = broker.clone();
393 tokio::spawn(async move {
394 tokio::time::sleep(Duration::from_millis(10)).await;
395 let _ = broker_clone.deliver_response(
396 &id_for_reply,
397 HookResponse {
398 abort: "veto".into(),
399 extra: serde_json::Value::Null,
400 },
401 );
402 });
403 let response = waiter.wait().await.expect("response");
404 assert_eq!(response.abort, "veto");
405 }
406
407 #[tokio::test]
408 async fn await_response_times_out_with_no_reply() {
409 let broker = HookEventBroadcaster::new();
410 let _sub = broker.subscribe();
411 let (_id, waiter) = broker.emit_and_wait("pre_capture", "{}", Duration::from_millis(20));
412 let response = waiter.wait().await;
413 assert!(response.is_none());
414 }
415
416 #[tokio::test]
417 async fn deliver_to_unknown_id_returns_false() {
418 let broker = HookEventBroadcaster::new();
419 let accepted = broker.deliver_response("no-such-id", HookResponse::default());
420 assert!(!accepted);
421 }
422
423 #[tokio::test]
424 async fn subscribers_are_independent() {
425 let broker = HookEventBroadcaster::new();
426 let mut a = broker.subscribe();
427 let mut b = broker.subscribe();
428 tokio::task::yield_now().await;
429 assert_eq!(broker.subscriber_count(), 2);
430 broker.emit("post_capture", "{}");
431 let event_a = a.recv().await.expect("a");
432 let event_b = b.recv().await.expect("b");
433 assert_eq!(event_a.hook_event_id, event_b.hook_event_id);
434 }
435}