bamboo_engine/events/
account_sink.rs1use std::path::PathBuf;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use bamboo_agent_core::AgentEvent;
19use chrono::Utc;
20use tokio::sync::{broadcast, mpsc};
21
22use super::change_feed::ChangeEvent;
23use super::journal::EventJournal;
24
25const INBOX_CAPACITY: usize = 1024;
29
30const BROADCAST_CAPACITY: usize = 4096;
34
35const RETAINED_FILES: usize = 64;
39
40pub type PendingEvent = (Option<String>, AgentEvent);
43
44pub struct AccountEventSink {
46 seq: Arc<AtomicU64>,
49 tx: mpsc::Sender<PendingEvent>,
51 broadcast: broadcast::Sender<Arc<ChangeEvent>>,
53 events_dir: PathBuf,
55 dropped: Arc<AtomicU64>,
57}
58
59impl AccountEventSink {
60 pub fn new(events_dir: PathBuf) -> std::io::Result<Arc<Self>> {
62 if let Err(e) = super::journal::prune(&events_dir, RETAINED_FILES) {
64 tracing::warn!("change-feed journal prune failed: {e}");
65 }
66 let (journal, max_seq) = EventJournal::open(events_dir.clone())?;
67 let seq = Arc::new(AtomicU64::new(max_seq));
68 let (tx, rx) = mpsc::channel(INBOX_CAPACITY);
69 let (btx, _brx) = broadcast::channel(BROADCAST_CAPACITY);
70
71 let sink = Arc::new(Self {
72 seq: seq.clone(),
73 tx,
74 broadcast: btx.clone(),
75 events_dir,
76 dropped: Arc::new(AtomicU64::new(0)),
77 });
78
79 tokio::spawn(writer_loop(rx, journal, seq, btx));
80 Ok(sink)
81 }
82
83 pub fn record(&self, session_id: Option<&str>, event: &AgentEvent) {
91 if !event.is_durable_change() {
92 return;
93 }
94 let sid = session_id
95 .map(|s| s.to_string())
96 .or_else(|| event.session_id().map(|s| s.to_string()));
97 if self.tx.try_send((sid, event.clone())).is_err() {
98 self.dropped.fetch_add(1, Ordering::Relaxed);
99 tracing::warn!("account change-feed inbox full or closed; event dropped");
100 }
101 }
102
103 pub fn inbox(&self) -> mpsc::Sender<PendingEvent> {
108 self.tx.clone()
109 }
110
111 pub fn subscribe(&self) -> broadcast::Receiver<Arc<ChangeEvent>> {
114 self.broadcast.subscribe()
115 }
116
117 pub fn events_dir(&self) -> &std::path::Path {
119 &self.events_dir
120 }
121
122 pub fn latest_seq(&self) -> u64 {
124 self.seq.load(Ordering::SeqCst)
125 }
126
127 pub fn dropped_count(&self) -> u64 {
129 self.dropped.load(Ordering::Relaxed)
130 }
131}
132
133async fn writer_loop(
134 mut rx: mpsc::Receiver<PendingEvent>,
135 mut journal: EventJournal,
136 seq: Arc<AtomicU64>,
137 broadcast: broadcast::Sender<Arc<ChangeEvent>>,
138) {
139 while let Some((session_id, event)) = rx.recv().await {
140 let next = seq.fetch_add(1, Ordering::SeqCst) + 1;
142 let ce = ChangeEvent {
143 seq: next,
144 ts: Utc::now(),
145 session_id,
146 event,
147 };
148 if let Err(e) = journal.append(&ce) {
149 tracing::error!("failed to append change event {} to journal: {e}", ce.seq);
150 }
151 let _ = broadcast.send(Arc::new(ce));
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use crate::events::journal;
161
162 fn deletion(id: &str) -> AgentEvent {
163 AgentEvent::SessionDeleted {
164 session_id: id.to_string(),
165 }
166 }
167
168 #[tokio::test]
169 async fn assigns_monotonic_seq_and_journals() {
170 let dir = tempfile::tempdir().unwrap();
171 let sink = AccountEventSink::new(dir.path().to_path_buf()).unwrap();
172
173 let mut rx = sink.subscribe();
174 for i in 0..3 {
175 sink.record(None, &deletion(&format!("s{i}")));
176 }
177
178 for expected in 1..=3u64 {
180 let ce = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
181 .await
182 .expect("not timed out")
183 .expect("not closed");
184 assert_eq!(ce.seq, expected);
185 }
186
187 let journaled = journal::read_since(sink.events_dir(), 0).unwrap();
189 assert_eq!(
190 journaled.iter().map(|e| e.seq).collect::<Vec<_>>(),
191 vec![1, 2, 3]
192 );
193 assert_eq!(sink.latest_seq(), 3);
194 }
195
196 #[tokio::test]
197 async fn ephemeral_events_are_dropped_from_feed() {
198 let dir = tempfile::tempdir().unwrap();
199 let sink = AccountEventSink::new(dir.path().to_path_buf()).unwrap();
200
201 sink.record(
202 Some("s1"),
203 &AgentEvent::Token {
204 content: "hi".into(),
205 },
206 );
207 sink.record(Some("s1"), &deletion("s1"));
208
209 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
211 let journaled = journal::read_since(sink.events_dir(), 0).unwrap();
212 assert_eq!(journaled.len(), 1, "only the durable event is journaled");
213 assert_eq!(journaled[0].seq, 1);
214 }
215
216 #[tokio::test]
217 async fn terminal_event_routes_via_caller_session_id() {
218 let dir = tempfile::tempdir().unwrap();
219 let sink = AccountEventSink::new(dir.path().to_path_buf()).unwrap();
220
221 sink.record(
222 Some("sess-7"),
223 &AgentEvent::Complete {
224 usage: Default::default(),
225 },
226 );
227 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
228 let journaled = journal::read_since(sink.events_dir(), 0).unwrap();
229 assert_eq!(journaled.len(), 1);
230 assert_eq!(journaled[0].session_id.as_deref(), Some("sess-7"));
231 }
232}