use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use bamboo_agent_core::AgentEvent;
use chrono::Utc;
use tokio::sync::{broadcast, mpsc};
use super::change_feed::ChangeEvent;
use super::journal::EventJournal;
const INBOX_CAPACITY: usize = 1024;
const BROADCAST_CAPACITY: usize = 4096;
const RETAINED_FILES: usize = 64;
pub type PendingEvent = (Option<String>, AgentEvent);
pub struct AccountEventSink {
seq: Arc<AtomicU64>,
tx: mpsc::Sender<PendingEvent>,
broadcast: broadcast::Sender<Arc<ChangeEvent>>,
events_dir: PathBuf,
dropped: Arc<AtomicU64>,
}
impl AccountEventSink {
pub fn new(events_dir: PathBuf) -> std::io::Result<Arc<Self>> {
if let Err(e) = super::journal::prune(&events_dir, RETAINED_FILES) {
tracing::warn!("change-feed journal prune failed: {e}");
}
let (journal, max_seq) = EventJournal::open(events_dir.clone())?;
let seq = Arc::new(AtomicU64::new(max_seq));
let (tx, rx) = mpsc::channel(INBOX_CAPACITY);
let (btx, _brx) = broadcast::channel(BROADCAST_CAPACITY);
let sink = Arc::new(Self {
seq: seq.clone(),
tx,
broadcast: btx.clone(),
events_dir,
dropped: Arc::new(AtomicU64::new(0)),
});
tokio::spawn(writer_loop(rx, journal, seq, btx));
Ok(sink)
}
pub fn record(&self, session_id: Option<&str>, event: &AgentEvent) {
if !event.is_durable_change() {
return;
}
let sid = session_id
.map(|s| s.to_string())
.or_else(|| event.session_id().map(|s| s.to_string()));
if self.tx.try_send((sid, event.clone())).is_err() {
self.dropped.fetch_add(1, Ordering::Relaxed);
tracing::warn!("account change-feed inbox full or closed; event dropped");
}
}
pub fn inbox(&self) -> mpsc::Sender<PendingEvent> {
self.tx.clone()
}
pub fn subscribe(&self) -> broadcast::Receiver<Arc<ChangeEvent>> {
self.broadcast.subscribe()
}
pub fn events_dir(&self) -> &std::path::Path {
&self.events_dir
}
pub fn latest_seq(&self) -> u64 {
self.seq.load(Ordering::SeqCst)
}
pub fn dropped_count(&self) -> u64 {
self.dropped.load(Ordering::Relaxed)
}
}
async fn writer_loop(
mut rx: mpsc::Receiver<PendingEvent>,
mut journal: EventJournal,
seq: Arc<AtomicU64>,
broadcast: broadcast::Sender<Arc<ChangeEvent>>,
) {
while let Some((session_id, event)) = rx.recv().await {
let next = seq.fetch_add(1, Ordering::SeqCst) + 1;
let ce = ChangeEvent {
seq: next,
ts: Utc::now(),
session_id,
event,
};
if let Err(e) = journal.append(&ce) {
tracing::error!("failed to append change event {} to journal: {e}", ce.seq);
}
let _ = broadcast.send(Arc::new(ce));
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events::journal;
fn deletion(id: &str) -> AgentEvent {
AgentEvent::SessionDeleted {
session_id: id.to_string(),
}
}
#[tokio::test]
async fn assigns_monotonic_seq_and_journals() {
let dir = tempfile::tempdir().unwrap();
let sink = AccountEventSink::new(dir.path().to_path_buf()).unwrap();
let mut rx = sink.subscribe();
for i in 0..3 {
sink.record(None, &deletion(&format!("s{i}")));
}
for expected in 1..=3u64 {
let ce = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
.await
.expect("not timed out")
.expect("not closed");
assert_eq!(ce.seq, expected);
}
let journaled = journal::read_since(sink.events_dir(), 0).unwrap();
assert_eq!(
journaled.iter().map(|e| e.seq).collect::<Vec<_>>(),
vec![1, 2, 3]
);
assert_eq!(sink.latest_seq(), 3);
}
#[tokio::test]
async fn ephemeral_events_are_dropped_from_feed() {
let dir = tempfile::tempdir().unwrap();
let sink = AccountEventSink::new(dir.path().to_path_buf()).unwrap();
sink.record(
Some("s1"),
&AgentEvent::Token {
content: "hi".into(),
},
);
sink.record(Some("s1"), &deletion("s1"));
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let journaled = journal::read_since(sink.events_dir(), 0).unwrap();
assert_eq!(journaled.len(), 1, "only the durable event is journaled");
assert_eq!(journaled[0].seq, 1);
}
#[tokio::test]
async fn terminal_event_routes_via_caller_session_id() {
let dir = tempfile::tempdir().unwrap();
let sink = AccountEventSink::new(dir.path().to_path_buf()).unwrap();
sink.record(
Some("sess-7"),
&AgentEvent::Complete {
usage: Default::default(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let journaled = journal::read_since(sink.events_dir(), 0).unwrap();
assert_eq!(journaled.len(), 1);
assert_eq!(journaled[0].session_id.as_deref(), Some("sess-7"));
}
}