use std::sync::Arc;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;
use tokio::sync::broadcast;
use super::metadata::EventMetadata;
use super::store::EventStore;
use crate::notifications::build_broadcast_listener;
pub const DEFAULT_BROADCAST_CAPACITY: usize = 256;
pub struct EventBroadcastState<T> {
pub store: Arc<EventStore<T>>,
pub broadcast: broadcast::Sender<T>,
}
impl<T> std::fmt::Debug for EventBroadcastState<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventBroadcastState")
.field("subscribers", &self.broadcast.receiver_count())
.finish_non_exhaustive()
}
}
impl<T> EventBroadcastState<T>
where
T: Send + Sync + Clone + 'static,
{
pub fn new(store: Arc<EventStore<T>>, capacity: usize) -> Self {
let (broadcast, _) = broadcast::channel(capacity.max(1));
Self { store, broadcast }
}
}
pub fn build_persisting_listener<T>(
method_name: &'static str,
state: Arc<EventBroadcastState<T>>,
) -> Arc<dyn Fn(Value) + Send + Sync>
where
T: EventMetadata + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
let inner = build_broadcast_listener::<T>(method_name, state.broadcast.clone());
let mut tap = state.broadcast.subscribe();
let store = state.store.clone();
tokio::spawn(async move {
loop {
match tap.recv().await {
Ok(event) => {
if let Err(e) = store.append(&event).await {
tracing::warn!(
method = method_name,
error = %e,
"events::build_persisting_listener: store append failed",
);
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
inner
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events::store::{EventStore, ListFilter, DEFAULT_TABLE};
use nexo_tool_meta::admin::agent_events::{AgentEventKind, TranscriptRole};
use uuid::Uuid;
fn transcript_value(seq: u64) -> Value {
serde_json::json!({
"kind": "transcript_appended",
"agent_id": "ana",
"session_id": Uuid::nil(),
"seq": seq,
"role": "user",
"body": "hi",
"sent_at_ms": 1000 + seq,
"source_plugin": "whatsapp"
})
}
async fn fresh_state() -> Arc<EventBroadcastState<AgentEventKind>> {
let store = Arc::new(EventStore::open_memory(DEFAULT_TABLE).await.unwrap());
Arc::new(EventBroadcastState::new(store, 16))
}
#[tokio::test]
async fn listener_appends_to_store_and_broadcasts() {
let state = fresh_state().await;
let mut rx = state.broadcast.subscribe();
let listener = build_persisting_listener("nexo/notify/agent_event", Arc::clone(&state));
listener(transcript_value(0));
let live = rx.recv().await.unwrap();
assert!(matches!(live, AgentEventKind::TranscriptAppended { .. }));
for _ in 0..50 {
let rows = state
.store
.list(&ListFilter {
agent_id: Some("ana".into()),
limit: 10,
..Default::default()
})
.await
.unwrap();
if rows.len() == 1 {
return;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
panic!("store append never landed");
}
#[tokio::test]
async fn listener_skips_malformed_payload() {
let state = fresh_state().await;
let listener = build_persisting_listener("nexo/notify/agent_event", Arc::clone(&state));
listener(serde_json::json!({"not": "a real event"}));
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let rows = state
.store
.list(&ListFilter {
limit: 10,
..Default::default()
})
.await
.unwrap();
assert!(rows.is_empty());
}
#[tokio::test]
async fn listener_continues_broadcast_when_no_subscribers() {
let state = fresh_state().await;
let listener = build_persisting_listener("nexo/notify/agent_event", Arc::clone(&state));
listener(transcript_value(0));
for _ in 0..50 {
let rows = state
.store
.list(&ListFilter {
agent_id: Some("ana".into()),
limit: 10,
..Default::default()
})
.await
.unwrap();
if rows.len() == 1 {
return;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
panic!("append did not land despite zero external subscribers");
}
#[allow(dead_code)]
fn _force_use(_: TranscriptRole) {}
}