1use std::sync::Arc;
13
14use tokio_util::sync::CancellationToken;
15
16use crate::error::{CoreError, Result};
17use crate::event::{RunEvent, RunHooks};
18use crate::message::{AgentMessage, ContentBlock, Role};
19use crate::model::{Model, ModelProvider, ModelRequest, StreamEvent};
20use crate::policy::{PolicyVerdict, ToolPolicy};
21use crate::thinking::ThinkingLevel;
22use crate::tool::{InvokeContext, Tool, ToolCall, ToolResult};
23
24#[derive(Debug, Clone)]
26pub struct RunConfig {
27 pub max_turns: usize,
29 pub thinking: ThinkingLevel,
31 pub turn_timeout_ms: Option<u64>,
34 pub max_tool_calls_per_turn: usize,
37 pub tool_concurrency: usize,
41}
42
43impl Default for RunConfig {
44 fn default() -> Self {
45 Self {
46 max_turns: 12,
47 thinking: ThinkingLevel::default(),
48 turn_timeout_ms: Some(120_000),
49 max_tool_calls_per_turn: 10,
50 tool_concurrency: 1,
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct RunOutcome {
58 pub turns: usize,
60 pub final_text: String,
63}
64
65#[async_trait::async_trait]
80pub trait TurnSink: Send + Sync {
81 async fn after_turn(&self, turn: usize, messages: &[AgentMessage]) -> Result<()>;
84}
85
86pub struct FanoutTurnSink {
100 sinks: Vec<Box<dyn TurnSink>>,
101}
102
103impl FanoutTurnSink {
104 #[must_use]
106 pub fn new() -> Self {
107 Self { sinks: Vec::new() }
108 }
109
110 #[must_use]
112 pub fn push(mut self, sink: Box<dyn TurnSink>) -> Self {
113 self.sinks.push(sink);
114 self
115 }
116
117 #[must_use]
119 pub fn len(&self) -> usize {
120 self.sinks.len()
121 }
122
123 #[must_use]
125 pub fn is_empty(&self) -> bool {
126 self.sinks.is_empty()
127 }
128}
129
130impl Default for FanoutTurnSink {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136#[async_trait::async_trait]
137impl TurnSink for FanoutTurnSink {
138 async fn after_turn(&self, turn: usize, messages: &[AgentMessage]) -> Result<()> {
139 for sink in &self.sinks {
140 sink.after_turn(turn, messages).await?;
141 }
142 Ok(())
143 }
144}
145
146#[cfg(test)]
147mod fanout_tests {
148 use super::*;
149 use std::sync::{Arc, Mutex};
150
151 struct RecordingSink {
153 calls: Arc<Mutex<Vec<usize>>>,
154 fail_at: Option<usize>,
155 }
156
157 #[async_trait::async_trait]
158 impl TurnSink for RecordingSink {
159 async fn after_turn(&self, turn: usize, _messages: &[AgentMessage]) -> Result<()> {
160 self.calls.lock().expect("lock poisoned").push(turn);
161 if self.fail_at == Some(turn) {
162 return Err(crate::error::CoreError::Transport(format!(
163 "injected failure at turn {turn}"
164 )));
165 }
166 Ok(())
167 }
168 }
169
170 #[tokio::test]
171 async fn fanout_calls_sinks_in_order() {
172 let calls_a = Arc::new(Mutex::new(Vec::new()));
173 let calls_b = Arc::new(Mutex::new(Vec::new()));
174 let fanout = FanoutTurnSink::new()
175 .push(Box::new(RecordingSink {
176 calls: calls_a.clone(),
177 fail_at: None,
178 }))
179 .push(Box::new(RecordingSink {
180 calls: calls_b.clone(),
181 fail_at: None,
182 }));
183 TurnSink::after_turn(&fanout, 1, &[]).await.unwrap();
184 assert_eq!(*calls_a.lock().unwrap(), vec![1]);
185 assert_eq!(*calls_b.lock().unwrap(), vec![1]);
186 }
187
188 #[tokio::test]
189 async fn fanout_propagates_error_and_stops() {
190 let calls_a = Arc::new(Mutex::new(Vec::new()));
192 let calls_b = Arc::new(Mutex::new(Vec::new()));
193 let fanout = FanoutTurnSink::new()
194 .push(Box::new(RecordingSink {
195 calls: calls_a.clone(),
196 fail_at: Some(2),
197 }))
198 .push(Box::new(RecordingSink {
199 calls: calls_b.clone(),
200 fail_at: None,
201 }));
202 let _ = TurnSink::after_turn(&fanout, 2, &[]).await;
203 assert_eq!(*calls_a.lock().unwrap(), vec![2]);
204 assert!(calls_b.lock().unwrap().is_empty(), "second sink ran");
205 }
206}
207
208pub async fn run_agent(
226 provider: &dyn ModelProvider,
227 tools: &[Arc<dyn Tool>],
228 messages: &mut Vec<AgentMessage>,
229 model: &Model,
230 config: &RunConfig,
231 cancel: &CancellationToken,
232 hooks: &RunHooks<'_>,
233) -> Result<RunOutcome> {
234 hooks.emit_event(|sid| RunEvent::SessionStarted { session: sid });
235 let mut turns = 0usize;
236 loop {
237 if cancel.is_cancelled() {
238 hooks.emit_event(|sid| crate::event::run_failed(sid, "cancelled"));
239 return Err(CoreError::Cancelled("agent run cancelled".into()));
240 }
241 if turns >= config.max_turns {
242 let msg = format!(
243 "max_turns ({}) exceeded — the model kept calling tools",
244 config.max_turns
245 );
246 hooks.emit_event(|sid| crate::event::run_failed(sid, msg.clone()));
247 return Err(CoreError::ModelResponse(msg));
248 }
249 turns += 1;
250 hooks.emit_event(|sid| RunEvent::TurnStarted {
251 session: sid,
252 turn: turns,
253 });
254
255 let request = ModelRequest {
256 model: model.clone(),
257 messages: messages.clone(),
258 tools: tools.iter().map(|t| t.definition()).collect(),
259 thinking: config.thinking,
260 params: Default::default(),
261 };
262 hooks.emit_event(|sid| RunEvent::ModelStarted {
263 session: sid,
264 turn: turns,
265 model: model.id.clone(),
266 });
267 let response =
269 match invoke_with_budget(provider, request, config.turn_timeout_ms, cancel).await {
270 Ok(r) => r,
271 Err(e) => {
272 hooks.emit_event(|sid| crate::event::run_failed(sid, e.to_string()));
273 return Err(e);
274 }
275 };
276 hooks.emit_event(|sid| RunEvent::ModelFinished {
277 session: sid,
278 turn: turns,
279 });
280 let tool_calls: Vec<(String, ToolCall)> = response
282 .messages
283 .iter()
284 .flat_map(|m| m.content.iter())
285 .filter_map(|block| {
286 if let ContentBlock::ToolUse { id, call } = block {
287 Some((id.clone(), call.clone()))
288 } else {
289 None
290 }
291 })
292 .collect();
293 messages.extend(response.messages);
295
296 if tool_calls.is_empty() {
297 let final_text = extract_final_text(messages);
299 if let Some(sink) = hooks.turn_sink {
301 sink.after_turn(turns, messages).await?;
302 }
303 hooks.emit_event(|sid| RunEvent::TurnFinished {
304 session: sid,
305 turn: turns,
306 });
307 return Ok(RunOutcome { turns, final_text });
308 }
309
310 if tool_calls.len() > config.max_tool_calls_per_turn {
312 let msg = format!(
313 "model issued {} tool calls in one turn (max {})",
314 tool_calls.len(),
315 config.max_tool_calls_per_turn
316 );
317 hooks.emit_event(|sid| crate::event::run_failed(sid, msg.clone()));
318 return Err(CoreError::ModelResponse(msg));
319 }
320
321 for (id, call) in &tool_calls {
323 hooks.emit_event(|sid| RunEvent::ToolStarted {
324 session: sid,
325 turn: turns,
326 tool: call.name.clone(),
327 call_id: id.clone(),
328 });
329 }
330
331 let results = execute_tool_calls(
335 tools,
336 &tool_calls,
337 cancel,
338 config.tool_concurrency,
339 hooks.policy,
340 )
341 .await;
342
343 for (i, (id, call)) in tool_calls.iter().enumerate() {
345 let result = &results[i];
346 let ok = tool_result_ok(result);
347 hooks.emit_event(|sid| RunEvent::ToolFinished {
348 session: sid,
349 turn: turns,
350 tool: call.name.clone(),
351 call_id: id.clone(),
352 ok,
353 });
354 let tool_msg = AgentMessage {
355 role: Role::Tool,
356 content: vec![ContentBlock::ToolResult {
357 tool_use_id: id.clone(),
358 content: serde_json::to_value(result)
359 .unwrap_or_else(|_| serde_json::json!({ "error": "serialize failed" })),
360 }],
361 };
362 messages.push(tool_msg);
363 }
364 if let Some(sink) = hooks.turn_sink {
368 sink.after_turn(turns, messages).await?;
369 }
370 hooks.emit_event(|sid| RunEvent::TurnFinished {
371 session: sid,
372 turn: turns,
373 });
374 }
375}
376
377#[derive(Debug, Clone, Default)]
380struct StreamedTurn {
381 text: String,
382 thinking: String,
383 tool_calls: Vec<(String, ToolCall)>,
384}
385
386async fn collect_streamed_turn(
392 stream: crate::model::StreamEventStream,
393 on_event: &mut (dyn FnMut(&StreamEvent) + Send),
394) -> Result<StreamedTurn> {
395 use futures::StreamExt;
396 let mut turn = StreamedTurn::default();
397 let mut s = stream;
398 while let Some(item) = s.next().await {
399 match item {
400 Ok(StreamEvent::TextDelta(t)) => {
401 on_event(&StreamEvent::TextDelta(t.clone()));
402 turn.text.push_str(&t);
403 }
404 Ok(StreamEvent::ThinkingDelta(t)) => {
405 turn.thinking.push_str(&t);
406 }
407 Ok(StreamEvent::ToolCall(call)) => {
408 let id = format!("call_{}", turn.tool_calls.len());
412 turn.tool_calls.push((id, call));
413 }
414 Ok(StreamEvent::Done) => break,
415 Err(e) => return Err(e),
416 }
417 }
418 Ok(turn)
419}
420
421#[allow(clippy::too_many_arguments)]
429pub async fn run_agent_streaming(
430 provider: &dyn ModelProvider,
431 tools: &[Arc<dyn Tool>],
432 messages: &mut Vec<AgentMessage>,
433 model: &Model,
434 config: &RunConfig,
435 cancel: &CancellationToken,
436 on_event: &mut (dyn FnMut(&StreamEvent) + Send),
437 hooks: &RunHooks<'_>,
438) -> Result<RunOutcome> {
439 hooks.emit_event(|sid| RunEvent::SessionStarted { session: sid });
440 let mut turns = 0usize;
441 loop {
442 if cancel.is_cancelled() {
443 hooks.emit_event(|sid| crate::event::run_failed(sid, "cancelled"));
444 return Err(CoreError::Cancelled("agent run cancelled".into()));
445 }
446 if turns >= config.max_turns {
447 let msg = format!(
448 "max_turns ({}) exceeded — the model kept calling tools",
449 config.max_turns
450 );
451 hooks.emit_event(|sid| crate::event::run_failed(sid, msg.clone()));
452 return Err(CoreError::ModelResponse(msg));
453 }
454 turns += 1;
455 hooks.emit_event(|sid| RunEvent::TurnStarted {
456 session: sid,
457 turn: turns,
458 });
459
460 let request = ModelRequest {
461 model: model.clone(),
462 messages: messages.clone(),
463 tools: tools.iter().map(|t| t.definition()).collect(),
464 thinking: config.thinking,
465 params: Default::default(),
466 };
467 hooks.emit_event(|sid| RunEvent::ModelStarted {
468 session: sid,
469 turn: turns,
470 model: model.id.clone(),
471 });
472 let stream = provider.stream(request);
474 let turn = match collect_streamed_turn(stream, on_event).await {
475 Ok(t) => t,
476 Err(e) => {
477 hooks.emit_event(|sid| crate::event::run_failed(sid, e.to_string()));
478 return Err(e);
479 }
480 };
481 hooks.emit_event(|sid| RunEvent::ModelFinished {
482 session: sid,
483 turn: turns,
484 });
485
486 let mut content: Vec<ContentBlock> = Vec::new();
488 if !turn.text.is_empty() {
489 content.push(ContentBlock::Text { text: turn.text });
490 }
491 for (id, call) in &turn.tool_calls {
492 content.push(ContentBlock::ToolUse {
493 id: id.clone(),
494 call: call.clone(),
495 });
496 }
497 messages.push(AgentMessage {
498 role: Role::Assistant,
499 content,
500 });
501
502 if turn.tool_calls.is_empty() {
503 let final_text = extract_final_text(messages);
504 if let Some(sink) = hooks.turn_sink {
506 sink.after_turn(turns, messages).await?;
507 }
508 hooks.emit_event(|sid| RunEvent::TurnFinished {
509 session: sid,
510 turn: turns,
511 });
512 return Ok(RunOutcome { turns, final_text });
513 }
514 if turn.tool_calls.len() > config.max_tool_calls_per_turn {
515 let msg = format!(
516 "model issued {} tool calls in one turn (max {})",
517 turn.tool_calls.len(),
518 config.max_tool_calls_per_turn
519 );
520 hooks.emit_event(|sid| crate::event::run_failed(sid, msg.clone()));
521 return Err(CoreError::ModelResponse(msg));
522 }
523
524 let owned_calls: Vec<(String, ToolCall)> = turn.tool_calls.clone();
525
526 for (id, call) in &owned_calls {
528 hooks.emit_event(|sid| RunEvent::ToolStarted {
529 session: sid,
530 turn: turns,
531 tool: call.name.clone(),
532 call_id: id.clone(),
533 });
534 }
535
536 let results = execute_tool_calls(
537 tools,
538 &owned_calls,
539 cancel,
540 config.tool_concurrency,
541 hooks.policy,
542 )
543 .await;
544
545 for (i, (id, call)) in owned_calls.iter().enumerate() {
547 let result = &results[i];
548 let ok = tool_result_ok(result);
549 hooks.emit_event(|sid| RunEvent::ToolFinished {
550 session: sid,
551 turn: turns,
552 tool: call.name.clone(),
553 call_id: id.clone(),
554 ok,
555 });
556 let tool_msg = AgentMessage {
557 role: Role::Tool,
558 content: vec![ContentBlock::ToolResult {
559 tool_use_id: id.clone(),
560 content: serde_json::to_value(result)
561 .unwrap_or_else(|_| serde_json::json!({ "error": "serialize failed" })),
562 }],
563 };
564 messages.push(tool_msg);
565 }
566 if let Some(sink) = hooks.turn_sink {
568 sink.after_turn(turns, messages).await?;
569 }
570 hooks.emit_event(|sid| RunEvent::TurnFinished {
571 session: sid,
572 turn: turns,
573 });
574 }
575}
576
577const PANIC_SUMMARY_MAX_CHARS: usize = 200;
580
581fn summarize_panic(payload: &Box<dyn std::any::Any + Send>) -> String {
586 let raw = payload
587 .downcast_ref::<&'static str>()
588 .map(std::string::ToString::to_string)
589 .or_else(|| payload.downcast_ref::<String>().cloned())
590 .unwrap_or_else(|| "<non-string panic payload>".to_string());
591 let chars: Vec<char> = raw.chars().collect();
592 if chars.len() <= PANIC_SUMMARY_MAX_CHARS {
593 raw
594 } else {
595 let truncated: String = chars
596 .into_iter()
597 .take(PANIC_SUMMARY_MAX_CHARS - 1)
598 .collect();
599 format!("{truncated}…")
600 }
601}
602
603async fn execute_tool_call(
613 tools: &[Arc<dyn Tool>],
614 id: &str,
615 call: &ToolCall,
616 cancel: &CancellationToken,
617) -> ToolResult {
618 let Some(tool) = tools.iter().find(|t| t.definition().name == call.name) else {
619 return error_result(&format!("unknown tool: `{}`", call.name));
620 };
621 let ctx = InvokeContext {
622 tool_call_id: id.to_string(),
623 cancel: cancel.clone(),
624 };
625 use futures::FutureExt;
626 use std::panic::AssertUnwindSafe;
627 match AssertUnwindSafe(tool.execute(ctx, call.input.clone()))
628 .catch_unwind()
629 .await
630 {
631 Ok(Ok(result)) => result,
632 Ok(Err(err)) => error_result(&err.to_string()),
633 Err(payload) => {
634 let summary = summarize_panic(&payload);
635 tracing::warn!(
640 tool = %call.name,
641 call_id = %id,
642 "tool panicked; converted to model-visible error result"
643 );
644 error_result(&format!("tool `{}` panicked: {summary}", call.name))
645 }
646 }
647}
648
649async fn invoke_with_budget(
652 provider: &dyn ModelProvider,
653 request: ModelRequest,
654 turn_timeout_ms: Option<u64>,
655 cancel: &CancellationToken,
656) -> Result<crate::model::ModelResponse> {
657 if cancel.is_cancelled() {
659 return Err(CoreError::Cancelled("turn cancelled before invoke".into()));
660 }
661 let invoke_fut = provider.invoke(request);
662 match turn_timeout_ms {
663 Some(ms) => {
664 let timeout = tokio::time::timeout(std::time::Duration::from_millis(ms), invoke_fut);
665 tokio::select! {
666 biased;
667 _ = cancel.cancelled() => {
668 Err(CoreError::Cancelled("turn cancelled during invoke".into()))
669 }
670 res = timeout => {
671 res.map_err(|_| {
672 CoreError::Cancelled(format!(
673 "turn timed out after {ms}ms"
674 ))
675 })?
676 }
677 }
678 }
679 None => {
680 tokio::select! {
681 biased;
682 _ = cancel.cancelled() => {
683 Err(CoreError::Cancelled("turn cancelled during invoke".into()))
684 }
685 res = invoke_fut => res,
686 }
687 }
688 }
689}
690
691enum PolicyOutcome {
693 Execute,
695 Denied(ToolResult),
698}
699
700async fn policy_check(
706 policy: Option<&dyn ToolPolicy>,
707 id: &str,
708 call: &ToolCall,
709 cancel: &CancellationToken,
710) -> PolicyOutcome {
711 let Some(policy) = policy else {
712 return PolicyOutcome::Execute;
713 };
714 let ctx = InvokeContext {
715 tool_call_id: id.to_string(),
716 cancel: cancel.clone(),
717 };
718 match policy.check(&call.name, &call.input, &ctx).await {
719 PolicyVerdict::Allow => PolicyOutcome::Execute,
720 PolicyVerdict::Confirm(reason) => {
721 tracing::info!(
722 tool = %call.name,
723 call_id = %id,
724 "tool policy returned Confirm; treating as Allow for this run: {reason}"
725 );
726 PolicyOutcome::Execute
727 }
728 PolicyVerdict::Deny(reason) => {
729 PolicyOutcome::Denied(error_result(&format!("denied by policy: {reason}")))
730 }
731 }
732}
733
734async fn execute_tool_calls(
745 tools: &[Arc<dyn Tool>],
746 calls: &[(String, ToolCall)],
747 cancel: &CancellationToken,
748 tool_concurrency: usize,
749 policy: Option<&dyn ToolPolicy>,
750) -> Vec<ToolResult> {
751 if tool_concurrency <= 1 {
752 let mut out = Vec::with_capacity(calls.len());
753 for (id, call) in calls {
754 let result = match policy_check(policy, id, call, cancel).await {
755 PolicyOutcome::Execute => execute_tool_call(tools, id, call, cancel).await,
756 PolicyOutcome::Denied(result) => result,
757 };
758 out.push(result);
759 }
760 return out;
761 }
762
763 use tokio::task::JoinSet;
765 let mut indexed: Vec<Option<ToolResult>> = (0..calls.len()).map(|_| None).collect();
770 let mut set: JoinSet<(usize, ToolResult)> = JoinSet::new();
771 for (i, (id, call)) in calls.iter().enumerate() {
772 if let PolicyOutcome::Denied(result) = policy_check(policy, id, call, cancel).await {
777 if let Some(slot) = indexed.get_mut(i) {
778 *slot = Some(result);
779 }
780 continue;
781 }
782 let tool = tools
784 .iter()
785 .find(|t| t.definition().name == call.name)
786 .cloned();
787 let ctx_cancel = cancel.child_token();
788 let ctx = InvokeContext {
789 tool_call_id: id.clone(),
790 cancel: ctx_cancel,
791 };
792 let input = call.input.clone();
793 let id_owned = id.clone();
794 let call_name = call.name.clone();
795 set.spawn(async move {
796 let result = match tool {
802 Some(t) => {
803 use futures::FutureExt;
804 use std::panic::AssertUnwindSafe;
805 match AssertUnwindSafe(t.execute(ctx, input)).catch_unwind().await {
806 Ok(Ok(r)) => r,
807 Ok(Err(err)) => error_result(&err.to_string()),
808 Err(payload) => {
809 let summary = summarize_panic(&payload);
810 tracing::warn!(
811 tool = %call_name,
812 call_id = %id_owned,
813 "tool panicked; converted to model-visible error result"
814 );
815 error_result(&format!("tool `{call_name}` panicked: {summary}"))
816 }
817 }
818 }
819 None => error_result(&format!("unknown tool: `{id_owned}`")),
820 };
821 (i, result)
822 });
823 while set.len() >= tool_concurrency {
827 let res = set.join_next().await;
828 if res.is_none() {
829 break; }
831 record_join_result(res, &mut indexed);
832 }
833 }
834 while let Some(res) = set.join_next().await {
841 record_join_result(Some(res), &mut indexed);
842 }
843 indexed
844 .into_iter()
845 .map(|opt| opt.unwrap_or_else(|| error_result("tool task produced no result")))
846 .collect()
848}
849
850fn record_join_result(
855 res: Option<std::result::Result<(usize, ToolResult), tokio::task::JoinError>>,
856 indexed: &mut [Option<ToolResult>],
857) {
858 match res {
859 Some(Ok((i, result))) => {
860 if let Some(slot) = indexed.get_mut(i) {
861 *slot = Some(result);
862 }
863 }
864 Some(Err(join_err)) => {
865 let slot = indexed.iter().position(Option::is_none).unwrap_or(0);
866 if let Some(s) = indexed.get_mut(slot) {
867 *s = Some(error_result(&format!("tool task failed: {join_err}")));
868 }
869 }
870 None => {}
871 }
872}
873
874fn error_result(message: &str) -> ToolResult {
876 ToolResult {
877 content: vec![serde_json::json!({ "type": "text", "text": format!("Error: {message}") })],
878 details: None,
879 }
880}
881
882fn tool_result_ok(result: &ToolResult) -> bool {
886 !result.content.iter().any(|c| {
887 c.get("text")
888 .and_then(|t| t.as_str())
889 .is_some_and(|t| t.starts_with("Error:"))
890 })
891}
892
893fn extract_final_text(messages: &[AgentMessage]) -> String {
895 messages
896 .iter()
897 .rev()
898 .find(|m| m.role == Role::Assistant)
899 .map(|m| {
900 m.content
901 .iter()
902 .filter_map(|b| {
903 if let ContentBlock::Text { text } = b {
904 Some(text.as_str())
905 } else {
906 None
907 }
908 })
909 .collect::<Vec<_>>()
910 .join("")
911 })
912 .unwrap_or_default()
913}
914
915#[cfg(test)]
916mod tests {
917 use super::*;
923 use crate::model::ModelResponse;
924 use async_trait::async_trait;
925 use serde_json::{json, Value};
926
927 struct MockProvider {
929 responses: std::sync::Mutex<std::collections::VecDeque<ModelResponse>>,
930 }
931
932 impl MockProvider {
933 fn new(responses: Vec<Vec<AgentMessage>>) -> Self {
934 let responses = responses
935 .into_iter()
936 .map(|msgs| ModelResponse { messages: msgs })
937 .collect();
938 Self {
939 responses: std::sync::Mutex::new(responses),
940 }
941 }
942 }
943
944 #[async_trait]
945 impl ModelProvider for MockProvider {
946 async fn invoke(&self, _request: ModelRequest) -> Result<ModelResponse> {
947 let next = self
948 .responses
949 .lock()
950 .unwrap()
951 .pop_front()
952 .unwrap_or(ModelResponse { messages: vec![] });
953 Ok(next)
954 }
955 }
956
957 struct EchoTool;
959
960 #[async_trait]
961 impl Tool for EchoTool {
962 fn definition(&self) -> crate::tool::ToolDefinition {
963 crate::tool::ToolDefinition {
964 name: "echo".into(),
965 label: "Echo".into(),
966 description: "Echo back the provided text.".into(),
967 parameters: crate::tool::ParameterSchema::default(),
968 }
969 }
970
971 async fn execute(&self, _ctx: InvokeContext, input: Value) -> Result<ToolResult> {
972 let text = input
973 .get("text")
974 .and_then(Value::as_str)
975 .unwrap_or("(no text)")
976 .to_string();
977 Ok(ToolResult {
978 content: vec![json!({ "type": "text", "text": format!("echo: {text}") })],
979 details: None,
980 })
981 }
982 }
983
984 fn assistant_text(t: &str) -> AgentMessage {
985 AgentMessage {
986 role: Role::Assistant,
987 content: vec![ContentBlock::Text { text: t.into() }],
988 }
989 }
990
991 fn assistant_tool_use(id: &str, name: &str, input: Value) -> AgentMessage {
992 AgentMessage {
993 role: Role::Assistant,
994 content: vec![ContentBlock::ToolUse {
995 id: id.into(),
996 call: ToolCall {
997 name: name.into(),
998 input,
999 },
1000 }],
1001 }
1002 }
1003
1004 fn user(t: &str) -> AgentMessage {
1005 AgentMessage {
1006 role: Role::User,
1007 content: vec![ContentBlock::Text { text: t.into() }],
1008 }
1009 }
1010
1011 #[tokio::test]
1012 async fn loop_runs_tool_then_finishes() {
1013 let provider = MockProvider::new(vec![
1015 vec![assistant_tool_use(
1016 "call_1",
1017 "echo",
1018 json!({ "text": "hello" }),
1019 )],
1020 vec![assistant_text("done")],
1021 ]);
1022 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1023 let model = Model::new("mock/test");
1024 let mut messages = vec![user("please echo hello then say done")];
1025
1026 let outcome = run_agent(
1027 &provider,
1028 &tools,
1029 &mut messages,
1030 &model,
1031 &RunConfig::default(),
1032 &CancellationToken::new(),
1033 &RunHooks::default(),
1034 )
1035 .await
1036 .expect("loop should complete");
1037
1038 assert_eq!(outcome.turns, 2);
1039 assert_eq!(outcome.final_text, "done");
1040
1041 assert_eq!(messages.len(), 4);
1043 assert_eq!(messages[2].role, Role::Tool);
1044 match &messages[2].content[0] {
1046 ContentBlock::ToolResult { content, .. } => {
1047 let s = serde_json::to_string(content).unwrap_or_default();
1048 assert!(s.contains("echo: hello"), "tool result was: {s}");
1049 }
1050 other => panic!("expected ToolResult, got {other:?}"),
1051 }
1052 }
1053
1054 #[tokio::test]
1055 async fn loop_stops_when_no_tool_calls() {
1056 let provider = MockProvider::new(vec![vec![assistant_text("just text, no tools")]]);
1057 let tools: Vec<Arc<dyn Tool>> = vec![];
1058 let model = Model::new("mock/test");
1059 let mut messages = vec![user("hi")];
1060
1061 let outcome = run_agent(
1062 &provider,
1063 &tools,
1064 &mut messages,
1065 &model,
1066 &RunConfig::default(),
1067 &CancellationToken::new(),
1068 &RunHooks::default(),
1069 )
1070 .await
1071 .expect("loop should complete");
1072
1073 assert_eq!(outcome.turns, 1);
1074 assert_eq!(outcome.final_text, "just text, no tools");
1075 }
1076
1077 #[tokio::test]
1078 async fn loop_recovers_from_unknown_tool() {
1079 let provider = MockProvider::new(vec![
1082 vec![assistant_tool_use("c1", "nonexistent", json!({}))],
1083 vec![assistant_text("recovered")],
1084 ]);
1085 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1086 let model = Model::new("mock/test");
1087 let mut messages = vec![user("call a missing tool")];
1088
1089 let outcome = run_agent(
1090 &provider,
1091 &tools,
1092 &mut messages,
1093 &model,
1094 &RunConfig::default(),
1095 &CancellationToken::new(),
1096 &RunHooks::default(),
1097 )
1098 .await
1099 .expect("loop should recover");
1100
1101 assert_eq!(outcome.final_text, "recovered");
1102 let tool_msg = &messages[2];
1103 assert_eq!(tool_msg.role, Role::Tool);
1104 }
1105
1106 #[tokio::test]
1107 async fn loop_aborts_on_max_turns() {
1108 let repeat = || vec![assistant_tool_use("c", "echo", json!({ "text": "x" }))];
1110 let provider = MockProvider::new(vec![repeat(), repeat(), repeat(), repeat()]);
1111 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1112 let model = Model::new("mock/test");
1113 let mut messages = vec![user("loop forever")];
1114
1115 let result = run_agent(
1116 &provider,
1117 &tools,
1118 &mut messages,
1119 &model,
1120 &RunConfig {
1121 max_turns: 3,
1122 ..RunConfig::default()
1123 },
1124 &CancellationToken::new(),
1125 &RunHooks::default(),
1126 )
1127 .await;
1128
1129 assert!(result.is_err(), "must abort on max_turns");
1130 }
1131
1132 struct DenyAllPolicy;
1134
1135 #[async_trait]
1136 impl ToolPolicy for DenyAllPolicy {
1137 async fn check(&self, _tool: &str, _input: &Value, _ctx: &InvokeContext) -> PolicyVerdict {
1138 PolicyVerdict::Deny("blocked in test".into())
1139 }
1140 }
1141
1142 #[tokio::test]
1143 async fn policy_deny_blocks_tool_but_run_continues() {
1144 let provider = MockProvider::new(vec![
1148 vec![assistant_tool_use(
1149 "c1",
1150 "echo",
1151 json!({ "text": "secret" }),
1152 )],
1153 vec![assistant_text("done")],
1154 ]);
1155 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1156 let model = Model::new("mock/test");
1157 let mut messages = vec![user("call echo")];
1158 let policy = DenyAllPolicy;
1159 let hooks = RunHooks {
1160 policy: Some(&policy),
1161 ..RunHooks::default()
1162 };
1163
1164 let outcome = run_agent(
1165 &provider,
1166 &tools,
1167 &mut messages,
1168 &model,
1169 &RunConfig::default(),
1170 &CancellationToken::new(),
1171 &hooks,
1172 )
1173 .await
1174 .expect("loop completes despite denial");
1175
1176 assert_eq!(outcome.final_text, "done");
1177 let s = match &messages[2].content[0] {
1179 ContentBlock::ToolResult { content, .. } => content.to_string(),
1180 other => panic!("expected ToolResult, got {other:?}"),
1181 };
1182 assert!(s.contains("denied by policy"), "expected denial, got: {s}");
1183 assert!(
1184 !s.contains("echo: secret"),
1185 "denied tool must NOT have executed: {s}"
1186 );
1187 }
1188
1189 #[tokio::test]
1190 async fn policy_none_is_allow_all() {
1191 let provider = MockProvider::new(vec![
1194 vec![assistant_tool_use("c1", "echo", json!({ "text": "hi" }))],
1195 vec![assistant_text("done")],
1196 ]);
1197 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1198 let model = Model::new("mock/test");
1199 let mut messages = vec![user("call echo")];
1200
1201 let outcome = run_agent(
1202 &provider,
1203 &tools,
1204 &mut messages,
1205 &model,
1206 &RunConfig::default(),
1207 &CancellationToken::new(),
1208 &RunHooks::default(),
1209 )
1210 .await
1211 .expect("loop completes");
1212 assert_eq!(outcome.final_text, "done");
1213 let s = match &messages[2].content[0] {
1214 ContentBlock::ToolResult { content, .. } => content.to_string(),
1215 other => panic!("expected ToolResult, got {other:?}"),
1216 };
1217 assert!(s.contains("echo: hi"), "tool should have run: {s}");
1218 }
1219
1220 struct RecordingTool {
1225 name: String,
1226 log: Arc<std::sync::Mutex<Vec<String>>>,
1227 }
1228
1229 #[async_trait]
1230 impl Tool for RecordingTool {
1231 fn definition(&self) -> crate::tool::ToolDefinition {
1232 crate::tool::ToolDefinition {
1233 name: self.name.clone(),
1234 label: "Recording".into(),
1235 description: "Records each execution.".into(),
1236 parameters: crate::tool::ParameterSchema::default(),
1237 }
1238 }
1239
1240 async fn execute(&self, _ctx: InvokeContext, input: Value) -> Result<ToolResult> {
1241 let tag = input
1242 .get("tag")
1243 .and_then(Value::as_str)
1244 .unwrap_or("?")
1245 .to_string();
1246 self.log.lock().expect("lock poisoned").push(tag);
1247 Ok(ToolResult {
1248 content: vec![json!({ "type": "text", "text": "ran" })],
1249 details: None,
1250 })
1251 }
1252 }
1253
1254 #[tokio::test]
1264 async fn policy_deny_blocks_tools_on_the_parallel_path() {
1265 let log = Arc::new(std::sync::Mutex::new(Vec::new()));
1268 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(RecordingTool {
1269 name: "rec".into(),
1270 log: log.clone(),
1271 })];
1272 let turn = vec![
1273 assistant_tool_use("c1", "rec", json!({ "tag": "one" })),
1274 assistant_tool_use("c2", "rec", json!({ "tag": "two" })),
1275 assistant_tool_use("c3", "rec", json!({ "tag": "three" })),
1276 ];
1277 let provider = MockProvider::new(vec![turn, vec![assistant_text("done")]]);
1278 let model = Model::new("mock/test");
1279 let mut messages = vec![user("call all three")];
1280 let config = RunConfig {
1281 tool_concurrency: 4,
1282 ..RunConfig::default()
1283 };
1284 let policy = DenyAllPolicy;
1285 let hooks = RunHooks {
1286 policy: Some(&policy),
1287 ..RunHooks::default()
1288 };
1289
1290 let outcome = run_agent(
1291 &provider,
1292 &tools,
1293 &mut messages,
1294 &model,
1295 &config,
1296 &CancellationToken::new(),
1297 &hooks,
1298 )
1299 .await
1300 .expect("loop completes despite denials");
1301
1302 assert_eq!(outcome.final_text, "done");
1303
1304 let executed = log.lock().expect("lock poisoned").clone();
1308 assert!(
1309 executed.is_empty(),
1310 "denied tools must NOT execute on the parallel path: ran {executed:?}"
1311 );
1312
1313 let results: Vec<String> = messages
1316 .iter()
1317 .filter(|m| m.role == Role::Tool)
1318 .filter_map(|m| match &m.content[0] {
1319 ContentBlock::ToolResult {
1320 tool_use_id,
1321 content,
1322 ..
1323 } => {
1324 let text = content.to_string();
1325 Some(format!("{tool_use_id}:{text}"))
1326 }
1327 _ => None,
1328 })
1329 .collect();
1330 assert_eq!(
1331 results.len(),
1332 3,
1333 "all 3 denied calls must produce a result slot: {results:?}"
1334 );
1335 for r in &results {
1336 assert!(
1337 r.contains("denied by policy"),
1338 "parallel-path denial must surface to the model: {r}"
1339 );
1340 }
1341 assert!(
1344 results[0].starts_with("c1:")
1345 && results[1].starts_with("c2:")
1346 && results[2].starts_with("c3:"),
1347 "denial slots must preserve issued order: {results:?}"
1348 );
1349 }
1350
1351 #[tokio::test]
1352 async fn loop_respects_cancellation() {
1353 let provider = MockProvider::new(vec![vec![assistant_text("never reached")]]);
1355 let tools: Vec<Arc<dyn Tool>> = vec![];
1356 let model = Model::new("mock/test");
1357 let mut messages = vec![user("hi")];
1358 let cancel = CancellationToken::new();
1359 cancel.cancel();
1360
1361 let result = run_agent(
1362 &provider,
1363 &tools,
1364 &mut messages,
1365 &model,
1366 &RunConfig::default(),
1367 &cancel,
1368 &RunHooks::default(),
1369 )
1370 .await;
1371
1372 assert!(matches!(result, Err(CoreError::Cancelled(_))));
1373 }
1374
1375 struct SlowProvider {
1378 delay_ms: u64,
1379 responses: std::sync::Mutex<std::collections::VecDeque<ModelResponse>>,
1380 }
1381
1382 impl SlowProvider {
1383 fn new(delay_ms: u64, responses: Vec<Vec<AgentMessage>>) -> Self {
1384 let responses = responses
1385 .into_iter()
1386 .map(|m| ModelResponse { messages: m })
1387 .collect();
1388 Self {
1389 delay_ms,
1390 responses: std::sync::Mutex::new(responses),
1391 }
1392 }
1393 }
1394
1395 #[async_trait]
1396 impl ModelProvider for SlowProvider {
1397 async fn invoke(&self, _request: ModelRequest) -> Result<ModelResponse> {
1398 tokio::time::sleep(std::time::Duration::from_millis(self.delay_ms)).await;
1399 let next = self
1400 .responses
1401 .lock()
1402 .unwrap()
1403 .pop_front()
1404 .unwrap_or(ModelResponse { messages: vec![] });
1405 Ok(next)
1406 }
1407 }
1408
1409 #[tokio::test]
1410 async fn turn_timeout_aborts_slow_provider() {
1411 let provider = SlowProvider::new(500, vec![vec![assistant_text("too slow")]]);
1413 let model = Model::new("mock/test");
1414 let mut messages = vec![user("hi")];
1415 let config = RunConfig {
1416 turn_timeout_ms: Some(100),
1417 ..RunConfig::default()
1418 };
1419
1420 let result = run_agent(
1421 &provider,
1422 &[],
1423 &mut messages,
1424 &model,
1425 &config,
1426 &CancellationToken::new(),
1427 &RunHooks::default(),
1428 )
1429 .await;
1430
1431 assert!(
1432 matches!(result, Err(CoreError::Cancelled(_))),
1433 "expected cancelled, got {result:?}"
1434 );
1435 }
1436
1437 #[tokio::test]
1438 async fn max_tool_calls_per_turn_rejects_runaway_response() {
1439 let runaway: Vec<AgentMessage> = (0..5)
1441 .map(|i| assistant_tool_use(&format!("c{i}"), "echo", json!({ "text": "x" })))
1442 .collect();
1443 let provider = MockProvider::new(vec![runaway]);
1444 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1445 let model = Model::new("mock/test");
1446 let mut messages = vec![user("call many tools")];
1447 let config = RunConfig {
1448 max_tool_calls_per_turn: 2,
1449 ..RunConfig::default()
1450 };
1451
1452 let result = run_agent(
1453 &provider,
1454 &tools,
1455 &mut messages,
1456 &model,
1457 &config,
1458 &CancellationToken::new(),
1459 &RunHooks::default(),
1460 )
1461 .await;
1462
1463 assert!(result.is_err(), "runaway tool calls must be rejected");
1464 let err = result.unwrap_err().to_string();
1465 assert!(err.contains("max"), "error should mention the cap: {err}");
1466 }
1467
1468 struct OrderingTool {
1470 name: String,
1471 delay_ms: u64,
1472 log: Arc<std::sync::Mutex<Vec<String>>>,
1473 }
1474
1475 #[async_trait]
1476 impl Tool for OrderingTool {
1477 fn definition(&self) -> crate::tool::ToolDefinition {
1478 crate::tool::ToolDefinition {
1479 name: self.name.clone(),
1480 label: "Ordering".into(),
1481 description: "Records completion order.".into(),
1482 parameters: crate::tool::ParameterSchema::default(),
1483 }
1484 }
1485
1486 async fn execute(&self, _ctx: InvokeContext, input: Value) -> Result<ToolResult> {
1487 tokio::time::sleep(std::time::Duration::from_millis(self.delay_ms)).await;
1488 self.log.lock().unwrap().push(
1489 input
1490 .get("tag")
1491 .and_then(Value::as_str)
1492 .unwrap_or("?")
1493 .to_string(),
1494 );
1495 Ok(ToolResult {
1496 content: vec![json!({ "type": "text", "text": "ok" })],
1497 details: None,
1498 })
1499 }
1500 }
1501
1502 #[tokio::test]
1503 async fn parallel_tool_calls_preserve_result_order() {
1504 let log = Arc::new(std::sync::Mutex::new(Vec::new()));
1508 let tools: Vec<Arc<dyn Tool>> = vec![
1509 Arc::new(OrderingTool {
1510 name: "slow".into(),
1511 delay_ms: 60,
1512 log: log.clone(),
1513 }),
1514 Arc::new(OrderingTool {
1515 name: "fast".into(),
1516 delay_ms: 5,
1517 log: log.clone(),
1518 }),
1519 ];
1520 let turn = vec![
1521 assistant_tool_use("c1", "slow", json!({ "tag": "slow" })),
1522 assistant_tool_use("c2", "fast", json!({ "tag": "fast" })),
1523 ];
1524 let provider = MockProvider::new(vec![turn, vec![assistant_text("done")]]);
1525 let model = Model::new("mock/test");
1526 let mut messages = vec![user("call both")];
1527 let config = RunConfig {
1528 tool_concurrency: 4,
1529 ..RunConfig::default()
1530 };
1531
1532 let outcome = run_agent(
1533 &provider,
1534 &tools,
1535 &mut messages,
1536 &model,
1537 &config,
1538 &CancellationToken::new(),
1539 &RunHooks::default(),
1540 )
1541 .await
1542 .expect("loop should complete");
1543
1544 assert_eq!(outcome.final_text, "done");
1545
1546 let completed = log.lock().unwrap().clone();
1549 assert_eq!(
1550 completed,
1551 vec!["fast", "slow"],
1552 "tools must have run concurrently: {completed:?}"
1553 );
1554
1555 let tool_ids: Vec<String> = messages
1557 .iter()
1558 .filter(|m| m.role == Role::Tool)
1559 .filter_map(|m| match &m.content[0] {
1560 ContentBlock::ToolResult { tool_use_id, .. } => Some(tool_use_id.clone()),
1561 _ => None,
1562 })
1563 .collect();
1564 assert_eq!(
1565 tool_ids,
1566 vec!["c1", "c2"],
1567 "results must be appended in issued order: {tool_ids:?}"
1568 );
1569 }
1570
1571 struct PanickingTool;
1574
1575 #[async_trait]
1576 impl Tool for PanickingTool {
1577 fn definition(&self) -> crate::tool::ToolDefinition {
1578 crate::tool::ToolDefinition {
1579 name: "boom".into(),
1580 label: "Boom".into(),
1581 description: "Always panics.".into(),
1582 parameters: crate::tool::ParameterSchema::default(),
1583 }
1584 }
1585
1586 async fn execute(&self, _ctx: InvokeContext, _input: Value) -> Result<ToolResult> {
1587 panic!("deliberate tool panic");
1588 }
1589 }
1590
1591 #[tokio::test]
1592 async fn parallel_path_survives_a_task_panic() {
1593 let turn = vec![
1596 assistant_tool_use("c1", "boom", json!({})),
1597 assistant_tool_use("c2", "echo", json!({ "text": "survived" })),
1598 ];
1599 let provider = MockProvider::new(vec![turn, vec![assistant_text("done")]]);
1600 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(PanickingTool), Arc::new(EchoTool)];
1601 let model = Model::new("mock/test");
1602 let mut messages = vec![user("call both")];
1603 let config = RunConfig {
1604 tool_concurrency: 4,
1605 ..RunConfig::default()
1606 };
1607
1608 let outcome = run_agent(
1610 &provider,
1611 &tools,
1612 &mut messages,
1613 &model,
1614 &config,
1615 &CancellationToken::new(),
1616 &RunHooks::default(),
1617 )
1618 .await
1619 .expect("loop must survive a tool panic");
1620
1621 assert_eq!(outcome.final_text, "done");
1622 let tool_ids: Vec<String> = messages
1624 .iter()
1625 .filter(|m| m.role == Role::Tool)
1626 .filter_map(|m| match &m.content[0] {
1627 ContentBlock::ToolResult { tool_use_id, .. } => Some(tool_use_id.clone()),
1628 _ => None,
1629 })
1630 .collect();
1631 assert_eq!(
1632 tool_ids,
1633 vec!["c1", "c2"],
1634 "both results must be present despite the panic: {tool_ids:?}"
1635 );
1636 }
1637
1638 #[tokio::test]
1644 async fn sequential_path_survives_a_tool_panic() {
1645 let turn = vec![
1646 assistant_tool_use("c1", "boom", json!({})),
1647 assistant_tool_use("c2", "echo", json!({ "text": "survived" })),
1648 ];
1649 let provider = MockProvider::new(vec![turn, vec![assistant_text("done")]]);
1650 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(PanickingTool), Arc::new(EchoTool)];
1651 let model = Model::new("mock/test");
1652 let mut messages = vec![user("call both")];
1653 let config = RunConfig::default();
1655
1656 let outcome = run_agent(
1658 &provider,
1659 &tools,
1660 &mut messages,
1661 &model,
1662 &config,
1663 &CancellationToken::new(),
1664 &RunHooks::default(),
1665 )
1666 .await
1667 .expect("sequential path must survive a tool panic");
1668 assert_eq!(outcome.final_text, "done");
1669
1670 let results: Vec<&ContentBlock> = messages
1672 .iter()
1673 .filter(|m| m.role == Role::Tool)
1674 .flat_map(|m| m.content.iter())
1675 .collect();
1676 assert_eq!(results.len(), 2, "both results appended");
1677 let c1_str = match &results[0] {
1679 ContentBlock::ToolResult { content, .. } => content.to_string(),
1680 _ => String::new(),
1681 };
1682 assert!(
1683 c1_str.contains("Error:"),
1684 "panic must surface as an Error: result, got: {c1_str}"
1685 );
1686 assert!(
1687 c1_str.contains("panicked"),
1688 "error result should mention the panic: {c1_str}"
1689 );
1690 }
1691
1692 #[tokio::test]
1698 async fn parallel_path_panic_preserves_call_id_and_summary() {
1699 let turn = vec![
1701 assistant_tool_use("c1", "boom", json!({})),
1702 assistant_tool_use("c2", "echo", json!({ "text": "ok" })),
1703 ];
1704 let provider = MockProvider::new(vec![turn, vec![assistant_text("done")]]);
1705 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(PanickingTool), Arc::new(EchoTool)];
1706 let model = Model::new("mock/test");
1707 let mut messages = vec![user("call both")];
1708 let config = RunConfig {
1709 tool_concurrency: 4,
1710 ..RunConfig::default()
1711 };
1712
1713 let outcome = run_agent(
1714 &provider,
1715 &tools,
1716 &mut messages,
1717 &model,
1718 &config,
1719 &CancellationToken::new(),
1720 &RunHooks::default(),
1721 )
1722 .await
1723 .expect("run survives parallel panic");
1724 assert_eq!(outcome.final_text, "done");
1725
1726 let tool_msgs: Vec<(&String, String)> = messages
1729 .iter()
1730 .filter(|m| m.role == Role::Tool)
1731 .flat_map(|m| m.content.iter())
1732 .filter_map(|b| match b {
1733 ContentBlock::ToolResult {
1734 tool_use_id,
1735 content,
1736 } => Some((tool_use_id, content.to_string())),
1737 _ => None,
1738 })
1739 .collect();
1740 assert_eq!(tool_msgs.len(), 2, "both results present");
1741 assert_eq!(tool_msgs[0].0, "c1", "c1 attributed correctly");
1743 assert_eq!(tool_msgs[1].0, "c2", "c2 attributed correctly");
1744 assert!(
1746 tool_msgs[0].1.contains("panicked"),
1747 "parallel panic should carry bounded summary, got: {}",
1748 tool_msgs[0].1
1749 );
1750 assert!(
1751 tool_msgs[0].1.contains("Error:"),
1752 "should be an Error: result, got: {}",
1753 tool_msgs[0].1
1754 );
1755 }
1756
1757 #[tokio::test]
1764 async fn parallel_path_keeps_all_results_under_throttling() {
1765 let turn = vec![
1769 assistant_tool_use("c1", "echo", json!({ "text": "one" })),
1770 assistant_tool_use("c2", "echo", json!({ "text": "two" })),
1771 assistant_tool_use("c3", "echo", json!({ "text": "three" })),
1772 ];
1773 let provider = MockProvider::new(vec![turn, vec![assistant_text("done")]]);
1774 let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1775 let model = Model::new("mock/test");
1776 let mut messages = vec![user("call all three")];
1777 let config = RunConfig {
1778 tool_concurrency: 2,
1779 ..RunConfig::default()
1780 };
1781
1782 let outcome = run_agent(
1783 &provider,
1784 &tools,
1785 &mut messages,
1786 &model,
1787 &config,
1788 &CancellationToken::new(),
1789 &RunHooks::default(),
1790 )
1791 .await
1792 .expect("run completes");
1793 assert_eq!(outcome.final_text, "done");
1794
1795 let results: Vec<String> = messages
1799 .iter()
1800 .filter(|m| m.role == Role::Tool)
1801 .flat_map(|m| m.content.iter())
1802 .filter_map(|b| match b {
1803 ContentBlock::ToolResult {
1804 tool_use_id,
1805 content,
1806 } => {
1807 let text = content
1808 .get("content")
1809 .and_then(|c| c.get(0))
1810 .and_then(|c| c.get("text"))
1811 .and_then(|t| t.as_str())
1812 .unwrap_or("<missing>");
1813 Some(format!("{tool_use_id}={text}"))
1814 }
1815 _ => None,
1816 })
1817 .collect();
1818 assert_eq!(
1819 results,
1820 vec!["c1=echo: one", "c2=echo: two", "c3=echo: three"],
1821 "all 3 results must survive throttling, in order, with correct text: {results:?}"
1822 );
1823 }
1824
1825 #[test]
1826 fn summarize_panic_handles_string_payloads() {
1827 let p: Box<dyn std::any::Any + Send> = Box::new("boom!".to_string());
1828 assert_eq!(summarize_panic(&p), "boom!");
1829 }
1830
1831 #[test]
1832 fn summarize_panic_handles_str_payloads() {
1833 let s: &'static str = "static boom";
1834 let p: Box<dyn std::any::Any + Send> = Box::new(s);
1835 assert_eq!(summarize_panic(&p), "static boom");
1836 }
1837
1838 #[test]
1839 fn summarize_panic_bounds_huge_payloads() {
1840 let huge = "x".repeat(10_000);
1841 let p: Box<dyn std::any::Any + Send> = Box::new(huge);
1842 let summary = summarize_panic(&p);
1843 assert!(
1844 summary.chars().count() <= PANIC_SUMMARY_MAX_CHARS,
1845 "summary not bounded: {} chars",
1846 summary.chars().count()
1847 );
1848 assert!(
1849 summary.ends_with('…'),
1850 "should end with ellipsis: {summary}"
1851 );
1852 }
1853
1854 #[test]
1855 fn summarize_panic_falls_back_for_non_string_payloads() {
1856 let p: Box<dyn std::any::Any + Send> = Box::new(42_i32);
1857 let summary = summarize_panic(&p);
1858 assert!(
1859 summary.contains("non-string"),
1860 "expected fallback marker: {summary}"
1861 );
1862 }
1863
1864 use crate::event::{EventSink, RunEvent};
1867 use std::sync::{Arc, Mutex};
1868 use uuid::Uuid;
1869
1870 struct RecordingSink {
1872 events: Arc<Mutex<Vec<RunEvent>>>,
1873 }
1874
1875 impl EventSink for RecordingSink {
1876 fn emit(&self, event: RunEvent) {
1877 self.events.lock().expect("lock poisoned").push(event);
1878 }
1879 }
1880
1881 #[tokio::test]
1882 async fn text_only_run_emits_complete_event_sequence() {
1883 let provider = MockProvider::new(vec![vec![assistant_text("hello")]]);
1884 let tools: Vec<Arc<dyn Tool>> = vec![];
1885 let model = Model::new("mock/test");
1886 let mut messages = vec![user("hi")];
1887 let sink = Arc::new(Mutex::new(Vec::new()));
1888 let hooks = RunHooks {
1889 session_id: Some(Uuid::nil()),
1890 turn_sink: None,
1891 event_sink: Some(&RecordingSink {
1892 events: sink.clone(),
1893 } as &dyn EventSink),
1894 policy: None,
1895 };
1896
1897 run_agent(
1898 &provider,
1899 &tools,
1900 &mut messages,
1901 &model,
1902 &RunConfig::default(),
1903 &CancellationToken::new(),
1904 &hooks,
1905 )
1906 .await
1907 .expect("run");
1908
1909 let events = sink.lock().expect("lock poisoned").clone();
1910 assert!(events
1912 .iter()
1913 .any(|e| matches!(e, RunEvent::SessionStarted { .. })));
1914 assert!(events
1915 .iter()
1916 .any(|e| matches!(e, RunEvent::TurnStarted { turn: 1, .. })));
1917 assert!(events.iter().any(
1918 |e| matches!(e, RunEvent::ModelStarted { turn: 1, model, .. } if model == "mock/test")
1919 ));
1920 assert!(events
1921 .iter()
1922 .any(|e| matches!(e, RunEvent::ModelFinished { turn: 1, .. })));
1923 assert!(events
1924 .iter()
1925 .any(|e| matches!(e, RunEvent::TurnFinished { turn: 1, .. })));
1926 assert!(!events
1928 .iter()
1929 .any(|e| matches!(e, RunEvent::ToolStarted { .. })));
1930 }
1931
1932 #[tokio::test]
1933 async fn tool_run_emits_tool_started_finished() {
1934 let echo_tool = Arc::new(EchoTool) as Arc<dyn Tool>;
1935 let tools = vec![echo_tool.clone()];
1936 let provider = MockProvider::new(vec![
1937 vec![assistant_tool_use(
1938 "call-1",
1939 "echo",
1940 json!({ "text": "hi" }),
1941 )],
1942 vec![assistant_text("done")],
1943 ]);
1944 let model = Model::new("mock/test");
1945 let mut messages = vec![user("echo hi")];
1946 let sink = Arc::new(Mutex::new(Vec::new()));
1947 let hooks = RunHooks {
1948 session_id: Some(Uuid::nil()),
1949 turn_sink: None,
1950 event_sink: Some(&RecordingSink {
1951 events: sink.clone(),
1952 } as &dyn EventSink),
1953 policy: None,
1954 };
1955
1956 run_agent(
1957 &provider,
1958 &tools,
1959 &mut messages,
1960 &model,
1961 &RunConfig::default(),
1962 &CancellationToken::new(),
1963 &hooks,
1964 )
1965 .await
1966 .expect("run");
1967
1968 let events = sink.lock().expect("lock poisoned").clone();
1969 assert!(
1971 events.iter().any(|e| matches!(e, RunEvent::ToolStarted { turn: 1, tool, call_id, .. } if tool == "echo" && call_id == "call-1")),
1972 "missing ToolStarted for echo/call-1"
1973 );
1974 assert!(
1975 events.iter().any(|e| matches!(e, RunEvent::ToolFinished { turn: 1, tool, call_id, ok: true, .. } if tool == "echo" && call_id == "call-1")),
1976 "missing ToolFinished for echo/call-1"
1977 );
1978 assert!(events
1980 .iter()
1981 .any(|e| matches!(e, RunEvent::TurnFinished { turn: 2, .. })));
1982 }
1983
1984 #[tokio::test]
1985 async fn no_events_when_session_id_is_none() {
1986 let provider = MockProvider::new(vec![vec![assistant_text("hello")]]);
1987 let tools: Vec<Arc<dyn Tool>> = vec![];
1988 let model = Model::new("mock/test");
1989 let mut messages = vec![user("hi")];
1990 let sink = Arc::new(Mutex::new(Vec::new()));
1991 let hooks = RunHooks {
1992 session_id: None, turn_sink: None,
1994 event_sink: Some(&RecordingSink {
1995 events: sink.clone(),
1996 } as &dyn EventSink),
1997 policy: None,
1998 };
1999
2000 run_agent(
2001 &provider,
2002 &tools,
2003 &mut messages,
2004 &model,
2005 &RunConfig::default(),
2006 &CancellationToken::new(),
2007 &hooks,
2008 )
2009 .await
2010 .expect("run");
2011
2012 assert!(
2013 sink.lock().expect("lock poisoned").is_empty(),
2014 "events emitted with no session_id"
2015 );
2016 }
2017}