use super::event::EngineEvent;
pub trait EngineSink: Send + Sync {
fn emit(&self, event: EngineEvent);
}
pub struct NullSink;
impl EngineSink for NullSink {
fn emit(&self, _event: EngineEvent) {}
}
pub struct BufferingSink {
lines: std::sync::Mutex<Vec<String>>,
cap: usize,
}
impl BufferingSink {
pub fn new() -> Self {
Self::with_cap(256)
}
pub fn with_cap(cap: usize) -> Self {
Self {
lines: std::sync::Mutex::new(Vec::new()),
cap,
}
}
pub fn take_lines(&self) -> Vec<String> {
std::mem::take(&mut *self.lines.lock().unwrap())
}
fn push_capped(&self, line: String) {
let mut guard = self.lines.lock().unwrap();
if guard.len() < self.cap {
guard.push(line);
} else if guard.last().map(|l| !l.starts_with('…')).unwrap_or(true) {
guard.push(format!("… (trace truncated at {} lines)", self.cap));
}
}
}
impl Default for BufferingSink {
fn default() -> Self {
Self::new()
}
}
impl EngineSink for BufferingSink {
fn emit(&self, event: EngineEvent) {
match event {
EngineEvent::ToolCallStart { name, .. } => {
self.push_capped(format!(" \u{1f527} {name}"));
}
EngineEvent::Info { message } => {
self.push_capped(message);
}
EngineEvent::ApprovalRequest { tool_name, .. } => {
self.push_capped(format!(
" \u{2398} approval auto-rejected for {tool_name} (no user channel)"
));
}
EngineEvent::AskUserRequest { question, .. } => {
self.push_capped(format!(
" \u{2398} ask-user auto-skipped: {}",
question.chars().take(80).collect::<String>()
));
}
_ => {}
}
}
}
pub struct ForwardingChildSink {
inner: BufferingSink,
emitter: crate::child_agent::ChildStatusEmitter,
}
impl ForwardingChildSink {
pub fn new(inner: BufferingSink, emitter: crate::child_agent::ChildStatusEmitter) -> Self {
Self { inner, emitter }
}
pub fn take_lines(&self) -> Vec<String> {
self.inner.take_lines()
}
}
impl EngineSink for ForwardingChildSink {
fn emit(&self, event: EngineEvent) {
match &event {
EngineEvent::ToolCallStart { name, args, .. } => {
self.emitter.send_activity(
crate::engine::event::ChildAgentActivityKind::ToolStart {
tool_name: name.clone(),
summary: summarize_tool_call(name, args),
},
);
}
EngineEvent::ToolCallResult { name, output, .. } => {
let success = !looks_like_tool_error(output);
self.emitter
.send_activity(crate::engine::event::ChildAgentActivityKind::ToolEnd {
tool_name: name.clone(),
success,
});
}
EngineEvent::Info { message } => {
self.emitter
.send_activity(crate::engine::event::ChildAgentActivityKind::Info {
message: message.clone(),
});
}
_ => {}
}
self.inner.emit(event);
}
}
pub struct FgForwardingSink<'a> {
inner: &'a dyn EngineSink,
emitter: crate::child_agent::ChildStatusEmitter,
}
impl<'a> FgForwardingSink<'a> {
pub fn new(inner: &'a dyn EngineSink, emitter: crate::child_agent::ChildStatusEmitter) -> Self {
Self { inner, emitter }
}
}
impl EngineSink for FgForwardingSink<'_> {
fn emit(&self, event: EngineEvent) {
match &event {
EngineEvent::ToolCallStart { name, args, .. } => {
self.emitter.send_activity(
crate::engine::event::ChildAgentActivityKind::ToolStart {
tool_name: name.clone(),
summary: summarize_tool_call(name, args),
},
);
}
EngineEvent::ToolCallResult { name, output, .. } => {
let success = !looks_like_tool_error(output);
self.emitter
.send_activity(crate::engine::event::ChildAgentActivityKind::ToolEnd {
tool_name: name.clone(),
success,
});
}
EngineEvent::Info { message } => {
self.emitter
.send_activity(crate::engine::event::ChildAgentActivityKind::Info {
message: message.clone(),
});
}
_ => {}
}
self.inner.emit(event);
}
}
fn summarize_tool_call(name: &str, args: &serde_json::Value) -> String {
const MAX_LEN: usize = 80;
let body = match name {
"Read" | "Edit" | "Write" | "Delete" => args
.get("path")
.and_then(|v| v.as_str())
.map(str::to_string),
"Bash" => args
.get("command")
.and_then(|v| v.as_str())
.map(str::to_string),
"Grep" => {
let pattern = args.get("pattern").and_then(|v| v.as_str()).unwrap_or("");
let path = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
Some(format!("{pattern} {path}"))
}
"InvokeAgent" => args
.get("agent")
.and_then(|v| v.as_str())
.map(str::to_string),
_ => None,
};
let body = body.unwrap_or_default();
let combined = if body.is_empty() {
name.to_string()
} else {
format!("{name} {body}")
};
if combined.chars().count() <= MAX_LEN {
combined
} else {
let truncated: String = combined.chars().take(MAX_LEN.saturating_sub(1)).collect();
format!("{truncated}\u{2026}")
}
}
fn looks_like_tool_error(output: &str) -> bool {
let head = output.trim_start();
head.starts_with("Error:") || head.starts_with("\u{274c}")
}
pub struct PersistingSink<'a> {
inner: &'a dyn EngineSink,
db: std::sync::Arc<dyn crate::persistence::Persistence>,
session_id: String,
parent_tool_call_id: Option<String>,
}
impl<'a> PersistingSink<'a> {
pub fn new(
inner: &'a dyn EngineSink,
db: std::sync::Arc<dyn crate::persistence::Persistence>,
session_id: String,
parent_tool_call_id: Option<String>,
) -> Self {
Self {
inner,
db,
session_id,
parent_tool_call_id,
}
}
fn persist(&self, kind: &'static str, payload: String) {
let db = self.db.clone();
let session_id = self.session_id.clone();
let parent = self.parent_tool_call_id.clone();
tokio::spawn(async move {
if let Err(e) = db
.insert_session_event(&session_id, kind, &payload, parent.as_deref())
.await
{
tracing::warn!(
error = %e, kind, session_id,
"failed to persist session event"
);
}
});
}
}
pub(crate) fn classify_for_persist(
event: &EngineEvent,
parent_tool_call_id: Option<&str>,
) -> Option<(&'static str, String)> {
use crate::persistence::session_event_kind as sek;
if parent_tool_call_id.is_some() {
match event {
EngineEvent::Info { message } => Some((sek::SUB_AGENT_EVENT, message.clone())),
EngineEvent::ToolCallStart { name, .. } => {
Some((sek::SUB_AGENT_EVENT, format!(" \u{1f527} {name}")))
}
EngineEvent::ApprovalRequest { tool_name, .. } => Some((
sek::SUB_AGENT_EVENT,
format!(" \u{2398} approval auto-rejected for {tool_name} (no user channel)"),
)),
EngineEvent::AskUserRequest { question, .. } => {
let truncated: String = question.chars().take(80).collect();
Some((
sek::SUB_AGENT_EVENT,
format!(" \u{2398} ask-user auto-skipped: {truncated}"),
))
}
_ => None,
}
} else {
match event {
EngineEvent::Info { message } => Some((sek::INFO, message.clone())),
EngineEvent::ChildTaskUpdate { .. } => {
serde_json::to_string(event)
.ok()
.map(|json| (sek::BG_TASK_UPDATE, json))
}
_ => None,
}
}
}
impl EngineSink for PersistingSink<'_> {
fn emit(&self, event: EngineEvent) {
if let Some((kind, payload)) =
classify_for_persist(&event, self.parent_tool_call_id.as_deref())
{
self.persist(kind, payload);
}
self.inner.emit(event);
}
}
#[cfg(any(test, feature = "test-support"))]
#[derive(Debug, Default)]
pub struct TestSink {
events: std::sync::Mutex<Vec<EngineEvent>>,
broadcaster: std::sync::Mutex<Option<tokio::sync::broadcast::Sender<EngineEvent>>>,
}
#[cfg(any(test, feature = "test-support"))]
impl TestSink {
pub fn new() -> Self {
Self::default()
}
pub fn events(&self) -> Vec<EngineEvent> {
self.events.lock().unwrap().clone()
}
pub fn len(&self) -> usize {
self.events.lock().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.events.lock().unwrap().is_empty()
}
pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<EngineEvent> {
let mut guard = self.broadcaster.lock().unwrap();
let sender = guard.get_or_insert_with(|| {
let (tx, _) = tokio::sync::broadcast::channel(256);
tx
});
sender.subscribe()
}
pub async fn wait_for<F>(
&self,
timeout: std::time::Duration,
pred: F,
) -> Result<EngineEvent, &'static str>
where
F: Fn(&EngineEvent) -> bool,
{
let mut rx = self.subscribe();
if let Some(ev) = self.events().into_iter().find(|e| pred(e)) {
return Ok(ev);
}
let deadline = tokio::time::Instant::now() + timeout;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err("timeout waiting for predicate");
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(ev)) if pred(&ev) => return Ok(ev),
Ok(Ok(_)) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
return Err("sink closed");
}
Err(_) => return Err("timeout waiting for predicate"),
}
}
}
pub async fn wait_for_map<F, T>(
&self,
timeout: std::time::Duration,
mut extract: F,
) -> Result<T, &'static str>
where
F: FnMut(&EngineEvent) -> Option<T>,
{
let mut rx = self.subscribe();
if let Some(value) = self.events().iter().find_map(&mut extract) {
return Ok(value);
}
let deadline = tokio::time::Instant::now() + timeout;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err("timeout waiting for predicate");
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(ev)) => {
if let Some(value) = extract(&ev) {
return Ok(value);
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
return Err("sink closed");
}
Err(_) => return Err("timeout waiting for predicate"),
}
}
}
}
#[cfg(any(test, feature = "test-support"))]
impl EngineSink for TestSink {
fn emit(&self, event: EngineEvent) {
if let Some(tx) = self.broadcaster.lock().unwrap().as_ref() {
let _ = tx.send(event.clone());
}
self.events.lock().unwrap().push(event);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sink_collects_events() {
let sink = TestSink::new();
assert!(sink.is_empty());
sink.emit(EngineEvent::ResponseStart);
sink.emit(EngineEvent::TextDelta {
text: "hello".into(),
});
sink.emit(EngineEvent::TextDone);
assert_eq!(sink.len(), 3);
let events = sink.events();
assert!(matches!(events[0], EngineEvent::ResponseStart));
assert!(matches!(&events[1], EngineEvent::TextDelta { text } if text == "hello"));
assert!(matches!(events[2], EngineEvent::TextDone));
}
#[test]
fn test_sink_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<TestSink>();
}
#[test]
fn test_trait_object_works() {
let sink: Box<dyn EngineSink> = Box::new(TestSink::new());
sink.emit(EngineEvent::Info {
message: "test".into(),
});
}
#[test]
fn buffering_sink_records_tool_calls_and_info() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::ToolCallStart {
id: "t1".into(),
name: "Read".into(),
args: serde_json::json!({"path": "foo.txt"}),
is_sub_agent: false,
});
sink.emit(EngineEvent::Info {
message: " \u{26a1} cache hit".into(),
});
sink.emit(EngineEvent::ToolCallStart {
id: "t2".into(),
name: "Bash".into(),
args: serde_json::json!({"command": "ls"}),
is_sub_agent: false,
});
let lines = sink.take_lines();
assert_eq!(lines.len(), 3);
assert!(lines[0].contains("Read"), "got: {}", lines[0]);
assert!(lines[1].contains("cache hit"), "got: {}", lines[1]);
assert!(lines[2].contains("Bash"), "got: {}", lines[2]);
}
#[test]
fn buffering_sink_drops_streaming_text() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::TextDelta {
text: "hello".into(),
});
sink.emit(EngineEvent::TextDelta {
text: " world".into(),
});
sink.emit(EngineEvent::TextDone);
sink.emit(EngineEvent::ThinkingDelta {
text: "reasoning".into(),
});
assert!(sink.take_lines().is_empty());
}
#[test]
fn buffering_sink_records_auto_reject_for_approval() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::ApprovalRequest {
id: "a1".into(),
tool_name: "Delete".into(),
detail: "foo.txt".into(),
preview: None,
effect: crate::tools::ToolEffect::Destructive,
});
let lines = sink.take_lines();
assert_eq!(lines.len(), 1);
assert!(lines[0].contains("Delete"));
assert!(
lines[0].contains("auto-rejected"),
"approval-without-channel must be marked as auto-rejected; got: {}",
lines[0]
);
}
#[test]
fn buffering_sink_caps_runaway_traces() {
let sink = BufferingSink::with_cap(3);
for i in 0..10 {
sink.emit(EngineEvent::Info {
message: format!("line {i}"),
});
}
let lines = sink.take_lines();
assert_eq!(lines.len(), 4, "got: {lines:?}");
assert!(lines.last().unwrap().starts_with('\u{2026}'));
assert!(lines.last().unwrap().contains("truncated"));
}
#[test]
fn buffering_sink_take_drains() {
let sink = BufferingSink::new();
sink.emit(EngineEvent::Info {
message: "a".into(),
});
assert_eq!(sink.take_lines().len(), 1);
assert!(sink.take_lines().is_empty());
}
#[test]
fn buffering_sink_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<BufferingSink>();
}
fn make_test_emitter(
task_id: u32,
) -> (
std::sync::Arc<crate::child_agent::ChildAgentRegistry>,
crate::child_agent::ChildStatusEmitter,
) {
let registry = crate::child_agent::new_shared();
let (status_tx, _status_rx) =
tokio::sync::watch::channel(crate::child_agent::AgentStatus::Pending);
let emitter = crate::child_agent::ChildStatusEmitter::new(
task_id,
None,
true,
status_tx,
registry.clone(),
);
(registry, emitter)
}
#[test]
fn forwarding_child_sink_emits_tool_start_and_end_to_registry() {
let (registry, emitter) = make_test_emitter(7);
let sink = ForwardingChildSink::new(BufferingSink::new(), emitter);
sink.emit(EngineEvent::ToolCallStart {
id: "t1".into(),
name: "Read".into(),
args: serde_json::json!({"path": "src/auth.rs"}),
is_sub_agent: false,
});
sink.emit(EngineEvent::ToolCallResult {
id: "t1".into(),
name: "Read".into(),
output: "<file contents>".into(),
});
let drained = registry.drain_status_events();
assert_eq!(
drained.len(),
2,
"each interesting event should fan out exactly one ChildAgentActivity"
);
assert!(matches!(
&drained[0],
EngineEvent::ChildAgentActivity {
task_id: 7,
kind: crate::engine::event::ChildAgentActivityKind::ToolStart { tool_name, summary },
..
} if tool_name == "Read" && summary.contains("src/auth.rs")
));
assert!(matches!(
&drained[1],
EngineEvent::ChildAgentActivity {
kind: crate::engine::event::ChildAgentActivityKind::ToolEnd { tool_name, success: true },
..
} if tool_name == "Read"
));
}
#[test]
fn forwarding_child_sink_classifies_tool_errors() {
let (registry, emitter) = make_test_emitter(1);
let sink = ForwardingChildSink::new(BufferingSink::new(), emitter);
sink.emit(EngineEvent::ToolCallResult {
id: "t1".into(),
name: "Bash".into(),
output: "Error: command not found".into(),
});
let drained = registry.drain_status_events();
assert!(matches!(
&drained[0],
EngineEvent::ChildAgentActivity {
kind: crate::engine::event::ChildAgentActivityKind::ToolEnd { success: false, .. },
..
}
));
}
#[test]
fn forwarding_child_sink_preserves_buffering_for_post_completion_drain() {
let (_registry, emitter) = make_test_emitter(1);
let sink = ForwardingChildSink::new(BufferingSink::new(), emitter);
sink.emit(EngineEvent::ToolCallStart {
id: "t1".into(),
name: "Read".into(),
args: serde_json::json!({"path": "foo"}),
is_sub_agent: false,
});
sink.emit(EngineEvent::Info {
message: " \u{26a1} cache hit".into(),
});
let lines = sink.take_lines();
assert_eq!(lines.len(), 2);
assert!(lines[0].contains("Read"));
assert!(lines[1].contains("cache hit"));
}
#[test]
fn forwarding_child_sink_drops_streaming_text() {
let (registry, emitter) = make_test_emitter(1);
let sink = ForwardingChildSink::new(BufferingSink::new(), emitter);
sink.emit(EngineEvent::TextDelta {
text: "hello".into(),
});
sink.emit(EngineEvent::ThinkingDelta {
text: "reasoning".into(),
});
sink.emit(EngineEvent::TextDone);
assert!(registry.drain_status_events().is_empty());
assert!(sink.take_lines().is_empty());
}
#[test]
fn forwarding_child_sink_summarizes_known_tool_args() {
let (registry, emitter) = make_test_emitter(1);
let sink = ForwardingChildSink::new(BufferingSink::new(), emitter);
sink.emit(EngineEvent::ToolCallStart {
id: "a".into(),
name: "Bash".into(),
args: serde_json::json!({"command": "cargo test"}),
is_sub_agent: false,
});
sink.emit(EngineEvent::ToolCallStart {
id: "b".into(),
name: "Grep".into(),
args: serde_json::json!({"pattern": "TODO", "path": "src/"}),
is_sub_agent: false,
});
sink.emit(EngineEvent::ToolCallStart {
id: "c".into(),
name: "InvokeAgent".into(),
args: serde_json::json!({"agent": "reviewer", "prompt": "x"}),
is_sub_agent: false,
});
let drained = registry.drain_status_events();
let summaries: Vec<String> = drained
.iter()
.filter_map(|e| match e {
EngineEvent::ChildAgentActivity {
kind: crate::engine::event::ChildAgentActivityKind::ToolStart { summary, .. },
..
} => Some(summary.clone()),
_ => None,
})
.collect();
assert_eq!(summaries.len(), 3);
assert!(summaries[0].contains("cargo test"), "got: {}", summaries[0]);
assert!(
summaries[1].contains("TODO") && summaries[1].contains("src/"),
"got: {}",
summaries[1]
);
assert!(summaries[2].contains("reviewer"), "got: {}", summaries[2]);
}
#[test]
fn forwarding_child_sink_truncates_long_summaries() {
let (registry, emitter) = make_test_emitter(1);
let sink = ForwardingChildSink::new(BufferingSink::new(), emitter);
let long_cmd = "x".repeat(500);
sink.emit(EngineEvent::ToolCallStart {
id: "a".into(),
name: "Bash".into(),
args: serde_json::json!({"command": long_cmd}),
is_sub_agent: false,
});
let drained = registry.drain_status_events();
let summary = match &drained[0] {
EngineEvent::ChildAgentActivity {
kind: crate::engine::event::ChildAgentActivityKind::ToolStart { summary, .. },
..
} => summary.clone(),
_ => panic!("expected ToolStart"),
};
assert!(summary.chars().count() <= 80);
assert!(summary.ends_with('\u{2026}'));
}
#[test]
fn forwarding_child_sink_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<ForwardingChildSink>();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wait_for_map_returns_extracted_value_for_already_emitted_event() {
let sink = TestSink::new();
sink.emit(EngineEvent::Info {
message: "first".into(),
});
sink.emit(EngineEvent::Info {
message: "second".into(),
});
let result: Result<String, _> = sink
.wait_for_map(std::time::Duration::from_secs(1), |e| match e {
EngineEvent::Info { message } if message == "second" => Some(message.clone()),
_ => None,
})
.await;
assert_eq!(result.unwrap(), "second");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wait_for_map_resolves_on_event_emitted_after_subscribe() {
let sink = std::sync::Arc::new(TestSink::new());
let sink2 = std::sync::Arc::clone(&sink);
let task = tokio::spawn(async move {
tokio::task::yield_now().await;
sink2.emit(EngineEvent::Info {
message: "hello".into(),
});
});
let value: String = sink
.wait_for_map(std::time::Duration::from_secs(5), |e| match e {
EngineEvent::Info { message } => Some(message.clone()),
_ => None,
})
.await
.expect("should resolve before timeout");
assert_eq!(value, "hello");
task.await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wait_for_map_skips_non_matching_events() {
use crate::engine::event::TurnEndReason;
let sink = std::sync::Arc::new(TestSink::new());
let sink2 = std::sync::Arc::clone(&sink);
tokio::spawn(async move {
sink2.emit(EngineEvent::Info {
message: "noise-1".into(),
});
sink2.emit(EngineEvent::Info {
message: "noise-2".into(),
});
sink2.emit(EngineEvent::TurnEnd {
turn_id: "t-1".into(),
reason: TurnEndReason::Complete,
});
sink2.emit(EngineEvent::Info {
message: "noise-3".into(),
});
});
let turn_id: String = sink
.wait_for_map(std::time::Duration::from_secs(5), |e| match e {
EngineEvent::TurnEnd { turn_id, .. } => Some(turn_id.clone()),
_ => None,
})
.await
.expect("TurnEnd should arrive");
assert_eq!(turn_id, "t-1");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wait_for_map_returns_err_on_timeout() {
let sink = TestSink::new();
sink.emit(EngineEvent::Info {
message: "unrelated".into(),
});
let result: Result<(), _> = sink
.wait_for_map(std::time::Duration::from_millis(50), |e| match e {
EngineEvent::TurnEnd { .. } => Some(()),
_ => None,
})
.await;
assert!(result.is_err(), "expected timeout error, got {result:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wait_for_returns_event_for_already_emitted_match() {
use crate::engine::event::TurnEndReason;
let sink = TestSink::new();
sink.emit(EngineEvent::TurnEnd {
turn_id: "t-1".into(),
reason: TurnEndReason::Complete,
});
let ev = sink
.wait_for(std::time::Duration::from_secs(1), |e| {
matches!(e, EngineEvent::TurnEnd { .. })
})
.await
.expect("already-emitted event should resolve immediately");
assert!(matches!(ev, EngineEvent::TurnEnd { .. }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wait_for_returns_err_on_timeout() {
let sink = TestSink::new();
let result = sink
.wait_for(std::time::Duration::from_millis(50), |e| {
matches!(e, EngineEvent::TurnEnd { .. })
})
.await;
assert!(result.is_err());
}
use crate::child_agent::AgentStatus;
use crate::persistence::session_event_kind as sek;
fn child_task_update_event() -> EngineEvent {
EngineEvent::ChildTaskUpdate {
task_id: 42,
spawner: Some(7),
is_background: true,
status: AgentStatus::Pending,
}
}
#[test]
fn classify_top_level_persists_info_with_message_payload() {
let event = EngineEvent::Info {
message: "hello".into(),
};
let decision = classify_for_persist(&event, None);
assert_eq!(decision, Some((sek::INFO, "hello".to_string())));
}
#[test]
fn classify_top_level_persists_child_task_update_with_json_payload() {
let event = child_task_update_event();
let (kind, payload) =
classify_for_persist(&event, None).expect("ChildTaskUpdate must persist top-level");
assert_eq!(kind, sek::BG_TASK_UPDATE);
let parsed: serde_json::Value =
serde_json::from_str(&payload).expect("payload must parse as JSON");
assert_eq!(parsed["task_id"], 42);
assert_eq!(parsed["is_background"], true);
}
#[test]
fn classify_top_level_skips_non_allowlisted_events() {
let cases = [
EngineEvent::ResponseStart,
EngineEvent::TextDelta { text: "a".into() },
EngineEvent::TextDone,
EngineEvent::ToolCallStart {
id: "call-1".into(),
name: "Read".into(),
args: serde_json::json!({"path": "f.txt"}),
is_sub_agent: false,
},
];
for event in cases {
assert_eq!(
classify_for_persist(&event, None),
None,
"top-level must skip {event:?}"
);
}
}
#[test]
fn classify_sub_agent_persists_info_with_sub_agent_kind() {
let event = EngineEvent::Info {
message: "sub said hi".into(),
};
let decision = classify_for_persist(&event, Some("parent-call"));
assert_eq!(
decision,
Some((sek::SUB_AGENT_EVENT, "sub said hi".to_string()))
);
}
#[test]
fn classify_sub_agent_persists_tool_call_start_as_rendered_line() {
let event = EngineEvent::ToolCallStart {
id: "c-1".into(),
name: "Read".into(),
args: serde_json::json!({}),
is_sub_agent: false,
};
let (kind, payload) = classify_for_persist(&event, Some("parent-call"))
.expect("ToolCallStart must persist on sub-agent path");
assert_eq!(kind, sek::SUB_AGENT_EVENT);
assert_eq!(payload, " \u{1f527} Read");
}
#[test]
fn classify_sub_agent_persists_approval_request_as_auto_reject_line() {
let event = EngineEvent::ApprovalRequest {
id: "a-1".into(),
tool_name: "WriteFile".into(),
detail: "write 1 file".into(),
preview: None,
effect: crate::tools::ToolEffect::LocalMutation,
};
let (kind, payload) = classify_for_persist(&event, Some("parent-call"))
.expect("ApprovalRequest must persist on sub-agent path");
assert_eq!(kind, sek::SUB_AGENT_EVENT);
assert!(
payload.contains("approval auto-rejected for WriteFile"),
"payload should explain the auto-rejection: {payload:?}"
);
}
#[test]
fn classify_sub_agent_truncates_ask_user_question_to_80_chars() {
let long_question = "q".repeat(200);
let event = EngineEvent::AskUserRequest {
id: "q-1".into(),
question: long_question,
options: Vec::new(),
};
let (kind, payload) = classify_for_persist(&event, Some("parent-call"))
.expect("AskUserRequest must persist on sub-agent path");
assert_eq!(kind, sek::SUB_AGENT_EVENT);
let prefix = " \u{2398} ask-user auto-skipped: ";
let question_part = payload.strip_prefix(prefix).expect("prefix should match");
assert_eq!(
question_part.chars().count(),
80,
"question must truncate to 80 chars, got {} in {payload:?}",
question_part.chars().count()
);
}
#[test]
fn classify_sub_agent_skips_child_task_update() {
let event = child_task_update_event();
assert_eq!(classify_for_persist(&event, Some("parent-call")), None);
}
#[test]
fn classify_sub_agent_skips_other_non_allowlisted_events() {
let cases = [
EngineEvent::ResponseStart,
EngineEvent::TextDelta { text: "a".into() },
EngineEvent::TextDone,
];
for event in cases {
assert_eq!(
classify_for_persist(&event, Some("parent-call")),
None,
"sub-agent must skip {event:?}"
);
}
}
}