use super::event::EngineEvent;
pub trait EngineSink: Send + Sync {
fn emit(&self, event: EngineEvent);
}
pub struct NullSink;
impl EngineSink for NullSink {
fn emit(&self, _event: EngineEvent) {}
}
pub struct BufferingSink {
lines: std::sync::Mutex<Vec<String>>,
cap: usize,
}
impl BufferingSink {
pub fn new() -> Self {
Self::with_cap(256)
}
pub fn with_cap(cap: usize) -> Self {
Self {
lines: std::sync::Mutex::new(Vec::new()),
cap,
}
}
pub fn take_lines(&self) -> Vec<String> {
std::mem::take(&mut *self.lines.lock().unwrap())
}
fn push_capped(&self, line: String) {
let mut guard = self.lines.lock().unwrap();
if guard.len() < self.cap {
guard.push(line);
} else if guard.last().map(|l| !l.starts_with('…')).unwrap_or(true) {
guard.push(format!("… (trace truncated at {} lines)", self.cap));
}
}
}
impl Default for BufferingSink {
fn default() -> Self {
Self::new()
}
}
impl EngineSink for BufferingSink {
fn emit(&self, event: EngineEvent) {
match event {
EngineEvent::ToolCallStart { name, .. } => {
self.push_capped(format!(" \u{1f527} {name}"));
}
EngineEvent::Info { message } => {
self.push_capped(message);
}
EngineEvent::ApprovalRequest { tool_name, .. } => {
self.push_capped(format!(
" \u{2398} approval auto-rejected for {tool_name} (no user channel)"
));
}
EngineEvent::AskUserRequest { question, .. } => {
self.push_capped(format!(
" \u{2398} ask-user auto-skipped: {}",
question.chars().take(80).collect::<String>()
));
}
_ => {}
}
}
}
pub struct PersistingSink<'a> {
inner: &'a dyn EngineSink,
db: std::sync::Arc<dyn crate::persistence::Persistence>,
session_id: String,
parent_tool_call_id: Option<String>,
}
impl<'a> PersistingSink<'a> {
pub fn new(
inner: &'a dyn EngineSink,
db: std::sync::Arc<dyn crate::persistence::Persistence>,
session_id: String,
parent_tool_call_id: Option<String>,
) -> Self {
Self {
inner,
db,
session_id,
parent_tool_call_id,
}
}
fn persist(&self, kind: &'static str, payload: String) {
let db = self.db.clone();
let session_id = self.session_id.clone();
let parent = self.parent_tool_call_id.clone();
tokio::spawn(async move {
if let Err(e) = db
.insert_session_event(&session_id, kind, &payload, parent.as_deref())
.await
{
tracing::warn!(
error = %e, kind, session_id,
"failed to persist session event"
);
}
});
}
}
impl EngineSink for PersistingSink<'_> {
fn emit(&self, event: EngineEvent) {
use crate::persistence::session_event_kind as sek;
if self.parent_tool_call_id.is_some() {
match &event {
EngineEvent::Info { message } => {
self.persist(sek::SUB_AGENT_EVENT, message.clone());
}
EngineEvent::ToolCallStart { name, .. } => {
self.persist(sek::SUB_AGENT_EVENT, format!(" \u{1f527} {name}"));
}
EngineEvent::ApprovalRequest { tool_name, .. } => {
self.persist(
sek::SUB_AGENT_EVENT,
format!(
" \u{2398} approval auto-rejected for {tool_name} (no user channel)"
),
);
}
EngineEvent::AskUserRequest { question, .. } => {
let truncated: String = question.chars().take(80).collect();
self.persist(
sek::SUB_AGENT_EVENT,
format!(" \u{2398} ask-user auto-skipped: {truncated}"),
);
}
_ => {}
}
} else {
match &event {
EngineEvent::Info { message } => {
self.persist(sek::INFO, message.clone());
}
EngineEvent::BgTaskUpdate { .. } => {
if let Ok(json) = serde_json::to_string(&event) {
self.persist(sek::BG_TASK_UPDATE, json);
}
}
_ => {}
}
}
self.inner.emit(event);
}
}
#[cfg(any(test, feature = "test-support"))]
#[derive(Debug, Default)]
pub struct TestSink {
events: std::sync::Mutex<Vec<EngineEvent>>,
broadcaster: std::sync::Mutex<Option<tokio::sync::broadcast::Sender<EngineEvent>>>,
}
#[cfg(any(test, feature = "test-support"))]
impl TestSink {
pub fn new() -> Self {
Self::default()
}
pub fn events(&self) -> Vec<EngineEvent> {
self.events.lock().unwrap().clone()
}
pub fn len(&self) -> usize {
self.events.lock().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.events.lock().unwrap().is_empty()
}
pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<EngineEvent> {
let mut guard = self.broadcaster.lock().unwrap();
let sender = guard.get_or_insert_with(|| {
let (tx, _) = tokio::sync::broadcast::channel(256);
tx
});
sender.subscribe()
}
pub async fn wait_for<F>(
&self,
timeout: std::time::Duration,
pred: F,
) -> Result<EngineEvent, &'static str>
where
F: Fn(&EngineEvent) -> bool,
{
let mut rx = self.subscribe();
if let Some(ev) = self.events().into_iter().find(|e| pred(e)) {
return Ok(ev);
}
let deadline = tokio::time::Instant::now() + timeout;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err("timeout waiting for predicate");
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(ev)) if pred(&ev) => return Ok(ev),
Ok(Ok(_)) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
return Err("sink closed");
}
Err(_) => return Err("timeout waiting for predicate"),
}
}
}
}
#[cfg(any(test, feature = "test-support"))]
impl EngineSink for TestSink {
fn emit(&self, event: EngineEvent) {
if let Some(tx) = self.broadcaster.lock().unwrap().as_ref() {
let _ = tx.send(event.clone());
}
self.events.lock().unwrap().push(event);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sink_collects_events() {
let sink = TestSink::new();
assert!(sink.is_empty());
sink.emit(EngineEvent::ResponseStart);
sink.emit(EngineEvent::TextDelta {
text: "hello".into(),
});
sink.emit(EngineEvent::TextDone);
assert_eq!(sink.len(), 3);
let events = sink.events();
assert!(matches!(events[0], EngineEvent::ResponseStart));
assert!(matches!(&events[1], EngineEvent::TextDelta { text } if text == "hello"));
assert!(matches!(events[2], EngineEvent::TextDone));
}
#[test]
fn test_sink_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<TestSink>();
}
#[test]
fn test_trait_object_works() {
let sink: Box<dyn EngineSink> = Box::new(TestSink::new());
sink.emit(EngineEvent::Info {
message: "test".into(),
});
}
#[test]
fn buffering_sink_records_tool_calls_and_info() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::ToolCallStart {
id: "t1".into(),
name: "Read".into(),
args: serde_json::json!({"path": "foo.txt"}),
is_sub_agent: false,
});
sink.emit(EngineEvent::Info {
message: " \u{26a1} cache hit".into(),
});
sink.emit(EngineEvent::ToolCallStart {
id: "t2".into(),
name: "Bash".into(),
args: serde_json::json!({"command": "ls"}),
is_sub_agent: false,
});
let lines = sink.take_lines();
assert_eq!(lines.len(), 3);
assert!(lines[0].contains("Read"), "got: {}", lines[0]);
assert!(lines[1].contains("cache hit"), "got: {}", lines[1]);
assert!(lines[2].contains("Bash"), "got: {}", lines[2]);
}
#[test]
fn buffering_sink_drops_streaming_text() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::TextDelta {
text: "hello".into(),
});
sink.emit(EngineEvent::TextDelta {
text: " world".into(),
});
sink.emit(EngineEvent::TextDone);
sink.emit(EngineEvent::ThinkingDelta {
text: "reasoning".into(),
});
assert!(sink.take_lines().is_empty());
}
#[test]
fn buffering_sink_records_auto_reject_for_approval() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::ApprovalRequest {
id: "a1".into(),
tool_name: "Delete".into(),
detail: "foo.txt".into(),
preview: None,
effect: crate::tools::ToolEffect::Destructive,
});
let lines = sink.take_lines();
assert_eq!(lines.len(), 1);
assert!(lines[0].contains("Delete"));
assert!(
lines[0].contains("auto-rejected"),
"approval-without-channel must be marked as auto-rejected; got: {}",
lines[0]
);
}
#[test]
fn buffering_sink_caps_runaway_traces() {
let sink = BufferingSink::with_cap(3);
for i in 0..10 {
sink.emit(EngineEvent::Info {
message: format!("line {i}"),
});
}
let lines = sink.take_lines();
assert_eq!(lines.len(), 4, "got: {lines:?}");
assert!(lines.last().unwrap().starts_with('\u{2026}'));
assert!(lines.last().unwrap().contains("truncated"));
}
#[test]
fn buffering_sink_take_drains() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::Info {
message: "a".into(),
});
assert_eq!(sink.take_lines().len(), 1);
assert!(sink.take_lines().is_empty());
}
#[test]
fn buffering_sink_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<BufferingSink>();
}
}