Skip to main content

fluers_core/
runner.rs

1//! The pure agent turn-loop.
2//!
3//! This is the Rust heart of Flue's `agent-coordinator.ts` turn logic — but
4//! *only* the pure loop: send messages + tool defs to a [`ModelProvider`],
5//! append assistant messages, execute any tool calls, append their results,
6//! and repeat until the model stops calling tools or `max_turns` is hit.
7//!
8//! The loop talks only to [`ModelProvider`] + `Arc<dyn Tool>` and knows nothing
9//! about sessions, events, sandboxes, or persistence — those live in
10//! `fluers-runtime`'s coordinator (MVP 3+).
11
12use std::sync::Arc;
13
14use tokio_util::sync::CancellationToken;
15
16use crate::error::{CoreError, Result};
17use crate::event::{RunEvent, RunHooks};
18use crate::message::{AgentMessage, ContentBlock, Role};
19use crate::model::{Model, ModelProvider, ModelRequest, StreamEvent};
20use crate::policy::{PolicyVerdict, ToolPolicy};
21use crate::thinking::ThinkingLevel;
22use crate::tool::{InvokeContext, Tool, ToolCall, ToolResult};
23
24/// Configuration for a single agent run.
25#[derive(Debug, Clone)]
26pub struct RunConfig {
27    /// Maximum number of model turns before the loop aborts.
28    pub max_turns: usize,
29    /// Reasoning effort forwarded to the provider.
30    pub thinking: ThinkingLevel,
31    /// Hard deadline for a single provider `invoke` call, in milliseconds.
32    /// `None` disables the per-turn timeout (the outer `cancel` still applies).
33    pub turn_timeout_ms: Option<u64>,
34    /// Maximum number of tool calls the model may issue in a single turn
35    /// before the loop rejects the response. Guards against runaway models.
36    pub max_tool_calls_per_turn: usize,
37    /// How many tool calls may run in parallel within a turn. `1` ⇒ fully
38    /// sequential (deterministic). Results are always appended in the order
39    /// the model issued them, regardless of concurrency.
40    pub tool_concurrency: usize,
41}
42
43impl Default for RunConfig {
44    fn default() -> Self {
45        Self {
46            max_turns: 12,
47            thinking: ThinkingLevel::default(),
48            turn_timeout_ms: Some(120_000),
49            max_tool_calls_per_turn: 10,
50            tool_concurrency: 1,
51        }
52    }
53}
54
55/// The outcome of a completed agent run.
56#[derive(Debug, Clone)]
57pub struct RunOutcome {
58    /// How many model turns ran.
59    pub turns: usize,
60    /// The final assistant text (concatenated text blocks of the last
61    /// assistant message). Empty if the model ended on a tool call.
62    pub final_text: String,
63}
64
65/// A sink notified after each turn's messages are appended to the history.
66///
67/// This is the per-turn **seam** that lets a coordinator (in
68/// `fluers-runtime`) persist a session, emit events, or snapshot state
69/// between turns — *without* `fluers-core` depending on any of those
70/// subsystems. It keeps the loop-home decision intact: the pure turn-loop
71/// stays in `fluers-core`; the coordinator that drives persistence/events
72/// lives in `fluers-runtime`.
73///
74/// The sink is `await`ed inside the loop after each turn's messages (both
75/// the assistant turn and the tool results) are appended, so persistence of
76/// turn *N* completes before turn *N+1* begins. That ordering is what makes
77/// "resume-after-kill" faithful: the file on disk always reflects at least
78/// all completed turns.
79#[async_trait::async_trait]
80pub trait TurnSink: Send + Sync {
81    /// Called after turn `turn` (1-indexed) with the full message history so
82    /// far. Returning `Err` aborts the run with that error.
83    async fn after_turn(&self, turn: usize, messages: &[AgentMessage]) -> Result<()>;
84}
85
86/// A [`TurnSink`] that fans a turn out to multiple inner sinks, **in order**.
87///
88/// `run_agent` accepts only one `Option<&dyn TurnSink>`. When two concerns
89/// both need per-turn observation (e.g. `SessionRunner` for persistence and
90/// a memory sink for semantic recall), wrap them in a `FanoutTurnSink`. The
91/// sinks are awaited sequentially: sink *N* completes before sink *N+1* runs.
92///
93/// Error semantics: the first sink to return `Err` aborts the remaining
94/// sinks and propagates. Sinks that should be **fail-open** (never break the
95/// run on their own errors — e.g. an optional memory store) must swallow their
96/// own errors inside [`TurnSink::after_turn`] rather than returning `Err`.
97///
98/// Construct with [`FanoutTurnSink::new`] / [`FanoutTurnSink::push`].
99pub struct FanoutTurnSink {
100    sinks: Vec<Box<dyn TurnSink>>,
101}
102
103impl FanoutTurnSink {
104    /// Create an empty fanout sink.
105    #[must_use]
106    pub fn new() -> Self {
107        Self { sinks: Vec::new() }
108    }
109
110    /// Append an inner sink. Sinks run in insertion order.
111    #[must_use]
112    pub fn push(mut self, sink: Box<dyn TurnSink>) -> Self {
113        self.sinks.push(sink);
114        self
115    }
116
117    /// Number of inner sinks.
118    #[must_use]
119    pub fn len(&self) -> usize {
120        self.sinks.len()
121    }
122
123    /// Whether there are no inner sinks.
124    #[must_use]
125    pub fn is_empty(&self) -> bool {
126        self.sinks.is_empty()
127    }
128}
129
130impl Default for FanoutTurnSink {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136#[async_trait::async_trait]
137impl TurnSink for FanoutTurnSink {
138    async fn after_turn(&self, turn: usize, messages: &[AgentMessage]) -> Result<()> {
139        for sink in &self.sinks {
140            sink.after_turn(turn, messages).await?;
141        }
142        Ok(())
143    }
144}
145
146#[cfg(test)]
147mod fanout_tests {
148    use super::*;
149    use std::sync::{Arc, Mutex};
150
151    /// A sink that records every call and optionally fails.
152    struct RecordingSink {
153        calls: Arc<Mutex<Vec<usize>>>,
154        fail_at: Option<usize>,
155    }
156
157    #[async_trait::async_trait]
158    impl TurnSink for RecordingSink {
159        async fn after_turn(&self, turn: usize, _messages: &[AgentMessage]) -> Result<()> {
160            self.calls.lock().expect("lock poisoned").push(turn);
161            if self.fail_at == Some(turn) {
162                return Err(crate::error::CoreError::Transport(format!(
163                    "injected failure at turn {turn}"
164                )));
165            }
166            Ok(())
167        }
168    }
169
170    #[tokio::test]
171    async fn fanout_calls_sinks_in_order() {
172        let calls_a = Arc::new(Mutex::new(Vec::new()));
173        let calls_b = Arc::new(Mutex::new(Vec::new()));
174        let fanout = FanoutTurnSink::new()
175            .push(Box::new(RecordingSink {
176                calls: calls_a.clone(),
177                fail_at: None,
178            }))
179            .push(Box::new(RecordingSink {
180                calls: calls_b.clone(),
181                fail_at: None,
182            }));
183        TurnSink::after_turn(&fanout, 1, &[]).await.unwrap();
184        assert_eq!(*calls_a.lock().unwrap(), vec![1]);
185        assert_eq!(*calls_b.lock().unwrap(), vec![1]);
186    }
187
188    #[tokio::test]
189    async fn fanout_propagates_error_and_stops() {
190        // First sink fails at turn 2; the second sink must not be called.
191        let calls_a = Arc::new(Mutex::new(Vec::new()));
192        let calls_b = Arc::new(Mutex::new(Vec::new()));
193        let fanout = FanoutTurnSink::new()
194            .push(Box::new(RecordingSink {
195                calls: calls_a.clone(),
196                fail_at: Some(2),
197            }))
198            .push(Box::new(RecordingSink {
199                calls: calls_b.clone(),
200                fail_at: None,
201            }));
202        let _ = TurnSink::after_turn(&fanout, 2, &[]).await;
203        assert_eq!(*calls_a.lock().unwrap(), vec![2]);
204        assert!(calls_b.lock().unwrap().is_empty(), "second sink ran");
205    }
206}
207
208/// Run the agent loop.
209///
210/// `messages` is seeded by the caller (typically a `System` message followed
211/// by a `User` message) and mutated in place as the loop appends assistant
212/// turns and tool results.
213///
214/// Budgets (from [`RunConfig`]):
215/// - `max_turns` caps total model turns.
216/// - `turn_timeout_ms` caps each provider `invoke`.
217/// - `max_tool_calls_per_turn` rejects runaway responses.
218///
219/// Concurrency: when `tool_concurrency > 1`, tool calls within a turn run on
220/// a `JoinSet` with the configured cap; results are always appended in the
221/// order the model issued them. `tool_concurrency == 1` is sequential.
222///
223/// Cancellation: the loop checks `cancel.is_cancelled()` between turns and
224/// composes it into each tool call.
225pub async fn run_agent(
226    provider: &dyn ModelProvider,
227    tools: &[Arc<dyn Tool>],
228    messages: &mut Vec<AgentMessage>,
229    model: &Model,
230    config: &RunConfig,
231    cancel: &CancellationToken,
232    hooks: &RunHooks<'_>,
233) -> Result<RunOutcome> {
234    hooks.emit_event(|sid| RunEvent::SessionStarted { session: sid });
235    let mut turns = 0usize;
236    loop {
237        if cancel.is_cancelled() {
238            hooks.emit_event(|sid| crate::event::run_failed(sid, "cancelled"));
239            return Err(CoreError::Cancelled("agent run cancelled".into()));
240        }
241        if turns >= config.max_turns {
242            let msg = format!(
243                "max_turns ({}) exceeded — the model kept calling tools",
244                config.max_turns
245            );
246            hooks.emit_event(|sid| crate::event::run_failed(sid, msg.clone()));
247            return Err(CoreError::ModelResponse(msg));
248        }
249        turns += 1;
250        hooks.emit_event(|sid| RunEvent::TurnStarted {
251            session: sid,
252            turn: turns,
253        });
254
255        let request = ModelRequest {
256            model: model.clone(),
257            messages: messages.clone(),
258            tools: tools.iter().map(|t| t.definition()).collect(),
259            thinking: config.thinking,
260            params: Default::default(),
261        };
262        hooks.emit_event(|sid| RunEvent::ModelStarted {
263            session: sid,
264            turn: turns,
265            model: model.id.clone(),
266        });
267        // Compose the per-turn timeout with the caller's cancellation token.
268        let response =
269            match invoke_with_budget(provider, request, config.turn_timeout_ms, cancel).await {
270                Ok(r) => r,
271                Err(e) => {
272                    hooks.emit_event(|sid| crate::event::run_failed(sid, e.to_string()));
273                    return Err(e);
274                }
275            };
276        hooks.emit_event(|sid| RunEvent::ModelFinished {
277            session: sid,
278            turn: turns,
279        });
280        // Snapshot this turn's tool calls *before* moving the messages into history.
281        let tool_calls: Vec<(String, ToolCall)> = response
282            .messages
283            .iter()
284            .flat_map(|m| m.content.iter())
285            .filter_map(|block| {
286                if let ContentBlock::ToolUse { id, call } = block {
287                    Some((id.clone(), call.clone()))
288                } else {
289                    None
290                }
291            })
292            .collect();
293        // Append the assistant turn(s) to the running history.
294        messages.extend(response.messages);
295
296        if tool_calls.is_empty() {
297            // No tool calls ⇒ the model finished. Extract final text.
298            let final_text = extract_final_text(messages);
299            // Notify the sink so the final state is persisted before returning.
300            if let Some(sink) = hooks.turn_sink {
301                sink.after_turn(turns, messages).await?;
302            }
303            hooks.emit_event(|sid| RunEvent::TurnFinished {
304                session: sid,
305                turn: turns,
306            });
307            return Ok(RunOutcome { turns, final_text });
308        }
309
310        // Reject runaway responses before executing anything.
311        if tool_calls.len() > config.max_tool_calls_per_turn {
312            let msg = format!(
313                "model issued {} tool calls in one turn (max {})",
314                tool_calls.len(),
315                config.max_tool_calls_per_turn
316            );
317            hooks.emit_event(|sid| crate::event::run_failed(sid, msg.clone()));
318            return Err(CoreError::ModelResponse(msg));
319        }
320
321        // Emit ToolStarted for each call, then execute.
322        for (id, call) in &tool_calls {
323            hooks.emit_event(|sid| RunEvent::ToolStarted {
324                session: sid,
325                turn: turns,
326                tool: call.name.clone(),
327                call_id: id.clone(),
328            });
329        }
330
331        // Execute the turn's tool calls (sequential or bounded-parallel) and
332        // append a Tool message per call, in the original order. The optional
333        // policy hook is consulted before each call (see `execute_tool_calls`).
334        let results = execute_tool_calls(
335            tools,
336            &tool_calls,
337            cancel,
338            config.tool_concurrency,
339            hooks.policy,
340        )
341        .await;
342
343        // Emit ToolFinished and append tool-result messages.
344        for (i, (id, call)) in tool_calls.iter().enumerate() {
345            let result = &results[i];
346            let ok = tool_result_ok(result);
347            hooks.emit_event(|sid| RunEvent::ToolFinished {
348                session: sid,
349                turn: turns,
350                tool: call.name.clone(),
351                call_id: id.clone(),
352                ok,
353            });
354            let tool_msg = AgentMessage {
355                role: Role::Tool,
356                content: vec![ContentBlock::ToolResult {
357                    tool_use_id: id.clone(),
358                    content: serde_json::to_value(result)
359                        .unwrap_or_else(|_| serde_json::json!({ "error": "serialize failed" })),
360                }],
361            };
362            messages.push(tool_msg);
363        }
364        // End of turn: notify the sink so the coordinator can persist/observe
365        // before the next turn begins. Persistence of turn N must complete
366        // before turn N+1 starts — this is what makes resume-after-kill faithful.
367        if let Some(sink) = hooks.turn_sink {
368            sink.after_turn(turns, messages).await?;
369        }
370        hooks.emit_event(|sid| RunEvent::TurnFinished {
371            session: sid,
372            turn: turns,
373        });
374    }
375}
376
377/// A single turn's streamed events, reassembled into the assistant message +
378/// the tool calls it issued. Consumed by [`run_agent_streaming`].
379#[derive(Debug, Clone, Default)]
380struct StreamedTurn {
381    text: String,
382    thinking: String,
383    tool_calls: Vec<(String, ToolCall)>,
384}
385
386/// Reassemble a provider's [`StreamEvent`] stream into a [`StreamedTurn`].
387///
388/// `on_event` is invoked for every event (so callers can print deltas live);
389/// this function still returns the full reassembled turn so the loop can
390/// append the assistant message and execute tools.
391async fn collect_streamed_turn(
392    stream: crate::model::StreamEventStream,
393    on_event: &mut (dyn FnMut(&StreamEvent) + Send),
394) -> Result<StreamedTurn> {
395    use futures::StreamExt;
396    let mut turn = StreamedTurn::default();
397    let mut s = stream;
398    while let Some(item) = s.next().await {
399        match item {
400            Ok(StreamEvent::TextDelta(t)) => {
401                on_event(&StreamEvent::TextDelta(t.clone()));
402                turn.text.push_str(&t);
403            }
404            Ok(StreamEvent::ThinkingDelta(t)) => {
405                turn.thinking.push_str(&t);
406            }
407            Ok(StreamEvent::ToolCall(call)) => {
408                // Streaming tool calls are assigned synthetic `call_N` ids in
409                // arrival order. The provider already buffers full argument
410                // strings before emitting, so no incremental reassembly here.
411                let id = format!("call_{}", turn.tool_calls.len());
412                turn.tool_calls.push((id, call));
413            }
414            Ok(StreamEvent::Done) => break,
415            Err(e) => return Err(e),
416        }
417    }
418    Ok(turn)
419}
420
421/// Streaming variant of [`run_agent`].
422///
423/// Identical loop semantics (budgets, parallel tools, cancellation) but each
424/// provider turn is consumed via [`ModelProvider::stream`] and text deltas are
425/// forwarded to `on_event` *as they arrive*. Tool calls are reassembled from
426/// the stream before execution. Use this when you want live token-by-token
427/// output.
428#[allow(clippy::too_many_arguments)]
429pub async fn run_agent_streaming(
430    provider: &dyn ModelProvider,
431    tools: &[Arc<dyn Tool>],
432    messages: &mut Vec<AgentMessage>,
433    model: &Model,
434    config: &RunConfig,
435    cancel: &CancellationToken,
436    on_event: &mut (dyn FnMut(&StreamEvent) + Send),
437    hooks: &RunHooks<'_>,
438) -> Result<RunOutcome> {
439    hooks.emit_event(|sid| RunEvent::SessionStarted { session: sid });
440    let mut turns = 0usize;
441    loop {
442        if cancel.is_cancelled() {
443            hooks.emit_event(|sid| crate::event::run_failed(sid, "cancelled"));
444            return Err(CoreError::Cancelled("agent run cancelled".into()));
445        }
446        if turns >= config.max_turns {
447            let msg = format!(
448                "max_turns ({}) exceeded — the model kept calling tools",
449                config.max_turns
450            );
451            hooks.emit_event(|sid| crate::event::run_failed(sid, msg.clone()));
452            return Err(CoreError::ModelResponse(msg));
453        }
454        turns += 1;
455        hooks.emit_event(|sid| RunEvent::TurnStarted {
456            session: sid,
457            turn: turns,
458        });
459
460        let request = ModelRequest {
461            model: model.clone(),
462            messages: messages.clone(),
463            tools: tools.iter().map(|t| t.definition()).collect(),
464            thinking: config.thinking,
465            params: Default::default(),
466        };
467        hooks.emit_event(|sid| RunEvent::ModelStarted {
468            session: sid,
469            turn: turns,
470            model: model.id.clone(),
471        });
472        // Stream the turn, reassembling into an assistant message + tool calls.
473        let stream = provider.stream(request);
474        let turn = match collect_streamed_turn(stream, on_event).await {
475            Ok(t) => t,
476            Err(e) => {
477                hooks.emit_event(|sid| crate::event::run_failed(sid, e.to_string()));
478                return Err(e);
479            }
480        };
481        hooks.emit_event(|sid| RunEvent::ModelFinished {
482            session: sid,
483            turn: turns,
484        });
485
486        // Build the assistant message from the reassembled turn.
487        let mut content: Vec<ContentBlock> = Vec::new();
488        if !turn.text.is_empty() {
489            content.push(ContentBlock::Text { text: turn.text });
490        }
491        for (id, call) in &turn.tool_calls {
492            content.push(ContentBlock::ToolUse {
493                id: id.clone(),
494                call: call.clone(),
495            });
496        }
497        messages.push(AgentMessage {
498            role: Role::Assistant,
499            content,
500        });
501
502        if turn.tool_calls.is_empty() {
503            let final_text = extract_final_text(messages);
504            // Notify the sink so the final state is persisted before returning.
505            if let Some(sink) = hooks.turn_sink {
506                sink.after_turn(turns, messages).await?;
507            }
508            hooks.emit_event(|sid| RunEvent::TurnFinished {
509                session: sid,
510                turn: turns,
511            });
512            return Ok(RunOutcome { turns, final_text });
513        }
514        if turn.tool_calls.len() > config.max_tool_calls_per_turn {
515            let msg = format!(
516                "model issued {} tool calls in one turn (max {})",
517                turn.tool_calls.len(),
518                config.max_tool_calls_per_turn
519            );
520            hooks.emit_event(|sid| crate::event::run_failed(sid, msg.clone()));
521            return Err(CoreError::ModelResponse(msg));
522        }
523
524        let owned_calls: Vec<(String, ToolCall)> = turn.tool_calls.clone();
525
526        // Emit ToolStarted for each call, then execute.
527        for (id, call) in &owned_calls {
528            hooks.emit_event(|sid| RunEvent::ToolStarted {
529                session: sid,
530                turn: turns,
531                tool: call.name.clone(),
532                call_id: id.clone(),
533            });
534        }
535
536        let results = execute_tool_calls(
537            tools,
538            &owned_calls,
539            cancel,
540            config.tool_concurrency,
541            hooks.policy,
542        )
543        .await;
544
545        // Emit ToolFinished and append tool-result messages.
546        for (i, (id, call)) in owned_calls.iter().enumerate() {
547            let result = &results[i];
548            let ok = tool_result_ok(result);
549            hooks.emit_event(|sid| RunEvent::ToolFinished {
550                session: sid,
551                turn: turns,
552                tool: call.name.clone(),
553                call_id: id.clone(),
554                ok,
555            });
556            let tool_msg = AgentMessage {
557                role: Role::Tool,
558                content: vec![ContentBlock::ToolResult {
559                    tool_use_id: id.clone(),
560                    content: serde_json::to_value(result)
561                        .unwrap_or_else(|_| serde_json::json!({ "error": "serialize failed" })),
562                }],
563            };
564            messages.push(tool_msg);
565        }
566        // End of turn: notify the sink (see `run_agent` for rationale).
567        if let Some(sink) = hooks.turn_sink {
568            sink.after_turn(turns, messages).await?;
569        }
570        hooks.emit_event(|sid| RunEvent::TurnFinished {
571            session: sid,
572            turn: turns,
573        });
574    }
575}
576
577/// Maximum length of a panic message we surface to the model (a panic payload
578/// may carry arbitrarily large text). Matches `event::ERROR_SUMMARY_MAX_CHARS`.
579const PANIC_SUMMARY_MAX_CHARS: usize = 200;
580
581/// Render a panic payload into a bounded, model-safe summary string.
582///
583/// Panic payloads are `Box<dyn Any + Send>`; the common cases are `&'static str`
584/// and `String`. Anything else falls back to a generic marker.
585fn summarize_panic(payload: &Box<dyn std::any::Any + Send>) -> String {
586    let raw = payload
587        .downcast_ref::<&'static str>()
588        .map(std::string::ToString::to_string)
589        .or_else(|| payload.downcast_ref::<String>().cloned())
590        .unwrap_or_else(|| "<non-string panic payload>".to_string());
591    let chars: Vec<char> = raw.chars().collect();
592    if chars.len() <= PANIC_SUMMARY_MAX_CHARS {
593        raw
594    } else {
595        let truncated: String = chars
596            .into_iter()
597            .take(PANIC_SUMMARY_MAX_CHARS - 1)
598            .collect();
599        format!("{truncated}…")
600    }
601}
602
603/// Execute a single tool call, returning a result even on error (so the
604/// model can recover) rather than aborting the whole run.
605///
606/// **Panic safety:** a panic inside `tool.execute` is caught and converted to a
607/// model-visible `Error:` result, matching the parallel path's `JoinSet`
608/// behaviour. This keeps a buggy / hostile tool from aborting the whole run on
609/// the default (`tool_concurrency == 1`) path. `catch_unwind` is best-effort:
610/// it catches unwinding panics but not aborts (e.g. `panic = "abort"`, OOM,
611/// SIGSEGV).
612async fn execute_tool_call(
613    tools: &[Arc<dyn Tool>],
614    id: &str,
615    call: &ToolCall,
616    cancel: &CancellationToken,
617) -> ToolResult {
618    let Some(tool) = tools.iter().find(|t| t.definition().name == call.name) else {
619        return error_result(&format!("unknown tool: `{}`", call.name));
620    };
621    let ctx = InvokeContext {
622        tool_call_id: id.to_string(),
623        cancel: cancel.clone(),
624    };
625    use futures::FutureExt;
626    use std::panic::AssertUnwindSafe;
627    match AssertUnwindSafe(tool.execute(ctx, call.input.clone()))
628        .catch_unwind()
629        .await
630    {
631        Ok(Ok(result)) => result,
632        Ok(Err(err)) => error_result(&err.to_string()),
633        Err(payload) => {
634            let summary = summarize_panic(&payload);
635            // Log only structural metadata: panic payloads may contain user
636            // data/secrets, and `tracing` can export to a remote collector.
637            // The bounded `summary` goes only into the model-visible result
638            // (the agent's own context), never into telemetry.
639            tracing::warn!(
640                tool = %call.name,
641                call_id = %id,
642                "tool panicked; converted to model-visible error result"
643            );
644            error_result(&format!("tool `{}` panicked: {summary}", call.name))
645        }
646    }
647}
648
649/// Invoke the provider with a per-turn timeout composed with the caller's
650/// cancellation token.
651async fn invoke_with_budget(
652    provider: &dyn ModelProvider,
653    request: ModelRequest,
654    turn_timeout_ms: Option<u64>,
655    cancel: &CancellationToken,
656) -> Result<crate::model::ModelResponse> {
657    // Fast-path cancellation check.
658    if cancel.is_cancelled() {
659        return Err(CoreError::Cancelled("turn cancelled before invoke".into()));
660    }
661    let invoke_fut = provider.invoke(request);
662    match turn_timeout_ms {
663        Some(ms) => {
664            let timeout = tokio::time::timeout(std::time::Duration::from_millis(ms), invoke_fut);
665            tokio::select! {
666                biased;
667                _ = cancel.cancelled() => {
668                    Err(CoreError::Cancelled("turn cancelled during invoke".into()))
669                }
670                res = timeout => {
671                    res.map_err(|_| {
672                        CoreError::Cancelled(format!(
673                            "turn timed out after {ms}ms"
674                        ))
675                    })?
676                }
677            }
678        }
679        None => {
680            tokio::select! {
681                biased;
682                _ = cancel.cancelled() => {
683                    Err(CoreError::Cancelled("turn cancelled during invoke".into()))
684                }
685                res = invoke_fut => res,
686            }
687        }
688    }
689}
690
691/// The result of consulting the [`ToolPolicy`] for one call.
692enum PolicyOutcome {
693    /// The call is cleared to execute.
694    Execute,
695    /// The call is denied; carry the model-visible error result to append in
696    /// place of executing the tool.
697    Denied(ToolResult),
698}
699
700/// Consult the optional policy hook for a single call.
701///
702/// `None` policy ⇒ allow-all. `Confirm` is treated as `Allow` with a logged
703/// note (a confirmation channel is out of scope for the loop itself). `Deny`
704/// yields a model-visible error result and the tool is not executed.
705async fn policy_check(
706    policy: Option<&dyn ToolPolicy>,
707    id: &str,
708    call: &ToolCall,
709    cancel: &CancellationToken,
710) -> PolicyOutcome {
711    let Some(policy) = policy else {
712        return PolicyOutcome::Execute;
713    };
714    let ctx = InvokeContext {
715        tool_call_id: id.to_string(),
716        cancel: cancel.clone(),
717    };
718    match policy.check(&call.name, &call.input, &ctx).await {
719        PolicyVerdict::Allow => PolicyOutcome::Execute,
720        PolicyVerdict::Confirm(reason) => {
721            tracing::info!(
722                tool = %call.name,
723                call_id = %id,
724                "tool policy returned Confirm; treating as Allow for this run: {reason}"
725            );
726            PolicyOutcome::Execute
727        }
728        PolicyVerdict::Deny(reason) => {
729            PolicyOutcome::Denied(error_result(&format!("denied by policy: {reason}")))
730        }
731    }
732}
733
734/// Execute all tool calls for a turn, returning results in the *original*
735/// order regardless of concurrency.
736///
737/// - `tool_concurrency <= 1` ⇒ sequential (deterministic, the default).
738/// - `tool_concurrency > 1` ⇒ bounded-parallel on a `JoinSet`; each task is
739///   handed its own child of the caller's `CancellationToken`.
740///
741/// When `policy` is set it is consulted **before** each call executes; a
742/// denied call is never dispatched and its slot carries a model-visible error
743/// result instead.
744async fn execute_tool_calls(
745    tools: &[Arc<dyn Tool>],
746    calls: &[(String, ToolCall)],
747    cancel: &CancellationToken,
748    tool_concurrency: usize,
749    policy: Option<&dyn ToolPolicy>,
750) -> Vec<ToolResult> {
751    if tool_concurrency <= 1 {
752        let mut out = Vec::with_capacity(calls.len());
753        for (id, call) in calls {
754            let result = match policy_check(policy, id, call, cancel).await {
755                PolicyOutcome::Execute => execute_tool_call(tools, id, call, cancel).await,
756                PolicyOutcome::Denied(result) => result,
757            };
758            out.push(result);
759        }
760        return out;
761    }
762
763    // Bounded-parallel path. Spawn one task per call, tagged with its index.
764    use tokio::task::JoinSet;
765    // Initialized up here (not at collection time) so the throttling loop can
766    // record early-completing tasks without dropping them. (Previously the
767    // throttle `join_next` discarded its drained result — a real bug that lost
768    // results whenever `calls.len() > tool_concurrency`.)
769    let mut indexed: Vec<Option<ToolResult>> = (0..calls.len()).map(|_| None).collect();
770    let mut set: JoinSet<(usize, ToolResult)> = JoinSet::new();
771    for (i, (id, call)) in calls.iter().enumerate() {
772        // Consult the policy hook before dispatching. A denied call fills its
773        // slot with an error result and is never spawned. (Awaited here, not
774        // inside the spawned task, so the borrowed `&dyn ToolPolicy` need not
775        // be `'static`.)
776        if let PolicyOutcome::Denied(result) = policy_check(policy, id, call, cancel).await {
777            if let Some(slot) = indexed.get_mut(i) {
778                *slot = Some(result);
779            }
780            continue;
781        }
782        // Find the tool by name now (cheap) so the task owns an `Arc<dyn Tool>`.
783        let tool = tools
784            .iter()
785            .find(|t| t.definition().name == call.name)
786            .cloned();
787        let ctx_cancel = cancel.child_token();
788        let ctx = InvokeContext {
789            tool_call_id: id.clone(),
790            cancel: ctx_cancel,
791        };
792        let input = call.input.clone();
793        let id_owned = id.clone();
794        let call_name = call.name.clone();
795        set.spawn(async move {
796            // Catch panics INSIDE the task so the original index `i` is
797            // preserved and the bounded summary reaches the model. (Previously
798            // the task could panic, and JoinSet::join_next would return Err
799            // with no index — record_join_result would fill the first empty
800            // slot, attributing the panic to the wrong call_id.)
801            let result = match tool {
802                Some(t) => {
803                    use futures::FutureExt;
804                    use std::panic::AssertUnwindSafe;
805                    match AssertUnwindSafe(t.execute(ctx, input)).catch_unwind().await {
806                        Ok(Ok(r)) => r,
807                        Ok(Err(err)) => error_result(&err.to_string()),
808                        Err(payload) => {
809                            let summary = summarize_panic(&payload);
810                            tracing::warn!(
811                                tool = %call_name,
812                                call_id = %id_owned,
813                                "tool panicked; converted to model-visible error result"
814                            );
815                            error_result(&format!("tool `{call_name}` panicked: {summary}"))
816                        }
817                    }
818                }
819                None => error_result(&format!("unknown tool: `{id_owned}`")),
820            };
821            (i, result)
822        });
823        // Cap concurrency by waiting for a slot to clear. Every drained result
824        // must be recorded (the throttle and final-collection loops share one
825        // recorder so nothing is dropped).
826        while set.len() >= tool_concurrency {
827            let res = set.join_next().await;
828            if res.is_none() {
829                break; // set drained (all spawned tasks already completed)
830            }
831            record_join_result(res, &mut indexed);
832        }
833    }
834    // Collect any remaining results, re-ordered by original index.
835    //
836    // A spawned task can panic (JoinSet swallows panics, returning `Err` from
837    // `join_next`). We must still return exactly `calls.len()` results so the
838    // caller's `results[i]` indexing can never panic. Missing/failed slots are
839    // filled with an error result.
840    while let Some(res) = set.join_next().await {
841        record_join_result(Some(res), &mut indexed);
842    }
843    indexed
844        .into_iter()
845        .map(|opt| opt.unwrap_or_else(|| error_result("tool task produced no result")))
846        // Order is already correct (indexed by position); this is a no-op guard.
847        .collect()
848}
849
850/// Record a `JoinSet::join_next()` outcome into the indexed results vec. Used
851/// by both the throttling loop and the final collection so no result is
852/// dropped. On a panic/cancel (`Err`) we fill the first still-empty slot
853/// (`join_next` on a panic doesn't report the index).
854fn record_join_result(
855    res: Option<std::result::Result<(usize, ToolResult), tokio::task::JoinError>>,
856    indexed: &mut [Option<ToolResult>],
857) {
858    match res {
859        Some(Ok((i, result))) => {
860            if let Some(slot) = indexed.get_mut(i) {
861                *slot = Some(result);
862            }
863        }
864        Some(Err(join_err)) => {
865            let slot = indexed.iter().position(Option::is_none).unwrap_or(0);
866            if let Some(s) = indexed.get_mut(slot) {
867                *s = Some(error_result(&format!("tool task failed: {join_err}")));
868            }
869        }
870        None => {}
871    }
872}
873
874/// Build a `ToolResult` carrying a single error text block.
875fn error_result(message: &str) -> ToolResult {
876    ToolResult {
877        content: vec![serde_json::json!({ "type": "text", "text": format!("Error: {message}") })],
878        details: None,
879    }
880}
881
882/// Heuristic: a [`ToolResult`] is "ok" unless any text block starts with
883/// `"Error:"`. This matches the [`error_result`] convention. (A future
884/// refactor should add an explicit `ok` field to `ToolResult`.)
885fn tool_result_ok(result: &ToolResult) -> bool {
886    !result.content.iter().any(|c| {
887        c.get("text")
888            .and_then(|t| t.as_str())
889            .is_some_and(|t| t.starts_with("Error:"))
890    })
891}
892
893/// Concatenate the text blocks of the last assistant message in `messages`.
894fn extract_final_text(messages: &[AgentMessage]) -> String {
895    messages
896        .iter()
897        .rev()
898        .find(|m| m.role == Role::Assistant)
899        .map(|m| {
900            m.content
901                .iter()
902                .filter_map(|b| {
903                    if let ContentBlock::Text { text } = b {
904                        Some(text.as_str())
905                    } else {
906                        None
907                    }
908                })
909                .collect::<Vec<_>>()
910                .join("")
911        })
912        .unwrap_or_default()
913}
914
915#[cfg(test)]
916mod tests {
917    //! Walking-skeleton tests for the agent loop.
918    //!
919    //! Uses a scripted mock provider and a mock `echo` tool — no network, no
920    //! sandbox, no API keys. CI-safe.
921
922    use super::*;
923    use crate::model::ModelResponse;
924    use async_trait::async_trait;
925    use serde_json::{json, Value};
926
927    /// A provider that returns scripted responses in order.
928    struct MockProvider {
929        responses: std::sync::Mutex<std::collections::VecDeque<ModelResponse>>,
930    }
931
932    impl MockProvider {
933        fn new(responses: Vec<Vec<AgentMessage>>) -> Self {
934            let responses = responses
935                .into_iter()
936                .map(|msgs| ModelResponse { messages: msgs })
937                .collect();
938            Self {
939                responses: std::sync::Mutex::new(responses),
940            }
941        }
942    }
943
944    #[async_trait]
945    impl ModelProvider for MockProvider {
946        async fn invoke(&self, _request: ModelRequest) -> Result<ModelResponse> {
947            let next = self
948                .responses
949                .lock()
950                .unwrap()
951                .pop_front()
952                .unwrap_or(ModelResponse { messages: vec![] });
953            Ok(next)
954        }
955    }
956
957    /// A tool that echoes its `text` input back.
958    struct EchoTool;
959
960    #[async_trait]
961    impl Tool for EchoTool {
962        fn definition(&self) -> crate::tool::ToolDefinition {
963            crate::tool::ToolDefinition {
964                name: "echo".into(),
965                label: "Echo".into(),
966                description: "Echo back the provided text.".into(),
967                parameters: crate::tool::ParameterSchema::default(),
968            }
969        }
970
971        async fn execute(&self, _ctx: InvokeContext, input: Value) -> Result<ToolResult> {
972            let text = input
973                .get("text")
974                .and_then(Value::as_str)
975                .unwrap_or("(no text)")
976                .to_string();
977            Ok(ToolResult {
978                content: vec![json!({ "type": "text", "text": format!("echo: {text}") })],
979                details: None,
980            })
981        }
982    }
983
984    fn assistant_text(t: &str) -> AgentMessage {
985        AgentMessage {
986            role: Role::Assistant,
987            content: vec![ContentBlock::Text { text: t.into() }],
988        }
989    }
990
991    fn assistant_tool_use(id: &str, name: &str, input: Value) -> AgentMessage {
992        AgentMessage {
993            role: Role::Assistant,
994            content: vec![ContentBlock::ToolUse {
995                id: id.into(),
996                call: ToolCall {
997                    name: name.into(),
998                    input,
999                },
1000            }],
1001        }
1002    }
1003
1004    fn user(t: &str) -> AgentMessage {
1005        AgentMessage {
1006            role: Role::User,
1007            content: vec![ContentBlock::Text { text: t.into() }],
1008        }
1009    }
1010
1011    #[tokio::test]
1012    async fn loop_runs_tool_then_finishes() {
1013        // Turn 1: model calls `echo`. Turn 2: model returns final text.
1014        let provider = MockProvider::new(vec![
1015            vec![assistant_tool_use(
1016                "call_1",
1017                "echo",
1018                json!({ "text": "hello" }),
1019            )],
1020            vec![assistant_text("done")],
1021        ]);
1022        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1023        let model = Model::new("mock/test");
1024        let mut messages = vec![user("please echo hello then say done")];
1025
1026        let outcome = run_agent(
1027            &provider,
1028            &tools,
1029            &mut messages,
1030            &model,
1031            &RunConfig::default(),
1032            &CancellationToken::new(),
1033            &RunHooks::default(),
1034        )
1035        .await
1036        .expect("loop should complete");
1037
1038        assert_eq!(outcome.turns, 2);
1039        assert_eq!(outcome.final_text, "done");
1040
1041        // History must contain: user, assistant(tool_use), tool(result), assistant(text).
1042        assert_eq!(messages.len(), 4);
1043        assert_eq!(messages[2].role, Role::Tool);
1044        // The tool result content must carry the echoed text.
1045        match &messages[2].content[0] {
1046            ContentBlock::ToolResult { content, .. } => {
1047                let s = serde_json::to_string(content).unwrap_or_default();
1048                assert!(s.contains("echo: hello"), "tool result was: {s}");
1049            }
1050            other => panic!("expected ToolResult, got {other:?}"),
1051        }
1052    }
1053
1054    #[tokio::test]
1055    async fn loop_stops_when_no_tool_calls() {
1056        let provider = MockProvider::new(vec![vec![assistant_text("just text, no tools")]]);
1057        let tools: Vec<Arc<dyn Tool>> = vec![];
1058        let model = Model::new("mock/test");
1059        let mut messages = vec![user("hi")];
1060
1061        let outcome = run_agent(
1062            &provider,
1063            &tools,
1064            &mut messages,
1065            &model,
1066            &RunConfig::default(),
1067            &CancellationToken::new(),
1068            &RunHooks::default(),
1069        )
1070        .await
1071        .expect("loop should complete");
1072
1073        assert_eq!(outcome.turns, 1);
1074        assert_eq!(outcome.final_text, "just text, no tools");
1075    }
1076
1077    #[tokio::test]
1078    async fn loop_recovers_from_unknown_tool() {
1079        // Model calls a tool that doesn't exist; loop must surface an error
1080        // result to the model and continue, not abort.
1081        let provider = MockProvider::new(vec![
1082            vec![assistant_tool_use("c1", "nonexistent", json!({}))],
1083            vec![assistant_text("recovered")],
1084        ]);
1085        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1086        let model = Model::new("mock/test");
1087        let mut messages = vec![user("call a missing tool")];
1088
1089        let outcome = run_agent(
1090            &provider,
1091            &tools,
1092            &mut messages,
1093            &model,
1094            &RunConfig::default(),
1095            &CancellationToken::new(),
1096            &RunHooks::default(),
1097        )
1098        .await
1099        .expect("loop should recover");
1100
1101        assert_eq!(outcome.final_text, "recovered");
1102        let tool_msg = &messages[2];
1103        assert_eq!(tool_msg.role, Role::Tool);
1104    }
1105
1106    #[tokio::test]
1107    async fn loop_aborts_on_max_turns() {
1108        // Every turn calls echo again → never terminates; must hit max_turns.
1109        let repeat = || vec![assistant_tool_use("c", "echo", json!({ "text": "x" }))];
1110        let provider = MockProvider::new(vec![repeat(), repeat(), repeat(), repeat()]);
1111        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1112        let model = Model::new("mock/test");
1113        let mut messages = vec![user("loop forever")];
1114
1115        let result = run_agent(
1116            &provider,
1117            &tools,
1118            &mut messages,
1119            &model,
1120            &RunConfig {
1121                max_turns: 3,
1122                ..RunConfig::default()
1123            },
1124            &CancellationToken::new(),
1125            &RunHooks::default(),
1126        )
1127        .await;
1128
1129        assert!(result.is_err(), "must abort on max_turns");
1130    }
1131
1132    /// A policy that denies every tool call (generic; no Fae types).
1133    struct DenyAllPolicy;
1134
1135    #[async_trait]
1136    impl ToolPolicy for DenyAllPolicy {
1137        async fn check(&self, _tool: &str, _input: &Value, _ctx: &InvokeContext) -> PolicyVerdict {
1138            PolicyVerdict::Deny("blocked in test".into())
1139        }
1140    }
1141
1142    #[tokio::test]
1143    async fn policy_deny_blocks_tool_but_run_continues() {
1144        // Turn 1: model calls echo. The policy denies it: the tool must NOT
1145        // execute, a denial error result is appended, and the loop continues
1146        // to turn 2's final text — matching the unknown-tool recovery path.
1147        let provider = MockProvider::new(vec![
1148            vec![assistant_tool_use(
1149                "c1",
1150                "echo",
1151                json!({ "text": "secret" }),
1152            )],
1153            vec![assistant_text("done")],
1154        ]);
1155        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1156        let model = Model::new("mock/test");
1157        let mut messages = vec![user("call echo")];
1158        let policy = DenyAllPolicy;
1159        let hooks = RunHooks {
1160            policy: Some(&policy),
1161            ..RunHooks::default()
1162        };
1163
1164        let outcome = run_agent(
1165            &provider,
1166            &tools,
1167            &mut messages,
1168            &model,
1169            &RunConfig::default(),
1170            &CancellationToken::new(),
1171            &hooks,
1172        )
1173        .await
1174        .expect("loop completes despite denial");
1175
1176        assert_eq!(outcome.final_text, "done");
1177        // The tool result must be the denial — proving the tool never ran.
1178        let s = match &messages[2].content[0] {
1179            ContentBlock::ToolResult { content, .. } => content.to_string(),
1180            other => panic!("expected ToolResult, got {other:?}"),
1181        };
1182        assert!(s.contains("denied by policy"), "expected denial, got: {s}");
1183        assert!(
1184            !s.contains("echo: secret"),
1185            "denied tool must NOT have executed: {s}"
1186        );
1187    }
1188
1189    #[tokio::test]
1190    async fn policy_none_is_allow_all() {
1191        // Default hooks (policy: None) must behave exactly as before: the tool
1192        // executes and its echoed output reaches the model.
1193        let provider = MockProvider::new(vec![
1194            vec![assistant_tool_use("c1", "echo", json!({ "text": "hi" }))],
1195            vec![assistant_text("done")],
1196        ]);
1197        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1198        let model = Model::new("mock/test");
1199        let mut messages = vec![user("call echo")];
1200
1201        let outcome = run_agent(
1202            &provider,
1203            &tools,
1204            &mut messages,
1205            &model,
1206            &RunConfig::default(),
1207            &CancellationToken::new(),
1208            &RunHooks::default(),
1209        )
1210        .await
1211        .expect("loop completes");
1212        assert_eq!(outcome.final_text, "done");
1213        let s = match &messages[2].content[0] {
1214            ContentBlock::ToolResult { content, .. } => content.to_string(),
1215            other => panic!("expected ToolResult, got {other:?}"),
1216        };
1217        assert!(s.contains("echo: hi"), "tool should have run: {s}");
1218    }
1219
1220    /// A tool that records every execution into a shared log. Used to prove a
1221    /// denied policy call never reaches `execute` — even under the parallel
1222    /// (`tool_concurrency > 1`) path. (The sequential deny test above inspects
1223    /// the result text; this one inspects whether `execute` ran at all.)
1224    struct RecordingTool {
1225        name: String,
1226        log: Arc<std::sync::Mutex<Vec<String>>>,
1227    }
1228
1229    #[async_trait]
1230    impl Tool for RecordingTool {
1231        fn definition(&self) -> crate::tool::ToolDefinition {
1232            crate::tool::ToolDefinition {
1233                name: self.name.clone(),
1234                label: "Recording".into(),
1235                description: "Records each execution.".into(),
1236                parameters: crate::tool::ParameterSchema::default(),
1237            }
1238        }
1239
1240        async fn execute(&self, _ctx: InvokeContext, input: Value) -> Result<ToolResult> {
1241            let tag = input
1242                .get("tag")
1243                .and_then(Value::as_str)
1244                .unwrap_or("?")
1245                .to_string();
1246            self.log.lock().expect("lock poisoned").push(tag);
1247            Ok(ToolResult {
1248                content: vec![json!({ "type": "text", "text": "ran" })],
1249                details: None,
1250            })
1251        }
1252    }
1253
1254    /// Regression guard for the security-critical parallel-path invariant: the
1255    /// policy hook MUST be consulted under `tool_concurrency > 1`, every denied
1256    /// call MUST be skipped (its tool never executes), and each denied slot
1257    /// MUST carry a model-visible denial result so the loop continues.
1258    ///
1259    /// (The "checked before `JoinSet::spawn`" placement is enforced by the
1260    /// borrow checker — the borrowed `&dyn ToolPolicy` is not `'static` and so
1261    /// cannot move into a spawned task; `policy_check` is awaited outside the
1262    /// task. This test pins the *behavior* that placement guarantees.)
1263    #[tokio::test]
1264    async fn policy_deny_blocks_tools_on_the_parallel_path() {
1265        // One turn, 3 tool calls, concurrency = 4 (well into the parallel path).
1266        // DenyAllPolicy refuses every call: none of the 3 tools may execute.
1267        let log = Arc::new(std::sync::Mutex::new(Vec::new()));
1268        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(RecordingTool {
1269            name: "rec".into(),
1270            log: log.clone(),
1271        })];
1272        let turn = vec![
1273            assistant_tool_use("c1", "rec", json!({ "tag": "one" })),
1274            assistant_tool_use("c2", "rec", json!({ "tag": "two" })),
1275            assistant_tool_use("c3", "rec", json!({ "tag": "three" })),
1276        ];
1277        let provider = MockProvider::new(vec![turn, vec![assistant_text("done")]]);
1278        let model = Model::new("mock/test");
1279        let mut messages = vec![user("call all three")];
1280        let config = RunConfig {
1281            tool_concurrency: 4,
1282            ..RunConfig::default()
1283        };
1284        let policy = DenyAllPolicy;
1285        let hooks = RunHooks {
1286            policy: Some(&policy),
1287            ..RunHooks::default()
1288        };
1289
1290        let outcome = run_agent(
1291            &provider,
1292            &tools,
1293            &mut messages,
1294            &model,
1295            &config,
1296            &CancellationToken::new(),
1297            &hooks,
1298        )
1299        .await
1300        .expect("loop completes despite denials");
1301
1302        assert_eq!(outcome.final_text, "done");
1303
1304        // Negative control: NO tool executed. If the parallel path skipped the
1305        // policy check (or checked inside the task after dispatch), the log
1306        // would be non-empty.
1307        let executed = log.lock().expect("lock poisoned").clone();
1308        assert!(
1309            executed.is_empty(),
1310            "denied tools must NOT execute on the parallel path: ran {executed:?}"
1311        );
1312
1313        // Every denied slot carries a model-visible denial result, in issued
1314        // order, so the model can recover.
1315        let results: Vec<String> = messages
1316            .iter()
1317            .filter(|m| m.role == Role::Tool)
1318            .filter_map(|m| match &m.content[0] {
1319                ContentBlock::ToolResult {
1320                    tool_use_id,
1321                    content,
1322                    ..
1323                } => {
1324                    let text = content.to_string();
1325                    Some(format!("{tool_use_id}:{text}"))
1326                }
1327                _ => None,
1328            })
1329            .collect();
1330        assert_eq!(
1331            results.len(),
1332            3,
1333            "all 3 denied calls must produce a result slot: {results:?}"
1334        );
1335        for r in &results {
1336            assert!(
1337                r.contains("denied by policy"),
1338                "parallel-path denial must surface to the model: {r}"
1339            );
1340        }
1341        // Slots remain in issued order (c1, c2, c3) — the denial must not
1342        // reshuffle results under concurrency.
1343        assert!(
1344            results[0].starts_with("c1:")
1345                && results[1].starts_with("c2:")
1346                && results[2].starts_with("c3:"),
1347            "denial slots must preserve issued order: {results:?}"
1348        );
1349    }
1350
1351    #[tokio::test]
1352    async fn loop_respects_cancellation() {
1353        // Cancel before starting.
1354        let provider = MockProvider::new(vec![vec![assistant_text("never reached")]]);
1355        let tools: Vec<Arc<dyn Tool>> = vec![];
1356        let model = Model::new("mock/test");
1357        let mut messages = vec![user("hi")];
1358        let cancel = CancellationToken::new();
1359        cancel.cancel();
1360
1361        let result = run_agent(
1362            &provider,
1363            &tools,
1364            &mut messages,
1365            &model,
1366            &RunConfig::default(),
1367            &cancel,
1368            &RunHooks::default(),
1369        )
1370        .await;
1371
1372        assert!(matches!(result, Err(CoreError::Cancelled(_))));
1373    }
1374
1375    /// A provider that sleeps for a fixed duration before each response,
1376    /// used to exercise `turn_timeout_ms`.
1377    struct SlowProvider {
1378        delay_ms: u64,
1379        responses: std::sync::Mutex<std::collections::VecDeque<ModelResponse>>,
1380    }
1381
1382    impl SlowProvider {
1383        fn new(delay_ms: u64, responses: Vec<Vec<AgentMessage>>) -> Self {
1384            let responses = responses
1385                .into_iter()
1386                .map(|m| ModelResponse { messages: m })
1387                .collect();
1388            Self {
1389                delay_ms,
1390                responses: std::sync::Mutex::new(responses),
1391            }
1392        }
1393    }
1394
1395    #[async_trait]
1396    impl ModelProvider for SlowProvider {
1397        async fn invoke(&self, _request: ModelRequest) -> Result<ModelResponse> {
1398            tokio::time::sleep(std::time::Duration::from_millis(self.delay_ms)).await;
1399            let next = self
1400                .responses
1401                .lock()
1402                .unwrap()
1403                .pop_front()
1404                .unwrap_or(ModelResponse { messages: vec![] });
1405            Ok(next)
1406        }
1407    }
1408
1409    #[tokio::test]
1410    async fn turn_timeout_aborts_slow_provider() {
1411        // Provider sleeps 500ms; turn timeout is 100ms.
1412        let provider = SlowProvider::new(500, vec![vec![assistant_text("too slow")]]);
1413        let model = Model::new("mock/test");
1414        let mut messages = vec![user("hi")];
1415        let config = RunConfig {
1416            turn_timeout_ms: Some(100),
1417            ..RunConfig::default()
1418        };
1419
1420        let result = run_agent(
1421            &provider,
1422            &[],
1423            &mut messages,
1424            &model,
1425            &config,
1426            &CancellationToken::new(),
1427            &RunHooks::default(),
1428        )
1429        .await;
1430
1431        assert!(
1432            matches!(result, Err(CoreError::Cancelled(_))),
1433            "expected cancelled, got {result:?}"
1434        );
1435    }
1436
1437    #[tokio::test]
1438    async fn max_tool_calls_per_turn_rejects_runaway_response() {
1439        // Model issues 5 tool calls in one turn; cap is 2.
1440        let runaway: Vec<AgentMessage> = (0..5)
1441            .map(|i| assistant_tool_use(&format!("c{i}"), "echo", json!({ "text": "x" })))
1442            .collect();
1443        let provider = MockProvider::new(vec![runaway]);
1444        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1445        let model = Model::new("mock/test");
1446        let mut messages = vec![user("call many tools")];
1447        let config = RunConfig {
1448            max_tool_calls_per_turn: 2,
1449            ..RunConfig::default()
1450        };
1451
1452        let result = run_agent(
1453            &provider,
1454            &tools,
1455            &mut messages,
1456            &model,
1457            &config,
1458            &CancellationToken::new(),
1459            &RunHooks::default(),
1460        )
1461        .await;
1462
1463        assert!(result.is_err(), "runaway tool calls must be rejected");
1464        let err = result.unwrap_err().to_string();
1465        assert!(err.contains("max"), "error should mention the cap: {err}");
1466    }
1467
1468    /// A tool that records the order in which its invocations *complete*.
1469    struct OrderingTool {
1470        name: String,
1471        delay_ms: u64,
1472        log: Arc<std::sync::Mutex<Vec<String>>>,
1473    }
1474
1475    #[async_trait]
1476    impl Tool for OrderingTool {
1477        fn definition(&self) -> crate::tool::ToolDefinition {
1478            crate::tool::ToolDefinition {
1479                name: self.name.clone(),
1480                label: "Ordering".into(),
1481                description: "Records completion order.".into(),
1482                parameters: crate::tool::ParameterSchema::default(),
1483            }
1484        }
1485
1486        async fn execute(&self, _ctx: InvokeContext, input: Value) -> Result<ToolResult> {
1487            tokio::time::sleep(std::time::Duration::from_millis(self.delay_ms)).await;
1488            self.log.lock().unwrap().push(
1489                input
1490                    .get("tag")
1491                    .and_then(Value::as_str)
1492                    .unwrap_or("?")
1493                    .to_string(),
1494            );
1495            Ok(ToolResult {
1496                content: vec![json!({ "type": "text", "text": "ok" })],
1497                details: None,
1498            })
1499        }
1500    }
1501
1502    #[tokio::test]
1503    async fn parallel_tool_calls_preserve_result_order() {
1504        // Two tool calls in one turn. The first is slow, the second fast.
1505        // With concurrency > 1 they finish out of order, but the appended
1506        // Tool messages must remain in the model's issued order.
1507        let log = Arc::new(std::sync::Mutex::new(Vec::new()));
1508        let tools: Vec<Arc<dyn Tool>> = vec![
1509            Arc::new(OrderingTool {
1510                name: "slow".into(),
1511                delay_ms: 60,
1512                log: log.clone(),
1513            }),
1514            Arc::new(OrderingTool {
1515                name: "fast".into(),
1516                delay_ms: 5,
1517                log: log.clone(),
1518            }),
1519        ];
1520        let turn = vec![
1521            assistant_tool_use("c1", "slow", json!({ "tag": "slow" })),
1522            assistant_tool_use("c2", "fast", json!({ "tag": "fast" })),
1523        ];
1524        let provider = MockProvider::new(vec![turn, vec![assistant_text("done")]]);
1525        let model = Model::new("mock/test");
1526        let mut messages = vec![user("call both")];
1527        let config = RunConfig {
1528            tool_concurrency: 4,
1529            ..RunConfig::default()
1530        };
1531
1532        let outcome = run_agent(
1533            &provider,
1534            &tools,
1535            &mut messages,
1536            &model,
1537            &config,
1538            &CancellationToken::new(),
1539            &RunHooks::default(),
1540        )
1541        .await
1542        .expect("loop should complete");
1543
1544        assert_eq!(outcome.final_text, "done");
1545
1546        // The completion log is [fast, slow] (fast finished first), proving
1547        // they actually ran in parallel rather than sequentially.
1548        let completed = log.lock().unwrap().clone();
1549        assert_eq!(
1550            completed,
1551            vec!["fast", "slow"],
1552            "tools must have run concurrently: {completed:?}"
1553        );
1554
1555        // But the appended Tool messages must be in issued order: c1 then c2.
1556        let tool_ids: Vec<String> = messages
1557            .iter()
1558            .filter(|m| m.role == Role::Tool)
1559            .filter_map(|m| match &m.content[0] {
1560                ContentBlock::ToolResult { tool_use_id, .. } => Some(tool_use_id.clone()),
1561                _ => None,
1562            })
1563            .collect();
1564        assert_eq!(
1565            tool_ids,
1566            vec!["c1", "c2"],
1567            "results must be appended in issued order: {tool_ids:?}"
1568        );
1569    }
1570
1571    /// A tool whose `execute` panics. The parallel path must NOT propagate the
1572    /// panic; it must fill that slot with an error result and keep going.
1573    struct PanickingTool;
1574
1575    #[async_trait]
1576    impl Tool for PanickingTool {
1577        fn definition(&self) -> crate::tool::ToolDefinition {
1578            crate::tool::ToolDefinition {
1579                name: "boom".into(),
1580                label: "Boom".into(),
1581                description: "Always panics.".into(),
1582                parameters: crate::tool::ParameterSchema::default(),
1583            }
1584        }
1585
1586        async fn execute(&self, _ctx: InvokeContext, _input: Value) -> Result<ToolResult> {
1587            panic!("deliberate tool panic");
1588        }
1589    }
1590
1591    #[tokio::test]
1592    async fn parallel_path_survives_a_task_panic() {
1593        // Two tool calls in one turn: one panics, one succeeds. The loop must
1594        // not panic; it must append two Tool messages (one error result, one ok).
1595        let turn = vec![
1596            assistant_tool_use("c1", "boom", json!({})),
1597            assistant_tool_use("c2", "echo", json!({ "text": "survived" })),
1598        ];
1599        let provider = MockProvider::new(vec![turn, vec![assistant_text("done")]]);
1600        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(PanickingTool), Arc::new(EchoTool)];
1601        let model = Model::new("mock/test");
1602        let mut messages = vec![user("call both")];
1603        let config = RunConfig {
1604            tool_concurrency: 4,
1605            ..RunConfig::default()
1606        };
1607
1608        // This must not panic.
1609        let outcome = run_agent(
1610            &provider,
1611            &tools,
1612            &mut messages,
1613            &model,
1614            &config,
1615            &CancellationToken::new(),
1616            &RunHooks::default(),
1617        )
1618        .await
1619        .expect("loop must survive a tool panic");
1620
1621        assert_eq!(outcome.final_text, "done");
1622        // Exactly two Tool messages appended (one per call), in issued order.
1623        let tool_ids: Vec<String> = messages
1624            .iter()
1625            .filter(|m| m.role == Role::Tool)
1626            .filter_map(|m| match &m.content[0] {
1627                ContentBlock::ToolResult { tool_use_id, .. } => Some(tool_use_id.clone()),
1628                _ => None,
1629            })
1630            .collect();
1631        assert_eq!(
1632            tool_ids,
1633            vec!["c1", "c2"],
1634            "both results must be present despite the panic: {tool_ids:?}"
1635        );
1636    }
1637
1638    /// The DEFAULT (sequential, `tool_concurrency == 1`) path must ALSO survive
1639    /// a tool panic — converted to a model-visible `Error:` result, not an
1640    /// aborting unwind. This is the gap the dev-config-UX red-team flagged:
1641    /// the parallel path had JoinSet panic isolation, but the sequential path
1642    /// (the default) did not until `execute_tool_call` grew `catch_unwind`.
1643    #[tokio::test]
1644    async fn sequential_path_survives_a_tool_panic() {
1645        let turn = vec![
1646            assistant_tool_use("c1", "boom", json!({})),
1647            assistant_tool_use("c2", "echo", json!({ "text": "survived" })),
1648        ];
1649        let provider = MockProvider::new(vec![turn, vec![assistant_text("done")]]);
1650        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(PanickingTool), Arc::new(EchoTool)];
1651        let model = Model::new("mock/test");
1652        let mut messages = vec![user("call both")];
1653        // tool_concurrency == 1 → sequential path (the default).
1654        let config = RunConfig::default();
1655
1656        // This must not panic.
1657        let outcome = run_agent(
1658            &provider,
1659            &tools,
1660            &mut messages,
1661            &model,
1662            &config,
1663            &CancellationToken::new(),
1664            &RunHooks::default(),
1665        )
1666        .await
1667        .expect("sequential path must survive a tool panic");
1668        assert_eq!(outcome.final_text, "done");
1669
1670        // Both Tool results present, in issued order.
1671        let results: Vec<&ContentBlock> = messages
1672            .iter()
1673            .filter(|m| m.role == Role::Tool)
1674            .flat_map(|m| m.content.iter())
1675            .collect();
1676        assert_eq!(results.len(), 2, "both results appended");
1677        // c1 (the panic) → contains Error: + panicked; c2 (echo) → the echoed text.
1678        let c1_str = match &results[0] {
1679            ContentBlock::ToolResult { content, .. } => content.to_string(),
1680            _ => String::new(),
1681        };
1682        assert!(
1683            c1_str.contains("Error:"),
1684            "panic must surface as an Error: result, got: {c1_str}"
1685        );
1686        assert!(
1687            c1_str.contains("panicked"),
1688            "error result should mention the panic: {c1_str}"
1689        );
1690    }
1691
1692    /// Parallel-path panics must be attributed to the CORRECT call_id (not the
1693    /// first empty slot) and carry the bounded panic summary — matching the
1694    /// sequential path's quality. Before the fix, the task could panic, and
1695    /// JoinSet::join_next returned Err with no index, so record_join_result
1696    /// filled the first empty slot with a generic 'tool task failed' message.
1697    #[tokio::test]
1698    async fn parallel_path_panic_preserves_call_id_and_summary() {
1699        // c1 → boom (panics); c2 → echo (succeeds). Both run in parallel.
1700        let turn = vec![
1701            assistant_tool_use("c1", "boom", json!({})),
1702            assistant_tool_use("c2", "echo", json!({ "text": "ok" })),
1703        ];
1704        let provider = MockProvider::new(vec![turn, vec![assistant_text("done")]]);
1705        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(PanickingTool), Arc::new(EchoTool)];
1706        let model = Model::new("mock/test");
1707        let mut messages = vec![user("call both")];
1708        let config = RunConfig {
1709            tool_concurrency: 4,
1710            ..RunConfig::default()
1711        };
1712
1713        let outcome = run_agent(
1714            &provider,
1715            &tools,
1716            &mut messages,
1717            &model,
1718            &config,
1719            &CancellationToken::new(),
1720            &RunHooks::default(),
1721        )
1722        .await
1723        .expect("run survives parallel panic");
1724        assert_eq!(outcome.final_text, "done");
1725
1726        // c1 (the panic) must be in the FIRST slot with a 'panicked' summary,
1727        // NOT a generic 'tool task failed'. c2 (echo) in the second slot.
1728        let tool_msgs: Vec<(&String, String)> = messages
1729            .iter()
1730            .filter(|m| m.role == Role::Tool)
1731            .flat_map(|m| m.content.iter())
1732            .filter_map(|b| match b {
1733                ContentBlock::ToolResult {
1734                    tool_use_id,
1735                    content,
1736                } => Some((tool_use_id, content.to_string())),
1737                _ => None,
1738            })
1739            .collect();
1740        assert_eq!(tool_msgs.len(), 2, "both results present");
1741        // Correct call_id attribution (c1 first, c2 second).
1742        assert_eq!(tool_msgs[0].0, "c1", "c1 attributed correctly");
1743        assert_eq!(tool_msgs[1].0, "c2", "c2 attributed correctly");
1744        // The panic carries the bounded summary (not the generic message).
1745        assert!(
1746            tool_msgs[0].1.contains("panicked"),
1747            "parallel panic should carry bounded summary, got: {}",
1748            tool_msgs[0].1
1749        );
1750        assert!(
1751            tool_msgs[0].1.contains("Error:"),
1752            "should be an Error: result, got: {}",
1753            tool_msgs[0].1
1754        );
1755    }
1756
1757    /// Regression test for a pre-existing bug in the parallel path: the
1758    /// throttling loop (`while set.len() >= tool_concurrency { join_next }`)
1759    /// used to **discard** the drained result, so when `calls.len()` exceeded
1760    /// `tool_concurrency`, early-completing results were lost and the run
1761    /// ended up with "produced no result" error results in their slots. The
1762    /// fix records every `join_next` via a shared recorder.
1763    #[tokio::test]
1764    async fn parallel_path_keeps_all_results_under_throttling() {
1765        // 3 calls, concurrency = 2 → the throttle loop must drain-and-record
1766        // (not drain-and-drop) the first task that completes while the third
1767        // is waiting for a slot.
1768        let turn = vec![
1769            assistant_tool_use("c1", "echo", json!({ "text": "one" })),
1770            assistant_tool_use("c2", "echo", json!({ "text": "two" })),
1771            assistant_tool_use("c3", "echo", json!({ "text": "three" })),
1772        ];
1773        let provider = MockProvider::new(vec![turn, vec![assistant_text("done")]]);
1774        let tools: Vec<Arc<dyn Tool>> = vec![Arc::new(EchoTool)];
1775        let model = Model::new("mock/test");
1776        let mut messages = vec![user("call all three")];
1777        let config = RunConfig {
1778            tool_concurrency: 2,
1779            ..RunConfig::default()
1780        };
1781
1782        let outcome = run_agent(
1783            &provider,
1784            &tools,
1785            &mut messages,
1786            &model,
1787            &config,
1788            &CancellationToken::new(),
1789            &RunHooks::default(),
1790        )
1791        .await
1792        .expect("run completes");
1793        assert_eq!(outcome.final_text, "done");
1794
1795        // All three Tool results must be present, in issued order, with the
1796        // correct echoed text in each slot (proving the RIGHT result landed,
1797        // not a placeholder).
1798        let results: Vec<String> = messages
1799            .iter()
1800            .filter(|m| m.role == Role::Tool)
1801            .flat_map(|m| m.content.iter())
1802            .filter_map(|b| match b {
1803                ContentBlock::ToolResult {
1804                    tool_use_id,
1805                    content,
1806                } => {
1807                    let text = content
1808                        .get("content")
1809                        .and_then(|c| c.get(0))
1810                        .and_then(|c| c.get("text"))
1811                        .and_then(|t| t.as_str())
1812                        .unwrap_or("<missing>");
1813                    Some(format!("{tool_use_id}={text}"))
1814                }
1815                _ => None,
1816            })
1817            .collect();
1818        assert_eq!(
1819            results,
1820            vec!["c1=echo: one", "c2=echo: two", "c3=echo: three"],
1821            "all 3 results must survive throttling, in order, with correct text: {results:?}"
1822        );
1823    }
1824
1825    #[test]
1826    fn summarize_panic_handles_string_payloads() {
1827        let p: Box<dyn std::any::Any + Send> = Box::new("boom!".to_string());
1828        assert_eq!(summarize_panic(&p), "boom!");
1829    }
1830
1831    #[test]
1832    fn summarize_panic_handles_str_payloads() {
1833        let s: &'static str = "static boom";
1834        let p: Box<dyn std::any::Any + Send> = Box::new(s);
1835        assert_eq!(summarize_panic(&p), "static boom");
1836    }
1837
1838    #[test]
1839    fn summarize_panic_bounds_huge_payloads() {
1840        let huge = "x".repeat(10_000);
1841        let p: Box<dyn std::any::Any + Send> = Box::new(huge);
1842        let summary = summarize_panic(&p);
1843        assert!(
1844            summary.chars().count() <= PANIC_SUMMARY_MAX_CHARS,
1845            "summary not bounded: {} chars",
1846            summary.chars().count()
1847        );
1848        assert!(
1849            summary.ends_with('…'),
1850            "should end with ellipsis: {summary}"
1851        );
1852    }
1853
1854    #[test]
1855    fn summarize_panic_falls_back_for_non_string_payloads() {
1856        let p: Box<dyn std::any::Any + Send> = Box::new(42_i32);
1857        let summary = summarize_panic(&p);
1858        assert!(
1859            summary.contains("non-string"),
1860            "expected fallback marker: {summary}"
1861        );
1862    }
1863
1864    // ── Event emission tests (MVP 4c) ──
1865
1866    use crate::event::{EventSink, RunEvent};
1867    use std::sync::{Arc, Mutex};
1868    use uuid::Uuid;
1869
1870    /// A recording sink that collects every emitted event.
1871    struct RecordingSink {
1872        events: Arc<Mutex<Vec<RunEvent>>>,
1873    }
1874
1875    impl EventSink for RecordingSink {
1876        fn emit(&self, event: RunEvent) {
1877            self.events.lock().expect("lock poisoned").push(event);
1878        }
1879    }
1880
1881    #[tokio::test]
1882    async fn text_only_run_emits_complete_event_sequence() {
1883        let provider = MockProvider::new(vec![vec![assistant_text("hello")]]);
1884        let tools: Vec<Arc<dyn Tool>> = vec![];
1885        let model = Model::new("mock/test");
1886        let mut messages = vec![user("hi")];
1887        let sink = Arc::new(Mutex::new(Vec::new()));
1888        let hooks = RunHooks {
1889            session_id: Some(Uuid::nil()),
1890            turn_sink: None,
1891            event_sink: Some(&RecordingSink {
1892                events: sink.clone(),
1893            } as &dyn EventSink),
1894            policy: None,
1895        };
1896
1897        run_agent(
1898            &provider,
1899            &tools,
1900            &mut messages,
1901            &model,
1902            &RunConfig::default(),
1903            &CancellationToken::new(),
1904            &hooks,
1905        )
1906        .await
1907        .expect("run");
1908
1909        let events = sink.lock().expect("lock poisoned").clone();
1910        // Text-only turn: Session → Turn → Model(S/F) → TurnFinished
1911        assert!(events
1912            .iter()
1913            .any(|e| matches!(e, RunEvent::SessionStarted { .. })));
1914        assert!(events
1915            .iter()
1916            .any(|e| matches!(e, RunEvent::TurnStarted { turn: 1, .. })));
1917        assert!(events.iter().any(
1918            |e| matches!(e, RunEvent::ModelStarted { turn: 1, model, .. } if model == "mock/test")
1919        ));
1920        assert!(events
1921            .iter()
1922            .any(|e| matches!(e, RunEvent::ModelFinished { turn: 1, .. })));
1923        assert!(events
1924            .iter()
1925            .any(|e| matches!(e, RunEvent::TurnFinished { turn: 1, .. })));
1926        // No tool events for a text-only turn.
1927        assert!(!events
1928            .iter()
1929            .any(|e| matches!(e, RunEvent::ToolStarted { .. })));
1930    }
1931
1932    #[tokio::test]
1933    async fn tool_run_emits_tool_started_finished() {
1934        let echo_tool = Arc::new(EchoTool) as Arc<dyn Tool>;
1935        let tools = vec![echo_tool.clone()];
1936        let provider = MockProvider::new(vec![
1937            vec![assistant_tool_use(
1938                "call-1",
1939                "echo",
1940                json!({ "text": "hi" }),
1941            )],
1942            vec![assistant_text("done")],
1943        ]);
1944        let model = Model::new("mock/test");
1945        let mut messages = vec![user("echo hi")];
1946        let sink = Arc::new(Mutex::new(Vec::new()));
1947        let hooks = RunHooks {
1948            session_id: Some(Uuid::nil()),
1949            turn_sink: None,
1950            event_sink: Some(&RecordingSink {
1951                events: sink.clone(),
1952            } as &dyn EventSink),
1953            policy: None,
1954        };
1955
1956        run_agent(
1957            &provider,
1958            &tools,
1959            &mut messages,
1960            &model,
1961            &RunConfig::default(),
1962            &CancellationToken::new(),
1963            &hooks,
1964        )
1965        .await
1966        .expect("run");
1967
1968        let events = sink.lock().expect("lock poisoned").clone();
1969        // Turn 1 has a tool call → ToolStarted then ToolFinished.
1970        assert!(
1971            events.iter().any(|e| matches!(e, RunEvent::ToolStarted { turn: 1, tool, call_id, .. } if tool == "echo" && call_id == "call-1")),
1972            "missing ToolStarted for echo/call-1"
1973        );
1974        assert!(
1975            events.iter().any(|e| matches!(e, RunEvent::ToolFinished { turn: 1, tool, call_id, ok: true, .. } if tool == "echo" && call_id == "call-1")),
1976            "missing ToolFinished for echo/call-1"
1977        );
1978        // Two turns (tool then text).
1979        assert!(events
1980            .iter()
1981            .any(|e| matches!(e, RunEvent::TurnFinished { turn: 2, .. })));
1982    }
1983
1984    #[tokio::test]
1985    async fn no_events_when_session_id_is_none() {
1986        let provider = MockProvider::new(vec![vec![assistant_text("hello")]]);
1987        let tools: Vec<Arc<dyn Tool>> = vec![];
1988        let model = Model::new("mock/test");
1989        let mut messages = vec![user("hi")];
1990        let sink = Arc::new(Mutex::new(Vec::new()));
1991        let hooks = RunHooks {
1992            session_id: None, // no session → no events
1993            turn_sink: None,
1994            event_sink: Some(&RecordingSink {
1995                events: sink.clone(),
1996            } as &dyn EventSink),
1997            policy: None,
1998        };
1999
2000        run_agent(
2001            &provider,
2002            &tools,
2003            &mut messages,
2004            &model,
2005            &RunConfig::default(),
2006            &CancellationToken::new(),
2007            &hooks,
2008        )
2009        .await
2010        .expect("run");
2011
2012        assert!(
2013            sink.lock().expect("lock poisoned").is_empty(),
2014            "events emitted with no session_id"
2015        );
2016    }
2017}