agent_block_core/bus/event.rs
1//! Event type flowing through the [`EventBus`](crate::bus::EventBus).
2//!
3//! Each [`Event`] carries a `kind` string used for handler dispatch, an `id`
4//! for correlation/logging, a `payload` (source-defined), a `meta` map
5//! (source-defined), and an optional `ack_tx` one-shot channel used to return
6//! the Lua handler's return value back to the source that produced the
7//! request (e.g. a mesh request/response round-trip).
8//!
9//! The `ack_tx` is `Option` because some sources are fire-and-forget
10//! (e.g. a future webhook broadcast) and do not need a response.
11
12use serde_json::Value;
13use tokio::sync::oneshot;
14
15use agent_block_types::error::BlockError;
16
17/// Result carried back to the originating source via [`Event::ack_tx`].
18pub type AckResult = Result<Value, BlockError>;
19
20/// Sender half of the ack channel. Carried inside [`Event`].
21pub type AckSender = oneshot::Sender<AckResult>;
22
23/// Receiver half of the ack channel. Held by whatever source produced the
24/// event and awaits the handler's return value.
25///
26/// Used by `Event::with_ack` callers (ST4 adapters: webhook/WSS/timer).
27/// Kept exported for downstream consumers; not referenced within the ST3
28/// cut where the mesh adapter in `host.rs` drives the ack loop directly.
29#[allow(dead_code)]
30pub type AckReceiver = oneshot::Receiver<AckResult>;
31
32/// A normalized event flowing through the bus.
33///
34/// Ownership: produced by a [`Source`](crate::bus::Source), moved through a
35/// bounded `mpsc::Sender<Event>` into the single dispatcher loop. The
36/// dispatcher consumes the `ack_tx` (via `Option::take`) to send the
37/// handler's return value back to the source.
38#[derive(Debug)]
39pub struct Event {
40 /// Dispatch key. Matched against `bus.on(kind, fn)` registrations.
41 pub kind: String,
42 /// Correlation id (source-assigned). Used in tracing/logging.
43 pub id: String,
44 /// Source-defined payload. Converted to Lua table at dispatch time.
45 pub payload: Value,
46 /// Source-defined metadata (e.g. mesh `from`, timestamps). Converted to
47 /// Lua table at dispatch time.
48 pub meta: Value,
49 /// Optional one-shot channel used to return the Lua handler's result.
50 /// `None` for fire-and-forget sources.
51 pub ack_tx: Option<AckSender>,
52}
53
54impl Event {
55 /// Construct a new event without an ack channel (fire-and-forget).
56 ///
57 /// Intended for ST4 adapters (webhook broadcast / timer). Not used by
58 /// the ST3 mesh path (which needs the ack round-trip).
59 #[allow(dead_code)]
60 pub fn fire_and_forget(kind: impl Into<String>, id: impl Into<String>, payload: Value) -> Self {
61 Self {
62 kind: kind.into(),
63 id: id.into(),
64 payload,
65 meta: Value::Null,
66 ack_tx: None,
67 }
68 }
69
70 /// Construct a new event paired with a fresh ack channel. Returns the
71 /// event (to be pushed to the bus) and the receiver half (to be awaited
72 /// by the source).
73 ///
74 /// Used by the dispatcher's in-crate tests and by forthcoming ST4
75 /// adapters. The ST3 mesh adapter constructs `Event` directly to keep
76 /// control over the `meta` map and ack sender lifetime.
77 #[allow(dead_code)]
78 pub fn with_ack(
79 kind: impl Into<String>,
80 id: impl Into<String>,
81 payload: Value,
82 meta: Value,
83 ) -> (Self, AckReceiver) {
84 let (tx, rx) = oneshot::channel();
85 let evt = Self {
86 kind: kind.into(),
87 id: id.into(),
88 payload,
89 meta,
90 ack_tx: Some(tx),
91 };
92 (evt, rx)
93 }
94
95 /// Send `result` on `ack_tx` if it is still present. Logs a warning when
96 /// the receiver has been dropped (tracing-missing-on-err policy).
97 ///
98 /// Returns `Ok(())` when the ack was delivered or the event was
99 /// fire-and-forget. Returns `Err(BlockError::Bus)` only when the
100 /// receiver had been dropped — the caller can decide whether to treat
101 /// that as fatal.
102 pub fn deliver_ack(&mut self, result: AckResult) -> Result<(), BlockError> {
103 let Some(tx) = self.ack_tx.take() else {
104 return Ok(());
105 };
106 if let Err(dropped) = tx.send(result) {
107 tracing::warn!(
108 kind = %self.kind,
109 id = %self.id,
110 "ack receiver dropped; handler result discarded: {:?}",
111 dropped.as_ref().map(|_| "ok").unwrap_or_else(|e| match e {
112 BlockError::Bus(_) => "bus-err",
113 _ => "other-err",
114 })
115 );
116 return Err(BlockError::Bus(format!(
117 "ack receiver dropped (kind={}, id={})",
118 self.kind, self.id
119 )));
120 }
121 Ok(())
122 }
123}