Skip to main content

bamboo_engine/events/
account_sink.rs

1//! Account-wide durable change-feed sink.
2//!
3//! This is the single choke point the feed is built around. The three existing
4//! per-session event paths (the interactive execute forwarder, the synchronous
5//! `publish_replayable_session_event` helper, and the engine forwarder) each
6//! also hand their events to this sink via [`AccountEventSink::record`] (or
7//! [`AccountEventSink::inbox`] for the dependency-free engine path).
8//!
9//! A **single writer task** owns sequence allocation, journal append, and the
10//! live broadcast — in that order, per event — so "journal order == broadcast
11//! order == seq order" holds trivially without scattering the invariant across
12//! concurrent callers.
13
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use bamboo_agent_core::AgentEvent;
19use chrono::Utc;
20use tokio::sync::{broadcast, mpsc};
21
22use super::change_feed::ChangeEvent;
23use super::journal::EventJournal;
24
25/// Bounded inbox to the writer task. Change events are low-volume, so this
26/// should never fill; a full inbox drops with a warning rather than blocking a
27/// forwarder.
28const INBOX_CAPACITY: usize = 1024;
29
30/// Live-tail ring capacity. Larger than the per-session channel because the
31/// account feed multiplexes all sessions and a bigger ring reduces `Lagged`
32/// for resuming clients (the journal is the durable backstop regardless).
33const BROADCAST_CAPACITY: usize = 4096;
34
35/// Boot-time retention: keep at most this many journal files (each ~8 MiB), so
36/// disk stays bounded (~512 MiB) without semantic compaction. Clients whose
37/// cursor falls below the retained window full-resync via `feed_reset`.
38const RETAINED_FILES: usize = 64;
39
40/// An event accepted by the sink before seq/ts assignment: `(session_id,
41/// event)`. The session id is the caller's known routing context.
42pub type PendingEvent = (Option<String>, AgentEvent);
43
44/// Account-wide durable change-feed sink. Construct once per [`AppState`].
45pub struct AccountEventSink {
46    /// Last-assigned seq. The writer task owns assignment; this is exposed only
47    /// for diagnostics ([`Self::latest_seq`]).
48    seq: Arc<AtomicU64>,
49    /// Inbox to the writer task.
50    tx: mpsc::Sender<PendingEvent>,
51    /// Live account tail. `Arc` keeps fan-out to many subscribers cheap.
52    broadcast: broadcast::Sender<Arc<ChangeEvent>>,
53    /// Journal directory, for stateless replay reads on the `/stream` path.
54    events_dir: PathBuf,
55    /// Count of events dropped due to a full inbox (should remain 0).
56    dropped: Arc<AtomicU64>,
57}
58
59impl AccountEventSink {
60    /// Open the journal (recovering the max seq) and spawn the writer task.
61    pub fn new(events_dir: PathBuf) -> std::io::Result<Arc<Self>> {
62        // Boot-time retention pruning before opening the writer.
63        if let Err(e) = super::journal::prune(&events_dir, RETAINED_FILES) {
64            tracing::warn!("change-feed journal prune failed: {e}");
65        }
66        let (journal, max_seq) = EventJournal::open(events_dir.clone())?;
67        let seq = Arc::new(AtomicU64::new(max_seq));
68        let (tx, rx) = mpsc::channel(INBOX_CAPACITY);
69        let (btx, _brx) = broadcast::channel(BROADCAST_CAPACITY);
70
71        let sink = Arc::new(Self {
72            seq: seq.clone(),
73            tx,
74            broadcast: btx.clone(),
75            events_dir,
76            dropped: Arc::new(AtomicU64::new(0)),
77        });
78
79        tokio::spawn(writer_loop(rx, journal, seq, btx));
80        Ok(sink)
81    }
82
83    /// Record an event onto the change feed, if it is durable.
84    ///
85    /// `session_id` is the caller's known session context (forwarders are
86    /// per-session) and wins over [`AgentEvent::session_id`] so terminal events
87    /// (`Complete`/`Cancelled`/`Error`, which carry no id) still route to the
88    /// right session. Never blocks: ephemeral events are filtered out before
89    /// any clone, then the durable event is `try_send`-ed.
90    pub fn record(&self, session_id: Option<&str>, event: &AgentEvent) {
91        if !event.is_durable_change() {
92            return;
93        }
94        let sid = session_id
95            .map(|s| s.to_string())
96            .or_else(|| event.session_id().map(|s| s.to_string()));
97        if self.tx.try_send((sid, event.clone())).is_err() {
98            self.dropped.fetch_add(1, Ordering::Relaxed);
99            tracing::warn!("account change-feed inbox full or closed; event dropped");
100        }
101    }
102
103    /// Clone of the writer inbox, for dependency-free callers (the engine
104    /// forwarder) that cannot reference `AppState`/this type's `record`. Such
105    /// callers must filter with [`AgentEvent::is_durable_change`] before
106    /// sending so ephemeral token traffic never crosses the channel.
107    pub fn inbox(&self) -> mpsc::Sender<PendingEvent> {
108        self.tx.clone()
109    }
110
111    /// Subscribe to the live account tail. Subscribe *before* reading the
112    /// journal on the `/stream` path so the replay→live handoff cannot gap.
113    pub fn subscribe(&self) -> broadcast::Receiver<Arc<ChangeEvent>> {
114        self.broadcast.subscribe()
115    }
116
117    /// Journal directory, for stateless replay reads.
118    pub fn events_dir(&self) -> &std::path::Path {
119        &self.events_dir
120    }
121
122    /// Last-assigned seq (0 if none yet).
123    pub fn latest_seq(&self) -> u64 {
124        self.seq.load(Ordering::SeqCst)
125    }
126
127    /// Number of events dropped due to a full inbox (diagnostics).
128    pub fn dropped_count(&self) -> u64 {
129        self.dropped.load(Ordering::Relaxed)
130    }
131}
132
133async fn writer_loop(
134    mut rx: mpsc::Receiver<PendingEvent>,
135    mut journal: EventJournal,
136    seq: Arc<AtomicU64>,
137    broadcast: broadcast::Sender<Arc<ChangeEvent>>,
138) {
139    while let Some((session_id, event)) = rx.recv().await {
140        // Single writer → fetch_add is the seq allocator. 1-based.
141        let next = seq.fetch_add(1, Ordering::SeqCst) + 1;
142        let ce = ChangeEvent {
143            seq: next,
144            ts: Utc::now(),
145            session_id,
146            event,
147        };
148        if let Err(e) = journal.append(&ce) {
149            tracing::error!("failed to append change event {} to journal: {e}", ce.seq);
150        }
151        // Lossy by design: with no live subscribers, or if all lag, the durable
152        // journal remains the source of truth for resume.
153        let _ = broadcast.send(Arc::new(ce));
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use crate::events::journal;
161
162    fn deletion(id: &str) -> AgentEvent {
163        AgentEvent::SessionDeleted {
164            session_id: id.to_string(),
165        }
166    }
167
168    #[tokio::test]
169    async fn assigns_monotonic_seq_and_journals() {
170        let dir = tempfile::tempdir().unwrap();
171        let sink = AccountEventSink::new(dir.path().to_path_buf()).unwrap();
172
173        let mut rx = sink.subscribe();
174        for i in 0..3 {
175            sink.record(None, &deletion(&format!("s{i}")));
176        }
177
178        // Live tail delivers seq 1,2,3 in order.
179        for expected in 1..=3u64 {
180            let ce = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
181                .await
182                .expect("not timed out")
183                .expect("not closed");
184            assert_eq!(ce.seq, expected);
185        }
186
187        // Journal has the same three, in order.
188        let journaled = journal::read_since(sink.events_dir(), 0).unwrap();
189        assert_eq!(
190            journaled.iter().map(|e| e.seq).collect::<Vec<_>>(),
191            vec![1, 2, 3]
192        );
193        assert_eq!(sink.latest_seq(), 3);
194    }
195
196    #[tokio::test]
197    async fn ephemeral_events_are_dropped_from_feed() {
198        let dir = tempfile::tempdir().unwrap();
199        let sink = AccountEventSink::new(dir.path().to_path_buf()).unwrap();
200
201        sink.record(
202            Some("s1"),
203            &AgentEvent::Token {
204                content: "hi".into(),
205            },
206        );
207        sink.record(Some("s1"), &deletion("s1"));
208
209        // Give the writer a moment.
210        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
211        let journaled = journal::read_since(sink.events_dir(), 0).unwrap();
212        assert_eq!(journaled.len(), 1, "only the durable event is journaled");
213        assert_eq!(journaled[0].seq, 1);
214    }
215
216    #[tokio::test]
217    async fn terminal_event_routes_via_caller_session_id() {
218        let dir = tempfile::tempdir().unwrap();
219        let sink = AccountEventSink::new(dir.path().to_path_buf()).unwrap();
220
221        sink.record(
222            Some("sess-7"),
223            &AgentEvent::Complete {
224                usage: Default::default(),
225            },
226        );
227        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
228        let journaled = journal::read_since(sink.events_dir(), 0).unwrap();
229        assert_eq!(journaled.len(), 1);
230        assert_eq!(journaled[0].session_id.as_deref(), Some("sess-7"));
231    }
232}