Skip to main content

bob_runtime/
scheduler.rs

1//! # Scheduler
2//!
3//! Scheduler: loop guard and 6-state turn FSM.
4//!
5//! ## Overview
6//!
7//! The scheduler is the core orchestration component that manages agent turn execution.
8//! It implements a finite state machine (FSM) with the following states:
9//!
10//! 1. **Start**: Initialize turn, load session
11//! 2. **LLM Call**: Send request to LLM
12//! 3. **Parse Action**: Parse LLM response into structured action
13//! 4. **Execute Tool**: Run tool if action is a tool call
14//! 5. **Update State**: Update session state with results
15//! 6. **Loop/Finish**: Either continue or complete the turn
16//!
17//! ## Loop Guard
18//!
19//! The [`LoopGuard`] ensures turn termination by tracking:
20//! - Number of steps (LLM calls)
21//! - Number of tool calls
22//! - Consecutive errors
23//! - Elapsed time
24//!
25//! If any limit is exceeded, the turn is terminated with an appropriate [`GuardReason`].
26//!
27//! ## Example
28//!
29//! ```rust,ignore
30//! use bob_runtime::scheduler::LoopGuard;
31//! use bob_core::types::TurnPolicy;
32//!
33//! let policy = TurnPolicy::default();
34//! let mut guard = LoopGuard::new(policy);
35//!
36//! while guard.can_continue() {
37//!     guard.record_step();
38//!     // Execute step...
39//! }
40//! ```
41
42use std::sync::Arc;
43
44use bob_core::{
45    error::AgentError,
46    journal::{JournalEntry, ToolJournalPort},
47    normalize_tool_list,
48    ports::{
49        ApprovalPort, ArtifactStorePort, ContextCompactorPort, CostMeterPort, EventSink, LlmPort,
50        SessionStore, ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
51    },
52    types::{
53        AgentAction, AgentEvent, AgentEventStream, AgentRequest, AgentResponse, AgentRunResult,
54        AgentStreamEvent, ApprovalContext, ApprovalDecision, ArtifactRecord, FinishReason,
55        GuardReason, Message, Role, TokenUsage, ToolCall, ToolResult, TurnCheckpoint, TurnPolicy,
56    },
57};
58use futures_util::StreamExt;
59use tokio::time::Instant;
60
61/// Default capacity for the bounded streaming channel.
62const STREAM_CHANNEL_CAPACITY: usize = 256;
63
64/// Safety net that guarantees turn termination by tracking steps,
65/// tool calls, consecutive errors, and elapsed time against [`TurnPolicy`] limits.
66#[derive(Debug)]
67pub struct LoopGuard {
68    policy: TurnPolicy,
69    steps: u32,
70    tool_calls: u32,
71    consecutive_errors: u32,
72    start: Instant,
73}
74
75impl LoopGuard {
76    /// Create a new guard tied to the given policy.
77    #[must_use]
78    pub fn new(policy: TurnPolicy) -> Self {
79        Self { policy, steps: 0, tool_calls: 0, consecutive_errors: 0, start: Instant::now() }
80    }
81
82    /// Returns `true` if the turn may continue executing.
83    #[must_use]
84    pub fn can_continue(&self) -> bool {
85        self.steps < self.policy.max_steps &&
86            self.tool_calls < self.policy.max_tool_calls &&
87            self.consecutive_errors < self.policy.max_consecutive_errors &&
88            !self.timed_out()
89    }
90
91    /// Record one scheduler step.
92    pub fn record_step(&mut self) {
93        self.steps += 1;
94    }
95
96    /// Record one tool call.
97    pub fn record_tool_call(&mut self) {
98        self.tool_calls += 1;
99    }
100
101    /// Record a consecutive error.
102    pub fn record_error(&mut self) {
103        self.consecutive_errors += 1;
104    }
105
106    /// Reset the consecutive-error counter (e.g. after a successful call).
107    pub fn reset_errors(&mut self) {
108        self.consecutive_errors = 0;
109    }
110
111    /// The reason the guard stopped the turn.
112    ///
113    /// Only meaningful when [`can_continue`](Self::can_continue) returns `false`.
114    #[must_use]
115    pub fn reason(&self) -> GuardReason {
116        if self.steps >= self.policy.max_steps {
117            GuardReason::MaxSteps
118        } else if self.tool_calls >= self.policy.max_tool_calls {
119            GuardReason::MaxToolCalls
120        } else if self.consecutive_errors >= self.policy.max_consecutive_errors {
121            GuardReason::MaxConsecutiveErrors
122        } else if self.timed_out() {
123            GuardReason::TurnTimeout
124        } else {
125            // Fallback — shouldn't be called when `can_continue()` is true.
126            GuardReason::Cancelled
127        }
128    }
129
130    /// Returns `true` if the turn has exceeded its time budget.
131    #[must_use]
132    pub fn timed_out(&self) -> bool {
133        self.start.elapsed().as_millis() >= u128::from(self.policy.turn_timeout_ms)
134    }
135}
136
137// ── Default system instructions (v1) ─────────────────────────────────
138
139const DEFAULT_SYSTEM_INSTRUCTIONS: &str = "\
140You are a helpful AI assistant. \
141Think step by step before answering. \
142When you need external information, use the available tools.";
143const MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS: u32 = 3;
144
145fn resolve_system_instructions(req: &AgentRequest) -> String {
146    let Some(skills_prompt) = req.context.system_prompt.as_deref() else {
147        return DEFAULT_SYSTEM_INSTRUCTIONS.to_string();
148    };
149
150    if skills_prompt.trim().is_empty() {
151        DEFAULT_SYSTEM_INSTRUCTIONS.to_string()
152    } else {
153        format!("{DEFAULT_SYSTEM_INSTRUCTIONS}\n\n{skills_prompt}")
154    }
155}
156
157fn resolve_selected_skills(req: &AgentRequest) -> Vec<String> {
158    req.context.selected_skills.clone()
159}
160
161#[derive(Debug, Clone, Default)]
162struct ToolCallPolicy {
163    deny_tools: Vec<String>,
164    allow_tools: Option<Vec<String>>,
165}
166
167fn resolve_tool_call_policy(req: &AgentRequest) -> ToolCallPolicy {
168    let deny_tools =
169        normalize_tool_list(req.context.tool_policy.deny_tools.iter().map(String::as_str));
170    let allow_tools = req
171        .context
172        .tool_policy
173        .allow_tools
174        .as_ref()
175        .map(|tools| normalize_tool_list(tools.iter().map(String::as_str)));
176    ToolCallPolicy { deny_tools, allow_tools }
177}
178
179fn prompt_options_for_mode(
180    dispatch_mode: crate::DispatchMode,
181    llm: &dyn LlmPort,
182    output_schema: Option<serde_json::Value>,
183) -> crate::prompt::PromptBuildOptions {
184    let mut opts = match dispatch_mode {
185        crate::DispatchMode::PromptGuided => crate::prompt::PromptBuildOptions::default(),
186        crate::DispatchMode::NativePreferred => {
187            if llm.capabilities().native_tool_calling {
188                crate::prompt::PromptBuildOptions {
189                    include_action_schema: false,
190                    include_tool_schema: false,
191                    ..Default::default()
192                }
193            } else {
194                crate::prompt::PromptBuildOptions::default()
195            }
196        }
197    };
198    opts.structured_output = output_schema;
199    opts
200}
201
202fn parse_action_for_mode(
203    dispatch_mode: crate::DispatchMode,
204    llm: &dyn LlmPort,
205    response: &bob_core::types::LlmResponse,
206) -> Result<AgentAction, crate::action::ActionParseError> {
207    match dispatch_mode {
208        crate::DispatchMode::PromptGuided => crate::action::parse_action(&response.content),
209        crate::DispatchMode::NativePreferred => {
210            if llm.capabilities().native_tool_calling &&
211                let Some(tool_call) = response.tool_calls.first()
212            {
213                return Ok(AgentAction::ToolCall {
214                    name: tool_call.name.clone(),
215                    arguments: tool_call.arguments.clone(),
216                });
217            }
218            crate::action::parse_action(&response.content)
219        }
220    }
221}
222
223#[expect(
224    clippy::too_many_arguments,
225    reason = "tool execution needs explicit policy, approval, and timeout dependencies"
226)]
227async fn execute_tool_call(
228    tools: &dyn ToolPort,
229    guard: &mut LoopGuard,
230    tool_call: ToolCall,
231    policy: &ToolCallPolicy,
232    tool_policy_port: &dyn ToolPolicyPort,
233    approval_port: &dyn ApprovalPort,
234    approval_context: &ApprovalContext,
235    timeout_ms: u64,
236) -> ToolResult {
237    if !tool_policy_port.is_tool_allowed(
238        &tool_call.name,
239        &policy.deny_tools,
240        policy.allow_tools.as_deref(),
241    ) {
242        guard.record_error();
243        return ToolResult {
244            name: tool_call.name.clone(),
245            output: serde_json::json!({
246                "error": format!("tool '{}' denied by policy", tool_call.name)
247            }),
248            is_error: true,
249        };
250    }
251
252    match approval_port.approve_tool_call(&tool_call, approval_context).await {
253        Ok(ApprovalDecision::Approved) => {}
254        Ok(ApprovalDecision::Denied { reason }) => {
255            guard.record_error();
256            return ToolResult {
257                name: tool_call.name.clone(),
258                output: serde_json::json!({"error": reason}),
259                is_error: true,
260            };
261        }
262        Err(err) => {
263            guard.record_error();
264            return ToolResult {
265                name: tool_call.name.clone(),
266                output: serde_json::json!({"error": err.to_string()}),
267                is_error: true,
268            };
269        }
270    }
271
272    match tokio::time::timeout(
273        std::time::Duration::from_millis(timeout_ms),
274        tools.call_tool(tool_call.clone()),
275    )
276    .await
277    {
278        Ok(Ok(result)) => {
279            guard.reset_errors();
280            result
281        }
282        Ok(Err(err)) => {
283            guard.record_error();
284            ToolResult {
285                name: tool_call.name,
286                output: serde_json::json!({"error": err.to_string()}),
287                is_error: true,
288            }
289        }
290        Err(_) => {
291            guard.record_error();
292            ToolResult {
293                name: tool_call.name,
294                output: serde_json::json!({"error": "tool call timed out"}),
295                is_error: true,
296            }
297        }
298    }
299}
300
301// ── Turn Loop FSM ────────────────────────────────────────────────────
302
303/// Execute a single agent turn as a 6-state FSM.
304///
305/// States: Start → BuildPrompt → LlmInfer → ParseAction → CallTool → Done.
306/// The loop guard guarantees termination under all conditions.
307pub async fn run_turn(
308    llm: &dyn LlmPort,
309    tools: &dyn ToolPort,
310    store: &dyn SessionStore,
311    events: &dyn EventSink,
312    req: AgentRequest,
313    policy: &TurnPolicy,
314    default_model: &str,
315) -> Result<AgentRunResult, AgentError> {
316    let tool_policy = crate::DefaultToolPolicyPort;
317    let approval = crate::AllowAllApprovalPort;
318    let checkpoint_store = crate::NoOpCheckpointStorePort;
319    let artifact_store = crate::NoOpArtifactStorePort;
320    let cost_meter = crate::NoOpCostMeterPort;
321    let compactor = crate::prompt::WindowContextCompactor::default();
322    let journal = crate::NoOpToolJournalPort;
323    run_turn_with_extensions(
324        llm,
325        tools,
326        store,
327        events,
328        req,
329        policy,
330        default_model,
331        &tool_policy,
332        &approval,
333        crate::DispatchMode::NativePreferred,
334        &checkpoint_store,
335        &artifact_store,
336        &cost_meter,
337        &compactor,
338        &journal,
339    )
340    .await
341}
342
343/// Execute a single turn with explicit policy/approval controls.
344#[cfg_attr(
345    not(test),
346    expect(
347        dead_code,
348        reason = "reserved wrapper for partial control injection in external integrations"
349    )
350)]
351#[expect(
352    clippy::too_many_arguments,
353    reason = "wrapper exposes explicit dependency ports for compatibility and testability"
354)]
355pub(crate) async fn run_turn_with_controls(
356    llm: &dyn LlmPort,
357    tools: &dyn ToolPort,
358    store: &dyn SessionStore,
359    events: &dyn EventSink,
360    req: AgentRequest,
361    policy: &TurnPolicy,
362    default_model: &str,
363    tool_policy_port: &dyn ToolPolicyPort,
364    approval_port: &dyn ApprovalPort,
365) -> Result<AgentRunResult, AgentError> {
366    let checkpoint_store = crate::NoOpCheckpointStorePort;
367    let artifact_store = crate::NoOpArtifactStorePort;
368    let cost_meter = crate::NoOpCostMeterPort;
369    let compactor = crate::prompt::WindowContextCompactor::default();
370    let journal = crate::NoOpToolJournalPort;
371    run_turn_with_extensions(
372        llm,
373        tools,
374        store,
375        events,
376        req,
377        policy,
378        default_model,
379        tool_policy_port,
380        approval_port,
381        crate::DispatchMode::PromptGuided,
382        &checkpoint_store,
383        &artifact_store,
384        &cost_meter,
385        &compactor,
386        &journal,
387    )
388    .await
389}
390
391/// Execute a single turn with all extensibility controls injected.
392#[expect(
393    clippy::too_many_arguments,
394    reason = "core entrypoint exposes all ports explicitly for adapter injection"
395)]
396pub(crate) async fn run_turn_with_extensions(
397    llm: &dyn LlmPort,
398    tools: &dyn ToolPort,
399    store: &dyn SessionStore,
400    events: &dyn EventSink,
401    req: AgentRequest,
402    policy: &TurnPolicy,
403    default_model: &str,
404    tool_policy_port: &dyn ToolPolicyPort,
405    approval_port: &dyn ApprovalPort,
406    dispatch_mode: crate::DispatchMode,
407    checkpoint_store: &dyn TurnCheckpointStorePort,
408    artifact_store: &dyn ArtifactStorePort,
409    cost_meter: &dyn CostMeterPort,
410    context_compactor: &dyn ContextCompactorPort,
411    journal: &dyn ToolJournalPort,
412) -> Result<AgentRunResult, AgentError> {
413    let model = req.model.as_deref().unwrap_or(default_model);
414    let cancel_token = req.cancel_token.clone();
415    let system_instructions = resolve_system_instructions(&req);
416    let selected_skills = resolve_selected_skills(&req);
417    let tool_call_policy = resolve_tool_call_policy(&req);
418
419    let mut session = store.load(&req.session_id).await?.unwrap_or_default();
420    let tool_descriptors = tools.list_tools().await?;
421    let mut guard = LoopGuard::new(policy.clone());
422
423    // Progressive tool view: compact summaries for inactive tools, full schemas for activated.
424    let mut tool_view = crate::progressive_tools::ProgressiveToolView::new(tool_descriptors);
425
426    events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
427    if !selected_skills.is_empty() {
428        events.emit(AgentEvent::SkillsSelected {
429            session_id: req.session_id.clone(),
430            skill_names: selected_skills.clone(),
431        });
432    }
433
434    session.messages.push(Message::text(Role::User, req.input.clone()));
435
436    let mut tool_transcript: Vec<ToolResult> = Vec::new();
437    let mut total_usage = TokenUsage::default();
438    let mut consecutive_parse_failures: u32 = 0;
439    let mut consecutive_validation_failures: u32 = 0;
440    let max_output_retries = req.max_output_retries;
441    // Loop protection: cap repeated identical calls, while allowing non-consecutive repeats.
442    let mut last_tool_call_signature: Option<String> = None;
443    let mut same_tool_call_streak: u32 = 0;
444
445    loop {
446        let current_step = guard.steps.saturating_add(1);
447
448        if let Some(ref token) = cancel_token &&
449            token.is_cancelled()
450        {
451            return finish_turn(
452                store,
453                events,
454                &req.session_id,
455                &session,
456                FinishResult {
457                    content: "Turn cancelled.",
458                    tool_transcript,
459                    usage: total_usage,
460                    finish_reason: FinishReason::Cancelled,
461                },
462            )
463            .await;
464        }
465
466        cost_meter.check_budget(&req.session_id).await?;
467
468        if !guard.can_continue() {
469            let reason = guard.reason();
470            let msg = format!("Turn stopped: {reason:?}");
471            return finish_turn(
472                store,
473                events,
474                &req.session_id,
475                &session,
476                FinishResult {
477                    content: &msg,
478                    tool_transcript,
479                    usage: total_usage,
480                    finish_reason: FinishReason::GuardExceeded,
481                },
482            )
483            .await;
484        }
485
486        // Build system instructions with progressive tool summary.
487        let mut augmented_instructions = system_instructions.clone();
488        let tool_summary = tool_view.summary_prompt();
489        if !tool_summary.is_empty() {
490            augmented_instructions.push('\n');
491            augmented_instructions.push('\n');
492            augmented_instructions.push_str(&tool_summary);
493        }
494
495        let active_tools = tool_view.activated_tools();
496        let llm_request = crate::prompt::build_llm_request_with_options(
497            model,
498            &session,
499            &active_tools,
500            &augmented_instructions,
501            prompt_options_for_mode(dispatch_mode, llm, req.output_schema.clone()),
502            context_compactor,
503        )
504        .await;
505
506        events.emit(AgentEvent::LlmCallStarted {
507            session_id: req.session_id.clone(),
508            step: current_step,
509            model: model.to_string(),
510        });
511
512        let llm_response = if let Some(ref token) = cancel_token {
513            tokio::select! {
514                result = llm.complete(llm_request.clone()) => result?,
515                () = token.cancelled() => {
516                    return finish_turn(
517                        store, events, &req.session_id, &session,
518                        FinishResult { content: "Turn cancelled.", tool_transcript, usage: total_usage, finish_reason: FinishReason::Cancelled },
519                    ).await;
520                }
521            }
522        } else {
523            llm.complete(llm_request).await?
524        };
525
526        guard.record_step();
527        total_usage.prompt_tokens += llm_response.usage.prompt_tokens;
528        total_usage.completion_tokens += llm_response.usage.completion_tokens;
529        session.total_usage.prompt_tokens =
530            session.total_usage.prompt_tokens.saturating_add(llm_response.usage.prompt_tokens);
531        session.total_usage.completion_tokens = session
532            .total_usage
533            .completion_tokens
534            .saturating_add(llm_response.usage.completion_tokens);
535        cost_meter.record_llm_usage(&req.session_id, model, &llm_response.usage).await?;
536
537        events.emit(AgentEvent::LlmCallCompleted {
538            session_id: req.session_id.clone(),
539            step: current_step,
540            model: model.to_string(),
541            usage: llm_response.usage.clone(),
542        });
543        let native_tool_call = if llm.capabilities().native_tool_calling {
544            llm_response.tool_calls.first().cloned()
545        } else {
546            None
547        };
548
549        // Scan LLM response for tool name hints and activate them.
550        tool_view.activate_hints(&llm_response.content);
551
552        let assistant_message = if llm_response.tool_calls.is_empty() {
553            Message::text(Role::Assistant, llm_response.content.clone())
554        } else {
555            Message::assistant_tool_calls(
556                llm_response.content.clone(),
557                llm_response.tool_calls.clone(),
558            )
559        };
560        session.messages.push(assistant_message);
561
562        let _ = checkpoint_store
563            .save_checkpoint(&TurnCheckpoint {
564                session_id: req.session_id.clone(),
565                step: guard.steps,
566                tool_calls: guard.tool_calls,
567                usage: total_usage.clone(),
568            })
569            .await;
570
571        match parse_action_for_mode(dispatch_mode, llm, &llm_response) {
572            Ok(action) => {
573                consecutive_parse_failures = 0;
574                match action {
575                    AgentAction::Final { content } => {
576                        // Structured output validation if schema is set.
577                        if let Some(ref schema) = req.output_schema {
578                            match crate::output_validation::validate_output_str(&content, schema) {
579                                Ok(_) => {}
580                                Err(validation_err) => {
581                                    consecutive_validation_failures += 1;
582                                    if consecutive_validation_failures > max_output_retries {
583                                        tracing::warn!(
584                                            session_id = %req.session_id,
585                                            "output schema validation failed after {} retries",
586                                            max_output_retries,
587                                        );
588                                        // Accept the output despite validation failure.
589                                    } else {
590                                        let prompt =
591                                            crate::output_validation::validation_error_prompt(
592                                                &content,
593                                                &validation_err,
594                                            );
595                                        session.messages.push(Message::text(Role::User, prompt));
596                                        continue;
597                                    }
598                                }
599                            }
600                        }
601                        return finish_turn(
602                            store,
603                            events,
604                            &req.session_id,
605                            &session,
606                            FinishResult {
607                                content: &content,
608                                tool_transcript,
609                                usage: total_usage,
610                                finish_reason: FinishReason::Stop,
611                            },
612                        )
613                        .await;
614                    }
615                    AgentAction::AskUser { question } => {
616                        return finish_turn(
617                            store,
618                            events,
619                            &req.session_id,
620                            &session,
621                            FinishResult {
622                                content: &question,
623                                tool_transcript,
624                                usage: total_usage,
625                                finish_reason: FinishReason::Stop,
626                            },
627                        )
628                        .await;
629                    }
630                    AgentAction::ToolCall { name, arguments } => {
631                        let tool_call_id = native_tool_call
632                            .as_ref()
633                            .filter(|call| call.name == name && call.arguments == arguments)
634                            .and_then(|call| call.call_id.clone());
635                        // Activate the tool in progressive view for subsequent requests.
636                        tool_view.activate(&name);
637
638                        let call_signature = format!(
639                            "{}:{}",
640                            name,
641                            serde_json::to_string(&arguments).unwrap_or_default()
642                        );
643                        if last_tool_call_signature.as_ref() == Some(&call_signature) {
644                            same_tool_call_streak = same_tool_call_streak.saturating_add(1);
645                        } else {
646                            same_tool_call_streak = 1;
647                            last_tool_call_signature = Some(call_signature);
648                        }
649
650                        if same_tool_call_streak > MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS {
651                            events.emit(AgentEvent::ToolCallStarted {
652                                session_id: req.session_id.clone(),
653                                step: current_step,
654                                name: name.clone(),
655                            });
656                            let dup_result = ToolResult {
657                                name: name.clone(),
658                                output: serde_json::json!({
659                                    "error": format!(
660                                        "consecutive duplicate tool call limit reached (>{MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS}); skipping to prevent loop"
661                                    )
662                                }),
663                                is_error: true,
664                            };
665                            guard.record_tool_call();
666                            let _ =
667                                cost_meter.record_tool_result(&req.session_id, &dup_result).await;
668                            let output_str =
669                                serde_json::to_string(&dup_result.output).unwrap_or_default();
670                            session.messages.push(Message::tool_result(
671                                name.clone(),
672                                tool_call_id.clone(),
673                                output_str,
674                            ));
675                            events.emit(AgentEvent::ToolCallCompleted {
676                                session_id: req.session_id.clone(),
677                                step: current_step,
678                                name: name.clone(),
679                                is_error: true,
680                            });
681                            let _ = artifact_store
682                                .put(ArtifactRecord {
683                                    session_id: req.session_id.clone(),
684                                    kind: "tool_result".to_string(),
685                                    name: name.clone(),
686                                    content: dup_result.output.clone(),
687                                })
688                                .await;
689                            tool_transcript.push(dup_result);
690                            continue;
691                        }
692
693                        events.emit(AgentEvent::ToolCallStarted {
694                            session_id: req.session_id.clone(),
695                            step: current_step,
696                            name: name.clone(),
697                        });
698                        let approval_context = ApprovalContext {
699                            session_id: req.session_id.clone(),
700                            turn_step: guard.steps.max(1),
701                            selected_skills: selected_skills.clone(),
702                        };
703
704                        // Journal lookup for idempotent replay.
705                        let call_fingerprint = JournalEntry::fingerprint(&name, &arguments);
706                        let tool_result = if let Ok(Some(cached)) =
707                            journal.lookup(&req.session_id, &call_fingerprint).await
708                        {
709                            tracing::debug!(
710                                session_id = %req.session_id,
711                                tool = %name,
712                                "replaying tool result from journal"
713                            );
714                            ToolResult {
715                                name: cached.tool_name,
716                                output: cached.result,
717                                is_error: cached.is_error,
718                            }
719                        } else {
720                            let result = execute_tool_call(
721                                tools,
722                                &mut guard,
723                                ToolCall::new(name.clone(), arguments.clone()),
724                                &tool_call_policy,
725                                tool_policy_port,
726                                approval_port,
727                                &approval_context,
728                                policy.tool_timeout_ms,
729                            )
730                            .await;
731                            // Record in journal for future replay.
732                            let _ = journal
733                                .append(JournalEntry {
734                                    session_id: req.session_id.clone(),
735                                    call_fingerprint: call_fingerprint.clone(),
736                                    tool_name: name.clone(),
737                                    arguments: arguments.clone(),
738                                    result: result.output.clone(),
739                                    is_error: result.is_error,
740                                    timestamp_ms: bob_core::tape::now_ms(),
741                                })
742                                .await;
743                            result
744                        };
745
746                        guard.record_tool_call();
747                        let _ = cost_meter.record_tool_result(&req.session_id, &tool_result).await;
748
749                        let is_error = tool_result.is_error;
750                        events.emit(AgentEvent::ToolCallCompleted {
751                            session_id: req.session_id.clone(),
752                            step: current_step,
753                            name: name.clone(),
754                            is_error,
755                        });
756
757                        let output_str =
758                            serde_json::to_string(&tool_result.output).unwrap_or_default();
759                        session.messages.push(Message::tool_result(
760                            name.clone(),
761                            tool_call_id,
762                            output_str,
763                        ));
764
765                        let _ = artifact_store
766                            .put(ArtifactRecord {
767                                session_id: req.session_id.clone(),
768                                kind: "tool_result".to_string(),
769                                name: name.clone(),
770                                content: tool_result.output.clone(),
771                            })
772                            .await;
773
774                        tool_transcript.push(tool_result);
775                    }
776                }
777            }
778            Err(_parse_err) => {
779                consecutive_parse_failures += 1;
780                last_tool_call_signature = None;
781                same_tool_call_streak = 0;
782                if consecutive_parse_failures >= 2 {
783                    let _ = store.save(&req.session_id, &session).await;
784                    return Err(AgentError::Internal(
785                        "LLM produced invalid JSON after re-prompt".into(),
786                    ));
787                }
788                session.messages.push(Message::text(
789                    Role::User,
790                    "Your response was not valid JSON. \
791                     Please respond with exactly one JSON object \
792                     matching the required schema.",
793                ));
794            }
795        }
796    }
797}
798
799/// Bundled data for building the final response (reduces argument count).
800struct FinishResult<'a> {
801    content: &'a str,
802    tool_transcript: Vec<ToolResult>,
803    usage: TokenUsage,
804    finish_reason: FinishReason,
805}
806
807/// Helper: save session, emit `TurnCompleted`, and build the final response.
808async fn finish_turn(
809    store: &dyn SessionStore,
810    events: &dyn EventSink,
811    session_id: &bob_core::types::SessionId,
812    session: &bob_core::types::SessionState,
813    result: FinishResult<'_>,
814) -> Result<AgentRunResult, AgentError> {
815    store.save(session_id, session).await?;
816    events.emit(AgentEvent::TurnCompleted {
817        session_id: session_id.clone(),
818        finish_reason: result.finish_reason,
819        usage: result.usage.clone(),
820    });
821    Ok(AgentRunResult::Finished(AgentResponse {
822        content: result.content.to_string(),
823        tool_transcript: result.tool_transcript,
824        usage: result.usage,
825        finish_reason: result.finish_reason,
826    }))
827}
828
829/// Execute a single turn in streaming mode and return an event stream.
830pub async fn run_turn_stream(
831    llm: Arc<dyn LlmPort>,
832    tools: Arc<dyn ToolPort>,
833    store: Arc<dyn SessionStore>,
834    events: Arc<dyn EventSink>,
835    req: AgentRequest,
836    policy: TurnPolicy,
837    default_model: String,
838) -> Result<AgentEventStream, AgentError> {
839    let tool_policy: Arc<dyn ToolPolicyPort> = Arc::new(crate::DefaultToolPolicyPort);
840    let approval: Arc<dyn ApprovalPort> = Arc::new(crate::AllowAllApprovalPort);
841    let checkpoint_store: Arc<dyn TurnCheckpointStorePort> =
842        Arc::new(crate::NoOpCheckpointStorePort);
843    let artifact_store: Arc<dyn ArtifactStorePort> = Arc::new(crate::NoOpArtifactStorePort);
844    let cost_meter: Arc<dyn CostMeterPort> = Arc::new(crate::NoOpCostMeterPort);
845    let context_compactor: Arc<dyn ContextCompactorPort> =
846        Arc::new(crate::prompt::WindowContextCompactor::default());
847    let journal: Arc<dyn ToolJournalPort> = Arc::new(crate::NoOpToolJournalPort);
848    run_turn_stream_with_controls(
849        llm,
850        tools,
851        store,
852        events,
853        req,
854        policy,
855        default_model,
856        tool_policy,
857        approval,
858        crate::DispatchMode::NativePreferred,
859        checkpoint_store,
860        artifact_store,
861        cost_meter,
862        context_compactor,
863        journal,
864    )
865    .await
866}
867
868/// Execute a single turn in streaming mode with explicit policy/approval controls.
869#[expect(
870    clippy::too_many_arguments,
871    reason = "streaming entrypoint exposes all ports and controls explicitly for composition roots"
872)]
873pub(crate) async fn run_turn_stream_with_controls(
874    llm: Arc<dyn LlmPort>,
875    tools: Arc<dyn ToolPort>,
876    store: Arc<dyn SessionStore>,
877    events: Arc<dyn EventSink>,
878    req: AgentRequest,
879    policy: TurnPolicy,
880    default_model: String,
881    tool_policy: Arc<dyn ToolPolicyPort>,
882    approval: Arc<dyn ApprovalPort>,
883    dispatch_mode: crate::DispatchMode,
884    checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
885    artifact_store: Arc<dyn ArtifactStorePort>,
886    cost_meter: Arc<dyn CostMeterPort>,
887    context_compactor: Arc<dyn ContextCompactorPort>,
888    journal: Arc<dyn ToolJournalPort>,
889) -> Result<AgentEventStream, AgentError> {
890    let (tx, rx) = flume::bounded::<AgentStreamEvent>(STREAM_CHANNEL_CAPACITY);
891    let config = StreamRunConfig {
892        policy,
893        default_model,
894        tool_policy,
895        approval,
896        dispatch_mode,
897        checkpoint_store,
898        artifact_store,
899        cost_meter,
900        context_compactor,
901        journal,
902    };
903
904    tokio::spawn(async move {
905        if let Err(err) = run_turn_stream_inner(llm, tools, store, events, req, &config, &tx).await
906        {
907            let _ = tx.send_async(AgentStreamEvent::Error { error: err.to_string() }).await;
908        }
909    });
910
911    Ok(Box::pin(rx.into_stream()))
912}
913
914struct StreamRunConfig {
915    policy: TurnPolicy,
916    default_model: String,
917    tool_policy: Arc<dyn ToolPolicyPort>,
918    approval: Arc<dyn ApprovalPort>,
919    dispatch_mode: crate::DispatchMode,
920    checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
921    artifact_store: Arc<dyn ArtifactStorePort>,
922    cost_meter: Arc<dyn CostMeterPort>,
923    context_compactor: Arc<dyn ContextCompactorPort>,
924    journal: Arc<dyn ToolJournalPort>,
925}
926
927async fn run_turn_stream_inner(
928    llm: Arc<dyn LlmPort>,
929    tools: Arc<dyn ToolPort>,
930    store: Arc<dyn SessionStore>,
931    events: Arc<dyn EventSink>,
932    req: AgentRequest,
933    config: &StreamRunConfig,
934    tx: &flume::Sender<AgentStreamEvent>,
935) -> Result<(), AgentError> {
936    let model = req.model.as_deref().unwrap_or(&config.default_model);
937    let cancel_token = req.cancel_token.clone();
938    let system_instructions = resolve_system_instructions(&req);
939    let selected_skills = resolve_selected_skills(&req);
940    let tool_call_policy = resolve_tool_call_policy(&req);
941
942    let mut session = store.load(&req.session_id).await?.unwrap_or_default();
943    let tool_descriptors = tools.list_tools().await?;
944    let mut guard = LoopGuard::new(config.policy.clone());
945    let mut total_usage = TokenUsage::default();
946    let mut consecutive_parse_failures: u32 = 0;
947    let mut next_call_id: u64 = 0;
948    let mut last_tool_call_signature: Option<String> = None;
949    let mut same_tool_call_streak: u32 = 0;
950
951    events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
952    if !selected_skills.is_empty() {
953        events.emit(AgentEvent::SkillsSelected {
954            session_id: req.session_id.clone(),
955            skill_names: selected_skills.clone(),
956        });
957    }
958    session.messages.push(Message::text(Role::User, req.input.clone()));
959
960    loop {
961        let current_step = guard.steps.saturating_add(1);
962
963        if let Some(ref token) = cancel_token &&
964            token.is_cancelled()
965        {
966            events.emit(AgentEvent::Error {
967                session_id: req.session_id.clone(),
968                step: Some(current_step),
969                error: "turn cancelled".to_string(),
970            });
971            events.emit(AgentEvent::TurnCompleted {
972                session_id: req.session_id.clone(),
973                finish_reason: FinishReason::Cancelled,
974                usage: total_usage.clone(),
975            });
976            store.save(&req.session_id, &session).await?;
977            let _ = tx
978                .send_async(AgentStreamEvent::Error { error: "turn cancelled".to_string() })
979                .await;
980            let _ = tx.send_async(AgentStreamEvent::Finished { usage: total_usage.clone() }).await;
981            return Ok(());
982        }
983
984        config.cost_meter.check_budget(&req.session_id).await?;
985
986        if !guard.can_continue() {
987            let reason = guard.reason();
988            let msg = format!("Turn stopped: {reason:?}");
989            events.emit(AgentEvent::Error {
990                session_id: req.session_id.clone(),
991                step: Some(current_step),
992                error: msg.clone(),
993            });
994            events.emit(AgentEvent::TurnCompleted {
995                session_id: req.session_id.clone(),
996                finish_reason: FinishReason::GuardExceeded,
997                usage: total_usage.clone(),
998            });
999            store.save(&req.session_id, &session).await?;
1000            let _ = tx.send_async(AgentStreamEvent::Error { error: msg }).await;
1001            let _ = tx.send_async(AgentStreamEvent::Finished { usage: total_usage.clone() }).await;
1002            return Ok(());
1003        }
1004
1005        let llm_request = crate::prompt::build_llm_request_with_options(
1006            model,
1007            &session,
1008            &tool_descriptors,
1009            &system_instructions,
1010            prompt_options_for_mode(config.dispatch_mode, llm.as_ref(), req.output_schema.clone()),
1011            config.context_compactor.as_ref(),
1012        );
1013        events.emit(AgentEvent::LlmCallStarted {
1014            session_id: req.session_id.clone(),
1015            step: current_step,
1016            model: model.to_string(),
1017        });
1018
1019        let mut assistant_content = String::new();
1020        let mut llm_usage = TokenUsage::default();
1021        let mut llm_tool_calls: Vec<ToolCall> = Vec::new();
1022        let mut llm_finish_reason = FinishReason::Stop;
1023        let mut fallback_to_complete = false;
1024
1025        let llm_request = llm_request.await;
1026        if llm.capabilities().native_tool_calling {
1027            fallback_to_complete = true;
1028        } else {
1029            match llm.complete_stream(llm_request.clone()).await {
1030                Ok(mut stream) => {
1031                    while let Some(item) = stream.next().await {
1032                        match item {
1033                            Ok(bob_core::types::LlmStreamChunk::TextDelta(delta)) => {
1034                                assistant_content.push_str(&delta);
1035                                let _ = tx
1036                                    .send_async(AgentStreamEvent::TextDelta { content: delta })
1037                                    .await;
1038                            }
1039                            Ok(bob_core::types::LlmStreamChunk::Done { usage }) => {
1040                                llm_usage = usage;
1041                            }
1042                            Err(err) => {
1043                                events.emit(AgentEvent::Error {
1044                                    session_id: req.session_id.clone(),
1045                                    step: Some(current_step),
1046                                    error: err.to_string(),
1047                                });
1048                                return Err(AgentError::Llm(err));
1049                            }
1050                        }
1051                    }
1052                }
1053                Err(err) => {
1054                    fallback_to_complete = true;
1055                    let _ = err;
1056                }
1057            }
1058        }
1059
1060        // Native tool-calling currently requires the non-streaming response path because the
1061        // internal stream interface does not expose structured tool-call chunks.
1062        // Providers that simply lack streaming support also fall back here.
1063        if fallback_to_complete {
1064            let llm_response = llm.complete(llm_request).await?;
1065            assistant_content = llm_response.content.clone();
1066            llm_usage = llm_response.usage;
1067            llm_finish_reason = llm_response.finish_reason;
1068            llm_tool_calls = llm_response.tool_calls;
1069            let _ =
1070                tx.send_async(AgentStreamEvent::TextDelta { content: llm_response.content }).await;
1071        }
1072
1073        guard.record_step();
1074        total_usage.prompt_tokens += llm_usage.prompt_tokens;
1075        total_usage.completion_tokens += llm_usage.completion_tokens;
1076        session.total_usage.prompt_tokens =
1077            session.total_usage.prompt_tokens.saturating_add(llm_usage.prompt_tokens);
1078        session.total_usage.completion_tokens =
1079            session.total_usage.completion_tokens.saturating_add(llm_usage.completion_tokens);
1080        config.cost_meter.record_llm_usage(&req.session_id, model, &llm_usage).await?;
1081        events.emit(AgentEvent::LlmCallCompleted {
1082            session_id: req.session_id.clone(),
1083            step: current_step,
1084            model: model.to_string(),
1085            usage: llm_usage.clone(),
1086        });
1087        let native_tool_call = if llm.capabilities().native_tool_calling {
1088            llm_tool_calls.first().cloned()
1089        } else {
1090            None
1091        };
1092        let assistant_message = if llm_tool_calls.is_empty() {
1093            Message::text(Role::Assistant, assistant_content.clone())
1094        } else {
1095            Message::assistant_tool_calls(assistant_content.clone(), llm_tool_calls.clone())
1096        };
1097        session.messages.push(assistant_message);
1098
1099        let _ = config
1100            .checkpoint_store
1101            .save_checkpoint(&TurnCheckpoint {
1102                session_id: req.session_id.clone(),
1103                step: guard.steps,
1104                tool_calls: guard.tool_calls,
1105                usage: total_usage.clone(),
1106            })
1107            .await;
1108
1109        let response_for_dispatch = bob_core::types::LlmResponse {
1110            content: assistant_content.clone(),
1111            usage: llm_usage.clone(),
1112            finish_reason: llm_finish_reason,
1113            tool_calls: llm_tool_calls,
1114        };
1115
1116        if let Ok(action) =
1117            parse_action_for_mode(config.dispatch_mode, llm.as_ref(), &response_for_dispatch)
1118        {
1119            consecutive_parse_failures = 0;
1120            match action {
1121                AgentAction::Final { .. } | AgentAction::AskUser { .. } => {
1122                    store.save(&req.session_id, &session).await?;
1123                    events.emit(AgentEvent::TurnCompleted {
1124                        session_id: req.session_id.clone(),
1125                        finish_reason: FinishReason::Stop,
1126                        usage: total_usage.clone(),
1127                    });
1128                    let _ = tx
1129                        .send_async(AgentStreamEvent::Finished { usage: total_usage.clone() })
1130                        .await;
1131                    return Ok(());
1132                }
1133                AgentAction::ToolCall { name, arguments } => {
1134                    let tool_call_id = native_tool_call
1135                        .as_ref()
1136                        .filter(|call| call.name == name && call.arguments == arguments)
1137                        .and_then(|call| call.call_id.clone());
1138                    let call_signature = format!(
1139                        "{}:{}",
1140                        name,
1141                        serde_json::to_string(&arguments).unwrap_or_default()
1142                    );
1143                    if last_tool_call_signature.as_ref() == Some(&call_signature) {
1144                        same_tool_call_streak = same_tool_call_streak.saturating_add(1);
1145                    } else {
1146                        same_tool_call_streak = 1;
1147                        last_tool_call_signature = Some(call_signature);
1148                    }
1149                    if same_tool_call_streak > MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS {
1150                        next_call_id += 1;
1151                        let call_id = format!("call-{next_call_id}");
1152                        events.emit(AgentEvent::ToolCallStarted {
1153                            session_id: req.session_id.clone(),
1154                            step: current_step,
1155                            name: name.clone(),
1156                        });
1157                        let _ = tx.send(AgentStreamEvent::ToolCallStarted {
1158                            name: name.clone(),
1159                            call_id: call_id.clone(),
1160                        });
1161                        guard.record_tool_call();
1162                        let duplicate_result = ToolResult {
1163                            name: name.clone(),
1164                            output: serde_json::json!({
1165                                "error": format!(
1166                                    "consecutive duplicate tool call limit reached (>{MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS}); skipping to prevent loop"
1167                                )
1168                            }),
1169                            is_error: true,
1170                        };
1171                        let _ = config
1172                            .cost_meter
1173                            .record_tool_result(&req.session_id, &duplicate_result)
1174                            .await;
1175                        events.emit(AgentEvent::ToolCallCompleted {
1176                            session_id: req.session_id.clone(),
1177                            step: current_step,
1178                            name: name.clone(),
1179                            is_error: true,
1180                        });
1181                        let output_str =
1182                            serde_json::to_string(&duplicate_result.output).unwrap_or_default();
1183                        session.messages.push(Message::tool_result(
1184                            name.clone(),
1185                            tool_call_id.clone(),
1186                            output_str,
1187                        ));
1188                        let _ = config
1189                            .artifact_store
1190                            .put(ArtifactRecord {
1191                                session_id: req.session_id.clone(),
1192                                kind: "tool_result".to_string(),
1193                                name: name.clone(),
1194                                content: duplicate_result.output.clone(),
1195                            })
1196                            .await;
1197                        let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
1198                            call_id,
1199                            result: duplicate_result,
1200                        });
1201                        continue;
1202                    }
1203
1204                    events.emit(AgentEvent::ToolCallStarted {
1205                        session_id: req.session_id.clone(),
1206                        step: current_step,
1207                        name: name.clone(),
1208                    });
1209                    next_call_id += 1;
1210                    let call_id = format!("call-{next_call_id}");
1211                    let _ = tx.send(AgentStreamEvent::ToolCallStarted {
1212                        name: name.clone(),
1213                        call_id: call_id.clone(),
1214                    });
1215                    let approval_context = ApprovalContext {
1216                        session_id: req.session_id.clone(),
1217                        turn_step: guard.steps.max(1),
1218                        selected_skills: selected_skills.clone(),
1219                    };
1220
1221                    // Journal lookup for idempotent replay.
1222                    let call_fingerprint = JournalEntry::fingerprint(&name, &arguments);
1223                    let tool_result = if let Ok(Some(cached)) =
1224                        config.journal.lookup(&req.session_id, &call_fingerprint).await
1225                    {
1226                        tracing::debug!(
1227                            session_id = %req.session_id,
1228                            tool = %name,
1229                            "replaying tool result from journal"
1230                        );
1231                        ToolResult {
1232                            name: cached.tool_name,
1233                            output: cached.result,
1234                            is_error: cached.is_error,
1235                        }
1236                    } else {
1237                        let result = execute_tool_call(
1238                            tools.as_ref(),
1239                            &mut guard,
1240                            ToolCall::new(name.clone(), arguments.clone()),
1241                            &tool_call_policy,
1242                            config.tool_policy.as_ref(),
1243                            config.approval.as_ref(),
1244                            &approval_context,
1245                            config.policy.tool_timeout_ms,
1246                        )
1247                        .await;
1248                        // Record in journal for future replay.
1249                        let _ = config
1250                            .journal
1251                            .append(JournalEntry {
1252                                session_id: req.session_id.clone(),
1253                                call_fingerprint: call_fingerprint.clone(),
1254                                tool_name: name.clone(),
1255                                arguments: arguments.clone(),
1256                                result: result.output.clone(),
1257                                is_error: result.is_error,
1258                                timestamp_ms: bob_core::tape::now_ms(),
1259                            })
1260                            .await;
1261                        result
1262                    };
1263
1264                    guard.record_tool_call();
1265                    let _ =
1266                        config.cost_meter.record_tool_result(&req.session_id, &tool_result).await;
1267                    let is_error = tool_result.is_error;
1268                    events.emit(AgentEvent::ToolCallCompleted {
1269                        session_id: req.session_id.clone(),
1270                        step: current_step,
1271                        name: name.clone(),
1272                        is_error,
1273                    });
1274                    let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
1275                        call_id,
1276                        result: tool_result.clone(),
1277                    });
1278
1279                    let output_str = serde_json::to_string(&tool_result.output).unwrap_or_default();
1280                    session.messages.push(Message::tool_result(
1281                        name.clone(),
1282                        tool_call_id,
1283                        output_str,
1284                    ));
1285                    let _ = config
1286                        .artifact_store
1287                        .put(ArtifactRecord {
1288                            session_id: req.session_id.clone(),
1289                            kind: "tool_result".to_string(),
1290                            name: name.clone(),
1291                            content: tool_result.output.clone(),
1292                        })
1293                        .await;
1294                }
1295            }
1296        } else {
1297            consecutive_parse_failures += 1;
1298            last_tool_call_signature = None;
1299            same_tool_call_streak = 0;
1300            if consecutive_parse_failures >= 2 {
1301                store.save(&req.session_id, &session).await?;
1302                events.emit(AgentEvent::Error {
1303                    session_id: req.session_id.clone(),
1304                    step: Some(current_step),
1305                    error: "LLM produced invalid JSON after re-prompt".to_string(),
1306                });
1307                return Err(AgentError::Internal(
1308                    "LLM produced invalid JSON after re-prompt".into(),
1309                ));
1310            }
1311            session.messages.push(Message::text(
1312                Role::User,
1313                "Your response was not valid JSON. \
1314                 Please respond with exactly one JSON object \
1315                 matching the required schema.",
1316            ));
1317        }
1318    }
1319}
1320
1321#[cfg(test)]
1322mod tests {
1323    use super::*;
1324
1325    /// Small policy with tight limits for fast, deterministic tests.
1326    fn test_policy() -> TurnPolicy {
1327        TurnPolicy {
1328            max_steps: 3,
1329            max_tool_calls: 2,
1330            max_consecutive_errors: 2,
1331            turn_timeout_ms: 100,
1332            tool_timeout_ms: 50,
1333        }
1334    }
1335
1336    #[test]
1337    fn trips_on_max_steps() {
1338        let mut guard = LoopGuard::new(test_policy());
1339        assert!(guard.can_continue());
1340
1341        for _ in 0..3 {
1342            guard.record_step();
1343        }
1344
1345        assert!(!guard.can_continue(), "guard should trip after reaching max_steps");
1346        assert_eq!(guard.reason(), GuardReason::MaxSteps);
1347    }
1348
1349    #[test]
1350    fn trips_on_max_tool_calls() {
1351        let mut guard = LoopGuard::new(test_policy());
1352        assert!(guard.can_continue());
1353
1354        for _ in 0..2 {
1355            guard.record_tool_call();
1356        }
1357
1358        assert!(!guard.can_continue(), "guard should trip after reaching max_tool_calls");
1359        assert_eq!(guard.reason(), GuardReason::MaxToolCalls);
1360    }
1361
1362    #[test]
1363    fn trips_on_max_consecutive_errors() {
1364        let mut guard = LoopGuard::new(test_policy());
1365        assert!(guard.can_continue());
1366
1367        for _ in 0..2 {
1368            guard.record_error();
1369        }
1370
1371        assert!(!guard.can_continue(), "guard should trip after reaching max_consecutive_errors");
1372        assert_eq!(guard.reason(), GuardReason::MaxConsecutiveErrors);
1373    }
1374
1375    #[tokio::test]
1376    async fn trips_on_timeout() {
1377        let guard = LoopGuard::new(test_policy());
1378        assert!(guard.can_continue());
1379        assert!(!guard.timed_out());
1380
1381        // Sleep past the 100 ms timeout.
1382        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
1383
1384        assert!(!guard.can_continue(), "guard should trip after timeout");
1385        assert!(guard.timed_out());
1386        assert_eq!(guard.reason(), GuardReason::TurnTimeout);
1387    }
1388
1389    #[test]
1390    fn reset_errors_clears_counter() {
1391        let mut guard = LoopGuard::new(test_policy());
1392
1393        guard.record_error();
1394        guard.reset_errors();
1395
1396        // After reset, a single error should NOT trip the guard.
1397        guard.record_error();
1398        assert!(guard.can_continue(), "single error after reset should not trip guard");
1399    }
1400
1401    // ── run_turn FSM tests ───────────────────────────────────────
1402
1403    use std::{
1404        collections::{HashMap, VecDeque},
1405        sync::{Arc, Mutex},
1406    };
1407
1408    use bob_core::{
1409        error::{CostError, LlmError, StoreError, ToolError},
1410        ports::{
1411            ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
1412            ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
1413        },
1414        types::{
1415            AgentEvent, AgentRequest, AgentRunResult, AgentStreamEvent, ApprovalContext,
1416            ApprovalDecision, ArtifactRecord, CancelToken, LlmRequest, LlmResponse, LlmStream,
1417            LlmStreamChunk, SessionId, SessionState, ToolCall, ToolDescriptor, ToolResult,
1418            TurnCheckpoint,
1419        },
1420    };
1421    use futures_util::StreamExt;
1422
1423    // ── Mock ports ───────────────────────────────────────────────
1424
1425    /// LLM mock that returns queued responses in order.
1426    struct SequentialLlm {
1427        responses: Mutex<VecDeque<Result<LlmResponse, LlmError>>>,
1428    }
1429
1430    impl SequentialLlm {
1431        fn from_contents(contents: Vec<&str>) -> Self {
1432            let responses = contents
1433                .into_iter()
1434                .map(|c| {
1435                    Ok(LlmResponse {
1436                        content: c.to_string(),
1437                        usage: TokenUsage::default(),
1438                        finish_reason: FinishReason::Stop,
1439                        tool_calls: Vec::new(),
1440                    })
1441                })
1442                .collect();
1443            Self { responses: Mutex::new(responses) }
1444        }
1445
1446        fn from_responses(responses: Vec<LlmResponse>) -> Self {
1447            let queued = responses.into_iter().map(Ok).collect();
1448            Self { responses: Mutex::new(queued) }
1449        }
1450    }
1451
1452    #[async_trait::async_trait]
1453    impl LlmPort for SequentialLlm {
1454        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1455            let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
1456            q.pop_front().unwrap_or_else(|| {
1457                Ok(LlmResponse {
1458                    content: r#"{"type": "final", "content": "fallback"}"#.to_string(),
1459                    usage: TokenUsage::default(),
1460                    finish_reason: FinishReason::Stop,
1461                    tool_calls: Vec::new(),
1462                })
1463            })
1464        }
1465
1466        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1467            Err(LlmError::Provider("not implemented".into()))
1468        }
1469    }
1470
1471    /// Tool port mock with configurable tools and call results.
1472    struct MockToolPort {
1473        tools: Vec<ToolDescriptor>,
1474        call_results: Mutex<VecDeque<Result<ToolResult, ToolError>>>,
1475    }
1476
1477    impl MockToolPort {
1478        fn empty() -> Self {
1479            Self { tools: vec![], call_results: Mutex::new(VecDeque::new()) }
1480        }
1481
1482        fn with_tool_and_results(
1483            tool_name: &str,
1484            results: Vec<Result<ToolResult, ToolError>>,
1485        ) -> Self {
1486            Self {
1487                tools: vec![
1488                    ToolDescriptor::new(tool_name, format!("{tool_name} tool"))
1489                        .with_input_schema(serde_json::json!({"type": "object"})),
1490                ],
1491                call_results: Mutex::new(results.into()),
1492            }
1493        }
1494    }
1495
1496    #[async_trait::async_trait]
1497    impl ToolPort for MockToolPort {
1498        async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1499            Ok(self.tools.clone())
1500        }
1501
1502        async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
1503            let mut q = self.call_results.lock().unwrap_or_else(|p| p.into_inner());
1504            q.pop_front().unwrap_or_else(|| {
1505                Ok(ToolResult {
1506                    name: call.name,
1507                    output: serde_json::json!({"result": "default"}),
1508                    is_error: false,
1509                })
1510            })
1511        }
1512    }
1513
1514    struct NoCallToolPort {
1515        tools: Vec<ToolDescriptor>,
1516    }
1517
1518    #[async_trait::async_trait]
1519    impl ToolPort for NoCallToolPort {
1520        async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1521            Ok(self.tools.clone())
1522        }
1523
1524        async fn call_tool(&self, _call: ToolCall) -> Result<ToolResult, ToolError> {
1525            Err(ToolError::Execution(
1526                "tool call should be blocked by policy before execution".to_string(),
1527            ))
1528        }
1529    }
1530
1531    struct AllowAllPolicyPort;
1532
1533    impl ToolPolicyPort for AllowAllPolicyPort {
1534        fn is_tool_allowed(
1535            &self,
1536            _tool: &str,
1537            _deny_tools: &[String],
1538            _allow_tools: Option<&[String]>,
1539        ) -> bool {
1540            true
1541        }
1542    }
1543
1544    struct DenySearchPolicyPort;
1545
1546    impl ToolPolicyPort for DenySearchPolicyPort {
1547        fn is_tool_allowed(
1548            &self,
1549            tool: &str,
1550            _deny_tools: &[String],
1551            _allow_tools: Option<&[String]>,
1552        ) -> bool {
1553            tool != "search"
1554        }
1555    }
1556
1557    struct AlwaysApprovePort;
1558
1559    #[async_trait::async_trait]
1560    impl ApprovalPort for AlwaysApprovePort {
1561        async fn approve_tool_call(
1562            &self,
1563            _call: &ToolCall,
1564            _context: &ApprovalContext,
1565        ) -> Result<ApprovalDecision, ToolError> {
1566            Ok(ApprovalDecision::Approved)
1567        }
1568    }
1569
1570    struct AlwaysDenyApprovalPort;
1571
1572    #[async_trait::async_trait]
1573    impl ApprovalPort for AlwaysDenyApprovalPort {
1574        async fn approve_tool_call(
1575            &self,
1576            _call: &ToolCall,
1577            _context: &ApprovalContext,
1578        ) -> Result<ApprovalDecision, ToolError> {
1579            Ok(ApprovalDecision::Denied {
1580                reason: "approval policy rejected tool call".to_string(),
1581            })
1582        }
1583    }
1584
1585    struct CountingCheckpointPort {
1586        saved: Mutex<Vec<TurnCheckpoint>>,
1587    }
1588
1589    impl CountingCheckpointPort {
1590        fn new() -> Self {
1591            Self { saved: Mutex::new(Vec::new()) }
1592        }
1593    }
1594
1595    #[async_trait::async_trait]
1596    impl TurnCheckpointStorePort for CountingCheckpointPort {
1597        async fn save_checkpoint(&self, checkpoint: &TurnCheckpoint) -> Result<(), StoreError> {
1598            self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(checkpoint.clone());
1599            Ok(())
1600        }
1601
1602        async fn load_latest(
1603            &self,
1604            _session_id: &SessionId,
1605        ) -> Result<Option<TurnCheckpoint>, StoreError> {
1606            Ok(None)
1607        }
1608    }
1609
1610    struct NoopArtifactStore;
1611
1612    #[async_trait::async_trait]
1613    impl ArtifactStorePort for NoopArtifactStore {
1614        async fn put(&self, _artifact: ArtifactRecord) -> Result<(), StoreError> {
1615            Ok(())
1616        }
1617
1618        async fn list_by_session(
1619            &self,
1620            _session_id: &SessionId,
1621        ) -> Result<Vec<ArtifactRecord>, StoreError> {
1622            Ok(Vec::new())
1623        }
1624    }
1625
1626    struct CountingArtifactStore {
1627        saved: Mutex<Vec<ArtifactRecord>>,
1628    }
1629
1630    impl CountingArtifactStore {
1631        fn new() -> Self {
1632            Self { saved: Mutex::new(Vec::new()) }
1633        }
1634    }
1635
1636    #[async_trait::async_trait]
1637    impl ArtifactStorePort for CountingArtifactStore {
1638        async fn put(&self, artifact: ArtifactRecord) -> Result<(), StoreError> {
1639            self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(artifact);
1640            Ok(())
1641        }
1642
1643        async fn list_by_session(
1644            &self,
1645            _session_id: &SessionId,
1646        ) -> Result<Vec<ArtifactRecord>, StoreError> {
1647            Ok(self.saved.lock().unwrap_or_else(|p| p.into_inner()).clone())
1648        }
1649    }
1650
1651    struct CountingCostMeter {
1652        llm_calls: Mutex<u32>,
1653        tool_results: Mutex<u32>,
1654    }
1655
1656    impl CountingCostMeter {
1657        fn new() -> Self {
1658            Self { llm_calls: Mutex::new(0), tool_results: Mutex::new(0) }
1659        }
1660    }
1661
1662    #[async_trait::async_trait]
1663    impl CostMeterPort for CountingCostMeter {
1664        async fn check_budget(&self, _session_id: &SessionId) -> Result<(), CostError> {
1665            Ok(())
1666        }
1667
1668        async fn record_llm_usage(
1669            &self,
1670            _session_id: &SessionId,
1671            _model: &str,
1672            _usage: &TokenUsage,
1673        ) -> Result<(), CostError> {
1674            let mut count = self.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
1675            *count += 1;
1676            Ok(())
1677        }
1678
1679        async fn record_tool_result(
1680            &self,
1681            _session_id: &SessionId,
1682            _tool_result: &ToolResult,
1683        ) -> Result<(), CostError> {
1684            let mut count = self.tool_results.lock().unwrap_or_else(|p| p.into_inner());
1685            *count += 1;
1686            Ok(())
1687        }
1688    }
1689
1690    struct MemoryStore {
1691        data: Mutex<HashMap<SessionId, SessionState>>,
1692    }
1693
1694    impl MemoryStore {
1695        fn new() -> Self {
1696            Self { data: Mutex::new(HashMap::new()) }
1697        }
1698    }
1699
1700    struct FailingSaveStore;
1701
1702    #[async_trait::async_trait]
1703    impl SessionStore for FailingSaveStore {
1704        async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1705            Ok(None)
1706        }
1707
1708        async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
1709            Err(StoreError::Backend("simulated save failure".into()))
1710        }
1711    }
1712
1713    #[async_trait::async_trait]
1714    impl SessionStore for MemoryStore {
1715        async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1716            let map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1717            Ok(map.get(id).cloned())
1718        }
1719
1720        async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
1721            let mut map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1722            map.insert(id.clone(), state.clone());
1723            Ok(())
1724        }
1725    }
1726
1727    struct CollectingSink {
1728        events: Mutex<Vec<AgentEvent>>,
1729    }
1730
1731    impl CollectingSink {
1732        fn new() -> Self {
1733            Self { events: Mutex::new(Vec::new()) }
1734        }
1735
1736        fn event_count(&self) -> usize {
1737            self.events.lock().unwrap_or_else(|p| p.into_inner()).len()
1738        }
1739
1740        fn all_events(&self) -> Vec<AgentEvent> {
1741            self.events.lock().unwrap_or_else(|p| p.into_inner()).clone()
1742        }
1743    }
1744
1745    impl EventSink for CollectingSink {
1746        fn emit(&self, event: AgentEvent) {
1747            self.events.lock().unwrap_or_else(|p| p.into_inner()).push(event);
1748        }
1749    }
1750
1751    fn make_request(input: &str) -> AgentRequest {
1752        AgentRequest {
1753            input: input.into(),
1754            session_id: "test-session".into(),
1755            model: None,
1756            context: bob_core::types::RequestContext::default(),
1757            cancel_token: None,
1758            output_schema: None,
1759            max_output_retries: 0,
1760        }
1761    }
1762
1763    fn generous_policy() -> TurnPolicy {
1764        TurnPolicy {
1765            max_steps: 20,
1766            max_tool_calls: 10,
1767            max_consecutive_errors: 3,
1768            turn_timeout_ms: 30_000,
1769            tool_timeout_ms: 5_000,
1770        }
1771    }
1772
1773    struct StreamLlm {
1774        chunks: Mutex<VecDeque<Result<LlmStreamChunk, LlmError>>>,
1775    }
1776
1777    impl StreamLlm {
1778        fn new(chunks: Vec<Result<LlmStreamChunk, LlmError>>) -> Self {
1779            Self { chunks: Mutex::new(chunks.into()) }
1780        }
1781    }
1782
1783    #[async_trait::async_trait]
1784    impl LlmPort for StreamLlm {
1785        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1786            Err(LlmError::Provider("complete() should not be called in stream test".into()))
1787        }
1788
1789        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1790            let mut chunks = self.chunks.lock().unwrap_or_else(|p| p.into_inner());
1791            let items: Vec<Result<LlmStreamChunk, LlmError>> = chunks.drain(..).collect();
1792            Ok(Box::pin(futures_util::stream::iter(items)))
1793        }
1794    }
1795
1796    struct InspectingLlm {
1797        expected_substring: String,
1798    }
1799
1800    #[async_trait::async_trait]
1801    impl LlmPort for InspectingLlm {
1802        async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
1803            let system = req
1804                .messages
1805                .iter()
1806                .find(|m| m.role == Role::System)
1807                .map(|m| m.content.clone())
1808                .unwrap_or_default();
1809            if !system.contains(&self.expected_substring) {
1810                return Err(LlmError::Provider(format!(
1811                    "expected system prompt to include '{}', got: {}",
1812                    self.expected_substring, system
1813                )));
1814            }
1815            Ok(LlmResponse {
1816                content: r#"{"type": "final", "content": "ok"}"#.to_string(),
1817                usage: TokenUsage::default(),
1818                finish_reason: FinishReason::Stop,
1819                tool_calls: Vec::new(),
1820            })
1821        }
1822
1823        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1824            Err(LlmError::Provider("not used".into()))
1825        }
1826    }
1827
1828    // ── TC-01: Simple Final response ─────────────────────────────
1829
1830    #[tokio::test]
1831    async fn tc01_simple_final_response() {
1832        let llm =
1833            SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "Hello there!"}"#]);
1834        let tools = MockToolPort::empty();
1835        let store = MemoryStore::new();
1836        let sink = CollectingSink::new();
1837
1838        let result = run_turn(
1839            &llm,
1840            &tools,
1841            &store,
1842            &sink,
1843            make_request("Hi"),
1844            &generous_policy(),
1845            "test-model",
1846        )
1847        .await;
1848
1849        assert!(
1850            matches!(&result, Ok(AgentRunResult::Finished(_))),
1851            "expected Finished, got {result:?}"
1852        );
1853        let resp = match result {
1854            Ok(AgentRunResult::Finished(r)) => r,
1855            _ => return,
1856        };
1857
1858        assert_eq!(resp.content, "Hello there!");
1859        assert_eq!(resp.finish_reason, FinishReason::Stop);
1860        assert!(resp.tool_transcript.is_empty());
1861        assert!(sink.event_count() >= 3, "should emit TurnStarted, LlmCall*, TurnCompleted");
1862    }
1863
1864    // ── TC-02: ToolCall → Final chain ────────────────────────────
1865
1866    #[tokio::test]
1867    async fn tc02_tool_call_then_final() {
1868        let llm = SequentialLlm::from_contents(vec![
1869            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1870            r#"{"type": "final", "content": "Found results."}"#,
1871        ]);
1872        let tools = MockToolPort::with_tool_and_results(
1873            "search",
1874            vec![Ok(ToolResult {
1875                name: "search".into(),
1876                output: serde_json::json!({"hits": 42}),
1877                is_error: false,
1878            })],
1879        );
1880        let store = MemoryStore::new();
1881        let sink = CollectingSink::new();
1882
1883        let result = run_turn(
1884            &llm,
1885            &tools,
1886            &store,
1887            &sink,
1888            make_request("Search for rust"),
1889            &generous_policy(),
1890            "test-model",
1891        )
1892        .await;
1893
1894        assert!(
1895            matches!(&result, Ok(AgentRunResult::Finished(_))),
1896            "expected Finished, got {result:?}"
1897        );
1898        let resp = match result {
1899            Ok(AgentRunResult::Finished(r)) => r,
1900            _ => return,
1901        };
1902
1903        assert_eq!(resp.content, "Found results.");
1904        assert_eq!(resp.finish_reason, FinishReason::Stop);
1905        assert_eq!(resp.tool_transcript.len(), 1);
1906        assert_eq!(resp.tool_transcript[0].name, "search");
1907        assert!(!resp.tool_transcript[0].is_error);
1908    }
1909
1910    // ── TC-03: Parse error → re-prompt → success ────────────────
1911
1912    #[tokio::test]
1913    async fn tc03_parse_error_reprompt_success() {
1914        let llm = SequentialLlm::from_contents(vec![
1915            "This is not JSON at all.",
1916            r#"{"type": "final", "content": "Recovered"}"#,
1917        ]);
1918        let tools = MockToolPort::empty();
1919        let store = MemoryStore::new();
1920        let sink = CollectingSink::new();
1921
1922        let result = run_turn(
1923            &llm,
1924            &tools,
1925            &store,
1926            &sink,
1927            make_request("Hi"),
1928            &generous_policy(),
1929            "test-model",
1930        )
1931        .await;
1932
1933        assert!(
1934            matches!(&result, Ok(AgentRunResult::Finished(_))),
1935            "expected Finished after re-prompt, got {result:?}"
1936        );
1937        let resp = match result {
1938            Ok(AgentRunResult::Finished(r)) => r,
1939            _ => return,
1940        };
1941
1942        assert_eq!(resp.content, "Recovered");
1943        assert_eq!(resp.finish_reason, FinishReason::Stop);
1944    }
1945
1946    // ── TC-04: Double parse error → AgentError ──────────────────
1947
1948    #[tokio::test]
1949    async fn tc04_double_parse_error() {
1950        let llm = SequentialLlm::from_contents(vec!["not json 1", "not json 2"]);
1951        let tools = MockToolPort::empty();
1952        let store = MemoryStore::new();
1953        let sink = CollectingSink::new();
1954
1955        let result = run_turn(
1956            &llm,
1957            &tools,
1958            &store,
1959            &sink,
1960            make_request("Hi"),
1961            &generous_policy(),
1962            "test-model",
1963        )
1964        .await;
1965
1966        assert!(result.is_err(), "should return error after two parse failures");
1967        let msg = match result {
1968            Err(err) => err.to_string(),
1969            Ok(value) => format!("unexpected success: {value:?}"),
1970        };
1971        assert!(msg.contains("invalid JSON"), "error message = {msg}");
1972    }
1973
1974    // ── TC-05: max_steps exhaustion → GuardExceeded ─────────────
1975
1976    #[tokio::test]
1977    async fn tc05_max_steps_exhaustion() {
1978        // LLM always returns tool calls — the guard should stop after max_steps.
1979        let llm = SequentialLlm::from_contents(vec![
1980            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1981            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1982            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1983            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1984        ]);
1985        let tools = MockToolPort::with_tool_and_results(
1986            "t1",
1987            vec![
1988                Ok(ToolResult {
1989                    name: "t1".into(),
1990                    output: serde_json::json!(null),
1991                    is_error: false,
1992                }),
1993                Ok(ToolResult {
1994                    name: "t1".into(),
1995                    output: serde_json::json!(null),
1996                    is_error: false,
1997                }),
1998                Ok(ToolResult {
1999                    name: "t1".into(),
2000                    output: serde_json::json!(null),
2001                    is_error: false,
2002                }),
2003            ],
2004        );
2005        let store = MemoryStore::new();
2006        let sink = CollectingSink::new();
2007
2008        let policy = TurnPolicy {
2009            max_steps: 2,
2010            max_tool_calls: 10,
2011            max_consecutive_errors: 5,
2012            turn_timeout_ms: 30_000,
2013            tool_timeout_ms: 5_000,
2014        };
2015
2016        let result =
2017            run_turn(&llm, &tools, &store, &sink, make_request("do work"), &policy, "test-model")
2018                .await;
2019
2020        assert!(
2021            matches!(&result, Ok(AgentRunResult::Finished(_))),
2022            "expected Finished with GuardExceeded, got {result:?}"
2023        );
2024        let resp = match result {
2025            Ok(AgentRunResult::Finished(r)) => r,
2026            _ => return,
2027        };
2028
2029        assert_eq!(resp.finish_reason, FinishReason::GuardExceeded);
2030        assert!(resp.content.contains("MaxSteps"), "content = {}", resp.content);
2031    }
2032
2033    // ── TC-06: Cancellation mid-turn → Cancelled ────────────────
2034
2035    #[tokio::test]
2036    async fn tc06_cancellation() {
2037        let llm = SequentialLlm::from_contents(vec![
2038            r#"{"type": "final", "content": "should not reach"}"#,
2039        ]);
2040        let tools = MockToolPort::empty();
2041        let store = MemoryStore::new();
2042        let sink = CollectingSink::new();
2043
2044        let token = CancelToken::new();
2045        // Cancel before running.
2046        token.cancel();
2047
2048        let mut req = make_request("Hi");
2049        req.cancel_token = Some(token);
2050
2051        let result =
2052            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2053
2054        assert!(
2055            matches!(&result, Ok(AgentRunResult::Finished(_))),
2056            "expected Finished with Cancelled, got {result:?}"
2057        );
2058        let resp = match result {
2059            Ok(AgentRunResult::Finished(r)) => r,
2060            _ => return,
2061        };
2062
2063        assert_eq!(resp.finish_reason, FinishReason::Cancelled);
2064    }
2065
2066    // ── TC-07: Tool error → is_error result → LLM sees error → Final ───
2067
2068    #[tokio::test]
2069    async fn tc07_tool_error_then_final() {
2070        let llm = SequentialLlm::from_contents(vec![
2071            r#"{"type": "tool_call", "name": "flaky_tool", "arguments": {}}"#,
2072            r#"{"type": "final", "content": "Recovered from tool error."}"#,
2073        ]);
2074        let tools = MockToolPort::with_tool_and_results(
2075            "flaky_tool",
2076            vec![Err(ToolError::Execution("connection refused".into()))],
2077        );
2078        let store = MemoryStore::new();
2079        let sink = CollectingSink::new();
2080
2081        let result = run_turn(
2082            &llm,
2083            &tools,
2084            &store,
2085            &sink,
2086            make_request("call flaky"),
2087            &generous_policy(),
2088            "test-model",
2089        )
2090        .await;
2091
2092        assert!(
2093            matches!(&result, Ok(AgentRunResult::Finished(_))),
2094            "expected Finished, got {result:?}"
2095        );
2096        let resp = match result {
2097            Ok(AgentRunResult::Finished(r)) => r,
2098            _ => return,
2099        };
2100
2101        assert_eq!(resp.content, "Recovered from tool error.");
2102        assert_eq!(resp.tool_transcript.len(), 1);
2103        assert!(resp.tool_transcript[0].is_error);
2104    }
2105
2106    #[tokio::test]
2107    async fn tc08_save_failure_is_propagated() {
2108        let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "done"}"#]);
2109        let tools = MockToolPort::empty();
2110        let store = FailingSaveStore;
2111        let sink = CollectingSink::new();
2112
2113        let result = run_turn(
2114            &llm,
2115            &tools,
2116            &store,
2117            &sink,
2118            make_request("hello"),
2119            &generous_policy(),
2120            "test-model",
2121        )
2122        .await;
2123
2124        assert!(matches!(result, Err(AgentError::Store(_))), "expected Store error to be returned");
2125    }
2126
2127    #[tokio::test]
2128    async fn tc09_stream_turn_emits_text_and_finished() {
2129        let llm: Arc<dyn LlmPort> = Arc::new(StreamLlm::new(vec![
2130            Ok(LlmStreamChunk::TextDelta("{\"type\":\"final\",\"content\":\"he".into())),
2131            Ok(LlmStreamChunk::TextDelta("llo\"}".into())),
2132            Ok(LlmStreamChunk::Done {
2133                usage: TokenUsage { prompt_tokens: 3, completion_tokens: 4 },
2134            }),
2135        ]));
2136        let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
2137        let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
2138        let sink: Arc<dyn EventSink> = Arc::new(CollectingSink::new());
2139
2140        let stream_result = run_turn_stream(
2141            llm,
2142            tools,
2143            store,
2144            sink,
2145            make_request("hello"),
2146            generous_policy(),
2147            "test-model".to_string(),
2148        )
2149        .await;
2150        assert!(stream_result.is_ok(), "run_turn_stream should produce a stream");
2151        let mut stream = match stream_result {
2152            Ok(stream) => stream,
2153            Err(_) => return,
2154        };
2155
2156        let mut saw_text = false;
2157        let mut saw_finished = false;
2158        while let Some(event) = stream.next().await {
2159            match event {
2160                AgentStreamEvent::TextDelta { content } => {
2161                    saw_text = saw_text || !content.is_empty();
2162                }
2163                AgentStreamEvent::Finished { usage } => {
2164                    saw_finished = true;
2165                    assert_eq!(usage.prompt_tokens, 3);
2166                    assert_eq!(usage.completion_tokens, 4);
2167                }
2168                AgentStreamEvent::ToolCallStarted { .. } |
2169                AgentStreamEvent::ToolCallCompleted { .. } |
2170                AgentStreamEvent::Error { .. } => {}
2171            }
2172        }
2173
2174        assert!(saw_text, "expected at least one text delta");
2175        assert!(saw_finished, "expected a finished event");
2176    }
2177
2178    #[tokio::test]
2179    async fn tc10_skills_prompt_context_is_injected() {
2180        let llm = InspectingLlm { expected_substring: "Skill: rust-review".to_string() };
2181        let tools = MockToolPort::empty();
2182        let store = MemoryStore::new();
2183        let sink = CollectingSink::new();
2184
2185        let mut req = make_request("review this code");
2186        req.context.system_prompt = Some("Skill: rust-review\nUse strict checks.".to_string());
2187
2188        let result =
2189            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2190
2191        assert!(result.is_ok(), "run should succeed when skills prompt is injected");
2192    }
2193
2194    #[tokio::test]
2195    async fn tc11_selected_skills_context_emits_event() {
2196        let llm =
2197            SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "looks good"}"#]);
2198        let tools = MockToolPort::empty();
2199        let store = MemoryStore::new();
2200        let sink = CollectingSink::new();
2201
2202        let mut req = make_request("review code");
2203        req.context.selected_skills = vec!["rust-review".to_string(), "security-audit".to_string()];
2204
2205        let result =
2206            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2207        assert!(result.is_ok(), "run should succeed");
2208
2209        let events = sink.all_events();
2210        assert!(
2211            events.iter().any(|event| matches!(
2212                event,
2213                AgentEvent::SkillsSelected { skill_names, .. }
2214                    if skill_names == &vec!["rust-review".to_string(), "security-audit".to_string()]
2215            )),
2216            "skills.selected event should be emitted with context skill names"
2217        );
2218    }
2219
2220    #[tokio::test]
2221    async fn tc12_policy_deny_tool_blocks_execution() {
2222        let llm = SequentialLlm::from_contents(vec![
2223            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
2224            r#"{"type": "final", "content": "done"}"#,
2225        ]);
2226        let tools = NoCallToolPort {
2227            tools: vec![
2228                ToolDescriptor::new("search", "search tool")
2229                    .with_input_schema(serde_json::json!({"type":"object"})),
2230            ],
2231        };
2232        let store = MemoryStore::new();
2233        let sink = CollectingSink::new();
2234
2235        let mut req = make_request("search rust");
2236        req.context.tool_policy.deny_tools =
2237            vec!["search".to_string(), "local/shell_exec".to_string()];
2238
2239        let result =
2240            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2241        assert!(
2242            matches!(&result, Ok(AgentRunResult::Finished(_))),
2243            "expected finished response, got {result:?}"
2244        );
2245        let resp = match result {
2246            Ok(AgentRunResult::Finished(r)) => r,
2247            _ => return,
2248        };
2249
2250        assert_eq!(resp.finish_reason, FinishReason::Stop);
2251        assert_eq!(resp.tool_transcript.len(), 1);
2252        assert!(resp.tool_transcript[0].is_error);
2253        assert!(
2254            resp.tool_transcript[0].output.to_string().contains("denied"),
2255            "tool error should explain policy denial"
2256        );
2257    }
2258
2259    #[tokio::test]
2260    async fn tc13_approval_denied_blocks_execution() {
2261        let llm = SequentialLlm::from_contents(vec![
2262            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
2263            r#"{"type": "final", "content": "done"}"#,
2264        ]);
2265        let tools = NoCallToolPort {
2266            tools: vec![
2267                ToolDescriptor::new("search", "search tool")
2268                    .with_input_schema(serde_json::json!({"type":"object"})),
2269            ],
2270        };
2271        let store = MemoryStore::new();
2272        let sink = CollectingSink::new();
2273        let req = make_request("search rust");
2274        let tool_policy = AllowAllPolicyPort;
2275        let approval = AlwaysDenyApprovalPort;
2276
2277        let result = run_turn_with_controls(
2278            &llm,
2279            &tools,
2280            &store,
2281            &sink,
2282            req,
2283            &generous_policy(),
2284            "test-model",
2285            &tool_policy,
2286            &approval,
2287        )
2288        .await;
2289        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
2290        let resp = match result {
2291            Ok(AgentRunResult::Finished(r)) => r,
2292            _ => return,
2293        };
2294
2295        assert_eq!(resp.tool_transcript.len(), 1);
2296        assert!(resp.tool_transcript[0].is_error);
2297        assert!(
2298            resp.tool_transcript[0].output.to_string().contains("approval policy rejected"),
2299            "tool error should explain approval denial"
2300        );
2301    }
2302
2303    #[tokio::test]
2304    async fn tc14_custom_policy_port_blocks_execution() {
2305        let llm = SequentialLlm::from_contents(vec![
2306            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
2307            r#"{"type": "final", "content": "done"}"#,
2308        ]);
2309        let tools = NoCallToolPort {
2310            tools: vec![
2311                ToolDescriptor::new("search", "search tool")
2312                    .with_input_schema(serde_json::json!({"type":"object"})),
2313            ],
2314        };
2315        let store = MemoryStore::new();
2316        let sink = CollectingSink::new();
2317        let req = make_request("search rust");
2318        let tool_policy = DenySearchPolicyPort;
2319        let approval = AlwaysApprovePort;
2320
2321        let result = run_turn_with_controls(
2322            &llm,
2323            &tools,
2324            &store,
2325            &sink,
2326            req,
2327            &generous_policy(),
2328            "test-model",
2329            &tool_policy,
2330            &approval,
2331        )
2332        .await;
2333        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
2334        let resp = match result {
2335            Ok(AgentRunResult::Finished(r)) => r,
2336            _ => return,
2337        };
2338
2339        assert_eq!(resp.tool_transcript.len(), 1);
2340        assert!(resp.tool_transcript[0].is_error);
2341        assert!(
2342            resp.tool_transcript[0].output.to_string().contains("denied"),
2343            "tool error should explain policy denial"
2344        );
2345    }
2346
2347    #[tokio::test]
2348    async fn tc15_native_dispatch_mode_uses_llm_tool_calls() {
2349        struct NativeToolLlm {
2350            responses: Mutex<VecDeque<LlmResponse>>,
2351        }
2352
2353        #[async_trait::async_trait]
2354        impl LlmPort for NativeToolLlm {
2355            fn capabilities(&self) -> bob_core::types::LlmCapabilities {
2356                bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
2357            }
2358
2359            async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
2360                let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
2361                Ok(q.pop_front().unwrap_or(LlmResponse {
2362                    content: r#"{"type":"final","content":"fallback"}"#.to_string(),
2363                    usage: TokenUsage::default(),
2364                    finish_reason: FinishReason::Stop,
2365                    tool_calls: Vec::new(),
2366                }))
2367            }
2368
2369            async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
2370                Err(LlmError::Provider("not used".into()))
2371            }
2372        }
2373
2374        let llm = NativeToolLlm {
2375            responses: Mutex::new(VecDeque::from(vec![
2376                LlmResponse {
2377                    content: "ignored".to_string(),
2378                    usage: TokenUsage::default(),
2379                    finish_reason: FinishReason::Stop,
2380                    tool_calls: vec![
2381                        ToolCall::new("search", serde_json::json!({"q":"rust"}))
2382                            .with_call_id("call-search-1"),
2383                    ],
2384                },
2385                LlmResponse {
2386                    content: r#"{"type":"final","content":"done"}"#.to_string(),
2387                    usage: TokenUsage::default(),
2388                    finish_reason: FinishReason::Stop,
2389                    tool_calls: Vec::new(),
2390                },
2391            ])),
2392        };
2393        let tools = MockToolPort::with_tool_and_results(
2394            "search",
2395            vec![Ok(ToolResult {
2396                name: "search".to_string(),
2397                output: serde_json::json!({"hits": 2}),
2398                is_error: false,
2399            })],
2400        );
2401        let store = MemoryStore::new();
2402        let sink = CollectingSink::new();
2403        let checkpoint = CountingCheckpointPort::new();
2404        let artifacts = NoopArtifactStore;
2405        let cost = CountingCostMeter::new();
2406        let policy = AllowAllPolicyPort;
2407        let approval = AlwaysApprovePort;
2408
2409        let result = run_turn_with_extensions(
2410            &llm,
2411            &tools,
2412            &store,
2413            &sink,
2414            make_request("search rust"),
2415            &generous_policy(),
2416            "test-model",
2417            &policy,
2418            &approval,
2419            crate::DispatchMode::NativePreferred,
2420            &checkpoint,
2421            &artifacts,
2422            &cost,
2423            &crate::prompt::WindowContextCompactor::default(),
2424            &crate::NoOpToolJournalPort,
2425        )
2426        .await;
2427
2428        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2429        let resp = match result {
2430            Ok(AgentRunResult::Finished(r)) => r,
2431            _ => return,
2432        };
2433        assert_eq!(resp.tool_transcript.len(), 1);
2434        assert_eq!(resp.tool_transcript[0].name, "search");
2435
2436        let saved = store.load(&"test-session".to_string()).await;
2437        let saved = match saved {
2438            Ok(Some(state)) => state,
2439            other => panic!("expected saved state, got {other:?}"),
2440        };
2441        assert!(
2442            saved.messages.iter().any(|message| {
2443                message.role == Role::Assistant &&
2444                    message.tool_calls.len() == 1 &&
2445                    message.tool_calls[0].call_id.as_deref() == Some("call-search-1")
2446            }),
2447            "assistant tool call should be persisted structurally",
2448        );
2449        assert!(
2450            saved.messages.iter().any(|message| {
2451                message.role == Role::Tool &&
2452                    message.tool_call_id.as_deref() == Some("call-search-1") &&
2453                    message.tool_name.as_deref() == Some("search")
2454            }),
2455            "tool result should retain tool metadata",
2456        );
2457    }
2458
2459    #[tokio::test]
2460    async fn tc16_checkpoint_and_cost_ports_are_invoked() {
2461        let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "ok"}"#]);
2462        let tools = MockToolPort::empty();
2463        let store = MemoryStore::new();
2464        let sink = CollectingSink::new();
2465        let checkpoint = CountingCheckpointPort::new();
2466        let artifacts = NoopArtifactStore;
2467        let cost = CountingCostMeter::new();
2468        let policy = AllowAllPolicyPort;
2469        let approval = AlwaysApprovePort;
2470
2471        let result = run_turn_with_extensions(
2472            &llm,
2473            &tools,
2474            &store,
2475            &sink,
2476            make_request("hello"),
2477            &generous_policy(),
2478            "test-model",
2479            &policy,
2480            &approval,
2481            crate::DispatchMode::PromptGuided,
2482            &checkpoint,
2483            &artifacts,
2484            &cost,
2485            &crate::prompt::WindowContextCompactor::default(),
2486            &crate::NoOpToolJournalPort,
2487        )
2488        .await;
2489        assert!(result.is_ok(), "turn should succeed");
2490        let checkpoints = checkpoint.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
2491        let llm_calls = *cost.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
2492        assert!(checkpoints >= 1, "checkpoint port should be invoked at least once");
2493        assert!(llm_calls >= 1, "cost meter should record llm usage");
2494    }
2495
2496    #[tokio::test]
2497    async fn tc17_session_usage_accumulates_and_persists() {
2498        let llm_first = SequentialLlm::from_responses(vec![LlmResponse {
2499            content: r#"{"type":"final","content":"first"}"#.to_string(),
2500            usage: TokenUsage { prompt_tokens: 10, completion_tokens: 5 },
2501            finish_reason: FinishReason::Stop,
2502            tool_calls: Vec::new(),
2503        }]);
2504        let llm_second = SequentialLlm::from_responses(vec![LlmResponse {
2505            content: r#"{"type":"final","content":"second"}"#.to_string(),
2506            usage: TokenUsage { prompt_tokens: 3, completion_tokens: 2 },
2507            finish_reason: FinishReason::Stop,
2508            tool_calls: Vec::new(),
2509        }]);
2510        let tools = MockToolPort::empty();
2511        let store = MemoryStore::new();
2512        let sink = CollectingSink::new();
2513
2514        let first = run_turn(
2515            &llm_first,
2516            &tools,
2517            &store,
2518            &sink,
2519            make_request("hello"),
2520            &generous_policy(),
2521            "test-model",
2522        )
2523        .await;
2524        assert!(first.is_ok(), "first run should succeed");
2525
2526        let second = run_turn(
2527            &llm_second,
2528            &tools,
2529            &store,
2530            &sink,
2531            make_request("again"),
2532            &generous_policy(),
2533            "test-model",
2534        )
2535        .await;
2536        assert!(second.is_ok(), "second run should succeed");
2537
2538        let loaded = store.load(&"test-session".to_string()).await;
2539        assert!(loaded.is_ok(), "session should be persisted");
2540        let state = loaded.ok().flatten();
2541        assert!(state.is_some(), "session state should exist");
2542        let state = state.unwrap_or_default();
2543        assert_eq!(state.total_usage.prompt_tokens, 13);
2544        assert_eq!(state.total_usage.completion_tokens, 7);
2545    }
2546
2547    struct FallbackOnlyLlm {
2548        response: LlmResponse,
2549    }
2550
2551    #[async_trait::async_trait]
2552    impl LlmPort for FallbackOnlyLlm {
2553        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
2554            Ok(self.response.clone())
2555        }
2556
2557        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
2558            Err(LlmError::Provider("streaming not available".to_string()))
2559        }
2560    }
2561
2562    #[tokio::test]
2563    async fn tc18_stream_fallback_does_not_emit_error_event() {
2564        let llm: Arc<dyn LlmPort> = Arc::new(FallbackOnlyLlm {
2565            response: LlmResponse {
2566                content: r#"{"type":"final","content":"done"}"#.to_string(),
2567                usage: TokenUsage { prompt_tokens: 2, completion_tokens: 1 },
2568                finish_reason: FinishReason::Stop,
2569                tool_calls: Vec::new(),
2570            },
2571        });
2572        let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
2573        let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
2574        let sink = Arc::new(CollectingSink::new());
2575        let sink_dyn: Arc<dyn EventSink> = sink.clone();
2576
2577        let stream_result = run_turn_stream(
2578            llm,
2579            tools,
2580            store,
2581            sink_dyn,
2582            make_request("hello"),
2583            generous_policy(),
2584            "test-model".to_string(),
2585        )
2586        .await;
2587        assert!(stream_result.is_ok(), "stream run should succeed with fallback");
2588        let mut stream = match stream_result {
2589            Ok(stream) => stream,
2590            Err(_) => return,
2591        };
2592
2593        while let Some(_event) = stream.next().await {}
2594
2595        let events = sink.all_events();
2596        assert!(
2597            !events.iter().any(|event| matches!(event, AgentEvent::Error { .. })),
2598            "fallback should not emit AgentEvent::Error when complete() succeeds"
2599        );
2600    }
2601
2602    struct NativeStreamingBypassLlm {
2603        complete_calls: std::sync::atomic::AtomicUsize,
2604        complete_stream_calls: std::sync::atomic::AtomicUsize,
2605        response: LlmResponse,
2606    }
2607
2608    #[async_trait::async_trait]
2609    impl LlmPort for NativeStreamingBypassLlm {
2610        fn capabilities(&self) -> bob_core::types::LlmCapabilities {
2611            bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
2612        }
2613
2614        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
2615            self.complete_calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2616            Ok(self.response.clone())
2617        }
2618
2619        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
2620            self.complete_stream_calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2621            Err(LlmError::Provider(
2622                "complete_stream() should be bypassed for native tool calling".to_string(),
2623            ))
2624        }
2625    }
2626
2627    #[tokio::test]
2628    async fn tc19_stream_native_tool_calling_bypasses_complete_stream() {
2629        let llm_impl = Arc::new(NativeStreamingBypassLlm {
2630            complete_calls: std::sync::atomic::AtomicUsize::new(0),
2631            complete_stream_calls: std::sync::atomic::AtomicUsize::new(0),
2632            response: LlmResponse {
2633                content: r#"{"type":"final","content":"done"}"#.to_string(),
2634                usage: TokenUsage { prompt_tokens: 2, completion_tokens: 1 },
2635                finish_reason: FinishReason::Stop,
2636                tool_calls: Vec::new(),
2637            },
2638        });
2639        let llm: Arc<dyn LlmPort> = llm_impl.clone();
2640        let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
2641        let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
2642        let sink = Arc::new(CollectingSink::new());
2643        let sink_dyn: Arc<dyn EventSink> = sink.clone();
2644
2645        let stream_result = run_turn_stream(
2646            llm,
2647            tools,
2648            store,
2649            sink_dyn,
2650            make_request("hello"),
2651            generous_policy(),
2652            "test-model".to_string(),
2653        )
2654        .await;
2655        assert!(
2656            stream_result.is_ok(),
2657            "stream run should succeed through complete() for native tool calling"
2658        );
2659        let mut stream = match stream_result {
2660            Ok(stream) => stream,
2661            Err(_) => return,
2662        };
2663
2664        while let Some(_event) = stream.next().await {}
2665
2666        assert_eq!(
2667            llm_impl.complete_calls.load(std::sync::atomic::Ordering::SeqCst),
2668            1,
2669            "native tool-calling stream path should use complete()"
2670        );
2671        assert_eq!(
2672            llm_impl.complete_stream_calls.load(std::sync::atomic::Ordering::SeqCst),
2673            0,
2674            "native tool-calling stream path should bypass complete_stream()"
2675        );
2676        assert!(
2677            !sink.all_events().iter().any(|event| matches!(event, AgentEvent::Error { .. })),
2678            "native-tool fallback should stay on the success path"
2679        );
2680    }
2681
2682    #[tokio::test]
2683    async fn tc20_non_consecutive_duplicate_tool_calls_are_allowed() {
2684        let llm = SequentialLlm::from_contents(vec![
2685            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2686            r#"{"type":"tool_call","name":"tool_b","arguments":{"q":"docs"}}"#,
2687            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2688            r#"{"type":"final","content":"done"}"#,
2689        ]);
2690        let tools = MockToolPort::with_tool_and_results(
2691            "tool_a",
2692            vec![
2693                Ok(ToolResult {
2694                    name: "tool_a".to_string(),
2695                    output: serde_json::json!({"ok": 1}),
2696                    is_error: false,
2697                }),
2698                Ok(ToolResult {
2699                    name: "tool_b".to_string(),
2700                    output: serde_json::json!({"ok": 2}),
2701                    is_error: false,
2702                }),
2703                Ok(ToolResult {
2704                    name: "tool_a".to_string(),
2705                    output: serde_json::json!({"ok": 3}),
2706                    is_error: false,
2707                }),
2708            ],
2709        );
2710        let store = MemoryStore::new();
2711        let sink = CollectingSink::new();
2712
2713        let result = run_turn(
2714            &llm,
2715            &tools,
2716            &store,
2717            &sink,
2718            make_request("repeat searches"),
2719            &generous_policy(),
2720            "test-model",
2721        )
2722        .await;
2723        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2724        let resp = match result {
2725            Ok(AgentRunResult::Finished(resp)) => resp,
2726            _ => return,
2727        };
2728
2729        assert_eq!(resp.tool_transcript.len(), 3);
2730        assert!(resp.tool_transcript.iter().all(|entry| !entry.is_error));
2731    }
2732
2733    #[tokio::test]
2734    async fn tc21_excessive_consecutive_duplicate_tool_calls_are_blocked() {
2735        let llm = SequentialLlm::from_contents(vec![
2736            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2737            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2738            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2739            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2740            r#"{"type":"final","content":"done"}"#,
2741        ]);
2742        let tools = MockToolPort::with_tool_and_results(
2743            "tool_a",
2744            vec![
2745                Ok(ToolResult {
2746                    name: "tool_a".to_string(),
2747                    output: serde_json::json!({"ok": 1}),
2748                    is_error: false,
2749                }),
2750                Ok(ToolResult {
2751                    name: "tool_a".to_string(),
2752                    output: serde_json::json!({"ok": 2}),
2753                    is_error: false,
2754                }),
2755                Ok(ToolResult {
2756                    name: "tool_a".to_string(),
2757                    output: serde_json::json!({"ok": 3}),
2758                    is_error: false,
2759                }),
2760            ],
2761        );
2762        let store = MemoryStore::new();
2763        let sink = CollectingSink::new();
2764
2765        let result = run_turn(
2766            &llm,
2767            &tools,
2768            &store,
2769            &sink,
2770            make_request("poll repeatedly"),
2771            &generous_policy(),
2772            "test-model",
2773        )
2774        .await;
2775        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2776        let resp = match result {
2777            Ok(AgentRunResult::Finished(resp)) => resp,
2778            _ => return,
2779        };
2780
2781        assert_eq!(resp.tool_transcript.len(), 4);
2782        assert!(!resp.tool_transcript[2].is_error);
2783        assert!(resp.tool_transcript[3].is_error);
2784        assert!(
2785            resp.tool_transcript[3]
2786                .output
2787                .to_string()
2788                .contains("consecutive duplicate tool call limit reached"),
2789            "expected duplicate-call protection error in transcript"
2790        );
2791    }
2792
2793    #[tokio::test]
2794    async fn tc22_duplicate_block_path_records_artifact_and_cost() {
2795        let llm = SequentialLlm::from_contents(vec![
2796            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2797            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2798            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2799            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2800            r#"{"type":"final","content":"done"}"#,
2801        ]);
2802        let tools = MockToolPort::with_tool_and_results(
2803            "tool_a",
2804            vec![
2805                Ok(ToolResult {
2806                    name: "tool_a".to_string(),
2807                    output: serde_json::json!({"ok": 1}),
2808                    is_error: false,
2809                }),
2810                Ok(ToolResult {
2811                    name: "tool_a".to_string(),
2812                    output: serde_json::json!({"ok": 2}),
2813                    is_error: false,
2814                }),
2815                Ok(ToolResult {
2816                    name: "tool_a".to_string(),
2817                    output: serde_json::json!({"ok": 3}),
2818                    is_error: false,
2819                }),
2820            ],
2821        );
2822        let store = MemoryStore::new();
2823        let sink = CollectingSink::new();
2824        let checkpoint = CountingCheckpointPort::new();
2825        let artifacts = CountingArtifactStore::new();
2826        let cost = CountingCostMeter::new();
2827        let policy = AllowAllPolicyPort;
2828        let approval = AlwaysApprovePort;
2829
2830        let result = run_turn_with_extensions(
2831            &llm,
2832            &tools,
2833            &store,
2834            &sink,
2835            make_request("poll repeatedly"),
2836            &generous_policy(),
2837            "test-model",
2838            &policy,
2839            &approval,
2840            crate::DispatchMode::PromptGuided,
2841            &checkpoint,
2842            &artifacts,
2843            &cost,
2844            &crate::prompt::WindowContextCompactor::default(),
2845            &crate::NoOpToolJournalPort,
2846        )
2847        .await;
2848        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2849
2850        let tool_results = *cost.tool_results.lock().unwrap_or_else(|p| p.into_inner());
2851        assert_eq!(tool_results, 4, "cost meter should record all tool outcomes");
2852        let saved_artifacts = artifacts.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
2853        assert_eq!(saved_artifacts, 4, "artifact store should record all tool outcomes");
2854    }
2855}