#[cfg(feature = "http-api")]
use std::sync::Arc;
#[cfg(feature = "http-api")]
use tokio::sync::mpsc;
#[cfg(feature = "http-api")]
use crate::reasoning::loop_types::{BufferedJournal, JournalEntry, JournalError, JournalWriter};
#[cfg(feature = "http-api")]
pub struct StreamingJournal {
inner: Arc<BufferedJournal>,
tx: mpsc::Sender<JournalEntry>,
}
#[cfg(feature = "http-api")]
impl StreamingJournal {
pub fn new(inner: Arc<BufferedJournal>, tx: mpsc::Sender<JournalEntry>) -> Self {
Self { inner, tx }
}
}
#[cfg(feature = "http-api")]
#[async_trait::async_trait]
impl JournalWriter for StreamingJournal {
async fn append(&self, entry: JournalEntry) -> Result<(), JournalError> {
self.inner.append(entry.clone()).await?;
if let Err(e) = self.tx.try_send(entry) {
tracing::debug!("Streaming journal channel full or closed: {}", e);
}
Ok(())
}
async fn next_sequence(&self) -> u64 {
self.inner.next_sequence().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::reasoning::loop_types::{LoopConfig, LoopEvent};
use crate::types::AgentId;
#[tokio::test]
async fn streaming_journal_forwards_to_channel() {
let inner = Arc::new(BufferedJournal::new(100));
let (tx, mut rx) = mpsc::channel(16);
let journal = StreamingJournal::new(inner.clone(), tx);
let entry = JournalEntry {
sequence: 0,
timestamp: chrono::Utc::now(),
agent_id: AgentId::new(),
iteration: 0,
event: LoopEvent::Started {
agent_id: AgentId::new(),
config: Box::new(LoopConfig::default()),
},
};
journal.append(entry).await.unwrap();
assert_eq!(inner.entries().await.len(), 1);
let received = rx.try_recv().unwrap();
assert_eq!(received.sequence, 0);
}
#[tokio::test]
async fn streaming_journal_does_not_block_when_channel_full() {
let inner = Arc::new(BufferedJournal::new(100));
let (tx, _rx) = mpsc::channel(1);
let journal = StreamingJournal::new(inner.clone(), tx);
let make_entry = |seq: u64| JournalEntry {
sequence: seq,
timestamp: chrono::Utc::now(),
agent_id: AgentId::new(),
iteration: 0,
event: LoopEvent::Started {
agent_id: AgentId::new(),
config: Box::new(LoopConfig::default()),
},
};
journal.append(make_entry(0)).await.unwrap();
journal.append(make_entry(1)).await.unwrap();
assert_eq!(inner.entries().await.len(), 2);
}
}