use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use serde_json::Value;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::event::AgentEvent;
use super::bridge::EventBridge;
use super::heal;
use super::message::{LoopMessage, UserMessage, loop_message_to_value};
use super::run::run_agent_loop;
use super::steering::steering_from_queue;
use super::stream::StreamFn;
use super::tool::{AbortSignal, LoopTool};
use super::types::{Context, LoopConfig, QueueMode, ToolExecutionMode};
pub struct LoopRunner {
pub event_rx: mpsc::Receiver<AgentEvent>,
pub task: JoinHandle<()>,
pub signal: AbortSignal,
}
impl LoopRunner {
pub fn into_agent_runner(self) -> crate::agent::runner::AgentRunner {
let (interject_tx, mut interject_rx) = mpsc::channel::<()>(64);
let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(64);
let signal_for_interject = self.signal.clone();
let signal_for_cancel = self.signal.clone();
tokio::spawn(async move {
if interject_rx.recv().await.is_some() {
signal_for_interject.interject();
while interject_rx.try_recv().is_ok() {}
}
});
tokio::spawn(async move {
if cancel_rx.recv().await.is_some() {
signal_for_cancel.cancel();
while cancel_rx.try_recv().is_ok() {}
}
});
crate::agent::runner::AgentRunner {
event_rx: self.event_rx,
task: self.task,
interject_tx,
cancel_tx,
}
}
}
pub fn rig_message_to_loop_messages(m: rig::completion::Message) -> Vec<LoopMessage> {
use super::message::{AssistantMessage, ContentBlock, StopReason, ToolResultMessage};
use rig::completion::message::{AssistantContent, Message, UserContent};
match m {
Message::System { .. } => Vec::new(),
Message::User { content } => {
let mut text_parts: Vec<String> = Vec::new();
let mut tool_results: Vec<LoopMessage> = Vec::new();
for part in content.into_iter() {
match part {
UserContent::Text(t) => text_parts.push(t.text),
UserContent::ToolResult(tr) => {
let body = tr
.content
.into_iter()
.filter_map(|c| match c {
rig::completion::message::ToolResultContent::Text(t) => {
Some(t.text)
}
_ => None,
})
.collect::<Vec<_>>()
.join("\n");
tool_results.push(LoopMessage::ToolResult(ToolResultMessage {
tool_call_id: tr.id,
tool_name: String::new(), content: vec![ContentBlock::Text { text: body }],
details: serde_json::Value::Null,
is_error: false,
}));
}
_ => {}
}
}
let mut out = Vec::new();
if !text_parts.is_empty() {
out.push(LoopMessage::User(UserMessage {
content: text_parts.join("\n"),
}));
}
out.extend(tool_results);
out
}
Message::Assistant { content, .. } => {
let mut blocks: Vec<ContentBlock> = Vec::new();
for part in content.into_iter() {
match part {
AssistantContent::Text(t) => blocks.push(ContentBlock::Text { text: t.text }),
AssistantContent::ToolCall(tc) => {
blocks.push(ContentBlock::ToolCall {
id: tc.id,
name: tc.function.name,
arguments: tc.function.arguments,
});
}
AssistantContent::Reasoning(r) => {
let text = r
.content
.iter()
.filter_map(|c| match c {
rig::completion::message::ReasoningContent::Text {
text, ..
} => Some(text.clone()),
rig::completion::message::ReasoningContent::Summary(s) => {
Some(s.clone())
}
_ => None,
})
.collect::<Vec<_>>()
.join("\n");
blocks.push(ContentBlock::Thinking { text });
}
AssistantContent::Image(_) => {}
}
}
if blocks.is_empty() {
Vec::new()
} else {
let has_tool = blocks
.iter()
.any(|b| matches!(b, ContentBlock::ToolCall { .. }));
let stop_reason = if has_tool {
StopReason::ToolUse
} else {
StopReason::Stop
};
vec![LoopMessage::Assistant(AssistantMessage {
content: blocks,
stop_reason,
error_message: None,
})]
}
}
}
}
pub fn rig_history_to_loop_messages(history: Vec<rig::completion::Message>) -> Vec<LoopMessage> {
history
.into_iter()
.flat_map(rig_message_to_loop_messages)
.collect()
}
pub fn rig_history_system_prompt(history: &[rig::completion::Message]) -> String {
use rig::completion::message::Message;
history
.iter()
.filter_map(|m| match m {
Message::System { content } => Some(content.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("\n\n")
}
pub struct LoopSpawnConfig {
pub stream_fn: StreamFn,
pub system_prompt: String,
pub history: Vec<LoopMessage>,
pub initial_prompt: String,
pub tools: Vec<Arc<dyn LoopTool>>,
#[cfg(feature = "plugin")]
pub plugin_mgr: Option<Arc<Mutex<crate::plugin::PluginManager>>>,
pub steering_queue: Option<Arc<Mutex<VecDeque<String>>>>,
pub tool_execution: ToolExecutionMode,
pub event_channel_capacity: usize,
pub provider_name: Option<String>,
pub model_name: Option<String>,
pub summarize_fn: Option<crate::agent::compression::SummarizeFn>,
pub tool_def_filter: Option<Arc<std::sync::Mutex<std::collections::HashSet<String>>>>,
pub dynamic_tool_search: bool,
pub escalation_stream_fn: Option<StreamFn>,
pub escalation_provider_name: Option<String>,
pub escalation_max_per_session: Option<usize>,
pub file_touch_tracker:
Option<std::sync::Arc<crate::agent::agent_loop::context_depth::FileTouchTracker>>,
pub verifier: Option<std::sync::Arc<crate::agent::agent_loop::verifier::VerifierGate>>,
pub critic_fn: Option<crate::agent::agent_loop::critic::CriticFn>,
pub goal: Option<String>,
pub max_turns: Option<usize>,
pub bg_store: Option<crate::agent::tools::background::BackgroundStore>,
pub memory_provider: Option<std::sync::Arc<dyn crate::extras::memory_provider::MemoryProvider>>,
}
impl LoopSpawnConfig {
pub fn minimal(stream_fn: StreamFn, prompt: impl Into<String>) -> Self {
Self {
stream_fn,
system_prompt: String::new(),
history: Vec::new(),
initial_prompt: prompt.into(),
tools: Vec::new(),
provider_name: None,
model_name: None,
#[cfg(feature = "plugin")]
plugin_mgr: None,
steering_queue: None,
tool_execution: ToolExecutionMode::Parallel,
event_channel_capacity: 256,
summarize_fn: None,
tool_def_filter: None,
dynamic_tool_search: false,
escalation_stream_fn: None,
escalation_provider_name: None,
escalation_max_per_session: None,
file_touch_tracker: None,
verifier: None,
critic_fn: None,
goal: None,
max_turns: None,
bg_store: None,
memory_provider: None,
}
}
}
pub fn spawn_loop_runner(cfg: LoopSpawnConfig) -> LoopRunner {
let (event_tx, event_rx) = mpsc::channel::<AgentEvent>(cfg.event_channel_capacity);
let signal = AbortSignal::new();
let signal_for_task = signal.clone();
#[cfg_attr(not(feature = "plugin"), allow(unused_mut))]
let mut loop_config = LoopConfig {
convert_to_llm: default_convert_to_llm(),
transform_context: None,
compaction_hooks: None,
get_api_key: None,
api_key: None,
tool_execution: cfg.tool_execution,
before_tool_call: None,
after_tool_call: None,
prepare_next_turn: None,
should_stop_after_turn: None,
get_steering_messages: cfg
.steering_queue
.map(|q| steering_from_queue(q, QueueMode::All)),
get_followup_messages: cfg
.bg_store
.clone()
.map(|store| crate::agent::tools::background::followup_from_background_store(store)),
reasoning: None,
thinking_budgets: None,
headers: std::collections::HashMap::new(),
metadata: std::collections::HashMap::new(),
request_timeout: None,
provider_name: cfg.provider_name.clone(),
model_name: cfg.model_name.clone(),
compact_model: None,
storm_mutating_tools: None,
storm_exempt_tools: None,
repair_stats: std::sync::Arc::new(
crate::agent::agent_loop::tool_input_repair::RepairStats::new(),
),
truncation_notes: std::sync::Arc::new(std::sync::Mutex::new(
std::collections::HashMap::new(),
)),
tool_def_filter: cfg.tool_def_filter.clone(),
dynamic_tool_search: cfg.dynamic_tool_search,
escalation_stream_fn: cfg.escalation_stream_fn.clone(),
escalation_provider_name: cfg.escalation_provider_name.clone(),
escalation_pending: std::sync::Arc::new(std::sync::Mutex::new(None)),
escalation_max_per_session: cfg.escalation_max_per_session.unwrap_or(3),
escalation_remaining: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(
cfg.escalation_max_per_session.unwrap_or(3),
)),
file_touch_tracker: cfg.file_touch_tracker.clone(),
verifier: cfg.verifier.clone(),
critic_fn: cfg.critic_fn.clone(),
goal: cfg.goal.clone(),
max_turns: cfg.max_turns,
};
#[cfg(feature = "plugin")]
{
if let Some(pm) = cfg.plugin_mgr {
loop_config.before_tool_call = Some(
super::plugin_hooks::before_hook_from_plugin_manager(pm.clone()),
);
loop_config.after_tool_call = Some(
super::plugin_hooks::after_hook_from_plugin_manager(pm.clone()),
);
if loop_config.transform_context.is_none() {
loop_config.transform_context = Some(
super::plugin_hooks::transform_context_from_plugin_manager(pm.clone()),
);
}
if loop_config.compaction_hooks.is_none() {
loop_config.compaction_hooks = Some(
super::plugin_hooks::compaction_hooks_from_plugin_manager(pm.clone()),
);
}
loop_config.prepare_next_turn = Some(
super::plugin_hooks::prepare_next_turn_from_plugin_manager(pm.clone()),
);
loop_config.should_stop_after_turn =
Some(super::plugin_hooks::should_stop_after_turn_from_plugin_manager(pm.clone()));
if loop_config.get_steering_messages.is_none() {
loop_config.get_steering_messages = Some(
super::plugin_hooks::get_steering_messages_from_plugin_manager(pm.clone()),
);
}
let plugin_followup =
super::plugin_hooks::get_followup_messages_from_plugin_manager(pm);
loop_config.get_followup_messages = match loop_config.get_followup_messages.take() {
Some(bg_followup) => Some(std::sync::Arc::new(move || {
let bg = bg_followup.clone();
let pl = plugin_followup.clone();
Box::pin(async move {
let mut out = bg().await;
out.extend(pl().await);
out
})
})),
None => Some(plugin_followup),
};
}
}
let mut context = Context {
system_prompt: cfg.system_prompt,
messages: cfg.history.iter().map(loop_message_to_value).collect(),
tools: cfg.tools,
};
let prompts = vec![LoopMessage::User(UserMessage {
content: cfg.initial_prompt,
})];
let stream_fn = cfg.stream_fn;
let summarize_fn = cfg.summarize_fn.clone();
let memory_provider = cfg.memory_provider.clone();
let task = tokio::spawn(async move {
let (loop_tx, mut loop_rx) = mpsc::channel(256);
let event_tx_inner = event_tx.clone();
let signal_inner = signal_for_task.clone();
let heal_result =
heal::heal_loaded_messages(&context.messages, heal::DEFAULT_MAX_RESULT_CHARS);
if heal_result.healed_count > 0 {
tracing::info!(
target: "dirge::agent_loop",
healed = %heal_result.healed_count,
chars_saved = %heal_result.chars_saved,
"healed {} message(s) after session restore",
heal_result.healed_count,
);
context.messages = heal_result.messages;
}
let loop_future = async move {
let _final_messages = run_agent_loop(
prompts,
context,
loop_config,
signal_inner,
&loop_tx,
&stream_fn,
summarize_fn,
memory_provider,
)
.await;
drop(loop_tx);
};
let pump_future = async {
let mut bridge = EventBridge::new();
while let Some(loop_evt) = loop_rx.recv().await {
for agent_evt in bridge.translate(loop_evt) {
if event_tx_inner.send(agent_evt).await.is_err() {
return;
}
}
}
};
tokio::join!(loop_future, pump_future);
});
LoopRunner {
event_rx,
task,
signal,
}
}
pub fn default_convert_to_llm() -> super::types::ConvertToLlmFn {
Arc::new(|messages: &[Value]| {
messages
.iter()
.filter(|m| {
let role = m.get("role").and_then(|r| r.as_str()).unwrap_or("");
matches!(
role,
"user" | "assistant" | "tool" | "toolResult" | "system"
)
})
.cloned()
.collect()
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agent::agent_loop::message::{
AssistantMessage, ContentBlock, StopReason, StreamEvent,
};
use crate::agent::agent_loop::result::LoopToolResult;
use crate::agent::agent_loop::tool::LoopToolUpdate;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
async fn drain(mut rx: mpsc::Receiver<AgentEvent>) -> Vec<AgentEvent> {
let mut out = Vec::new();
while let Some(e) = rx.recv().await {
out.push(e);
}
out
}
fn canned_factory(responses: Vec<AssistantMessage>) -> StreamFn {
let counter = Arc::new(AtomicUsize::new(0));
let responses = Arc::new(responses);
Arc::new(move |_ctx, _opts| {
let n = counter.fetch_add(1, Ordering::SeqCst);
let msg = responses.get(n).cloned().unwrap_or_else(|| {
AssistantMessage::new(
vec![ContentBlock::Text {
text: "fallback".to_string(),
}],
StopReason::Stop,
)
});
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason,
message: msg,
usage: None,
}]))
})
}
fn text_response(s: &str) -> AssistantMessage {
AssistantMessage::new(
vec![ContentBlock::Text {
text: s.to_string(),
}],
StopReason::Stop,
)
}
fn tool_response(id: &str, name: &str, args: Value) -> AssistantMessage {
AssistantMessage::new(
vec![ContentBlock::ToolCall {
id: id.to_string(),
name: name.to_string(),
arguments: args,
}],
StopReason::ToolUse,
)
}
#[derive(Debug)]
struct EchoTool;
impl LoopTool for EchoTool {
fn name(&self) -> &str {
"echo"
}
fn description(&self) -> &str {
"Echo"
}
fn label(&self) -> &str {
"Echo"
}
fn parameters(&self) -> &Value {
static EMPTY: std::sync::OnceLock<Value> = std::sync::OnceLock::new();
EMPTY.get_or_init(|| serde_json::json!({"type": "object"}))
}
fn execute<'a>(
&'a self,
_id: &'a str,
args: Value,
_signal: AbortSignal,
_on_update: LoopToolUpdate,
) -> Pin<Box<dyn Future<Output = Result<LoopToolResult, String>> + Send + 'a>> {
Box::pin(async move {
Ok(LoopToolResult {
content: vec![serde_json::json!({"type": "text", "text": "ok"})],
details: args,
terminate: None,
})
})
}
}
#[tokio::test]
async fn spawn_emits_expected_event_sequence_for_text_response() {
let cfg =
LoopSpawnConfig::minimal(canned_factory(vec![text_response("Hello world")]), "hi");
let runner = spawn_loop_runner(cfg);
let events = drain(runner.event_rx).await;
let kinds: Vec<&str> = events.iter().map(agent_event_kind).collect();
for required in ["TurnStart", "TurnEnd", "Done"] {
assert!(kinds.contains(&required), "missing {required} in {kinds:?}");
}
let done = events
.iter()
.find_map(|e| match e {
AgentEvent::Done { response, .. } => Some(response.clone()),
_ => None,
})
.expect("Done must be emitted");
assert_eq!(done, "Hello world");
let _ = runner.task.await;
}
#[tokio::test]
async fn spawn_handles_tool_call_then_final_text() {
let mut cfg = LoopSpawnConfig::minimal(
canned_factory(vec![
tool_response("call-1", "echo", serde_json::json!({"v": 1})),
text_response("done"),
]),
"go",
);
cfg.tools.push(Arc::new(EchoTool));
cfg.tool_execution = ToolExecutionMode::Sequential;
let runner = spawn_loop_runner(cfg);
let events = drain(runner.event_rx).await;
let kinds: Vec<&str> = events.iter().map(agent_event_kind).collect();
for required in [
"TurnStart",
"ToolCall",
"ToolStarted",
"ToolResult",
"TurnEnd",
"Done",
] {
assert!(kinds.contains(&required), "missing {required} in {kinds:?}");
}
let _ = runner.task.await;
}
#[tokio::test]
async fn spawn_with_steering_queue_injects_mid_run() {
let queue = Arc::new(Mutex::new(VecDeque::<String>::new()));
let queue_writer = queue.clone();
let saw = Arc::new(Mutex::new(false));
let saw_clone = saw.clone();
let counter = Arc::new(AtomicUsize::new(0));
let factory: StreamFn = Arc::new(move |llm_ctx, _opts| {
let n = counter.fetch_add(1, Ordering::SeqCst);
if n == 1 {
let found = llm_ctx.messages.iter().any(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("user")
&& m.get("content")
.and_then(|c| c.as_str())
.map(|s| s.contains("interrupt"))
== Some(true)
});
*saw_clone.lock().unwrap() = found;
} else if n == 0 {
queue_writer
.lock()
.unwrap()
.push_back("interrupt".to_string());
}
let msg = if n == 0 {
tool_response("call-1", "echo", serde_json::json!({}))
} else {
text_response("done")
};
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason,
message: msg,
usage: None,
}]))
});
let mut cfg = LoopSpawnConfig::minimal(factory, "start");
cfg.tools.push(Arc::new(EchoTool));
cfg.tool_execution = ToolExecutionMode::Sequential;
cfg.steering_queue = Some(queue);
let runner = spawn_loop_runner(cfg);
let _events = drain(runner.event_rx).await;
let _ = runner.task.await;
assert!(
*saw.lock().unwrap(),
"steering should have injected the interrupt for the second LLM call"
);
}
#[tokio::test]
async fn spawn_injects_fewshot_exemplars_for_tool_task() {
let saw = Arc::new(Mutex::new(false));
let saw_clone = saw.clone();
let factory: StreamFn = Arc::new(move |llm_ctx, _opts| {
let found = llm_ctx.messages.iter().any(|m| {
m.get("content")
.and_then(|c| c.as_str())
.map(|s| s.contains("[Tool-use examples]"))
== Some(true)
});
*saw_clone.lock().unwrap() = found;
let msg = text_response("done");
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason,
message: msg,
usage: None,
}]))
});
let cfg =
LoopSpawnConfig::minimal(factory, "change the handle_login function in the auth file");
let runner = spawn_loop_runner(cfg);
let _events = drain(runner.event_rx).await;
let _ = runner.task.await;
assert!(
*saw.lock().unwrap(),
"few-shot exemplars should be injected into the context for a tool task"
);
}
#[tokio::test]
async fn spawn_omits_fewshot_exemplars_for_offtopic_task() {
let saw = Arc::new(Mutex::new(false));
let saw_clone = saw.clone();
let factory: StreamFn = Arc::new(move |llm_ctx, _opts| {
let found = llm_ctx.messages.iter().any(|m| {
m.get("content")
.and_then(|c| c.as_str())
.map(|s| s.contains("[Tool-use examples]"))
== Some(true)
});
*saw_clone.lock().unwrap() = found;
let msg = text_response("hi");
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason,
message: msg,
usage: None,
}]))
});
let cfg = LoopSpawnConfig::minimal(factory, "what is your favorite color");
let runner = spawn_loop_runner(cfg);
let _events = drain(runner.event_rx).await;
let _ = runner.task.await;
assert!(
!*saw.lock().unwrap(),
"no exemplars should be injected for an off-topic task"
);
}
#[tokio::test]
async fn spawn_exposes_working_abort_signal() {
let cfg = LoopSpawnConfig::minimal(canned_factory(vec![text_response("hi")]), "x");
let runner = spawn_loop_runner(cfg);
let s = runner.signal.clone();
s.cancel();
assert!(runner.signal.is_cancelled());
let _ = runner.task.await;
}
#[cfg(feature = "plugin")]
#[tokio::test]
async fn spawn_with_plugin_block_hook_blocks_tool() {
use crate::plugin::PluginManager;
let pm = match PluginManager::try_new() {
Ok(mgr) => Arc::new(Mutex::new(mgr)),
Err(_) => {
eprintln!("[skipped] PluginManager::try_new failed");
return;
}
};
{
let mut mgr = pm.lock().unwrap();
mgr.eval(r#"(defn deny [_ctx] (harness/block "policy"))"#)
.unwrap();
mgr.register("on-tool-start", "deny");
}
let factory = canned_factory(vec![
tool_response("call-1", "echo", serde_json::json!({})),
text_response("done"),
]);
let mut cfg = LoopSpawnConfig::minimal(factory, "go");
cfg.tools.push(Arc::new(EchoTool));
cfg.tool_execution = ToolExecutionMode::Sequential;
cfg.plugin_mgr = Some(pm);
let runner = spawn_loop_runner(cfg);
let events = drain(runner.event_rx).await;
let _ = runner.task.await;
let found_block_text = events.iter().any(|e| match e {
AgentEvent::ToolResult { output, .. } => output.contains("policy"),
_ => false,
});
assert!(
found_block_text,
"expected ToolResult to convey 'policy' block reason; got {events:?}"
);
}
#[tokio::test]
async fn parent_idle_during_subagent_run_resumes_on_completion() {
use crate::agent::tools::background::{BackgroundStore, TaskState};
let store = BackgroundStore::new();
store.insert("sub-1".into());
let saw_reminder = Arc::new(Mutex::new(false));
let saw_clone = saw_reminder.clone();
let counter = Arc::new(AtomicUsize::new(0));
let store_for_factory = store.clone();
let factory: StreamFn = Arc::new(move |llm_ctx, _opts| {
let n = counter.fetch_add(1, Ordering::SeqCst);
match n {
0 => {
store_for_factory.notify(
"sub-1",
TaskState::Completed("subagent finished work".into()),
);
}
1 => {
let found = llm_ctx.messages.iter().any(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("user")
&& m.get("content").and_then(|c| c.as_str()).map(|s| {
s.contains("[task sub-1] completed:")
&& s.contains("subagent finished work")
}) == Some(true)
});
*saw_clone.lock().unwrap() = found;
}
_ => {}
}
let msg = if n == 0 {
text_response("initial work done; awaiting subagent")
} else {
text_response("acknowledged subagent result")
};
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason,
message: msg,
usage: None,
}]))
});
let mut cfg = LoopSpawnConfig::minimal(factory, "start work, then wait");
cfg.bg_store = Some(store.clone());
let runner = spawn_loop_runner(cfg);
let _events = drain(runner.event_rx).await;
let _ = runner.task.await;
assert!(
*saw_reminder.lock().unwrap(),
"second LLM call must see the [task sub-1] completed marker; \
the parent loop should have re-entered the inner loop with \
the subagent completion as a pending user message",
);
assert!(
store.drain_notifications().is_empty(),
"completion must be consumed exactly once",
);
}
#[tokio::test]
async fn no_bg_store_means_no_followup_injection() {
let mut cfg = LoopSpawnConfig::minimal(canned_factory(vec![text_response("done")]), "hi");
cfg.bg_store = None;
let runner = spawn_loop_runner(cfg);
let events = drain(runner.event_rx).await;
let _ = runner.task.await;
let turn_ends = events
.iter()
.filter(|e| matches!(e, AgentEvent::TurnEnd { .. }))
.count();
assert_eq!(turn_ends, 1, "expected single turn; got {turn_ends}");
}
fn agent_event_kind(e: &AgentEvent) -> &'static str {
match e {
AgentEvent::Token(_) => "Token",
AgentEvent::Reasoning(_) => "Reasoning",
AgentEvent::ToolCall { .. } => "ToolCall",
AgentEvent::ToolStarted { .. } => "ToolStarted",
AgentEvent::ToolResult { .. } => "ToolResult",
AgentEvent::Error(_) => "Error",
AgentEvent::ContextOverflow { .. } => "ContextOverflow",
AgentEvent::Done { .. } => "Done",
AgentEvent::TurnStart { .. } => "TurnStart",
AgentEvent::TurnEnd { .. } => "TurnEnd",
AgentEvent::Usage { .. } => "Usage",
AgentEvent::Interjected { .. } => "Interjected",
AgentEvent::CustomMessage { .. } => "CustomMessage",
AgentEvent::UserMessage { .. } => "UserMessage",
AgentEvent::CompactionStarted { .. } => "CompactionStarted",
AgentEvent::ContextCompacted { .. } => "ContextCompacted",
AgentEvent::CheckpointRefresh { .. } => "CheckpointRefresh",
AgentEvent::RetryNotice { .. } => "RetryNotice",
AgentEvent::SystemNotice { .. } => "SystemNotice",
AgentEvent::RepairStats { .. } => "RepairStats",
AgentEvent::EscalationActivated { .. } => "EscalationActivated",
}
}
#[tokio::test]
async fn default_convert_to_llm_filters_custom_messages() {
use std::sync::Mutex;
let history = vec![
LoopMessage::User(UserMessage {
content: "first user".to_string(),
}),
LoopMessage::Custom(serde_json::json!({
"role": "custom",
"content": "UI-only notification",
})),
LoopMessage::Assistant(AssistantMessage::new(
vec![ContentBlock::Text {
text: "first answer".to_string(),
}],
StopReason::Stop,
)),
];
let observed: Arc<Mutex<Vec<Value>>> = Arc::new(Mutex::new(Vec::new()));
let observed_clone = observed.clone();
let stream_fn: StreamFn = Arc::new(move |ctx, _opts| {
*observed_clone.lock().unwrap() = ctx.messages.clone();
let msg = AssistantMessage::new(
vec![ContentBlock::Text {
text: "done".to_string(),
}],
StopReason::Stop,
);
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason: StopReason::Stop,
message: msg,
usage: None,
}]))
});
let mut cfg = LoopSpawnConfig::minimal(stream_fn, "next turn please");
cfg.history = history;
let runner = spawn_loop_runner(cfg);
let _ = drain(runner.event_rx).await;
let _ = runner.task.await;
let seen = observed.lock().unwrap().clone();
let roles: Vec<String> = seen
.iter()
.map(|m| {
m.get("role")
.and_then(|r| r.as_str())
.unwrap_or("?")
.to_string()
})
.collect();
assert!(
!roles.contains(&"custom".to_string()),
"Custom messages must be filtered before the LLM; got roles: {roles:?}"
);
assert_eq!(
roles.len(),
3,
"expected 3 LLM-visible messages; got {roles:?}"
);
}
}