Skip to main content

agent_block_core/bus/
event.rs

1//! Event type flowing through the [`EventBus`](crate::bus::EventBus).
2//!
3//! Each [`Event`] carries a `kind` string used for handler dispatch, an `id`
4//! for correlation/logging, a `payload` (source-defined), a `meta` map
5//! (source-defined), and an optional `ack_tx` one-shot channel used to return
6//! the Lua handler's return value back to the source that produced the
7//! request (e.g. a mesh request/response round-trip).
8//!
9//! The `ack_tx` is `Option` because some sources are fire-and-forget
10//! (e.g. a future webhook broadcast) and do not need a response.
11
12use serde_json::Value;
13use tokio::sync::oneshot;
14
15use agent_block_types::error::BlockError;
16
17/// Result carried back to the originating source via [`Event::ack_tx`].
18pub type AckResult = Result<Value, BlockError>;
19
20/// Sender half of the ack channel. Carried inside [`Event`].
21pub type AckSender = oneshot::Sender<AckResult>;
22
23/// Receiver half of the ack channel. Held by whatever source produced the
24/// event and awaits the handler's return value.
25///
26/// Used by `Event::with_ack` callers (ST4 adapters: webhook/WSS/timer).
27/// Kept exported for downstream consumers; not referenced within the ST3
28/// cut where the mesh adapter in `host.rs` drives the ack loop directly.
29#[allow(dead_code)]
30pub type AckReceiver = oneshot::Receiver<AckResult>;
31
32/// A normalized event flowing through the bus.
33///
34/// Ownership: produced by a [`Source`](crate::bus::Source), moved through a
35/// bounded `mpsc::Sender<Event>` into the single dispatcher loop. The
36/// dispatcher consumes the `ack_tx` (via `Option::take`) to send the
37/// handler's return value back to the source.
38#[derive(Debug)]
39pub struct Event {
40    /// Dispatch key. Matched against `bus.on(kind, fn)` registrations.
41    pub kind: String,
42    /// Correlation id (source-assigned). Used in tracing/logging.
43    pub id: String,
44    /// Source-defined payload. Converted to Lua table at dispatch time.
45    pub payload: Value,
46    /// Source-defined metadata (e.g. mesh `from`, timestamps). Converted to
47    /// Lua table at dispatch time.
48    pub meta: Value,
49    /// Optional one-shot channel used to return the Lua handler's result.
50    /// `None` for fire-and-forget sources.
51    pub ack_tx: Option<AckSender>,
52}
53
54impl Event {
55    /// Construct a new event without an ack channel (fire-and-forget).
56    ///
57    /// Intended for ST4 adapters (webhook broadcast / timer). Not used by
58    /// the ST3 mesh path (which needs the ack round-trip).
59    #[allow(dead_code)]
60    pub fn fire_and_forget(kind: impl Into<String>, id: impl Into<String>, payload: Value) -> Self {
61        Self {
62            kind: kind.into(),
63            id: id.into(),
64            payload,
65            meta: Value::Null,
66            ack_tx: None,
67        }
68    }
69
70    /// Construct a new event paired with a fresh ack channel. Returns the
71    /// event (to be pushed to the bus) and the receiver half (to be awaited
72    /// by the source).
73    ///
74    /// Used by the dispatcher's in-crate tests and by forthcoming ST4
75    /// adapters. The ST3 mesh adapter constructs `Event` directly to keep
76    /// control over the `meta` map and ack sender lifetime.
77    #[allow(dead_code)]
78    pub fn with_ack(
79        kind: impl Into<String>,
80        id: impl Into<String>,
81        payload: Value,
82        meta: Value,
83    ) -> (Self, AckReceiver) {
84        let (tx, rx) = oneshot::channel();
85        let evt = Self {
86            kind: kind.into(),
87            id: id.into(),
88            payload,
89            meta,
90            ack_tx: Some(tx),
91        };
92        (evt, rx)
93    }
94
95    /// Send `result` on `ack_tx` if it is still present. Logs a warning when
96    /// the receiver has been dropped (tracing-missing-on-err policy).
97    ///
98    /// Returns `Ok(())` when the ack was delivered or the event was
99    /// fire-and-forget. Returns `Err(BlockError::Bus)` only when the
100    /// receiver had been dropped — the caller can decide whether to treat
101    /// that as fatal.
102    pub fn deliver_ack(&mut self, result: AckResult) -> Result<(), BlockError> {
103        let Some(tx) = self.ack_tx.take() else {
104            return Ok(());
105        };
106        if let Err(dropped) = tx.send(result) {
107            tracing::warn!(
108                kind = %self.kind,
109                id = %self.id,
110                "ack receiver dropped; handler result discarded: {:?}",
111                dropped.as_ref().map(|_| "ok").unwrap_or_else(|e| match e {
112                    BlockError::Bus(_) => "bus-err",
113                    _ => "other-err",
114                })
115            );
116            return Err(BlockError::Bus(format!(
117                "ack receiver dropped (kind={}, id={})",
118                self.kind, self.id
119            )));
120        }
121        Ok(())
122    }
123}