Skip to main content

clark_agent/
run.rs

1//! The canonical agent loop.
2//!
3//! One free function each for run/start and continue — no god-class.
4//!
5//! Shape:
6//!
7//! ```text
8//! agent_start
9//!  └ loop:                          ← outer (follow-up) loop
10//!     turn_start
11//!     [pending steering messages]   ← injected before LLM call
12//!     stream assistant response     ← StreamFn → AssistantMessage
13//!     execute tool batch (if any)   ← parallel/sequential dispatch
14//!     turn_end
15//!     ↻ until no more tool calls AND no steering ready
16//!     check follow-up               ← post-stop injection
17//!  agent_end
18//! ```
19//!
20//! Termination is unanimous-tool-vote: a batch ends the run only when
21//! every finalized tool result sets `terminate = true`. One tool wanting
22//! to stop does not stop the batch.
23
24use futures::stream::StreamExt;
25use std::time::{SystemTime, UNIX_EPOCH};
26use tokio_util::sync::CancellationToken;
27
28use crate::config::LoopConfig;
29use crate::error::{LoopError, StreamError};
30use crate::event::AgentEvent;
31use crate::exec::{execute_tool_batch, ExecutedBatch};
32use crate::plugin::TransformContext;
33use crate::stream::{ReasoningEffort, StreamErrorKind, StreamEvent, StreamRequest, ToolSchema};
34use crate::types::{
35    AgentContext, AgentMessage, AssistantContent, StopReason, ToolResultContent, Usage,
36};
37
38const EMPTY_STREAM_MAX_ATTEMPTS: u8 = 3;
39const EMPTY_STREAM_RETRY_INITIAL_DELAY: std::time::Duration = std::time::Duration::from_millis(250);
40const ZERO_OUTPUT_TRANSPORT_MAX_ATTEMPTS: u8 = 2;
41const ZERO_OUTPUT_TRANSPORT_RETRY_INITIAL_DELAY: std::time::Duration =
42    std::time::Duration::from_millis(500);
43const ZERO_OUTPUT_TRANSPORT_RECOVERY_CONTEXT: &str = "\
44[runtime context — transport recovery, not user instruction]\n\
45The previous provider attempt produced no actionable output: no visible assistant text and no usable tool call reached the runtime. \
46It may have produced private-only reasoning or an unusable burst of partial tool calls. \
47Do not continue with private reasoning only. Re-read the latest observation and immediately choose exactly one next structured tool call; \
48if the answer is ready, use the final response tool.";
49
50/// Hard cap on consecutive plain-text-fallback nudges before the loop
51/// falls back to synthesizing a terminal tool result as a last resort.
52/// Two nudges plus one synthesize keeps the recovery window bounded
53/// without leaning on a caller-configured `empty_outcome_retry_budget`.
54const MAX_PLAIN_TEXT_NUDGE_RETRIES: usize = 2;
55
56/// Outcome label for a completed run.
57///
58/// Distinguishes natural termination from budget-pressure terminations so
59/// callers (notably parent agents reading a subagent's tool result) can
60/// reason about whether the answer is complete or partial. All variants
61/// are non-error — a hard error becomes [`LoopError`].
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum LoopOutcome {
64    /// Model emitted a final assistant turn with no tool calls and no
65    /// pending steering. The natural happy path.
66    Done,
67    /// The graceful turn-limit plugin injected a wrap-up steering message
68    /// and the model produced a clean final turn within the grace window.
69    /// Result text reflects the model's deliberate close-out, not a
70    /// truncated transcript.
71    WrappedUp,
72    /// `max_iterations` was reached before the model wrapped up. The run
73    /// stopped at the cap, but earlier turns are still in the transcript.
74    /// The most recent assistant turn may have had pending tool calls.
75    HitMaxIterations,
76}
77
78impl LoopOutcome {
79    /// Whether this outcome implies a clean, non-partial final answer.
80    pub fn is_complete(self) -> bool {
81        matches!(self, LoopOutcome::Done | LoopOutcome::WrappedUp)
82    }
83
84    /// Short stable label suitable for logs and tool-result prefixes.
85    pub fn label(self) -> &'static str {
86        match self {
87            LoopOutcome::Done => "done",
88            LoopOutcome::WrappedUp => "wrapped_up",
89            LoopOutcome::HitMaxIterations => "hit_max_iterations",
90        }
91    }
92}
93
94/// Result of a completed run: emitted messages plus a typed outcome label.
95///
96/// Returned by [`run`] and [`run_continue`]. `messages` is the slice of
97/// messages produced **during this run** (not the full transcript).
98/// `outcome` lets callers distinguish a natural close from a budget-driven
99/// one without inspecting message content.
100#[derive(Debug, Clone)]
101pub struct RunResult {
102    pub messages: Vec<AgentMessage>,
103    pub outcome: LoopOutcome,
104}
105
106/// Run the loop with one or more starting prompts.
107///
108/// The prompts are appended to the context's existing message list, then
109/// the loop runs until natural stop (no more tool calls, no follow-up).
110/// Returns the messages produced **during this run** plus a typed outcome
111/// label — not the full transcript. Callers that want the full transcript
112/// should fold prior messages into their own state, or read from the
113/// event sink.
114pub async fn run(
115    prompts: Vec<AgentMessage>,
116    context: AgentContext,
117    config: &LoopConfig,
118    signal: CancellationToken,
119) -> Result<RunResult, LoopError> {
120    let mut current = context;
121    let mut new_messages = prompts.clone();
122
123    current.messages.extend(prompts.iter().cloned());
124
125    emit(config, AgentEvent::AgentStart).await;
126    if let Some(identity) = current.identity.clone() {
127        emit(config, AgentEvent::RunIdentified { identity }).await;
128    }
129    emit(config, AgentEvent::TurnStart).await;
130    for prompt in &prompts {
131        emit(
132            config,
133            AgentEvent::MessageStart {
134                message: prompt.clone(),
135            },
136        )
137        .await;
138        emit(
139            config,
140            AgentEvent::MessageEnd {
141                message: prompt.clone(),
142            },
143        )
144        .await;
145    }
146
147    let outcome = inner_run(&mut current, &mut new_messages, config, &signal).await?;
148
149    Ok(RunResult {
150        messages: new_messages,
151        outcome,
152    })
153}
154
155/// Continue an existing context without adding a new prompt.
156///
157/// Used when the trailing message is already a `User` (e.g., steering
158/// queued externally) or `ToolResult` (e.g., an out-of-band tool result
159/// was injected). Errors if the trailing message is `Assistant` — the
160/// model would not respond to its own message.
161pub async fn run_continue(
162    context: AgentContext,
163    config: &LoopConfig,
164    signal: CancellationToken,
165) -> Result<RunResult, LoopError> {
166    let last = context
167        .messages
168        .last()
169        .ok_or_else(|| LoopError::InvalidContinuation("no messages in context".into()))?;
170    if matches!(last, AgentMessage::Assistant { .. }) {
171        return Err(LoopError::InvalidContinuation(
172            "trailing message is assistant".into(),
173        ));
174    }
175
176    let mut current = context;
177    let mut new_messages = Vec::new();
178
179    emit(config, AgentEvent::AgentStart).await;
180    if let Some(identity) = current.identity.clone() {
181        emit(config, AgentEvent::RunIdentified { identity }).await;
182    }
183    emit(config, AgentEvent::TurnStart).await;
184
185    let outcome = inner_run(&mut current, &mut new_messages, config, &signal).await?;
186
187    Ok(RunResult {
188        messages: new_messages,
189        outcome,
190    })
191}
192
193// ─── Internals ─────────────────────────────────────────────────────
194
195async fn emit(config: &LoopConfig, event: AgentEvent) {
196    config.event_sink.emit(event.clone()).await;
197    for observer in &config.plugins.event_observer {
198        observer.on_event(&event).await;
199    }
200}
201
202async fn inner_run(
203    current: &mut AgentContext,
204    new_messages: &mut Vec<AgentMessage>,
205    config: &LoopConfig,
206    signal: &CancellationToken,
207) -> Result<LoopOutcome, LoopError> {
208    let mut first_turn = true;
209    let mut iterations: usize = 0;
210    let mut empty_outcomes_seen: usize = 0;
211    let mut last_turn_stopped_without_tool = false;
212    let mut plain_text_terminal_fallback_candidate: Option<AgentMessage> = None;
213
214    // Steering messages may already be queued (caller produced them
215    // before calling `run`).
216    let mut pending = collect_steering(config).await;
217
218    'outer: loop {
219        let mut has_more_tool_calls = true;
220        // Did the most recent tool batch vote terminate? Reset per
221        // outer iteration so a follow-up-driven re-entry starts clean.
222        //
223        // When the batch produces a unanimous terminator (every
224        // finalized result votes `terminate = true`), the run is over —
225        // `SteeringSource` and `FollowUpSource` plugins must NOT
226        // re-prompt the model with another LLM call. Without this
227        // guard a steering source whose firing condition lined up
228        // with the same turn (e.g. `graceful_turn_limit` reaching
229        // its soft limit on the same turn the model delivered)
230        // would inject a wrap-up message and the loop would burn
231        // another turn after a clean delivery — observed in production,
232        // where a model drifted into hallucinated content on the
233        // wrap-up re-entry after the prior batch had already produced
234        // the correct terminal delivery.
235        let mut last_batch_terminated = false;
236
237        while has_more_tool_calls || !pending.is_empty() {
238            if signal.is_cancelled() {
239                return Err(LoopError::Aborted);
240            }
241            if let Some(max) = config.max_iterations {
242                if iterations >= max {
243                    // Hit the iteration cap. Break out of the inner
244                    // loop so the follow-up sources get one last
245                    // chance to inject a terminator nudge before the
246                    // run ends. The outer loop's own cap-check (added
247                    // below) ensures we don't loop forever.
248                    break;
249                }
250            }
251            iterations += 1;
252
253            if !first_turn {
254                emit(config, AgentEvent::TurnStart).await;
255            } else {
256                first_turn = false;
257            }
258
259            // Inject any pending steering messages before the next LLM call.
260            if !pending.is_empty() {
261                for msg in pending.drain(..) {
262                    emit(
263                        config,
264                        AgentEvent::MessageStart {
265                            message: msg.clone(),
266                        },
267                    )
268                    .await;
269                    emit(
270                        config,
271                        AgentEvent::MessageEnd {
272                            message: msg.clone(),
273                        },
274                    )
275                    .await;
276                    current.messages.push(msg.clone());
277                    new_messages.push(msg);
278                }
279            }
280
281            // Stream one assistant response, applying the configured
282            // max-tokens recovery ladder if a turn comes back
283            // truncated. `iteration` is 0-indexed and counts LLM calls
284            // within this run — `iterations` was already incremented
285            // above for cap-checking, so the 0-indexed turn number is
286            // `iterations - 1`.
287            let (assistant, turn_allowlist) =
288                stream_with_max_tokens_recovery(current, config, signal, iterations - 1).await?;
289            // The assistant message must land in *both* the live conversation
290            // (so the next turn's request body includes it — providers reject
291            // tool messages that don't follow a matching assistant tool_call)
292            // and the run's emitted-messages tail.
293            current.messages.push(assistant.clone());
294            new_messages.push(assistant.clone());
295
296            // Stop on stream-level error/abort. Well-behaved
297            // transports surface these as `StreamEvent::Error`, which
298            // `stream_assistant_response` converts to `LoopError`
299            // before returning. Keep this branch as a guard for
300            // transports that incorrectly finalize a `Done` message
301            // with an error stop reason.
302            let stop_reason = match &assistant {
303                AgentMessage::Assistant { stop_reason, .. } => *stop_reason,
304                _ => StopReason::Other,
305            };
306            if matches!(stop_reason, StopReason::Error | StopReason::Aborted) {
307                let loop_error = match &assistant {
308                    AgentMessage::Assistant {
309                        stop_reason: StopReason::Aborted,
310                        ..
311                    } => LoopError::Aborted,
312                    AgentMessage::Assistant { error_message, .. } => LoopError::Stream(
313                        StreamError::Transient(error_message.clone().unwrap_or_else(|| {
314                            "assistant stream ended with error stop reason".into()
315                        })),
316                    ),
317                    _ => LoopError::Stream(StreamError::Transient(
318                        "assistant stream ended with error stop reason".into(),
319                    )),
320                };
321                emit(
322                    config,
323                    AgentEvent::TurnEnd {
324                        message: assistant,
325                        tool_results: Vec::new(),
326                    },
327                )
328                .await;
329                emit(
330                    config,
331                    AgentEvent::AgentEnd {
332                        messages: new_messages.clone(),
333                    },
334                )
335                .await;
336                return Err(loop_error);
337            }
338
339            // Extract tool calls.
340            let tool_calls: Vec<_> = match &assistant {
341                AgentMessage::Assistant { content, .. } => {
342                    content.tool_calls().into_iter().cloned().collect()
343                }
344                _ => Vec::new(),
345            };
346            last_turn_stopped_without_tool = tool_calls.is_empty();
347            if last_turn_stopped_without_tool {
348                empty_outcomes_seen = empty_outcomes_seen.saturating_add(1);
349            }
350
351            let mut tool_result_messages = Vec::new();
352            has_more_tool_calls = false;
353
354            if tool_calls.is_empty() {
355                if let Some(tool_name) = config.plain_text_terminal_fallback_tool.as_deref() {
356                    let eager = config.plain_text_terminal_fallback_eager;
357                    let terminal_tool_names = config.protocol.terminal_tool_names();
358                    let narrowed_to_terminators = is_terminal_only_allowlist(
359                        turn_allowlist.as_ref(),
360                        tool_name,
361                        &terminal_tool_names,
362                    );
363                    let preserve_plain_text_candidate = plain_assistant_text(&assistant)
364                        .is_some_and(|text| should_preserve_plain_text_terminal_candidate(&text));
365                    if plain_text_terminal_fallback_candidate.is_none()
366                        && preserve_plain_text_candidate
367                    {
368                        plain_text_terminal_fallback_candidate = Some(assistant.clone());
369                    }
370                    let nudge_mode = config.plain_text_terminal_fallback_eager_nudge
371                        && eager
372                        && !narrowed_to_terminators
373                        && empty_outcomes_seen <= MAX_PLAIN_TEXT_NUDGE_RETRIES;
374                    if nudge_mode {
375                        // Catalog still contains real work tools (e.g. `plan`)
376                        // but the model emitted prose. Inject an explicit
377                        // protocol-recovery system message and force the
378                        // inner loop to re-stream rather than laundering
379                        // the prose into a synthetic `message_result`.
380                        // After MAX_PLAIN_TEXT_NUDGE_RETRIES the synthesizer
381                        // below fires as a last resort, preferring the first
382                        // preserved non-clarifying answer so retry drift does
383                        // not replace a good response with recovery chatter.
384                        //
385                        // Push directly into `current.messages` (mirrors the
386                        // synthesize path) rather than `pending`, which is
387                        // overwritten by `collect_steering` at end-of-iter.
388                        // Set `has_more_tool_calls = true` to satisfy the
389                        // inner while-loop's continuation predicate.
390                        //
391                        // The recovery prose comes from the active
392                        // `ProtocolPolicy` (which may name the product's
393                        // delivery / ask tools); the core falls back to a
394                        // generic, vocabulary-free nudge.
395                        let available_tool_names: Vec<&str> =
396                            config.tools.iter().map(|t| t.name()).collect();
397                        let nudge_text = config
398                            .protocol
399                            .plain_text_recovery_prompt(crate::protocol::PlainTextRecoveryContext {
400                                messages: &current.messages,
401                                iteration: iterations - 1,
402                                available_tool_names: &available_tool_names,
403                                terminal_fallback_tool: Some(tool_name),
404                            })
405                            .unwrap_or_else(|| {
406                                crate::protocol::DEFAULT_PLAIN_TEXT_RECOVERY_PROMPT.to_string()
407                            });
408                        let nudge = AgentMessage::System {
409                            content: nudge_text,
410                            timestamp: Some(now_ms()),
411                        };
412                        current.messages.push(nudge.clone());
413                        new_messages.push(nudge);
414                        has_more_tool_calls = true;
415                    } else if let Some(result_msg) = synthesize_plain_text_terminal_result(
416                        plain_text_terminal_fallback_candidate
417                            .as_ref()
418                            .unwrap_or(&assistant),
419                        turn_allowlist.as_ref(),
420                        tool_name,
421                        eager,
422                        &terminal_tool_names,
423                    ) {
424                        plain_text_terminal_fallback_candidate = None;
425                        last_turn_stopped_without_tool = false;
426                        empty_outcomes_seen = 0;
427                        last_batch_terminated = true;
428                        current.messages.push(result_msg.clone());
429                        new_messages.push(result_msg.clone());
430                        tool_result_messages.push(result_msg);
431                    }
432                }
433            } else {
434                let ExecutedBatch {
435                    messages,
436                    terminate,
437                } = execute_tool_batch(
438                    &assistant,
439                    tool_calls,
440                    current,
441                    config,
442                    signal,
443                    turn_allowlist.as_ref(),
444                )
445                .await?;
446
447                // A real tool batch is forward progress; the empty-outcome
448                // budget tracks being stuck, not lifetime empty stops.
449                empty_outcomes_seen = 0;
450                plain_text_terminal_fallback_candidate = None;
451                tool_result_messages = messages;
452                has_more_tool_calls = !terminate;
453                last_batch_terminated = terminate;
454
455                for result_msg in &tool_result_messages {
456                    current.messages.push(result_msg.clone());
457                    new_messages.push(result_msg.clone());
458                }
459            }
460
461            emit(
462                config,
463                AgentEvent::TurnEnd {
464                    message: assistant,
465                    tool_results: tool_result_messages,
466                },
467            )
468            .await;
469
470            // Drain any new steering messages that arrived during the
471            // turn — except when the batch just emitted a unanimous
472            // terminator. A clean terminator vote is the model's
473            // "we're done" signal; further steering would re-prompt
474            // past the delivery and let the model drift.
475            pending = if last_batch_terminated {
476                Vec::new()
477            } else {
478                collect_steering(config).await
479            };
480        }
481
482        // Inner loop exhausted: either (a) the model produced no tool
483        // calls AND no steering is queued, or (b) we hit the iteration
484        // cap. In either case, give the follow-up sources one last
485        // chance to inject a terminator nudge before declaring the
486        // run done. To prevent infinite looping when a follow-up
487        // re-arms but we're already past the cap, exit unconditionally
488        // if the cap was hit.
489        let cap_hit = config.max_iterations.is_some_and(|max| iterations >= max);
490        // Skip the follow-up source pass when the last batch
491        // terminated for the same reason steering is skipped above:
492        // a clean terminator vote means the run is done; follow-up
493        // sources exist to nudge the model toward a terminator when
494        // it failed to emit one, not to overrule one it already cast.
495        let follow_up = if last_batch_terminated {
496            Vec::new()
497        } else {
498            collect_follow_up(config).await
499        };
500        if last_turn_stopped_without_tool {
501            if let Some(budget) = config.empty_outcome_retry_budget {
502                if empty_outcomes_seen > budget {
503                    emit(
504                        config,
505                        AgentEvent::AgentEnd {
506                            messages: new_messages.clone(),
507                        },
508                    )
509                    .await;
510                    return Err(LoopError::EmptyOutcomeBudgetExhausted {
511                        budget,
512                        observed: empty_outcomes_seen,
513                    });
514                }
515            }
516        }
517        if !follow_up.is_empty() && !cap_hit {
518            pending = follow_up;
519            continue 'outer;
520        }
521        // If the cap was hit but a follow-up was produced, append it
522        // to the transcript so listeners see the final nudge — but do
523        // NOT re-enter the LLM loop. The user-facing run still ends
524        // with this message as the last appended turn.
525        if cap_hit {
526            for msg in follow_up {
527                emit(
528                    config,
529                    AgentEvent::MessageStart {
530                        message: msg.clone(),
531                    },
532                )
533                .await;
534                emit(
535                    config,
536                    AgentEvent::MessageEnd {
537                        message: msg.clone(),
538                    },
539                )
540                .await;
541                current.messages.push(msg.clone());
542                new_messages.push(msg);
543            }
544        }
545
546        break;
547    }
548
549    emit(
550        config,
551        AgentEvent::AgentEnd {
552            messages: new_messages.clone(),
553        },
554    )
555    .await;
556
557    // Classify outcome.
558    // - HitMaxIterations: hard cap was reached before the model stopped
559    //   tool-calling. The transcript may end on a turn that wanted to do
560    //   more.
561    // - WrappedUp: the graceful-turn-limit plugin fired its one-shot
562    //   wrap-up steer AND we exited naturally (cap not hit). The model
563    //   responded to the warning and produced a clean close.
564    // - Done: natural termination with no budget pressure.
565    let cap_hit_final = config.max_iterations.is_some_and(|max| iterations >= max);
566    let wrap_up_fired = config
567        .grace_signal
568        .as_ref()
569        .is_some_and(|flag| flag.load(std::sync::atomic::Ordering::Relaxed));
570    let outcome = if cap_hit_final {
571        LoopOutcome::HitMaxIterations
572    } else if wrap_up_fired {
573        LoopOutcome::WrappedUp
574    } else {
575        LoopOutcome::Done
576    };
577    Ok(outcome)
578}
579
580async fn collect_steering(config: &LoopConfig) -> Vec<AgentMessage> {
581    let mut out = Vec::new();
582    for source in &config.plugins.steering {
583        out.extend(source.next_steering_messages().await);
584    }
585    out
586}
587
588async fn collect_follow_up(config: &LoopConfig) -> Vec<AgentMessage> {
589    let mut out = Vec::new();
590    for source in &config.plugins.follow_up {
591        out.extend(source.next_follow_up_messages().await);
592    }
593    out
594}
595
596fn synthesize_plain_text_terminal_result(
597    assistant: &AgentMessage,
598    turn_allowlist: Option<&std::collections::HashSet<String>>,
599    tool_name: &str,
600    eager: bool,
601    terminal_tool_names: &std::collections::HashSet<String>,
602) -> Option<AgentMessage> {
603    // The default contract is "only convert plain text once the runtime
604    // has narrowed the catalog to terminators" — preserves strict
605    // delivery shape for everyone else. When `eager` is set the gate is
606    // lifted: the host has signalled this provider can never honor
607    // forced tool choice, so prose IS the failure mode and the nudge
608    // cycle that normally narrows the allowlist would just burn turns.
609    if !eager && !is_terminal_only_allowlist(turn_allowlist, tool_name, terminal_tool_names) {
610        return None;
611    }
612    let text = plain_assistant_text(assistant)?;
613    Some(AgentMessage::ToolResult {
614        tool_call_id: format!("plain_text_terminal_fallback_{}", now_ms()),
615        tool_name: tool_name.to_string(),
616        content: ToolResultContent::text(text),
617        is_error: false,
618        narration: Some(
619            "Converted plain assistant text into terminal delivery for an auto-tool-choice provider."
620                .to_string(),
621        ),
622        details: None,
623        timestamp: Some(now_ms()),
624    })
625}
626
627fn plain_assistant_text(assistant: &AgentMessage) -> Option<String> {
628    let AgentMessage::Assistant { content, .. } = assistant else {
629        return None;
630    };
631    let text = crate::strip_thinking_tags(&content.plain_text())
632        .trim()
633        .to_string();
634    (!text.is_empty()).then_some(text)
635}
636
637fn should_preserve_plain_text_terminal_candidate(text: &str) -> bool {
638    !looks_like_permission_or_clarification_question(text)
639}
640
641fn looks_like_permission_or_clarification_question(text: &str) -> bool {
642    let trimmed = text.trim();
643    if !trimmed.contains('?') {
644        return false;
645    }
646    let lower = trimmed.to_ascii_lowercase();
647    let starts_with_prompt = [
648        "would you like",
649        "shall i",
650        "should i",
651        "do you want",
652        "what would you like",
653        "what do you need",
654        "what's your next move",
655        "what is your next move",
656        "continue what",
657    ]
658    .iter()
659    .any(|prefix| lower.starts_with(prefix));
660    starts_with_prompt
661        || (trimmed.len() <= 500
662            && lower.contains("what")
663            && (lower.contains("next") || lower.contains("continue")))
664}
665
666/// Whether a turn's allowlist has narrowed to "terminal only" — it
667/// contains the configured fallback terminal tool and nothing but
668/// terminal/delivery tools. The set of *other* names that count as
669/// terminal comes from the active [`crate::protocol::ProtocolPolicy`]
670/// ([`crate::protocol::ProtocolPolicy::terminal_tool_names`]); the core
671/// hardcodes no product tool names. With the default policy (empty extra
672/// set) an allowlist is terminal-only exactly when it contains only the
673/// fallback tool itself.
674fn is_terminal_only_allowlist(
675    turn_allowlist: Option<&std::collections::HashSet<String>>,
676    terminal_tool: &str,
677    terminal_tool_names: &std::collections::HashSet<String>,
678) -> bool {
679    let Some(allowlist) = turn_allowlist else {
680        return false;
681    };
682    !allowlist.is_empty()
683        && allowlist.contains(terminal_tool)
684        && allowlist
685            .iter()
686            .all(|tool| tool == terminal_tool || terminal_tool_names.contains(tool))
687}
688
689// ─── Stream one assistant response ─────────────────────────────────
690
691/// Wrap [`stream_assistant_response`] with the configured max-output-
692/// tokens recovery ladder. When recovery is disabled (the default), this
693/// reduces to a single call. When enabled, a `StopReason::MaxTokens`
694/// turn is discarded and the next attempt re-streams with a larger
695/// cap until the ladder runs out or the model produces a non-truncated
696/// turn.
697///
698/// Discarded turns *do* fire `MessageStart`/`MessageEnd` from the
699/// inner streamer — listeners that care must correlate via the
700/// `OutputTokensEscalation` event that this wrapper emits before each
701/// retry. Persistence layers should treat the message that immediately
702/// precedes an `OutputTokensEscalation` as overridden by the next
703/// `MessageEnd`.
704async fn stream_with_max_tokens_recovery(
705    context: &AgentContext,
706    config: &LoopConfig,
707    signal: &CancellationToken,
708    iteration: usize,
709) -> Result<(AgentMessage, Option<std::collections::HashSet<String>>), LoopError> {
710    let mut current_cap = config.max_output_tokens;
711    let mut max_tokens_attempt: u8 = 0;
712    let mut empty_stream_attempts: u8 = 0;
713    let mut zero_output_transport_attempts: u8 = 0;
714    let mut zero_output_recovery_context: Option<AgentContext> = None;
715    let mut reasoning = config.reasoning;
716
717    loop {
718        let attempt_context = zero_output_recovery_context.as_ref().unwrap_or(context);
719        let (assistant, allowlist) = match stream_assistant_response(
720            attempt_context,
721            config,
722            signal,
723            iteration,
724            current_cap,
725            reasoning,
726        )
727        .await
728        {
729            Ok(pair) => pair,
730            Err(LoopError::Stream(StreamError::Empty))
731                if empty_stream_attempts + 1 < EMPTY_STREAM_MAX_ATTEMPTS =>
732            {
733                empty_stream_attempts = empty_stream_attempts.saturating_add(1);
734                let delay = EMPTY_STREAM_RETRY_INITIAL_DELAY * u32::from(empty_stream_attempts);
735                tokio::select! {
736                    _ = signal.cancelled() => return Err(LoopError::Aborted),
737                    _ = tokio::time::sleep(delay) => {}
738                }
739                continue;
740            }
741            Err(LoopError::Stream(StreamError::ZeroOutputTransport(_)))
742                if zero_output_transport_attempts + 1 < ZERO_OUTPUT_TRANSPORT_MAX_ATTEMPTS =>
743            {
744                zero_output_transport_attempts = zero_output_transport_attempts.saturating_add(1);
745                zero_output_recovery_context =
746                    Some(context_with_zero_output_transport_recovery(context));
747                reasoning = zero_output_transport_retry_reasoning(config.reasoning);
748                let delay = ZERO_OUTPUT_TRANSPORT_RETRY_INITIAL_DELAY
749                    * u32::from(zero_output_transport_attempts);
750                tokio::select! {
751                    _ = signal.cancelled() => return Err(LoopError::Aborted),
752                    _ = tokio::time::sleep(delay) => {}
753                }
754                continue;
755            }
756            Err(err) => return Err(err),
757        };
758
759        let stop_reason = match &assistant {
760            AgentMessage::Assistant { stop_reason, .. } => *stop_reason,
761            _ => StopReason::Other,
762        };
763        if stop_reason != StopReason::MaxTokens {
764            return Ok((assistant, allowlist));
765        }
766        let Some(recovery) = config.max_output_tokens_recovery.as_ref() else {
767            return Ok((assistant, allowlist));
768        };
769        if max_tokens_attempt >= recovery.max_attempts {
770            return Ok((assistant, allowlist));
771        }
772        // No starting cap means there's no number to scale from. Refuse
773        // recovery rather than guess — the deployment hadn't pinned a
774        // cap, so the truncation came from a provider-side limit we
775        // don't know how to raise.
776        let Some(prev_cap) = current_cap else {
777            return Ok((assistant, allowlist));
778        };
779        let Some(new_cap) = recovery.next_cap(prev_cap, max_tokens_attempt) else {
780            return Ok((assistant, allowlist));
781        };
782
783        max_tokens_attempt = max_tokens_attempt.saturating_add(1);
784        emit(
785            config,
786            AgentEvent::OutputTokensEscalation {
787                attempt: max_tokens_attempt,
788                prev_cap,
789                new_cap,
790            },
791        )
792        .await;
793        current_cap = Some(new_cap);
794        // Discard the truncated `assistant` by simply not pushing it
795        // into the caller's transcript. The MessageStart/MessageEnd
796        // events for it already fired from the inner streamer; the
797        // OutputTokensEscalation event above is the listener's signal
798        // to roll the previous pair back from any projection.
799    }
800}
801
802async fn stream_assistant_response(
803    context: &AgentContext,
804    config: &LoopConfig,
805    signal: &CancellationToken,
806    iteration: usize,
807    max_output_tokens: Option<u32>,
808    reasoning: ReasoningEffort,
809) -> Result<(AgentMessage, Option<std::collections::HashSet<String>>), LoopError> {
810    // Apply context transforms in registration order. The
811    // `TransformContext` carries the cancellation signal plus a few
812    // cheap observables (model id, iteration, last-turn provider
813    // usage, token estimator) so each transform can decide locally
814    // without the loop widening the trait per-knob.
815    let last_provider_usage = last_provider_usage(&context.messages);
816    let cx = TransformContext {
817        signal,
818        model_id: config.model_id.as_deref().unwrap_or(""),
819        iteration,
820        last_provider_usage: last_provider_usage.as_ref(),
821        estimator: &*config.token_estimator,
822    };
823    let mut messages = context.messages.clone();
824    // Each transform's diff is observable so post-mortems can attribute
825    // a specific compaction (shrinker, microcompactor, history-repair,
826    // …) to the missing slice the model went on to misuse. Cloning is
827    // cheap relative to the actual transform work, and the eval-side
828    // observer is the one consumer that wants this much detail; other
829    // sinks ignore the variant.
830    for transform in &config.plugins.context_transform {
831        // Cheap pre-check: plugins that can locally decide they have
832        // nothing to do (no browser snapshots, history under budget, …)
833        // skip the clone + diff-event entirely. Default impl returns
834        // `true`, so plugins that haven't opted in still run on every
835        // round.
836        if !transform.should_run(&messages, &cx) {
837            continue;
838        }
839        let before = messages.clone();
840        messages = transform.transform(messages, &cx).await;
841        emit(
842            config,
843            AgentEvent::ContextTransformApplied {
844                iteration,
845                plugin: transform.name(),
846                before,
847                after: messages.clone(),
848            },
849        )
850        .await;
851    }
852
853    // Consult any registered ToolGate plugins for a per-turn allowlist.
854    // Each plugin returns `Some(set)` to narrow the advertised tools for
855    // exactly this LLM call. Multiple plugins compose by intersection;
856    // `None` plugins do not constrain. See `ToolGate` docs for rationale.
857    let allowlist = collect_tool_allowlist_with_events(config, iteration, &messages).await;
858
859    let tools = build_tool_schemas(config, allowlist.as_ref());
860    // Final snapshot of what the loop is about to send, captured after
861    // every transform/gate. Observers (eval per-turn dump, debugger,
862    // replay) take this as the source of truth for "what did the
863    // model see this turn?".
864    emit(
865        config,
866        AgentEvent::ProviderRequestPrepared {
867            iteration,
868            model_id: config.model_id.clone(),
869            system_prompt: context.system_prompt.clone(),
870            messages: messages.clone(),
871            tools: tools.clone(),
872            temperature: config.temperature,
873            max_output_tokens,
874        },
875    )
876    .await;
877    let request = StreamRequest {
878        system_prompt: context.system_prompt.clone(),
879        messages,
880        tools,
881        temperature: config.temperature,
882        max_output_tokens,
883        reasoning,
884        provider_extras: config
885            .provider_extras
886            .clone()
887            .unwrap_or(serde_json::Value::Null),
888        // `tool_choice: "required"` on every turn. The LLM-in-charge
889        // contract is "context → LLM → tool call → append result →
890        // repeat" — the model's job is to pick a tool, not emit
891        // narration. This assumes the catalog includes a terminal
892        // text-delivery tool, so required-on-every-turn doesn't trap the
893        // model: when the work is done it calls that delivery tool to
894        // return the answer. If the model loops on verification instead,
895        // the bug is in the catalog or prompt — not in the requirement.
896        force_tool_call: true,
897    };
898
899    let mut stream = config.stream.stream(request, signal.clone()).await;
900
901    let mut last_partial: Option<AgentMessage> = None;
902
903    while let Some(event) = stream.next().await {
904        match event {
905            StreamEvent::Start { partial } => {
906                emit(
907                    config,
908                    AgentEvent::MessageStart {
909                        message: partial.clone(),
910                    },
911                )
912                .await;
913                last_partial = Some(partial);
914            }
915            StreamEvent::Chunk(chunk) => {
916                if let Some(ref partial) = last_partial {
917                    emit(
918                        config,
919                        AgentEvent::MessageUpdate {
920                            partial: partial.clone(),
921                            chunk,
922                        },
923                    )
924                    .await;
925                }
926            }
927            StreamEvent::Done { message } => {
928                emit(
929                    config,
930                    AgentEvent::MessageEnd {
931                        message: message.clone(),
932                    },
933                )
934                .await;
935                return Ok((message, allowlist));
936            }
937            StreamEvent::Error {
938                partial,
939                kind,
940                message,
941            } => {
942                let stop_reason = match kind {
943                    StreamErrorKind::Aborted => StopReason::Aborted,
944                    _ => StopReason::Error,
945                };
946                let error_message = AgentMessage::Assistant {
947                    content: match &partial {
948                        AgentMessage::Assistant { content, .. } => content.clone(),
949                        _ => AssistantContent { blocks: Vec::new() },
950                    },
951                    stop_reason,
952                    error_message: Some(message.clone()),
953                    timestamp: Some(now_ms()),
954                    usage: None,
955                };
956                emit(
957                    config,
958                    AgentEvent::MessageEnd {
959                        message: error_message.clone(),
960                    },
961                )
962                .await;
963                return Err(loop_error_from_stream_kind(kind, message));
964            }
965        }
966    }
967
968    // Stream ended without `Done` or `Error`. Synthesize an empty
969    // assistant message so the loop can recover.
970    let empty = AgentMessage::Assistant {
971        content: AssistantContent { blocks: Vec::new() },
972        stop_reason: StopReason::Error,
973        error_message: Some("stream ended without terminal event".into()),
974        timestamp: Some(now_ms()),
975        usage: None,
976    };
977    emit(
978        config,
979        AgentEvent::MessageEnd {
980            message: empty.clone(),
981        },
982    )
983    .await;
984    Err(LoopError::Stream(StreamError::Empty))
985}
986
987fn context_with_zero_output_transport_recovery(context: &AgentContext) -> AgentContext {
988    let mut recovered = context.clone();
989    recovered.messages.push(AgentMessage::System {
990        content: ZERO_OUTPUT_TRANSPORT_RECOVERY_CONTEXT.to_string(),
991        timestamp: Some(now_ms()),
992    });
993    recovered
994}
995
996fn zero_output_transport_retry_reasoning(reasoning: ReasoningEffort) -> ReasoningEffort {
997    match reasoning {
998        ReasoningEffort::Medium | ReasoningEffort::High | ReasoningEffort::XHigh => {
999            ReasoningEffort::Minimal
1000        }
1001        ReasoningEffort::None | ReasoningEffort::Minimal | ReasoningEffort::Low => reasoning,
1002    }
1003}
1004
1005fn loop_error_from_stream_kind(kind: StreamErrorKind, message: String) -> LoopError {
1006    // StreamFn implementations own transport retries. Once an error
1007    // reaches the loop, it is the terminal outcome of that provider
1008    // attempt and must not be reclassified as a successful assistant
1009    // turn.
1010    match kind {
1011        StreamErrorKind::Transient => LoopError::Stream(StreamError::Transient(message)),
1012        StreamErrorKind::ProviderRateLimited => {
1013            LoopError::Stream(StreamError::ProviderRateLimited(message))
1014        }
1015        StreamErrorKind::ZeroOutputTransport => {
1016            LoopError::Stream(StreamError::ZeroOutputTransport(message))
1017        }
1018        StreamErrorKind::Fatal => LoopError::Stream(StreamError::Fatal(message)),
1019        StreamErrorKind::Empty => LoopError::Stream(StreamError::Empty),
1020        StreamErrorKind::Aborted => LoopError::Aborted,
1021        StreamErrorKind::ContextOverflow => {
1022            LoopError::Stream(StreamError::ContextOverflow(message))
1023        }
1024    }
1025}
1026
1027fn now_ms() -> u64 {
1028    SystemTime::now()
1029        .duration_since(UNIX_EPOCH)
1030        .map(|d| d.as_millis() as u64)
1031        .unwrap_or(0)
1032}
1033
1034/// Walk back through `messages` and return the most recent provider
1035/// usage block reported on an assistant turn, if any. `None` on the
1036/// very first turn or when the active provider doesn't surface usage.
1037fn last_provider_usage(messages: &[AgentMessage]) -> Option<Usage> {
1038    messages.iter().rev().find_map(|message| match message {
1039        AgentMessage::Assistant {
1040            usage: Some(usage), ..
1041        } => Some(usage.clone()),
1042        _ => None,
1043    })
1044}
1045
1046fn build_tool_schemas(
1047    config: &LoopConfig,
1048    allowlist: Option<&std::collections::HashSet<String>>,
1049) -> Vec<ToolSchema> {
1050    config
1051        .tools
1052        .iter()
1053        .filter(|tool| match allowlist {
1054            Some(set) => set.contains(tool.name()),
1055            None => true,
1056        })
1057        .map(|tool| ToolSchema {
1058            name: tool.name().to_string(),
1059            description: tool.description().to_string(),
1060            parameters: tool.parameters_schema(),
1061        })
1062        .collect()
1063}
1064
1065/// Poll every registered `ToolGate` plugin and intersect their
1066/// allowlists. Returns `None` when no plugin returned an allowlist
1067/// (the common case — no narrowing). Returns `Some(set)` when at
1068/// least one plugin is gating; multiple gates compose by intersection
1069/// unless their non-empty allowlists conflict to an empty set, in which
1070/// case the highest-priority gate wins and a typed conflict event is
1071/// emitted.
1072/// Resolve the per-turn tool allowlist by composing every registered
1073/// `ToolGate` plugin (intersection) and emit one
1074/// [`AgentEvent::ToolGateApplied`] per gate so observers can attribute
1075/// the final allowlist to specific plugins.
1076async fn collect_tool_allowlist_with_events(
1077    config: &LoopConfig,
1078    iteration: usize,
1079    messages: &[AgentMessage],
1080) -> Option<std::collections::HashSet<String>> {
1081    if config.plugins.tool_gate.is_empty() {
1082        return None;
1083    }
1084    let conversation_id = config.conversation_id.as_deref();
1085    let available_tool_names: Vec<&str> = config.tools.iter().map(|t| t.name()).collect();
1086    let mut decisions: Vec<GateAllowDecision> = Vec::new();
1087    for gate in &config.plugins.tool_gate {
1088        let ctx = crate::plugin::ToolGateContext {
1089            iteration,
1090            messages,
1091            conversation_id,
1092            available_tool_names: &available_tool_names,
1093        };
1094        let decision = gate.next_turn_tool_allowlist(ctx).await;
1095        emit(
1096            config,
1097            AgentEvent::ToolGateApplied {
1098                iteration,
1099                plugin: gate.name(),
1100                allow: decision.as_ref().map(|set| {
1101                    let mut sorted: Vec<String> = set.iter().cloned().collect();
1102                    sorted.sort();
1103                    sorted
1104                }),
1105            },
1106        )
1107        .await;
1108        if let Some(set) = decision {
1109            let suppresses_advisory =
1110                gate.suppresses_advisory_gates(crate::plugin::ToolGateContext {
1111                    iteration,
1112                    messages,
1113                    conversation_id,
1114                    available_tool_names: &available_tool_names,
1115                });
1116            decisions.push(GateAllowDecision {
1117                plugin: gate.name(),
1118                priority: gate.conflict_priority(),
1119                class: gate.tool_gate_class(),
1120                suppresses_advisory,
1121                allow: set,
1122            });
1123        }
1124    }
1125    let suppression_priority = decisions
1126        .iter()
1127        .filter(|decision| decision.suppresses_advisory)
1128        .map(|decision| decision.priority)
1129        .max();
1130    let active_decisions = decisions
1131        .iter()
1132        .filter(|decision| {
1133            !matches!(
1134                suppression_priority,
1135                Some(priority)
1136                    if decision.class == crate::plugin::ToolGateClass::Advisory
1137                        && decision.priority < priority
1138            )
1139        })
1140        .collect::<Vec<_>>();
1141    let mut combined: Option<std::collections::HashSet<String>> = None;
1142    for decision in &active_decisions {
1143        combined = Some(match combined {
1144            Some(prev) => prev.intersection(&decision.allow).cloned().collect(),
1145            None => decision.allow.clone(),
1146        });
1147    }
1148    if combined.as_ref().is_some_and(|allow| allow.is_empty()) {
1149        let non_empty_decisions = active_decisions
1150            .iter()
1151            .filter(|decision| !decision.allow.is_empty())
1152            .map(|decision| (decision.plugin, decision.priority, decision.allow.clone()))
1153            .collect::<Vec<_>>();
1154        let resolved = resolve_empty_tool_gate_intersection(&non_empty_decisions);
1155        let (chosen_plugin, allow, reason) = match resolved {
1156            Some((plugin, allow, reason)) => (Some(plugin.to_string()), allow, reason),
1157            None => (
1158                None,
1159                std::collections::HashSet::new(),
1160                "all gating plugins returned empty allowlists".to_string(),
1161            ),
1162        };
1163        let sorted_allow = sorted_tool_names(&allow);
1164        emit(
1165            config,
1166            AgentEvent::ToolGateConflictResolved {
1167                iteration,
1168                plugins: active_decisions
1169                    .iter()
1170                    .map(|decision| decision.plugin.to_string())
1171                    .collect(),
1172                chosen_plugin,
1173                allow: sorted_allow,
1174                reason,
1175            },
1176        )
1177        .await;
1178        return if allow.is_empty() { None } else { Some(allow) };
1179    }
1180    combined
1181}
1182
1183struct GateAllowDecision {
1184    plugin: &'static str,
1185    priority: i32,
1186    class: crate::plugin::ToolGateClass,
1187    suppresses_advisory: bool,
1188    allow: std::collections::HashSet<String>,
1189}
1190
1191fn resolve_empty_tool_gate_intersection(
1192    decisions: &[(&'static str, i32, std::collections::HashSet<String>)],
1193) -> Option<(&'static str, std::collections::HashSet<String>, String)> {
1194    decisions
1195        .iter()
1196        .max_by(|(left_plugin, left_priority, left), (right_plugin, right_priority, right)| {
1197            left_priority
1198                .cmp(right_priority)
1199                .then_with(|| right.len().cmp(&left.len()))
1200                .then_with(|| right_plugin.cmp(left_plugin))
1201        })
1202        .map(|(plugin, priority, allow)| {
1203            (
1204                *plugin,
1205                allow.clone(),
1206                format!(
1207                    "empty intersection repaired by highest-priority owner `{plugin}` (priority {priority})"
1208                ),
1209            )
1210        })
1211}
1212
1213fn sorted_tool_names(set: &std::collections::HashSet<String>) -> Vec<String> {
1214    let mut sorted: Vec<String> = set.iter().cloned().collect();
1215    sorted.sort();
1216    sorted
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221    use super::*;
1222    use crate::config::AgentBuilder;
1223    use crate::plugin::{
1224        FollowUpSource, Plugin, PluginCapabilities, ToolGate, ToolGateClass, ToolGateContext,
1225    };
1226    use crate::stream::{ReasoningEffort, StreamFn};
1227    use crate::types::{AssistantBlock, UserContent};
1228    use futures::stream::{self, BoxStream};
1229    use std::sync::{
1230        atomic::{AtomicUsize, Ordering},
1231        Arc, Mutex,
1232    };
1233
1234    fn empty_assistant_message() -> AgentMessage {
1235        AgentMessage::Assistant {
1236            content: AssistantContent { blocks: Vec::new() },
1237            stop_reason: StopReason::Other,
1238            error_message: None,
1239            timestamp: None,
1240            usage: None,
1241        }
1242    }
1243
1244    fn text_assistant_message(text: impl Into<String>) -> AgentMessage {
1245        AgentMessage::Assistant {
1246            content: AssistantContent::text(text),
1247            stop_reason: StopReason::EndTurn,
1248            error_message: None,
1249            timestamp: None,
1250            usage: None,
1251        }
1252    }
1253
1254    fn tool_call_assistant_message(name: impl Into<String>, id: impl Into<String>) -> AgentMessage {
1255        AgentMessage::Assistant {
1256            content: AssistantContent::with_tool_calls(
1257                None,
1258                vec![crate::tool::ToolCall {
1259                    id: id.into(),
1260                    name: name.into(),
1261                    arguments: serde_json::json!({}),
1262                }],
1263            ),
1264            stop_reason: StopReason::ToolUse,
1265            error_message: None,
1266            timestamp: None,
1267            usage: None,
1268        }
1269    }
1270
1271    #[derive(Default)]
1272    struct EmptyThenTextStream {
1273        calls: AtomicUsize,
1274    }
1275
1276    #[derive(Default)]
1277    struct ZeroOutputThenTextStream {
1278        calls: AtomicUsize,
1279        requests: Mutex<Vec<StreamRequest>>,
1280    }
1281
1282    impl ZeroOutputThenTextStream {
1283        fn requests(&self) -> Vec<StreamRequest> {
1284            self.requests.lock().unwrap().clone()
1285        }
1286    }
1287
1288    #[derive(Default)]
1289    struct RepeatedTextStream {
1290        calls: AtomicUsize,
1291    }
1292
1293    #[derive(Default)]
1294    struct EmptyStopsAroundProgressStream {
1295        calls: AtomicUsize,
1296    }
1297
1298    struct CountingFollowUp {
1299        remaining: AtomicUsize,
1300    }
1301
1302    struct TerminalOnlyGate;
1303    struct TerminalWithStatusGate;
1304
1305    /// A product protocol policy that declares several delivery/status
1306    /// tools (beyond the configured fallback tool) as terminal, so an
1307    /// allowlist narrowed to `{message_info, message_result}` still
1308    /// classifies as terminal-only. The core ships none of these names;
1309    /// they live behind the policy.
1310    struct TestTerminalPolicy;
1311    impl crate::protocol::ProtocolPolicy for TestTerminalPolicy {
1312        fn terminal_tool_names(&self) -> std::collections::HashSet<String> {
1313            [
1314                "message_info",
1315                "message_ask",
1316                "message_result",
1317                "terminator",
1318            ]
1319            .iter()
1320            .map(|s| s.to_string())
1321            .collect()
1322        }
1323    }
1324    struct StaticAllowGate {
1325        name: &'static str,
1326        tools: &'static [&'static str],
1327        priority: i32,
1328        class: ToolGateClass,
1329        suppresses_advisory: bool,
1330    }
1331
1332    impl Plugin for TerminalOnlyGate {
1333        fn name(&self) -> &'static str {
1334            "terminal_only_gate"
1335        }
1336
1337        fn capabilities(&self) -> PluginCapabilities {
1338            PluginCapabilities::tool_gate()
1339        }
1340    }
1341
1342    #[async_trait::async_trait]
1343    impl ToolGate for TerminalOnlyGate {
1344        async fn next_turn_tool_allowlist(
1345            &self,
1346            _ctx: ToolGateContext<'_>,
1347        ) -> Option<std::collections::HashSet<String>> {
1348            Some(["message_result".to_string()].into_iter().collect())
1349        }
1350    }
1351
1352    impl Plugin for TerminalWithStatusGate {
1353        fn name(&self) -> &'static str {
1354            "terminal_with_status_gate"
1355        }
1356
1357        fn capabilities(&self) -> PluginCapabilities {
1358            PluginCapabilities::tool_gate()
1359        }
1360    }
1361
1362    #[async_trait::async_trait]
1363    impl ToolGate for TerminalWithStatusGate {
1364        async fn next_turn_tool_allowlist(
1365            &self,
1366            _ctx: ToolGateContext<'_>,
1367        ) -> Option<std::collections::HashSet<String>> {
1368            Some(
1369                ["message_info".to_string(), "message_result".to_string()]
1370                    .into_iter()
1371                    .collect(),
1372            )
1373        }
1374    }
1375
1376    impl Plugin for StaticAllowGate {
1377        fn name(&self) -> &'static str {
1378            self.name
1379        }
1380
1381        fn capabilities(&self) -> PluginCapabilities {
1382            PluginCapabilities::tool_gate()
1383        }
1384    }
1385
1386    #[async_trait::async_trait]
1387    impl ToolGate for StaticAllowGate {
1388        fn conflict_priority(&self) -> i32 {
1389            self.priority
1390        }
1391
1392        fn tool_gate_class(&self) -> ToolGateClass {
1393            self.class
1394        }
1395
1396        fn suppresses_advisory_gates(&self, _ctx: ToolGateContext<'_>) -> bool {
1397            self.suppresses_advisory
1398        }
1399
1400        async fn next_turn_tool_allowlist(
1401            &self,
1402            _ctx: ToolGateContext<'_>,
1403        ) -> Option<std::collections::HashSet<String>> {
1404            Some(self.tools.iter().map(|name| (*name).to_string()).collect())
1405        }
1406    }
1407
1408    impl CountingFollowUp {
1409        fn new(remaining: usize) -> Self {
1410            Self {
1411                remaining: AtomicUsize::new(remaining),
1412            }
1413        }
1414    }
1415
1416    impl Plugin for CountingFollowUp {
1417        fn name(&self) -> &'static str {
1418            "counting_follow_up"
1419        }
1420
1421        fn capabilities(&self) -> PluginCapabilities {
1422            PluginCapabilities::follow_up()
1423        }
1424    }
1425
1426    #[async_trait::async_trait]
1427    impl FollowUpSource for CountingFollowUp {
1428        async fn next_follow_up_messages(&self) -> Vec<AgentMessage> {
1429            let used = self
1430                .remaining
1431                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |remaining| {
1432                    remaining.checked_sub(1)
1433                })
1434                .unwrap_or(0);
1435            if used == 0 {
1436                return Vec::new();
1437            }
1438            vec![AgentMessage::System {
1439                content: "retry after no-tool stop".into(),
1440                timestamp: None,
1441            }]
1442        }
1443    }
1444
1445    #[async_trait::async_trait]
1446    impl StreamFn for EmptyThenTextStream {
1447        async fn stream(
1448            &self,
1449            _request: StreamRequest,
1450            _signal: CancellationToken,
1451        ) -> BoxStream<'static, StreamEvent> {
1452            let call = self.calls.fetch_add(1, Ordering::SeqCst);
1453            let partial = empty_assistant_message();
1454            if call == 0 {
1455                return Box::pin(stream::iter(vec![
1456                    StreamEvent::Start {
1457                        partial: partial.clone(),
1458                    },
1459                    StreamEvent::Error {
1460                        partial,
1461                        kind: StreamErrorKind::Empty,
1462                        message: "empty provider response".to_string(),
1463                    },
1464                ]));
1465            }
1466            Box::pin(stream::iter(vec![
1467                StreamEvent::Start { partial },
1468                StreamEvent::Done {
1469                    message: text_assistant_message("recovered"),
1470                },
1471            ]))
1472        }
1473    }
1474
1475    #[async_trait::async_trait]
1476    impl StreamFn for RepeatedTextStream {
1477        async fn stream(
1478            &self,
1479            _request: StreamRequest,
1480            _signal: CancellationToken,
1481        ) -> BoxStream<'static, StreamEvent> {
1482            let call = self.calls.fetch_add(1, Ordering::SeqCst);
1483            let partial = empty_assistant_message();
1484            Box::pin(stream::iter(vec![
1485                StreamEvent::Start { partial },
1486                StreamEvent::Done {
1487                    message: text_assistant_message(format!("plain stop {call}")),
1488                },
1489            ]))
1490        }
1491    }
1492
1493    #[async_trait::async_trait]
1494    impl StreamFn for EmptyStopsAroundProgressStream {
1495        async fn stream(
1496            &self,
1497            _request: StreamRequest,
1498            _signal: CancellationToken,
1499        ) -> BoxStream<'static, StreamEvent> {
1500            let call = self.calls.fetch_add(1, Ordering::SeqCst);
1501            let partial = empty_assistant_message();
1502            let message = match call {
1503                0 | 2 | 4 => text_assistant_message(format!("plain stop {call}")),
1504                1 | 3 => tool_call_assistant_message("progress", format!("tc-progress-{call}")),
1505                5 => tool_call_assistant_message("terminator", "tc-terminator"),
1506                other => panic!("unexpected stream call after terminal turn: {other}"),
1507            };
1508            Box::pin(stream::iter(vec![
1509                StreamEvent::Start { partial },
1510                StreamEvent::Done { message },
1511            ]))
1512        }
1513    }
1514
1515    #[async_trait::async_trait]
1516    impl StreamFn for ZeroOutputThenTextStream {
1517        async fn stream(
1518            &self,
1519            request: StreamRequest,
1520            _signal: CancellationToken,
1521        ) -> BoxStream<'static, StreamEvent> {
1522            self.requests.lock().unwrap().push(request);
1523            let call = self.calls.fetch_add(1, Ordering::SeqCst);
1524            let partial = empty_assistant_message();
1525            if call == 0 {
1526                return Box::pin(stream::iter(vec![
1527                    StreamEvent::Start {
1528                        partial: partial.clone(),
1529                    },
1530                    StreamEvent::Error {
1531                        partial,
1532                        kind: StreamErrorKind::ZeroOutputTransport,
1533                        message: "response body decode failed before output".to_string(),
1534                    },
1535                ]));
1536            }
1537            Box::pin(stream::iter(vec![
1538                StreamEvent::Start { partial },
1539                StreamEvent::Done {
1540                    message: text_assistant_message("recovered from transport"),
1541                },
1542            ]))
1543        }
1544    }
1545
1546    #[test]
1547    fn wrapped_up_is_complete() {
1548        assert!(LoopOutcome::Done.is_complete());
1549        assert!(LoopOutcome::WrappedUp.is_complete());
1550        assert!(!LoopOutcome::HitMaxIterations.is_complete());
1551    }
1552
1553    #[tokio::test]
1554    async fn empty_stream_response_is_retried_before_returning() {
1555        let stream = Arc::new(EmptyThenTextStream::default());
1556        let config = AgentBuilder::new()
1557            .stream(stream.clone())
1558            .model_id("test-model")
1559            .build()
1560            .expect("config builds");
1561        let context = AgentContext::new("system").with_messages(vec![AgentMessage::User {
1562            content: UserContent::Text("continue".to_string()),
1563            timestamp: None,
1564        }]);
1565
1566        let (assistant, _allowlist) =
1567            stream_with_max_tokens_recovery(&context, &config, &CancellationToken::new(), 0)
1568                .await
1569                .expect("second stream attempt should recover");
1570
1571        let AgentMessage::Assistant { content, .. } = assistant else {
1572            panic!("expected assistant response");
1573        };
1574        assert_eq!(content.plain_text(), "recovered");
1575        assert_eq!(stream.calls.load(Ordering::SeqCst), 2);
1576    }
1577
1578    #[tokio::test]
1579    async fn zero_output_transport_error_is_retried_before_returning() {
1580        let stream = Arc::new(ZeroOutputThenTextStream::default());
1581        let config = AgentBuilder::new()
1582            .stream(stream.clone())
1583            .model_id("test-model")
1584            .reasoning(ReasoningEffort::High)
1585            .build()
1586            .expect("config builds");
1587        let context = AgentContext::new("system").with_messages(vec![AgentMessage::User {
1588            content: UserContent::Text("continue".to_string()),
1589            timestamp: None,
1590        }]);
1591
1592        let (assistant, _allowlist) =
1593            stream_with_max_tokens_recovery(&context, &config, &CancellationToken::new(), 0)
1594                .await
1595                .expect("second zero-output transport attempt should recover");
1596
1597        let AgentMessage::Assistant { content, .. } = assistant else {
1598            panic!("expected assistant response");
1599        };
1600        assert_eq!(content.plain_text(), "recovered from transport");
1601        assert_eq!(stream.calls.load(Ordering::SeqCst), 2);
1602
1603        let requests = stream.requests();
1604        assert_eq!(requests.len(), 2);
1605        assert_eq!(requests[0].reasoning, ReasoningEffort::High);
1606        assert_eq!(
1607            requests[1].reasoning,
1608            ReasoningEffort::Minimal,
1609            "zero-output replay should lower high reasoning so reasoning-heavy private-only spins can produce a tool call"
1610        );
1611        assert!(
1612            requests[1].messages.iter().any(|message| matches!(
1613                message,
1614                AgentMessage::System { content, .. }
1615                    if content.contains("transport recovery")
1616                        && content.contains("no visible assistant text")
1617                        && content.contains("no usable tool call")
1618                        && content.contains("unusable burst of partial tool calls")
1619                        && content.contains("exactly one next structured tool call")
1620                        && content.contains("next structured tool call")
1621            )),
1622            "zero-output replay must carry explicit recovery context"
1623        );
1624    }
1625
1626    /// `StreamFn` that emits one assistant turn with a single
1627    /// `terminator` tool call, then panics on subsequent invocations
1628    /// — the test asserts the loop never re-enters the LLM.
1629    struct TerminatorOnlyStream {
1630        calls: AtomicUsize,
1631    }
1632
1633    impl Default for TerminatorOnlyStream {
1634        fn default() -> Self {
1635            Self {
1636                calls: AtomicUsize::new(0),
1637            }
1638        }
1639    }
1640
1641    #[async_trait::async_trait]
1642    impl StreamFn for TerminatorOnlyStream {
1643        async fn stream(
1644            &self,
1645            _request: StreamRequest,
1646            _signal: CancellationToken,
1647        ) -> BoxStream<'static, StreamEvent> {
1648            let call = self.calls.fetch_add(1, Ordering::SeqCst);
1649            assert_eq!(
1650                call, 0,
1651                "terminate-on-turn-1 test must NOT re-enter the LLM after a successful terminator"
1652            );
1653            let partial = empty_assistant_message();
1654            let assistant = AgentMessage::Assistant {
1655                content: AssistantContent {
1656                    blocks: vec![AssistantBlock::ToolCall(crate::tool::ToolCall {
1657                        id: "tc-terminator-1".into(),
1658                        name: "terminator".into(),
1659                        arguments: serde_json::json!({}),
1660                    })],
1661                },
1662                stop_reason: StopReason::ToolUse,
1663                error_message: None,
1664                timestamp: None,
1665                usage: None,
1666            };
1667            Box::pin(stream::iter(vec![
1668                StreamEvent::Start { partial },
1669                StreamEvent::Done { message: assistant },
1670            ]))
1671        }
1672    }
1673
1674    /// Tool that always votes `terminate=true`. Mirrors the contract a
1675    /// downstream terminal/delivery tool upholds.
1676    struct TerminatorTool;
1677
1678    #[async_trait::async_trait]
1679    impl crate::tool::AgentTool for TerminatorTool {
1680        fn name(&self) -> &str {
1681            "terminator"
1682        }
1683
1684        fn description(&self) -> &str {
1685            "test terminator"
1686        }
1687
1688        fn parameters_schema(&self) -> serde_json::Value {
1689            serde_json::json!({"type": "object"})
1690        }
1691
1692        async fn execute(
1693            &self,
1694            _call_id: &str,
1695            _args: serde_json::Value,
1696            _signal: CancellationToken,
1697            _update: tokio::sync::mpsc::UnboundedSender<crate::tool::ToolResult>,
1698        ) -> Result<crate::tool::ToolResult, crate::error::ToolError> {
1699            Ok(crate::tool::ToolResult {
1700                content: vec![crate::types::ToolResultBlock::Text(
1701                    crate::types::TextContent {
1702                        text: "delivered".into(),
1703                    },
1704                )],
1705                is_error: false,
1706                details: serde_json::Value::Null,
1707                terminate: true,
1708                narration: None,
1709            })
1710        }
1711    }
1712
1713    struct ProgressTool;
1714
1715    #[async_trait::async_trait]
1716    impl crate::tool::AgentTool for ProgressTool {
1717        fn name(&self) -> &str {
1718            "progress"
1719        }
1720
1721        fn description(&self) -> &str {
1722            "test progress tool"
1723        }
1724
1725        fn parameters_schema(&self) -> serde_json::Value {
1726            serde_json::json!({"type": "object"})
1727        }
1728
1729        async fn execute(
1730            &self,
1731            _call_id: &str,
1732            _args: serde_json::Value,
1733            _signal: CancellationToken,
1734            _update: tokio::sync::mpsc::UnboundedSender<crate::tool::ToolResult>,
1735        ) -> Result<crate::tool::ToolResult, crate::error::ToolError> {
1736            Ok(crate::tool::ToolResult::text("made progress"))
1737        }
1738    }
1739
1740    /// `SteeringSource` that always returns one wrap-up message. Used
1741    /// to prove the loop does NOT poll steering after a terminator
1742    /// vote (otherwise this would re-enter the LLM and trip the
1743    /// `assert_eq!(call, 0)` in `TerminatorOnlyStream`).
1744    struct AlwaysSteer {
1745        polls: Arc<AtomicUsize>,
1746    }
1747
1748    impl Plugin for AlwaysSteer {
1749        fn name(&self) -> &'static str {
1750            "always_steer"
1751        }
1752
1753        fn capabilities(&self) -> PluginCapabilities {
1754            PluginCapabilities {
1755                steering: true,
1756                ..PluginCapabilities::default()
1757            }
1758        }
1759    }
1760
1761    #[async_trait::async_trait]
1762    impl crate::plugin::SteeringSource for AlwaysSteer {
1763        async fn next_steering_messages(&self) -> Vec<AgentMessage> {
1764            self.polls.fetch_add(1, Ordering::SeqCst);
1765            vec![AgentMessage::System {
1766                content: "wrap up now".into(),
1767                timestamp: None,
1768            }]
1769        }
1770    }
1771
1772    #[tokio::test]
1773    async fn terminator_vote_skips_post_batch_steering_collection() {
1774        // Regression: a `SteeringSource` whose firing condition lines
1775        // up with the same turn the model delivers (e.g.
1776        // `graceful_turn_limit` reaching its soft limit on the delivery
1777        // turn) used to re-enter the loop and prompt the model for
1778        // ANOTHER turn after a clean terminator. The model's drift on
1779        // that extra turn corrupted the user-visible answer in
1780        // production. With the fix, a unanimous terminator vote is a
1781        // hard exit — steering sources are not polled once the run has
1782        // decided it's done.
1783        let stream = Arc::new(TerminatorOnlyStream::default());
1784        let polls = Arc::new(AtomicUsize::new(0));
1785        let mut tool_registry = crate::tool::ToolRegistry::new();
1786        tool_registry = tool_registry.with(Arc::new(TerminatorTool));
1787        let config = AgentBuilder::new()
1788            .stream(stream.clone())
1789            .model_id("test-model")
1790            .tools(tool_registry)
1791            .steering(AlwaysSteer {
1792                polls: polls.clone(),
1793            })
1794            .build()
1795            .expect("config builds");
1796        let context = AgentContext::new("system");
1797        let prompts = vec![AgentMessage::User {
1798            content: UserContent::Text("deliver".to_string()),
1799            timestamp: None,
1800        }];
1801
1802        let result = run(prompts, context, &config, CancellationToken::new())
1803            .await
1804            .expect("run completes after one terminator turn");
1805
1806        // Exactly one LLM call — the terminator turn.
1807        assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
1808        // Outcome is a clean Done, not WrappedUp (no graceful flag) and
1809        // not HitMaxIterations.
1810        assert_eq!(result.outcome, LoopOutcome::Done);
1811        // Steering source is consulted exactly once — the pre-loop
1812        // priming poll at the top of `inner_run`. After the terminator
1813        // batch, `collect_steering` MUST NOT fire again.
1814        assert_eq!(
1815            polls.load(Ordering::SeqCst),
1816            1,
1817            "steering source polled more than once — terminator vote did not gate post-batch re-entry"
1818        );
1819    }
1820
1821    /// `FollowUpSource` that always emits one nudge. Counts polls so
1822    /// the test can prove `collect_follow_up` is NOT invoked after a
1823    /// terminator batch.
1824    struct AlwaysFollowUp {
1825        polls: Arc<AtomicUsize>,
1826    }
1827
1828    impl Plugin for AlwaysFollowUp {
1829        fn name(&self) -> &'static str {
1830            "always_follow_up"
1831        }
1832
1833        fn capabilities(&self) -> PluginCapabilities {
1834            PluginCapabilities::follow_up()
1835        }
1836    }
1837
1838    #[async_trait::async_trait]
1839    impl FollowUpSource for AlwaysFollowUp {
1840        async fn next_follow_up_messages(&self) -> Vec<AgentMessage> {
1841            self.polls.fetch_add(1, Ordering::SeqCst);
1842            vec![AgentMessage::System {
1843                content: "deliver something".into(),
1844                timestamp: None,
1845            }]
1846        }
1847    }
1848
1849    #[tokio::test]
1850    async fn terminator_vote_skips_post_batch_follow_up_collection() {
1851        // Mirror of the steering test for the follow-up source path.
1852        // `FollowUpSource` exists to nudge the model toward a
1853        // terminator when it failed to emit one — not to overrule a
1854        // terminator the model already cast. After a clean delivery,
1855        // follow-up must be silent.
1856        let stream = Arc::new(TerminatorOnlyStream::default());
1857        let polls = Arc::new(AtomicUsize::new(0));
1858        let mut tool_registry = crate::tool::ToolRegistry::new();
1859        tool_registry = tool_registry.with(Arc::new(TerminatorTool));
1860        let config = AgentBuilder::new()
1861            .stream(stream.clone())
1862            .model_id("test-model")
1863            .tools(tool_registry)
1864            .follow_up(AlwaysFollowUp {
1865                polls: polls.clone(),
1866            })
1867            .build()
1868            .expect("config builds");
1869        let context = AgentContext::new("system");
1870        let prompts = vec![AgentMessage::User {
1871            content: UserContent::Text("deliver".to_string()),
1872            timestamp: None,
1873        }];
1874
1875        let result = run(prompts, context, &config, CancellationToken::new())
1876            .await
1877            .expect("run completes after one terminator turn");
1878
1879        assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
1880        assert_eq!(result.outcome, LoopOutcome::Done);
1881        assert_eq!(
1882            polls.load(Ordering::SeqCst),
1883            0,
1884            "follow-up source polled after a terminator vote — terminator did not gate post-batch re-entry"
1885        );
1886    }
1887
1888    #[tokio::test]
1889    async fn exhausted_empty_outcome_budget_returns_typed_loop_error() {
1890        let stream = Arc::new(RepeatedTextStream::default());
1891        let config = AgentBuilder::new()
1892            .stream(stream.clone())
1893            .model_id("test-model")
1894            .empty_outcome_retry_budget(1)
1895            .follow_up(CountingFollowUp::new(1))
1896            .build()
1897            .expect("config builds");
1898        let context = AgentContext::new("system");
1899        let prompts = vec![AgentMessage::User {
1900            content: UserContent::Text("continue".to_string()),
1901            timestamp: None,
1902        }];
1903
1904        let err = run(prompts, context, &config, CancellationToken::new())
1905            .await
1906            .expect_err("second no-tool stop should exhaust the budget");
1907
1908        assert!(
1909            matches!(
1910                err,
1911                LoopError::EmptyOutcomeBudgetExhausted {
1912                    budget: 1,
1913                    observed: 2,
1914                }
1915            ),
1916            "unexpected error: {err:?}"
1917        );
1918        assert_eq!(stream.calls.load(Ordering::SeqCst), 2);
1919    }
1920
1921    #[tokio::test]
1922    async fn empty_tool_gate_intersection_prefers_delivery_repair_owner() {
1923        let (sink, mut rx) = crate::event::ChannelSink::new();
1924        let config = AgentBuilder::new()
1925            .stream(Arc::new(RepeatedTextStream::default()))
1926            .event_sink(Arc::new(sink))
1927            .tool_gate_arc(Arc::new(StaticAllowGate {
1928                name: "delivery_repair_gate",
1929                tools: &["browser_interact"],
1930                priority: 100,
1931                class: ToolGateClass::Required,
1932                suppresses_advisory: false,
1933            }))
1934            .tool_gate_arc(Arc::new(StaticAllowGate {
1935                name: "terminal_message_guard",
1936                tools: &["message_result"],
1937                priority: 10,
1938                class: ToolGateClass::Required,
1939                suppresses_advisory: false,
1940            }))
1941            .build()
1942            .expect("config builds");
1943
1944        let allow = collect_tool_allowlist_with_events(&config, 3, &[])
1945            .await
1946            .expect("conflict repair should keep a non-empty allowlist");
1947
1948        assert_eq!(
1949            allow,
1950            ["browser_interact".to_string()].into_iter().collect()
1951        );
1952
1953        let mut saw_conflict = false;
1954        while let Ok(event) = rx.try_recv() {
1955            if let AgentEvent::ToolGateConflictResolved {
1956                chosen_plugin,
1957                allow,
1958                ..
1959            } = event
1960            {
1961                saw_conflict = true;
1962                assert_eq!(chosen_plugin.as_deref(), Some("delivery_repair_gate"));
1963                assert_eq!(allow, vec!["browser_interact".to_string()]);
1964            }
1965        }
1966        assert!(saw_conflict, "tool-gate deadlock should be diagnosable");
1967    }
1968
1969    #[tokio::test]
1970    async fn repair_owner_suppresses_advisory_gate_before_plan_only_intersection() {
1971        let config = AgentBuilder::new()
1972            .stream(Arc::new(RepeatedTextStream::default()))
1973            .tool_gate_arc(Arc::new(StaticAllowGate {
1974                name: "delivery_repair_gate",
1975                tools: &["plan", "file_write"],
1976                priority: 100,
1977                class: ToolGateClass::Required,
1978                suppresses_advisory: true,
1979            }))
1980            .tool_gate_arc(Arc::new(StaticAllowGate {
1981                name: "wrap_up_gate",
1982                tools: &["plan", "message_result", "message_ask"],
1983                priority: 0,
1984                class: ToolGateClass::Advisory,
1985                suppresses_advisory: false,
1986            }))
1987            .build()
1988            .expect("config builds");
1989
1990        let allow = collect_tool_allowlist_with_events(&config, 3, &[])
1991            .await
1992            .expect("repair owner should keep its own allowlist");
1993
1994        assert_eq!(
1995            allow,
1996            ["plan".to_string(), "file_write".to_string()]
1997                .into_iter()
1998                .collect()
1999        );
2000    }
2001
2002    #[tokio::test]
2003    async fn productive_tool_batch_resets_empty_outcome_budget() {
2004        let stream = Arc::new(EmptyStopsAroundProgressStream::default());
2005        let mut tool_registry = crate::tool::ToolRegistry::new();
2006        tool_registry = tool_registry
2007            .with(Arc::new(ProgressTool))
2008            .with(Arc::new(TerminatorTool));
2009        let config = AgentBuilder::new()
2010            .stream(stream.clone())
2011            .model_id("test-model")
2012            .tools(tool_registry)
2013            .empty_outcome_retry_budget(1)
2014            .follow_up(CountingFollowUp::new(3))
2015            .build()
2016            .expect("config builds");
2017        let context = AgentContext::new("system");
2018        let prompts = vec![AgentMessage::User {
2019            content: UserContent::Text("continue".to_string()),
2020            timestamp: None,
2021        }];
2022
2023        let result = run(prompts, context, &config, CancellationToken::new())
2024            .await
2025            .expect("productive tool batches should reset the empty-outcome budget");
2026
2027        assert_eq!(result.outcome, LoopOutcome::Done);
2028        assert_eq!(stream.calls.load(Ordering::SeqCst), 6);
2029    }
2030
2031    #[tokio::test]
2032    async fn terminal_only_plain_text_fallback_synthesizes_terminal_result() {
2033        let stream = Arc::new(RepeatedTextStream::default());
2034        let mut tool_registry = crate::tool::ToolRegistry::new();
2035        tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
2036        let config = AgentBuilder::new()
2037            .stream(stream.clone())
2038            .model_id("auto-tool-provider")
2039            .tools(tool_registry)
2040            .tool_gate_arc(Arc::new(TerminalOnlyGate))
2041            .plain_text_terminal_fallback_tool("message_result")
2042            .empty_outcome_retry_budget(0)
2043            .build()
2044            .expect("config builds");
2045        let context = AgentContext::new("system");
2046        let prompts = vec![AgentMessage::User {
2047            content: UserContent::Text("answer directly".to_string()),
2048            timestamp: None,
2049        }];
2050
2051        let result = run(prompts, context, &config, CancellationToken::new())
2052            .await
2053            .expect("plain text should be converted on terminal-only turn");
2054
2055        assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
2056        assert_eq!(result.outcome, LoopOutcome::Done);
2057        assert!(result.messages.iter().any(|message| matches!(
2058            message,
2059            AgentMessage::ToolResult {
2060                tool_name,
2061                content,
2062                is_error: false,
2063                ..
2064            } if tool_name == "message_result"
2065                && content.plain_text() == "plain stop 0"
2066        )));
2067    }
2068
2069    #[tokio::test]
2070    async fn eager_plain_text_fallback_fires_without_terminal_only_allowlist() {
2071        // Providers in the "auto-when-forced" class can never be
2072        // wire-forced into a tool call, so prose IS their failure mode.
2073        // The eager flag lifts the "allowlist must already be narrowed"
2074        // precondition so the fallback fires on the FIRST plain-text
2075        // stop instead of after a narrowing gate has burned 2-3 nudge
2076        // turns.
2077        //
2078        // No `tool_gate_arc` is installed in this test, so the catalog
2079        // stays at the full registry — exactly the situation where the
2080        // non-eager path would refuse to convert and the run would die
2081        // on the empty-outcome budget.
2082        let stream = Arc::new(RepeatedTextStream::default());
2083        let mut tool_registry = crate::tool::ToolRegistry::new();
2084        tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
2085        let config = AgentBuilder::new()
2086            .stream(stream.clone())
2087            .model_id("auto-tool-provider-eager")
2088            .tools(tool_registry)
2089            .plain_text_terminal_fallback_tool("message_result")
2090            .plain_text_terminal_fallback_eager(true)
2091            .empty_outcome_retry_budget(0)
2092            .build()
2093            .expect("config builds");
2094        let context = AgentContext::new("system");
2095        let prompts = vec![AgentMessage::User {
2096            content: UserContent::Text("answer directly".to_string()),
2097            timestamp: None,
2098        }];
2099
2100        let result = run(prompts, context, &config, CancellationToken::new())
2101            .await
2102            .expect("eager fallback should convert plain text on first stop");
2103
2104        assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
2105        assert_eq!(result.outcome, LoopOutcome::Done);
2106        assert!(result.messages.iter().any(|message| matches!(
2107            message,
2108            AgentMessage::ToolResult {
2109                tool_name,
2110                content,
2111                is_error: false,
2112                ..
2113            } if tool_name == "message_result"
2114                && content.plain_text() == "plain stop 0"
2115        )));
2116    }
2117
2118    #[tokio::test]
2119    async fn eager_nudge_mode_injects_protocol_recovery_before_synthesizing() {
2120        // With `plain_text_terminal_fallback_eager_nudge(true)` the eager
2121        // path nudges the model with a protocol-recovery system message
2122        // on each consecutive plain-text stop, up to
2123        // `MAX_PLAIN_TEXT_NUDGE_RETRIES`. After the cap a synthesizer
2124        // fires as a last resort so the run still terminates with the
2125        // model's prose as the delivered text — never silently, never
2126        // forever. Verifies the recovery path is observable in the
2127        // emitted message stream (the model sees the nudges in context)
2128        // and that the synthesizer ultimately delivers the first
2129        // substantive plain-text answer, not later recovery drift.
2130        let stream = Arc::new(RepeatedTextStream::default());
2131        let mut tool_registry = crate::tool::ToolRegistry::new();
2132        tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
2133        let config = AgentBuilder::new()
2134            .stream(stream.clone())
2135            .model_id("auto-tool-provider-eager-nudge")
2136            .tools(tool_registry)
2137            .plain_text_terminal_fallback_tool("message_result")
2138            .plain_text_terminal_fallback_eager(true)
2139            .plain_text_terminal_fallback_eager_nudge(true)
2140            .build()
2141            .expect("config builds");
2142        let context = AgentContext::new("system");
2143        let prompts = vec![AgentMessage::User {
2144            content: UserContent::Text("answer directly".to_string()),
2145            timestamp: None,
2146        }];
2147
2148        let result = run(prompts, context, &config, CancellationToken::new())
2149            .await
2150            .expect("nudge mode should eventually synthesize after retries");
2151
2152        // MAX_PLAIN_TEXT_NUDGE_RETRIES = 2 → two nudges fire, then on the
2153        // third empty stop the synthesizer takes over. Total LLM calls = 3.
2154        assert_eq!(stream.calls.load(Ordering::SeqCst), 3);
2155        assert_eq!(result.outcome, LoopOutcome::Done);
2156
2157        let nudge_count = result
2158            .messages
2159            .iter()
2160            .filter(|m| matches!(m, AgentMessage::System { content, .. } if content == crate::protocol::DEFAULT_PLAIN_TEXT_RECOVERY_PROMPT))
2161            .count();
2162        assert_eq!(
2163            nudge_count, 2,
2164            "expected two protocol-recovery system messages in the run output, got {nudge_count}",
2165        );
2166
2167        let synthesized_text = result
2168            .messages
2169            .iter()
2170            .find_map(|message| match message {
2171                AgentMessage::ToolResult {
2172                    tool_name,
2173                    content,
2174                    is_error: false,
2175                    ..
2176                } if tool_name == "message_result" => Some(content.plain_text()),
2177                _ => None,
2178            })
2179            .expect("a terminal tool result should be synthesized as last resort");
2180        assert_eq!(
2181            synthesized_text, "plain stop 0",
2182            "synthesizer should deliver the first preserved plain text, not later recovery drift",
2183        );
2184    }
2185
2186    #[test]
2187    fn plain_text_fallback_candidate_skips_obvious_clarifying_questions() {
2188        assert!(!should_preserve_plain_text_terminal_candidate(
2189            "Continue what, exactly? What's your next move?"
2190        ));
2191        assert!(!should_preserve_plain_text_terminal_candidate(
2192            "Would you like me to proceed?"
2193        ));
2194        assert!(should_preserve_plain_text_terminal_candidate(
2195            "# Machine Learning\n\nMachine learning is the branch of artificial intelligence that studies systems which improve from data."
2196        ));
2197    }
2198
2199    #[tokio::test]
2200    async fn non_eager_plain_text_fallback_still_requires_narrowed_allowlist() {
2201        // Default behaviour preserved: when eager is NOT set and the
2202        // turn allowlist is the full catalog, plain text is NOT
2203        // converted — the run dies on the empty-outcome budget,
2204        // matching the pre-eager contract for every other model.
2205        let stream = Arc::new(RepeatedTextStream::default());
2206        let mut tool_registry = crate::tool::ToolRegistry::new();
2207        tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
2208        let config = AgentBuilder::new()
2209            .stream(stream.clone())
2210            .model_id("non-eager-provider")
2211            .tools(tool_registry)
2212            .plain_text_terminal_fallback_tool("message_result")
2213            // Eager NOT set → defaults to false.
2214            .empty_outcome_retry_budget(0)
2215            .build()
2216            .expect("config builds");
2217        let context = AgentContext::new("system");
2218        let prompts = vec![AgentMessage::User {
2219            content: UserContent::Text("answer directly".to_string()),
2220            timestamp: None,
2221        }];
2222
2223        let err = run(prompts, context, &config, CancellationToken::new())
2224            .await
2225            .expect_err("non-eager fallback must not convert without narrowed allowlist");
2226
2227        assert!(
2228            matches!(err, LoopError::EmptyOutcomeBudgetExhausted { .. }),
2229            "unexpected error: {err:?}"
2230        );
2231    }
2232
2233    #[tokio::test]
2234    async fn terminal_plain_text_fallback_allows_status_delivery_gate() {
2235        let stream = Arc::new(RepeatedTextStream::default());
2236        let mut tool_registry = crate::tool::ToolRegistry::new();
2237        tool_registry = tool_registry.with(Arc::new(TerminalNamedTool("message_result")));
2238        let config = AgentBuilder::new()
2239            .stream(stream.clone())
2240            .model_id("auto-tool-provider")
2241            .tools(tool_registry)
2242            .protocol_policy(Arc::new(TestTerminalPolicy))
2243            .tool_gate_arc(Arc::new(TerminalWithStatusGate))
2244            .plain_text_terminal_fallback_tool("message_result")
2245            .empty_outcome_retry_budget(0)
2246            .build()
2247            .expect("config builds");
2248        let context = AgentContext::new("system");
2249        let prompts = vec![AgentMessage::User {
2250            content: UserContent::Text("answer directly".to_string()),
2251            timestamp: None,
2252        }];
2253
2254        let result = run(prompts, context, &config, CancellationToken::new())
2255            .await
2256            .expect(
2257                "plain text should be converted when only status and terminal tools are allowed",
2258            );
2259
2260        assert_eq!(stream.calls.load(Ordering::SeqCst), 1);
2261        assert_eq!(result.outcome, LoopOutcome::Done);
2262        assert!(result.messages.iter().any(|message| matches!(
2263            message,
2264            AgentMessage::ToolResult {
2265                tool_name,
2266                content,
2267                is_error: false,
2268                ..
2269            } if tool_name == "message_result"
2270                && content.plain_text() == "plain stop 0"
2271        )));
2272    }
2273
2274    struct TerminalNamedTool(&'static str);
2275
2276    #[async_trait::async_trait]
2277    impl crate::tool::AgentTool for TerminalNamedTool {
2278        fn name(&self) -> &str {
2279            self.0
2280        }
2281
2282        fn description(&self) -> &str {
2283            "test terminal tool"
2284        }
2285
2286        fn parameters_schema(&self) -> serde_json::Value {
2287            serde_json::json!({"type": "object"})
2288        }
2289
2290        async fn execute(
2291            &self,
2292            _call_id: &str,
2293            _args: serde_json::Value,
2294            _signal: CancellationToken,
2295            _update: tokio::sync::mpsc::UnboundedSender<crate::tool::ToolResult>,
2296        ) -> Result<crate::tool::ToolResult, crate::error::ToolError> {
2297            Ok(crate::tool::ToolResult {
2298                content: vec![crate::types::ToolResultBlock::Text(
2299                    crate::types::TextContent {
2300                        text: "not used".into(),
2301                    },
2302                )],
2303                is_error: false,
2304                details: serde_json::Value::Null,
2305                terminate: true,
2306                narration: None,
2307            })
2308        }
2309    }
2310}