use super::event::EngineEvent;
pub trait EngineSink: Send + Sync {
fn emit(&self, event: EngineEvent);
}
pub struct NullSink;
impl EngineSink for NullSink {
fn emit(&self, _event: EngineEvent) {}
}
pub struct BufferingSink {
lines: std::sync::Mutex<Vec<String>>,
cap: usize,
}
impl BufferingSink {
pub fn new() -> Self {
Self::with_cap(256)
}
pub fn with_cap(cap: usize) -> Self {
Self {
lines: std::sync::Mutex::new(Vec::new()),
cap,
}
}
pub fn take_lines(&self) -> Vec<String> {
std::mem::take(&mut *self.lines.lock().unwrap())
}
fn push_capped(&self, line: String) {
let mut guard = self.lines.lock().unwrap();
if guard.len() < self.cap {
guard.push(line);
} else if guard.last().map(|l| !l.starts_with('…')).unwrap_or(true) {
guard.push(format!("… (trace truncated at {} lines)", self.cap));
}
}
}
impl Default for BufferingSink {
fn default() -> Self {
Self::new()
}
}
impl EngineSink for BufferingSink {
fn emit(&self, event: EngineEvent) {
match event {
EngineEvent::ToolCallStart { name, .. } => {
self.push_capped(format!(" \u{1f527} {name}"));
}
EngineEvent::Info { message } => {
self.push_capped(message);
}
EngineEvent::ApprovalRequest { tool_name, .. } => {
self.push_capped(format!(
" \u{2398} approval auto-rejected for {tool_name} (no user channel)"
));
}
EngineEvent::AskUserRequest { question, .. } => {
self.push_capped(format!(
" \u{2398} ask-user auto-skipped: {}",
question.chars().take(80).collect::<String>()
));
}
_ => {}
}
}
}
pub struct ForwardingBgSink {
inner: BufferingSink,
emitter: crate::bg_agent::BgStatusEmitter,
}
impl ForwardingBgSink {
pub fn new(inner: BufferingSink, emitter: crate::bg_agent::BgStatusEmitter) -> Self {
Self { inner, emitter }
}
pub fn take_lines(&self) -> Vec<String> {
self.inner.take_lines()
}
}
impl EngineSink for ForwardingBgSink {
fn emit(&self, event: EngineEvent) {
match &event {
EngineEvent::ToolCallStart { name, args, .. } => {
self.emitter
.send_activity(crate::engine::event::BgChildActivityKind::ToolStart {
tool_name: name.clone(),
summary: summarize_tool_call(name, args),
});
}
EngineEvent::ToolCallResult { name, output, .. } => {
let success = !looks_like_tool_error(output);
self.emitter
.send_activity(crate::engine::event::BgChildActivityKind::ToolEnd {
tool_name: name.clone(),
success,
});
}
EngineEvent::Info { message } => {
self.emitter
.send_activity(crate::engine::event::BgChildActivityKind::Info {
message: message.clone(),
});
}
_ => {}
}
self.inner.emit(event);
}
}
fn summarize_tool_call(name: &str, args: &serde_json::Value) -> String {
const MAX_LEN: usize = 80;
let body = match name {
"Read" | "Edit" | "Write" | "Delete" => args
.get("path")
.and_then(|v| v.as_str())
.map(str::to_string),
"Bash" => args
.get("command")
.and_then(|v| v.as_str())
.map(str::to_string),
"Grep" => {
let pattern = args.get("pattern").and_then(|v| v.as_str()).unwrap_or("");
let path = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
Some(format!("{pattern} {path}"))
}
"InvokeAgent" => args
.get("agent")
.and_then(|v| v.as_str())
.map(str::to_string),
_ => None,
};
let body = body.unwrap_or_default();
let combined = if body.is_empty() {
name.to_string()
} else {
format!("{name} {body}")
};
if combined.chars().count() <= MAX_LEN {
combined
} else {
let truncated: String = combined.chars().take(MAX_LEN.saturating_sub(1)).collect();
format!("{truncated}\u{2026}")
}
}
fn looks_like_tool_error(output: &str) -> bool {
let head = output.trim_start();
head.starts_with("Error:") || head.starts_with("\u{274c}")
}
pub struct PersistingSink<'a> {
inner: &'a dyn EngineSink,
db: std::sync::Arc<dyn crate::persistence::Persistence>,
session_id: String,
parent_tool_call_id: Option<String>,
}
impl<'a> PersistingSink<'a> {
pub fn new(
inner: &'a dyn EngineSink,
db: std::sync::Arc<dyn crate::persistence::Persistence>,
session_id: String,
parent_tool_call_id: Option<String>,
) -> Self {
Self {
inner,
db,
session_id,
parent_tool_call_id,
}
}
fn persist(&self, kind: &'static str, payload: String) {
let db = self.db.clone();
let session_id = self.session_id.clone();
let parent = self.parent_tool_call_id.clone();
tokio::spawn(async move {
if let Err(e) = db
.insert_session_event(&session_id, kind, &payload, parent.as_deref())
.await
{
tracing::warn!(
error = %e, kind, session_id,
"failed to persist session event"
);
}
});
}
}
impl EngineSink for PersistingSink<'_> {
fn emit(&self, event: EngineEvent) {
use crate::persistence::session_event_kind as sek;
if self.parent_tool_call_id.is_some() {
match &event {
EngineEvent::Info { message } => {
self.persist(sek::SUB_AGENT_EVENT, message.clone());
}
EngineEvent::ToolCallStart { name, .. } => {
self.persist(sek::SUB_AGENT_EVENT, format!(" \u{1f527} {name}"));
}
EngineEvent::ApprovalRequest { tool_name, .. } => {
self.persist(
sek::SUB_AGENT_EVENT,
format!(
" \u{2398} approval auto-rejected for {tool_name} (no user channel)"
),
);
}
EngineEvent::AskUserRequest { question, .. } => {
let truncated: String = question.chars().take(80).collect();
self.persist(
sek::SUB_AGENT_EVENT,
format!(" \u{2398} ask-user auto-skipped: {truncated}"),
);
}
_ => {}
}
} else {
match &event {
EngineEvent::Info { message } => {
self.persist(sek::INFO, message.clone());
}
EngineEvent::BgTaskUpdate { .. } => {
if let Ok(json) = serde_json::to_string(&event) {
self.persist(sek::BG_TASK_UPDATE, json);
}
}
_ => {}
}
}
self.inner.emit(event);
}
}
#[cfg(any(test, feature = "test-support"))]
#[derive(Debug, Default)]
pub struct TestSink {
events: std::sync::Mutex<Vec<EngineEvent>>,
broadcaster: std::sync::Mutex<Option<tokio::sync::broadcast::Sender<EngineEvent>>>,
}
#[cfg(any(test, feature = "test-support"))]
impl TestSink {
pub fn new() -> Self {
Self::default()
}
pub fn events(&self) -> Vec<EngineEvent> {
self.events.lock().unwrap().clone()
}
pub fn len(&self) -> usize {
self.events.lock().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.events.lock().unwrap().is_empty()
}
pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<EngineEvent> {
let mut guard = self.broadcaster.lock().unwrap();
let sender = guard.get_or_insert_with(|| {
let (tx, _) = tokio::sync::broadcast::channel(256);
tx
});
sender.subscribe()
}
pub async fn wait_for<F>(
&self,
timeout: std::time::Duration,
pred: F,
) -> Result<EngineEvent, &'static str>
where
F: Fn(&EngineEvent) -> bool,
{
let mut rx = self.subscribe();
if let Some(ev) = self.events().into_iter().find(|e| pred(e)) {
return Ok(ev);
}
let deadline = tokio::time::Instant::now() + timeout;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err("timeout waiting for predicate");
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(ev)) if pred(&ev) => return Ok(ev),
Ok(Ok(_)) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
return Err("sink closed");
}
Err(_) => return Err("timeout waiting for predicate"),
}
}
}
}
#[cfg(any(test, feature = "test-support"))]
impl EngineSink for TestSink {
fn emit(&self, event: EngineEvent) {
if let Some(tx) = self.broadcaster.lock().unwrap().as_ref() {
let _ = tx.send(event.clone());
}
self.events.lock().unwrap().push(event);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sink_collects_events() {
let sink = TestSink::new();
assert!(sink.is_empty());
sink.emit(EngineEvent::ResponseStart);
sink.emit(EngineEvent::TextDelta {
text: "hello".into(),
});
sink.emit(EngineEvent::TextDone);
assert_eq!(sink.len(), 3);
let events = sink.events();
assert!(matches!(events[0], EngineEvent::ResponseStart));
assert!(matches!(&events[1], EngineEvent::TextDelta { text } if text == "hello"));
assert!(matches!(events[2], EngineEvent::TextDone));
}
#[test]
fn test_sink_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<TestSink>();
}
#[test]
fn test_trait_object_works() {
let sink: Box<dyn EngineSink> = Box::new(TestSink::new());
sink.emit(EngineEvent::Info {
message: "test".into(),
});
}
#[test]
fn buffering_sink_records_tool_calls_and_info() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::ToolCallStart {
id: "t1".into(),
name: "Read".into(),
args: serde_json::json!({"path": "foo.txt"}),
is_sub_agent: false,
});
sink.emit(EngineEvent::Info {
message: " \u{26a1} cache hit".into(),
});
sink.emit(EngineEvent::ToolCallStart {
id: "t2".into(),
name: "Bash".into(),
args: serde_json::json!({"command": "ls"}),
is_sub_agent: false,
});
let lines = sink.take_lines();
assert_eq!(lines.len(), 3);
assert!(lines[0].contains("Read"), "got: {}", lines[0]);
assert!(lines[1].contains("cache hit"), "got: {}", lines[1]);
assert!(lines[2].contains("Bash"), "got: {}", lines[2]);
}
#[test]
fn buffering_sink_drops_streaming_text() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::TextDelta {
text: "hello".into(),
});
sink.emit(EngineEvent::TextDelta {
text: " world".into(),
});
sink.emit(EngineEvent::TextDone);
sink.emit(EngineEvent::ThinkingDelta {
text: "reasoning".into(),
});
assert!(sink.take_lines().is_empty());
}
#[test]
fn buffering_sink_records_auto_reject_for_approval() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::ApprovalRequest {
id: "a1".into(),
tool_name: "Delete".into(),
detail: "foo.txt".into(),
preview: None,
effect: crate::tools::ToolEffect::Destructive,
});
let lines = sink.take_lines();
assert_eq!(lines.len(), 1);
assert!(lines[0].contains("Delete"));
assert!(
lines[0].contains("auto-rejected"),
"approval-without-channel must be marked as auto-rejected; got: {}",
lines[0]
);
}
#[test]
fn buffering_sink_caps_runaway_traces() {
let sink = BufferingSink::with_cap(3);
for i in 0..10 {
sink.emit(EngineEvent::Info {
message: format!("line {i}"),
});
}
let lines = sink.take_lines();
assert_eq!(lines.len(), 4, "got: {lines:?}");
assert!(lines.last().unwrap().starts_with('\u{2026}'));
assert!(lines.last().unwrap().contains("truncated"));
}
#[test]
fn buffering_sink_take_drains() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::Info {
message: "a".into(),
});
assert_eq!(sink.take_lines().len(), 1);
assert!(sink.take_lines().is_empty());
}
#[test]
fn buffering_sink_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<BufferingSink>();
}
fn make_test_emitter(
task_id: u32,
) -> (
std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
crate::bg_agent::BgStatusEmitter,
) {
let registry = crate::bg_agent::new_shared();
let (status_tx, _status_rx) =
tokio::sync::watch::channel(crate::bg_agent::AgentStatus::Pending);
let emitter =
crate::bg_agent::BgStatusEmitter::new(task_id, None, status_tx, registry.clone());
(registry, emitter)
}
#[test]
fn forwarding_bg_sink_emits_tool_start_and_end_to_registry() {
let (registry, emitter) = make_test_emitter(7);
let sink = ForwardingBgSink::new(BufferingSink::new(), emitter);
sink.emit(EngineEvent::ToolCallStart {
id: "t1".into(),
name: "Read".into(),
args: serde_json::json!({"path": "src/auth.rs"}),
is_sub_agent: false,
});
sink.emit(EngineEvent::ToolCallResult {
id: "t1".into(),
name: "Read".into(),
output: "<file contents>".into(),
});
let drained = registry.drain_status_events();
assert_eq!(
drained.len(),
2,
"each interesting event should fan out exactly one BgChildActivity"
);
assert!(matches!(
&drained[0],
EngineEvent::BgChildActivity {
task_id: 7,
kind: crate::engine::event::BgChildActivityKind::ToolStart { tool_name, summary },
..
} if tool_name == "Read" && summary.contains("src/auth.rs")
));
assert!(matches!(
&drained[1],
EngineEvent::BgChildActivity {
kind: crate::engine::event::BgChildActivityKind::ToolEnd { tool_name, success: true },
..
} if tool_name == "Read"
));
}
#[test]
fn forwarding_bg_sink_classifies_tool_errors() {
let (registry, emitter) = make_test_emitter(1);
let sink = ForwardingBgSink::new(BufferingSink::new(), emitter);
sink.emit(EngineEvent::ToolCallResult {
id: "t1".into(),
name: "Bash".into(),
output: "Error: command not found".into(),
});
let drained = registry.drain_status_events();
assert!(matches!(
&drained[0],
EngineEvent::BgChildActivity {
kind: crate::engine::event::BgChildActivityKind::ToolEnd { success: false, .. },
..
}
));
}
#[test]
fn forwarding_bg_sink_preserves_buffering_for_post_completion_drain() {
let (_registry, emitter) = make_test_emitter(1);
let sink = ForwardingBgSink::new(BufferingSink::new(), emitter);
sink.emit(EngineEvent::ToolCallStart {
id: "t1".into(),
name: "Read".into(),
args: serde_json::json!({"path": "foo"}),
is_sub_agent: false,
});
sink.emit(EngineEvent::Info {
message: " \u{26a1} cache hit".into(),
});
let lines = sink.take_lines();
assert_eq!(lines.len(), 2);
assert!(lines[0].contains("Read"));
assert!(lines[1].contains("cache hit"));
}
#[test]
fn forwarding_bg_sink_drops_streaming_text() {
let (registry, emitter) = make_test_emitter(1);
let sink = ForwardingBgSink::new(BufferingSink::new(), emitter);
sink.emit(EngineEvent::TextDelta {
text: "hello".into(),
});
sink.emit(EngineEvent::ThinkingDelta {
text: "reasoning".into(),
});
sink.emit(EngineEvent::TextDone);
assert!(registry.drain_status_events().is_empty());
assert!(sink.take_lines().is_empty());
}
#[test]
fn forwarding_bg_sink_summarizes_known_tool_args() {
let (registry, emitter) = make_test_emitter(1);
let sink = ForwardingBgSink::new(BufferingSink::new(), emitter);
sink.emit(EngineEvent::ToolCallStart {
id: "a".into(),
name: "Bash".into(),
args: serde_json::json!({"command": "cargo test"}),
is_sub_agent: false,
});
sink.emit(EngineEvent::ToolCallStart {
id: "b".into(),
name: "Grep".into(),
args: serde_json::json!({"pattern": "TODO", "path": "src/"}),
is_sub_agent: false,
});
sink.emit(EngineEvent::ToolCallStart {
id: "c".into(),
name: "InvokeAgent".into(),
args: serde_json::json!({"agent": "reviewer", "prompt": "x"}),
is_sub_agent: false,
});
let drained = registry.drain_status_events();
let summaries: Vec<String> = drained
.iter()
.filter_map(|e| match e {
EngineEvent::BgChildActivity {
kind: crate::engine::event::BgChildActivityKind::ToolStart { summary, .. },
..
} => Some(summary.clone()),
_ => None,
})
.collect();
assert_eq!(summaries.len(), 3);
assert!(summaries[0].contains("cargo test"), "got: {}", summaries[0]);
assert!(
summaries[1].contains("TODO") && summaries[1].contains("src/"),
"got: {}",
summaries[1]
);
assert!(summaries[2].contains("reviewer"), "got: {}", summaries[2]);
}
#[test]
fn forwarding_bg_sink_truncates_long_summaries() {
let (registry, emitter) = make_test_emitter(1);
let sink = ForwardingBgSink::new(BufferingSink::new(), emitter);
let long_cmd = "x".repeat(500);
sink.emit(EngineEvent::ToolCallStart {
id: "a".into(),
name: "Bash".into(),
args: serde_json::json!({"command": long_cmd}),
is_sub_agent: false,
});
let drained = registry.drain_status_events();
let summary = match &drained[0] {
EngineEvent::BgChildActivity {
kind: crate::engine::event::BgChildActivityKind::ToolStart { summary, .. },
..
} => summary.clone(),
_ => panic!("expected ToolStart"),
};
assert!(summary.chars().count() <= 80);
assert!(summary.ends_with('\u{2026}'));
}
#[test]
fn forwarding_bg_sink_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<ForwardingBgSink>();
}
}