daemon/grpc_local_impl/hook_events.rs
1// SPDX-License-Identifier: Apache-2.0
2//! In-process hook-event broker for the local-mode daemon.
3//!
4//! The hosted variant rides Postgres NOTIFY (see the `server` crate's
5//! `events` module). Local mode is single-process, single-user, no Postgres — so we
6//! model the same fan-out shape with a `tokio::sync::broadcast`
7//! channel for emit→subscribe and a per-correlator `oneshot` for
8//! response routing.
9//!
10//! Lifecycle of a single event:
11//!
12//! 1. The capture/merge code path (a future workstream consumer)
13//! calls [`HookEventBroadcaster::emit`] with a JSON payload. The
14//! broker mints a `hook_event_id`, registers a fresh response
15//! slot, and broadcasts the event to every live subscriber.
16//! 2. Each subscriber (a `SubscribeHookEvents` server-stream) picks
17//! up the event from its `mpsc::Receiver` and forwards it to the
18//! hook process.
19//! 3. The hook reads the event, computes its reply, and the local
20//! daemon delivers it via `RespondToHook`. The handler routes the
21//! reply through [`HookEventBroadcaster::deliver_response`].
22//! 4. The original emit caller awaits the reply via
23//! [`HookEventBroadcaster::await_response`], with a timeout so a
24//! crashed hook can't wedge the operation.
25//!
26//! Out-of-scope here:
27//! - Multiple hooks racing to reply: the first reply wins; the
28//! second is reported as `accepted=false` to its caller. The
29//! wire shape doesn't try to fan replies in.
30//! - Persisting in-flight events across daemon restart: the local
31//! daemon is meant to be the same lifetime as the agent loop,
32//! so a crash drops every in-flight reply. Hooks see the stream
33//! close and the emit caller times out.
34
35use std::{
36 collections::HashMap,
37 sync::{Arc, Mutex},
38 time::Duration,
39};
40
41use objects::object::OperationId;
42use prost_types::Timestamp;
43use serde::{Deserialize, Serialize};
44use tokio::sync::{broadcast, mpsc, oneshot};
45
46use grpc::heddle::v1::HookEvent as ProtoHookEvent;
47
48/// Channel capacity for the in-process broadcast. Each subscriber gets
49/// its own queue; if a subscriber lags more than this many events
50/// behind, the oldest events are dropped (the subscriber sees a
51/// `Lagged` error in its `recv` and we close the stream so the hook
52/// can re-subscribe).
53const BROADCAST_CAPACITY: usize = 256;
54
55/// Typed hook response decoded from `RespondToHook`. The universal
56/// veto channel is `abort`; per-event extension fields ride on `extra`
57/// so per-event handlers can pull `extra_signals`, `veto`, etc.
58/// without the universal type having to know every shape.
59///
60/// This type's home will move to `crates/repo/src/hooks.rs` so the
61/// CLI hook runner can decode the same shape from stdout. Until that
62/// lands, the broker carries its own definition; the wire format on
63/// `RespondToHookRequest` decodes into this type and the emit-side
64/// awaits it. Field names match the spec verbatim so the eventual
65/// move to `repo::hooks` is a one-line `pub use`.
66#[derive(Debug, Clone, Default, Serialize, Deserialize)]
67pub struct HookResponse {
68 #[serde(default)]
69 pub abort: String,
70 #[serde(flatten, default)]
71 pub extra: serde_json::Value,
72}
73
74/// Per-event metadata routed from `emit` to `await_response`. The
75/// channel is single-shot — once a reply lands, the slot is removed.
76struct ResponseSlot {
77 sender: oneshot::Sender<HookResponse>,
78}
79
80/// In-process pub/sub broker for hook events. Lives on
81/// [`GrpcLocalService`](super::GrpcLocalService) so every handler
82/// shares the same broker and a `subscribe_hook_events` stream and a
83/// `respond_to_hook` reply meet on the same correlator.
84#[derive(Clone)]
85pub struct HookEventBroadcaster {
86 inner: Arc<HookEventBroadcasterInner>,
87}
88
89struct HookEventBroadcasterInner {
90 /// Broadcast sender. Fan-out shape so every subscriber gets its
91 /// own backpressure rather than blocking the emitter.
92 sender: broadcast::Sender<ProtoHookEvent>,
93 /// Pending response slots keyed by `hook_event_id`. Mutex is
94 /// fine here — every operation is short and the contention is low
95 /// (one entry per in-flight emit).
96 pending: Mutex<HashMap<String, ResponseSlot>>,
97}
98
99impl Default for HookEventBroadcaster {
100 fn default() -> Self {
101 Self::new()
102 }
103}
104
105impl HookEventBroadcaster {
106 pub fn new() -> Self {
107 let (sender, _) = broadcast::channel(BROADCAST_CAPACITY);
108 Self {
109 inner: Arc::new(HookEventBroadcasterInner {
110 sender,
111 pending: Mutex::new(HashMap::new()),
112 }),
113 }
114 }
115
116 /// Subscribe a fresh stream. Each call returns its own
117 /// [`mpsc::Receiver`] backed by a forwarding task that drains the
118 /// underlying `broadcast::Receiver`. The mpsc shape lets us close
119 /// the stream cleanly when the subscriber drops, and lets the
120 /// `Lagged` error close the stream rather than panicking.
121 pub fn subscribe(&self) -> mpsc::Receiver<ProtoHookEvent> {
122 let mut rx = self.inner.sender.subscribe();
123 // Buffer one event ahead of the consumer; broadcast handles
124 // the actual fan-out backlog so the mpsc only needs a small
125 // shock-absorber capacity.
126 let (tx, out_rx) = mpsc::channel(16);
127 tokio::spawn(async move {
128 loop {
129 match rx.recv().await {
130 Ok(event) => {
131 if tx.send(event).await.is_err() {
132 break;
133 }
134 }
135 Err(broadcast::error::RecvError::Lagged(_)) => {
136 // Subscriber fell behind. Drop the stream so
137 // the hook reconnects rather than silently
138 // missing events — the alternative (skip and
139 // continue) makes silent veto loss possible.
140 break;
141 }
142 Err(broadcast::error::RecvError::Closed) => break,
143 }
144 }
145 });
146 out_rx
147 }
148
149 /// Emit a fresh hook event. Returns the `hook_event_id` the
150 /// caller should pass to [`Self::await_response`]. The id is a
151 /// stringified UUIDv4 so it survives JSON round-trips intact.
152 ///
153 /// `payload_json` is delivered verbatim — schema validation lives
154 /// in the catalog (see `GetHookEventSchema`) and is the caller's
155 /// responsibility for now.
156 pub fn emit(&self, event_name: impl Into<String>, payload_json: impl Into<String>) -> String {
157 let hook_event_id = OperationId::new().to_string();
158 let now = std::time::SystemTime::now()
159 .duration_since(std::time::UNIX_EPOCH)
160 .unwrap_or_default();
161 let event = ProtoHookEvent {
162 hook_event_id: hook_event_id.clone(),
163 event_name: event_name.into(),
164 payload_json: payload_json.into(),
165 emitted_at: Some(Timestamp {
166 seconds: now.as_secs() as i64,
167 nanos: now.subsec_nanos() as i32,
168 }),
169 };
170 // Best-effort send: if there are no subscribers the broadcast
171 // returns an error which we deliberately swallow. The emit
172 // caller's `await_response` will time out — that's the
173 // documented "no hook installed" path.
174 let _ = self.inner.sender.send(event);
175 hook_event_id
176 }
177
178 /// Register a single-shot response slot for `hook_event_id` and
179 /// emit at the same time. Returns the id and a future that
180 /// resolves to the hook's reply (or times out).
181 ///
182 /// Use this from the capture/merge code paths when you both want
183 /// to fire the event and wait for the reply atomically.
184 pub fn emit_and_wait(
185 &self,
186 event_name: impl Into<String>,
187 payload_json: impl Into<String>,
188 timeout: Duration,
189 ) -> (String, EmitWaiter) {
190 let (sender, receiver) = oneshot::channel();
191 let event_name = event_name.into();
192 let payload_json = payload_json.into();
193 let hook_event_id = OperationId::new().to_string();
194 // Reserve the slot before we broadcast so the response can't
195 // race ahead of the registration.
196 {
197 let mut pending = self
198 .inner
199 .pending
200 .lock()
201 .expect("hook broker pending map poisoned");
202 pending.insert(hook_event_id.clone(), ResponseSlot { sender });
203 }
204 let now = std::time::SystemTime::now()
205 .duration_since(std::time::UNIX_EPOCH)
206 .unwrap_or_default();
207 let event = ProtoHookEvent {
208 hook_event_id: hook_event_id.clone(),
209 event_name,
210 payload_json,
211 emitted_at: Some(Timestamp {
212 seconds: now.as_secs() as i64,
213 nanos: now.subsec_nanos() as i32,
214 }),
215 };
216 let _ = self.inner.sender.send(event);
217 let waiter = EmitWaiter {
218 broker: self.clone(),
219 hook_event_id: hook_event_id.clone(),
220 receiver,
221 timeout,
222 };
223 (hook_event_id, waiter)
224 }
225
226 /// Await a reply for `hook_event_id` with a deadline. Returns
227 /// `None` when the deadline elapses before a hook responds (the
228 /// emit caller's "no hook acted on this in time" branch).
229 ///
230 /// The slot must have been reserved via [`Self::emit_and_wait`]
231 /// — calling this for a never-registered id resolves to `None`
232 /// immediately.
233 pub async fn await_response(
234 &self,
235 hook_event_id: &str,
236 timeout: Duration,
237 ) -> Option<HookResponse> {
238 // Reservation is the responsibility of `emit_and_wait`. If
239 // a caller wants to register, then emit, then wait separately
240 // they can use this directly — but they must have called
241 // `register_pending` first (kept private to avoid mis-use).
242 let receiver = {
243 let mut pending = self
244 .inner
245 .pending
246 .lock()
247 .expect("hook broker pending map poisoned");
248 pending.remove(hook_event_id).map(|slot| slot.sender)
249 };
250 // If no slot exists, fall back to creating one on the fly so
251 // callers that didn't use `emit_and_wait` still work. We
252 // re-insert and then take a fresh receiver.
253 let receiver = match receiver {
254 Some(_already_taken) => {
255 // The sender is consumed — there's no way to await on
256 // it here without rebuilding the slot. Fall through
257 // to the fresh-slot path.
258 let (sender, receiver) = oneshot::channel();
259 let mut pending = self
260 .inner
261 .pending
262 .lock()
263 .expect("hook broker pending map poisoned");
264 pending.insert(hook_event_id.to_string(), ResponseSlot { sender });
265 receiver
266 }
267 None => {
268 let (sender, receiver) = oneshot::channel();
269 let mut pending = self
270 .inner
271 .pending
272 .lock()
273 .expect("hook broker pending map poisoned");
274 pending.insert(hook_event_id.to_string(), ResponseSlot { sender });
275 receiver
276 }
277 };
278 match tokio::time::timeout(timeout, receiver).await {
279 Ok(Ok(response)) => Some(response),
280 Ok(Err(_canceled)) => None,
281 Err(_elapsed) => {
282 // Drop the slot so a late reply doesn't pile up
283 // memory. The `RespondToHook` handler will report
284 // `accepted=false` for late deliveries.
285 let mut pending = self
286 .inner
287 .pending
288 .lock()
289 .expect("hook broker pending map poisoned");
290 pending.remove(hook_event_id);
291 None
292 }
293 }
294 }
295
296 /// Deliver a hook reply to the in-flight emit waiting on
297 /// `hook_event_id`. Called by the `RespondToHook` handler.
298 /// Returns `true` when the reply was delivered (a waiter was
299 /// present); `false` when no waiter is registered (timed out, or
300 /// already replied).
301 pub fn deliver_response(&self, hook_event_id: &str, response: HookResponse) -> bool {
302 let slot = {
303 let mut pending = self
304 .inner
305 .pending
306 .lock()
307 .expect("hook broker pending map poisoned");
308 pending.remove(hook_event_id)
309 };
310 match slot {
311 Some(slot) => slot.sender.send(response).is_ok(),
312 None => false,
313 }
314 }
315
316 /// Number of subscribers currently attached. Useful for tests.
317 #[cfg(test)]
318 fn subscriber_count(&self) -> usize {
319 self.inner.sender.receiver_count()
320 }
321}
322
323/// Future returned by [`HookEventBroadcaster::emit_and_wait`]. Holds
324/// the receiver plus a hook back to the broker so it can clean up the
325/// pending slot if the future is dropped before the reply lands.
326pub struct EmitWaiter {
327 broker: HookEventBroadcaster,
328 hook_event_id: String,
329 receiver: oneshot::Receiver<HookResponse>,
330 timeout: Duration,
331}
332
333impl EmitWaiter {
334 /// Resolve the waiter, returning `Some` on a fresh reply and
335 /// `None` on timeout or hook crash. Drops the broker's pending
336 /// slot in either path.
337 pub async fn wait(self) -> Option<HookResponse> {
338 let EmitWaiter {
339 broker,
340 hook_event_id,
341 receiver,
342 timeout,
343 } = self;
344 match tokio::time::timeout(timeout, receiver).await {
345 Ok(Ok(response)) => Some(response),
346 Ok(Err(_canceled)) => {
347 broker
348 .inner
349 .pending
350 .lock()
351 .expect("hook broker pending map poisoned")
352 .remove(&hook_event_id);
353 None
354 }
355 Err(_elapsed) => {
356 broker
357 .inner
358 .pending
359 .lock()
360 .expect("hook broker pending map poisoned")
361 .remove(&hook_event_id);
362 None
363 }
364 }
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371
372 #[tokio::test]
373 async fn emit_round_trips_to_subscriber() {
374 let broker = HookEventBroadcaster::new();
375 let mut sub = broker.subscribe();
376 // Yield once so the subscribe forwarding task can install the
377 // underlying broadcast receiver before the first emit.
378 tokio::task::yield_now().await;
379 let id = broker.emit("pre_capture", "{\"thread\":\"t1\"}");
380 let event = sub.recv().await.expect("event");
381 assert_eq!(event.hook_event_id, id);
382 assert_eq!(event.event_name, "pre_capture");
383 assert!(event.payload_json.contains("t1"));
384 }
385
386 #[tokio::test]
387 async fn await_response_returns_delivered_reply() {
388 let broker = HookEventBroadcaster::new();
389 let _sub = broker.subscribe();
390 tokio::task::yield_now().await;
391 let (id, waiter) = broker.emit_and_wait("pre_capture", "{}", Duration::from_secs(1));
392 let id_for_reply = id.clone();
393 let broker_clone = broker.clone();
394 tokio::spawn(async move {
395 tokio::time::sleep(Duration::from_millis(10)).await;
396 let _ = broker_clone.deliver_response(
397 &id_for_reply,
398 HookResponse {
399 abort: "veto".into(),
400 extra: serde_json::Value::Null,
401 },
402 );
403 });
404 let response = waiter.wait().await.expect("response");
405 assert_eq!(response.abort, "veto");
406 }
407
408 #[tokio::test]
409 async fn await_response_times_out_with_no_reply() {
410 let broker = HookEventBroadcaster::new();
411 let _sub = broker.subscribe();
412 let (_id, waiter) = broker.emit_and_wait("pre_capture", "{}", Duration::from_millis(20));
413 let response = waiter.wait().await;
414 assert!(response.is_none());
415 }
416
417 #[tokio::test]
418 async fn deliver_to_unknown_id_returns_false() {
419 let broker = HookEventBroadcaster::new();
420 let accepted = broker.deliver_response("no-such-id", HookResponse::default());
421 assert!(!accepted);
422 }
423
424 #[tokio::test]
425 async fn subscribers_are_independent() {
426 let broker = HookEventBroadcaster::new();
427 let mut a = broker.subscribe();
428 let mut b = broker.subscribe();
429 tokio::task::yield_now().await;
430 assert_eq!(broker.subscriber_count(), 2);
431 broker.emit("post_capture", "{}");
432 let event_a = a.recv().await.expect("a");
433 let event_b = b.recv().await.expect("b");
434 assert_eq!(event_a.hook_event_id, event_b.hook_event_id);
435 }
436}