Skip to main content

bob_runtime/
scheduler.rs

1//! # Scheduler
2//!
3//! Scheduler: loop guard and 6-state turn FSM.
4//!
5//! ## Overview
6//!
7//! The scheduler is the core orchestration component that manages agent turn execution.
8//! It implements a finite state machine (FSM) with the following states:
9//!
10//! 1. **Start**: Initialize turn, load session
11//! 2. **LLM Call**: Send request to LLM
12//! 3. **Parse Action**: Parse LLM response into structured action
13//! 4. **Execute Tool**: Run tool if action is a tool call
14//! 5. **Update State**: Update session state with results
15//! 6. **Loop/Finish**: Either continue or complete the turn
16//!
17//! ## Loop Guard
18//!
19//! The [`LoopGuard`] ensures turn termination by tracking:
20//! - Number of steps (LLM calls)
21//! - Number of tool calls
22//! - Consecutive errors
23//! - Elapsed time
24//!
25//! If any limit is exceeded, the turn is terminated with an appropriate [`GuardReason`].
26//!
27//! ## Example
28//!
29//! ```rust,ignore
30//! use bob_runtime::scheduler::LoopGuard;
31//! use bob_core::types::TurnPolicy;
32//!
33//! let policy = TurnPolicy::default();
34//! let mut guard = LoopGuard::new(policy);
35//!
36//! while guard.can_continue() {
37//!     guard.record_step();
38//!     // Execute step...
39//! }
40//! ```
41
42use std::sync::Arc;
43
44use bob_core::{
45    error::AgentError,
46    journal::{JournalEntry, ToolJournalPort},
47    normalize_tool_list,
48    ports::{
49        ApprovalPort, ArtifactStorePort, ContextCompactorPort, CostMeterPort, EventSink, LlmPort,
50        SessionStore, ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
51    },
52    types::{
53        AgentAction, AgentEvent, AgentEventStream, AgentRequest, AgentResponse, AgentRunResult,
54        AgentStreamEvent, ApprovalContext, ApprovalDecision, ArtifactRecord, FinishReason,
55        GuardReason, Message, Role, TokenUsage, ToolCall, ToolResult, TurnCheckpoint, TurnPolicy,
56    },
57};
58use futures_util::StreamExt;
59use tokio::time::Instant;
60
61/// Default capacity for the bounded streaming channel.
62const STREAM_CHANNEL_CAPACITY: usize = 256;
63
64/// Safety net that guarantees turn termination by tracking steps,
65/// tool calls, consecutive errors, and elapsed time against [`TurnPolicy`] limits.
66#[derive(Debug)]
67pub struct LoopGuard {
68    policy: TurnPolicy,
69    steps: u32,
70    tool_calls: u32,
71    consecutive_errors: u32,
72    start: Instant,
73}
74
75impl LoopGuard {
76    /// Create a new guard tied to the given policy.
77    #[must_use]
78    pub fn new(policy: TurnPolicy) -> Self {
79        Self { policy, steps: 0, tool_calls: 0, consecutive_errors: 0, start: Instant::now() }
80    }
81
82    /// Returns `true` if the turn may continue executing.
83    #[must_use]
84    pub fn can_continue(&self) -> bool {
85        self.steps < self.policy.max_steps &&
86            self.tool_calls < self.policy.max_tool_calls &&
87            self.consecutive_errors < self.policy.max_consecutive_errors &&
88            !self.timed_out()
89    }
90
91    /// Record one scheduler step.
92    pub fn record_step(&mut self) {
93        self.steps += 1;
94    }
95
96    /// Record one tool call.
97    pub fn record_tool_call(&mut self) {
98        self.tool_calls += 1;
99    }
100
101    /// Record a consecutive error.
102    pub fn record_error(&mut self) {
103        self.consecutive_errors += 1;
104    }
105
106    /// Reset the consecutive-error counter (e.g. after a successful call).
107    pub fn reset_errors(&mut self) {
108        self.consecutive_errors = 0;
109    }
110
111    /// The reason the guard stopped the turn.
112    ///
113    /// Only meaningful when [`can_continue`](Self::can_continue) returns `false`.
114    #[must_use]
115    pub fn reason(&self) -> GuardReason {
116        if self.steps >= self.policy.max_steps {
117            GuardReason::MaxSteps
118        } else if self.tool_calls >= self.policy.max_tool_calls {
119            GuardReason::MaxToolCalls
120        } else if self.consecutive_errors >= self.policy.max_consecutive_errors {
121            GuardReason::MaxConsecutiveErrors
122        } else if self.timed_out() {
123            GuardReason::TurnTimeout
124        } else {
125            // Fallback — shouldn't be called when `can_continue()` is true.
126            GuardReason::Cancelled
127        }
128    }
129
130    /// Returns `true` if the turn has exceeded its time budget.
131    #[must_use]
132    pub fn timed_out(&self) -> bool {
133        self.start.elapsed().as_millis() >= u128::from(self.policy.turn_timeout_ms)
134    }
135}
136
137// ── Default system instructions (v1) ─────────────────────────────────
138
139const DEFAULT_SYSTEM_INSTRUCTIONS: &str = "\
140You are a helpful AI assistant. \
141Think step by step before answering. \
142When you need external information, use the available tools.";
143const MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS: u32 = 3;
144
145fn resolve_system_instructions(req: &AgentRequest) -> String {
146    if let Some(skills_prompt) = req.context.system_prompt.as_deref() {
147        if skills_prompt.trim().is_empty() {
148            DEFAULT_SYSTEM_INSTRUCTIONS.to_string()
149        } else {
150            format!("{DEFAULT_SYSTEM_INSTRUCTIONS}\n\n{skills_prompt}")
151        }
152    } else {
153        DEFAULT_SYSTEM_INSTRUCTIONS.to_string()
154    }
155}
156
157fn resolve_system_instructions_with_memory(
158    req: &AgentRequest,
159    session: &bob_core::types::SessionState,
160) -> String {
161    let base_instructions = resolve_system_instructions(req);
162    crate::memory_context::inject_memory_prompt(
163        &base_instructions,
164        session.memory_summary.as_deref(),
165    )
166}
167
168fn resolve_selected_skills(req: &AgentRequest) -> Vec<String> {
169    req.context.selected_skills.clone()
170}
171
172#[derive(Debug, Clone, Default)]
173struct ToolCallPolicy {
174    deny_tools: Vec<String>,
175    allow_tools: Option<Vec<String>>,
176}
177
178fn resolve_tool_call_policy(req: &AgentRequest) -> ToolCallPolicy {
179    let deny_tools =
180        normalize_tool_list(req.context.tool_policy.deny_tools.iter().map(String::as_str));
181    let allow_tools = req
182        .context
183        .tool_policy
184        .allow_tools
185        .as_ref()
186        .map(|tools| normalize_tool_list(tools.iter().map(String::as_str)));
187    ToolCallPolicy { deny_tools, allow_tools }
188}
189
190fn prompt_options_for_mode(
191    dispatch_mode: crate::DispatchMode,
192    llm: &dyn LlmPort,
193    output_schema: Option<serde_json::Value>,
194) -> crate::prompt::PromptBuildOptions {
195    let mut opts = match dispatch_mode {
196        crate::DispatchMode::PromptGuided => crate::prompt::PromptBuildOptions::default(),
197        crate::DispatchMode::NativePreferred => {
198            if llm.capabilities().native_tool_calling {
199                crate::prompt::PromptBuildOptions {
200                    include_action_schema: false,
201                    include_tool_schema: false,
202                    ..Default::default()
203                }
204            } else {
205                crate::prompt::PromptBuildOptions::default()
206            }
207        }
208    };
209    opts.structured_output = output_schema;
210    opts
211}
212
213fn parse_action_for_mode(
214    dispatch_mode: crate::DispatchMode,
215    llm: &dyn LlmPort,
216    response: &bob_core::types::LlmResponse,
217) -> Result<AgentAction, crate::action::ActionParseError> {
218    match dispatch_mode {
219        crate::DispatchMode::PromptGuided => crate::action::parse_action(&response.content),
220        crate::DispatchMode::NativePreferred => {
221            if llm.capabilities().native_tool_calling &&
222                let Some(tool_call) = response.tool_calls.first()
223            {
224                return Ok(AgentAction::ToolCall {
225                    name: tool_call.name.clone(),
226                    arguments: tool_call.arguments.clone(),
227                });
228            }
229            crate::action::parse_action(&response.content)
230        }
231    }
232}
233
234#[expect(
235    clippy::too_many_arguments,
236    reason = "tool execution needs explicit policy, approval, and timeout dependencies"
237)]
238async fn execute_tool_call(
239    tools: &dyn ToolPort,
240    guard: &mut LoopGuard,
241    tool_call: ToolCall,
242    policy: &ToolCallPolicy,
243    tool_policy_port: &dyn ToolPolicyPort,
244    approval_port: &dyn ApprovalPort,
245    approval_context: &ApprovalContext,
246    timeout_ms: u64,
247) -> ToolResult {
248    if !tool_policy_port.is_tool_allowed(
249        &tool_call.name,
250        &policy.deny_tools,
251        policy.allow_tools.as_deref(),
252    ) {
253        guard.record_error();
254        return ToolResult {
255            name: tool_call.name.clone(),
256            output: serde_json::json!({
257                "error": format!("tool '{}' denied by policy", tool_call.name)
258            }),
259            is_error: true,
260        };
261    }
262
263    match approval_port.approve_tool_call(&tool_call, approval_context).await {
264        Ok(ApprovalDecision::Approved) => {}
265        Ok(ApprovalDecision::Denied { reason }) => {
266            guard.record_error();
267            return ToolResult {
268                name: tool_call.name.clone(),
269                output: serde_json::json!({"error": reason}),
270                is_error: true,
271            };
272        }
273        Err(err) => {
274            guard.record_error();
275            return ToolResult {
276                name: tool_call.name.clone(),
277                output: serde_json::json!({"error": err.to_string()}),
278                is_error: true,
279            };
280        }
281    }
282
283    match tokio::time::timeout(
284        std::time::Duration::from_millis(timeout_ms),
285        tools.call_tool(tool_call.clone()),
286    )
287    .await
288    {
289        Ok(Ok(result)) => {
290            guard.reset_errors();
291            result
292        }
293        Ok(Err(err)) => {
294            guard.record_error();
295            ToolResult {
296                name: tool_call.name,
297                output: serde_json::json!({"error": err.to_string()}),
298                is_error: true,
299            }
300        }
301        Err(_) => {
302            guard.record_error();
303            ToolResult {
304                name: tool_call.name,
305                output: serde_json::json!({"error": "tool call timed out"}),
306                is_error: true,
307            }
308        }
309    }
310}
311
312// ── Turn Loop FSM ────────────────────────────────────────────────────
313
314/// Execute a single agent turn as a 6-state FSM.
315///
316/// States: Start → BuildPrompt → LlmInfer → ParseAction → CallTool → Done.
317/// The loop guard guarantees termination under all conditions.
318pub async fn run_turn(
319    llm: &dyn LlmPort,
320    tools: &dyn ToolPort,
321    store: &dyn SessionStore,
322    events: &dyn EventSink,
323    req: AgentRequest,
324    policy: &TurnPolicy,
325    default_model: &str,
326) -> Result<AgentRunResult, AgentError> {
327    let tool_policy = crate::DefaultToolPolicyPort;
328    let approval = crate::AllowAllApprovalPort;
329    let checkpoint_store = crate::NoOpCheckpointStorePort;
330    let artifact_store = crate::NoOpArtifactStorePort;
331    let cost_meter = crate::NoOpCostMeterPort;
332    let compactor = crate::prompt::WindowContextCompactor::default();
333    let journal = crate::NoOpToolJournalPort;
334    run_turn_with_extensions(
335        llm,
336        tools,
337        store,
338        events,
339        req,
340        policy,
341        default_model,
342        &tool_policy,
343        &approval,
344        crate::DispatchMode::NativePreferred,
345        &checkpoint_store,
346        &artifact_store,
347        &cost_meter,
348        &compactor,
349        &journal,
350    )
351    .await
352}
353
354/// Execute a single turn with explicit policy/approval controls.
355#[cfg_attr(
356    not(test),
357    expect(
358        dead_code,
359        reason = "reserved wrapper for partial control injection in external integrations"
360    )
361)]
362#[expect(
363    clippy::too_many_arguments,
364    reason = "wrapper exposes explicit dependency ports for compatibility and testability"
365)]
366pub(crate) async fn run_turn_with_controls(
367    llm: &dyn LlmPort,
368    tools: &dyn ToolPort,
369    store: &dyn SessionStore,
370    events: &dyn EventSink,
371    req: AgentRequest,
372    policy: &TurnPolicy,
373    default_model: &str,
374    tool_policy_port: &dyn ToolPolicyPort,
375    approval_port: &dyn ApprovalPort,
376) -> Result<AgentRunResult, AgentError> {
377    let checkpoint_store = crate::NoOpCheckpointStorePort;
378    let artifact_store = crate::NoOpArtifactStorePort;
379    let cost_meter = crate::NoOpCostMeterPort;
380    let compactor = crate::prompt::WindowContextCompactor::default();
381    let journal = crate::NoOpToolJournalPort;
382    run_turn_with_extensions(
383        llm,
384        tools,
385        store,
386        events,
387        req,
388        policy,
389        default_model,
390        tool_policy_port,
391        approval_port,
392        crate::DispatchMode::PromptGuided,
393        &checkpoint_store,
394        &artifact_store,
395        &cost_meter,
396        &compactor,
397        &journal,
398    )
399    .await
400}
401
402/// Execute a single turn with all extensibility controls injected.
403#[expect(
404    clippy::too_many_arguments,
405    reason = "core entrypoint exposes all ports explicitly for adapter injection"
406)]
407pub(crate) async fn run_turn_with_extensions(
408    llm: &dyn LlmPort,
409    tools: &dyn ToolPort,
410    store: &dyn SessionStore,
411    events: &dyn EventSink,
412    req: AgentRequest,
413    policy: &TurnPolicy,
414    default_model: &str,
415    tool_policy_port: &dyn ToolPolicyPort,
416    approval_port: &dyn ApprovalPort,
417    dispatch_mode: crate::DispatchMode,
418    checkpoint_store: &dyn TurnCheckpointStorePort,
419    artifact_store: &dyn ArtifactStorePort,
420    cost_meter: &dyn CostMeterPort,
421    context_compactor: &dyn ContextCompactorPort,
422    journal: &dyn ToolJournalPort,
423) -> Result<AgentRunResult, AgentError> {
424    let model = req.model.as_deref().unwrap_or(default_model);
425    let cancel_token = req.cancel_token.clone();
426    let selected_skills = resolve_selected_skills(&req);
427    let tool_call_policy = resolve_tool_call_policy(&req);
428
429    let mut session = store.load(&req.session_id).await?.unwrap_or_default();
430    let system_instructions = resolve_system_instructions_with_memory(&req, &session);
431    let tool_descriptors = tools.list_tools().await?;
432    let mut guard = LoopGuard::new(policy.clone());
433
434    // Progressive tool view: compact summaries for inactive tools, full schemas for activated.
435    let mut tool_view = crate::progressive_tools::ProgressiveToolView::new(tool_descriptors);
436
437    events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
438    if !selected_skills.is_empty() {
439        events.emit(AgentEvent::SkillsSelected {
440            session_id: req.session_id.clone(),
441            skill_names: selected_skills.clone(),
442        });
443    }
444
445    session.messages.push(Message::text(Role::User, req.input.clone()));
446
447    let mut tool_transcript: Vec<ToolResult> = Vec::new();
448    let mut total_usage = TokenUsage::default();
449    let mut consecutive_parse_failures: u32 = 0;
450    let mut consecutive_validation_failures: u32 = 0;
451    let max_output_retries = req.max_output_retries;
452    // Loop protection: cap repeated identical calls, while allowing non-consecutive repeats.
453    let mut last_tool_call_signature: Option<String> = None;
454    let mut same_tool_call_streak: u32 = 0;
455
456    loop {
457        let current_step = guard.steps.saturating_add(1);
458
459        if let Some(ref token) = cancel_token &&
460            token.is_cancelled()
461        {
462            return finish_turn(
463                store,
464                events,
465                &req.session_id,
466                &session,
467                FinishResult {
468                    content: "Turn cancelled.",
469                    tool_transcript,
470                    usage: total_usage,
471                    finish_reason: FinishReason::Cancelled,
472                },
473            )
474            .await;
475        }
476
477        cost_meter.check_budget(&req.session_id).await?;
478
479        if !guard.can_continue() {
480            let reason = guard.reason();
481            let msg = format!("Turn stopped: {reason:?}");
482            return finish_turn(
483                store,
484                events,
485                &req.session_id,
486                &session,
487                FinishResult {
488                    content: &msg,
489                    tool_transcript,
490                    usage: total_usage,
491                    finish_reason: FinishReason::GuardExceeded,
492                },
493            )
494            .await;
495        }
496
497        // Build system instructions with progressive tool summary.
498        let mut augmented_instructions = system_instructions.clone();
499        let tool_summary = tool_view.summary_prompt();
500        if !tool_summary.is_empty() {
501            augmented_instructions.push('\n');
502            augmented_instructions.push('\n');
503            augmented_instructions.push_str(&tool_summary);
504        }
505
506        let active_tools = tool_view.activated_tools();
507        let llm_request = crate::prompt::build_llm_request_with_options(
508            model,
509            &session,
510            &active_tools,
511            &augmented_instructions,
512            prompt_options_for_mode(dispatch_mode, llm, req.output_schema.clone()),
513            context_compactor,
514        )
515        .await;
516
517        events.emit(AgentEvent::LlmCallStarted {
518            session_id: req.session_id.clone(),
519            step: current_step,
520            model: model.to_string(),
521        });
522
523        let llm_response = if let Some(ref token) = cancel_token {
524            tokio::select! {
525                result = llm.complete(llm_request.clone()) => result?,
526                () = token.cancelled() => {
527                    return finish_turn(
528                        store, events, &req.session_id, &session,
529                        FinishResult { content: "Turn cancelled.", tool_transcript, usage: total_usage, finish_reason: FinishReason::Cancelled },
530                    ).await;
531                }
532            }
533        } else {
534            llm.complete(llm_request).await?
535        };
536
537        guard.record_step();
538        total_usage.prompt_tokens += llm_response.usage.prompt_tokens;
539        total_usage.completion_tokens += llm_response.usage.completion_tokens;
540        session.total_usage.prompt_tokens =
541            session.total_usage.prompt_tokens.saturating_add(llm_response.usage.prompt_tokens);
542        session.total_usage.completion_tokens = session
543            .total_usage
544            .completion_tokens
545            .saturating_add(llm_response.usage.completion_tokens);
546        cost_meter.record_llm_usage(&req.session_id, model, &llm_response.usage).await?;
547
548        events.emit(AgentEvent::LlmCallCompleted {
549            session_id: req.session_id.clone(),
550            step: current_step,
551            model: model.to_string(),
552            usage: llm_response.usage.clone(),
553        });
554        let native_tool_call = if llm.capabilities().native_tool_calling {
555            llm_response.tool_calls.first().cloned()
556        } else {
557            None
558        };
559
560        // Scan LLM response for tool name hints and activate them.
561        tool_view.activate_hints(&llm_response.content);
562
563        let assistant_message = if llm_response.tool_calls.is_empty() {
564            Message::text(Role::Assistant, llm_response.content.clone())
565        } else {
566            Message::assistant_tool_calls(
567                llm_response.content.clone(),
568                llm_response.tool_calls.clone(),
569            )
570        };
571        session.messages.push(assistant_message);
572
573        let _ = checkpoint_store
574            .save_checkpoint(&TurnCheckpoint {
575                session_id: req.session_id.clone(),
576                step: guard.steps,
577                tool_calls: guard.tool_calls,
578                usage: total_usage.clone(),
579            })
580            .await;
581
582        match parse_action_for_mode(dispatch_mode, llm, &llm_response) {
583            Ok(action) => {
584                consecutive_parse_failures = 0;
585                match action {
586                    AgentAction::Final { content } => {
587                        // Structured output validation if schema is set.
588                        if let Some(ref schema) = req.output_schema {
589                            match crate::output_validation::validate_output_str(&content, schema) {
590                                Ok(_) => {}
591                                Err(validation_err) => {
592                                    consecutive_validation_failures += 1;
593                                    if consecutive_validation_failures > max_output_retries {
594                                        tracing::warn!(
595                                            session_id = %req.session_id,
596                                            "output schema validation failed after {} retries",
597                                            max_output_retries,
598                                        );
599                                        // Accept the output despite validation failure.
600                                    } else {
601                                        let prompt =
602                                            crate::output_validation::validation_error_prompt(
603                                                &content,
604                                                &validation_err,
605                                            );
606                                        session.messages.push(Message::text(Role::User, prompt));
607                                        continue;
608                                    }
609                                }
610                            }
611                        }
612                        return finish_turn(
613                            store,
614                            events,
615                            &req.session_id,
616                            &session,
617                            FinishResult {
618                                content: &content,
619                                tool_transcript,
620                                usage: total_usage,
621                                finish_reason: FinishReason::Stop,
622                            },
623                        )
624                        .await;
625                    }
626                    AgentAction::AskUser { question } => {
627                        return finish_turn(
628                            store,
629                            events,
630                            &req.session_id,
631                            &session,
632                            FinishResult {
633                                content: &question,
634                                tool_transcript,
635                                usage: total_usage,
636                                finish_reason: FinishReason::Stop,
637                            },
638                        )
639                        .await;
640                    }
641                    AgentAction::ToolCall { name, arguments } => {
642                        let tool_call_id = native_tool_call
643                            .as_ref()
644                            .filter(|call| call.name == name && call.arguments == arguments)
645                            .and_then(|call| call.call_id.clone());
646                        // Activate the tool in progressive view for subsequent requests.
647                        tool_view.activate(&name);
648
649                        let call_signature = format!(
650                            "{}:{}",
651                            name,
652                            serde_json::to_string(&arguments).unwrap_or_default()
653                        );
654                        if last_tool_call_signature.as_ref() == Some(&call_signature) {
655                            same_tool_call_streak = same_tool_call_streak.saturating_add(1);
656                        } else {
657                            same_tool_call_streak = 1;
658                            last_tool_call_signature = Some(call_signature);
659                        }
660
661                        if same_tool_call_streak > MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS {
662                            events.emit(AgentEvent::ToolCallStarted {
663                                session_id: req.session_id.clone(),
664                                step: current_step,
665                                name: name.clone(),
666                            });
667                            let dup_result = ToolResult {
668                                name: name.clone(),
669                                output: serde_json::json!({
670                                    "error": format!(
671                                        "consecutive duplicate tool call limit reached (>{MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS}); skipping to prevent loop"
672                                    )
673                                }),
674                                is_error: true,
675                            };
676                            guard.record_tool_call();
677                            let _ =
678                                cost_meter.record_tool_result(&req.session_id, &dup_result).await;
679                            let output_str =
680                                serde_json::to_string(&dup_result.output).unwrap_or_default();
681                            session.messages.push(Message::tool_result(
682                                name.clone(),
683                                tool_call_id.clone(),
684                                output_str,
685                            ));
686                            events.emit(AgentEvent::ToolCallCompleted {
687                                session_id: req.session_id.clone(),
688                                step: current_step,
689                                name: name.clone(),
690                                is_error: true,
691                            });
692                            let _ = artifact_store
693                                .put(ArtifactRecord {
694                                    session_id: req.session_id.clone(),
695                                    kind: "tool_result".to_string(),
696                                    name: name.clone(),
697                                    content: dup_result.output.clone(),
698                                })
699                                .await;
700                            tool_transcript.push(dup_result);
701                            continue;
702                        }
703
704                        events.emit(AgentEvent::ToolCallStarted {
705                            session_id: req.session_id.clone(),
706                            step: current_step,
707                            name: name.clone(),
708                        });
709                        let approval_context = ApprovalContext {
710                            session_id: req.session_id.clone(),
711                            turn_step: guard.steps.max(1),
712                            selected_skills: selected_skills.clone(),
713                        };
714
715                        // Journal lookup for idempotent replay.
716                        let call_fingerprint = JournalEntry::fingerprint(&name, &arguments);
717                        let tool_result = if let Ok(Some(cached)) =
718                            journal.lookup(&req.session_id, &call_fingerprint).await
719                        {
720                            tracing::debug!(
721                                session_id = %req.session_id,
722                                tool = %name,
723                                "replaying tool result from journal"
724                            );
725                            ToolResult {
726                                name: cached.tool_name,
727                                output: cached.result,
728                                is_error: cached.is_error,
729                            }
730                        } else {
731                            let result = execute_tool_call(
732                                tools,
733                                &mut guard,
734                                ToolCall::new(name.clone(), arguments.clone()),
735                                &tool_call_policy,
736                                tool_policy_port,
737                                approval_port,
738                                &approval_context,
739                                policy.tool_timeout_ms,
740                            )
741                            .await;
742                            // Record in journal for future replay.
743                            let _ = journal
744                                .append(JournalEntry {
745                                    session_id: req.session_id.clone(),
746                                    call_fingerprint: call_fingerprint.clone(),
747                                    tool_name: name.clone(),
748                                    arguments: arguments.clone(),
749                                    result: result.output.clone(),
750                                    is_error: result.is_error,
751                                    timestamp_ms: bob_core::tape::now_ms(),
752                                })
753                                .await;
754                            result
755                        };
756
757                        guard.record_tool_call();
758                        let _ = cost_meter.record_tool_result(&req.session_id, &tool_result).await;
759
760                        let is_error = tool_result.is_error;
761                        events.emit(AgentEvent::ToolCallCompleted {
762                            session_id: req.session_id.clone(),
763                            step: current_step,
764                            name: name.clone(),
765                            is_error,
766                        });
767
768                        let output_str =
769                            serde_json::to_string(&tool_result.output).unwrap_or_default();
770                        session.messages.push(Message::tool_result(
771                            name.clone(),
772                            tool_call_id,
773                            output_str,
774                        ));
775
776                        let _ = artifact_store
777                            .put(ArtifactRecord {
778                                session_id: req.session_id.clone(),
779                                kind: "tool_result".to_string(),
780                                name: name.clone(),
781                                content: tool_result.output.clone(),
782                            })
783                            .await;
784
785                        tool_transcript.push(tool_result);
786                    }
787                }
788            }
789            Err(_parse_err) => {
790                consecutive_parse_failures += 1;
791                last_tool_call_signature = None;
792                same_tool_call_streak = 0;
793                if consecutive_parse_failures >= 2 {
794                    let _ = store.save(&req.session_id, &session).await;
795                    return Err(AgentError::Internal(
796                        "LLM produced invalid JSON after re-prompt".into(),
797                    ));
798                }
799                session.messages.push(Message::text(
800                    Role::User,
801                    "Your response was not valid JSON. \
802                     Please respond with exactly one JSON object \
803                     matching the required schema.",
804                ));
805            }
806        }
807    }
808}
809
810/// Bundled data for building the final response (reduces argument count).
811struct FinishResult<'a> {
812    content: &'a str,
813    tool_transcript: Vec<ToolResult>,
814    usage: TokenUsage,
815    finish_reason: FinishReason,
816}
817
818/// Helper: save session, emit `TurnCompleted`, and build the final response.
819async fn finish_turn(
820    store: &dyn SessionStore,
821    events: &dyn EventSink,
822    session_id: &bob_core::types::SessionId,
823    session: &bob_core::types::SessionState,
824    result: FinishResult<'_>,
825) -> Result<AgentRunResult, AgentError> {
826    store.save(session_id, session).await?;
827    events.emit(AgentEvent::TurnCompleted {
828        session_id: session_id.clone(),
829        finish_reason: result.finish_reason,
830        usage: result.usage.clone(),
831    });
832    Ok(AgentRunResult::Finished(AgentResponse {
833        content: result.content.to_string(),
834        tool_transcript: result.tool_transcript,
835        usage: result.usage,
836        finish_reason: result.finish_reason,
837    }))
838}
839
840/// Execute a single turn in streaming mode and return an event stream.
841pub async fn run_turn_stream(
842    llm: Arc<dyn LlmPort>,
843    tools: Arc<dyn ToolPort>,
844    store: Arc<dyn SessionStore>,
845    events: Arc<dyn EventSink>,
846    req: AgentRequest,
847    policy: TurnPolicy,
848    default_model: String,
849) -> Result<AgentEventStream, AgentError> {
850    let tool_policy: Arc<dyn ToolPolicyPort> = Arc::new(crate::DefaultToolPolicyPort);
851    let approval: Arc<dyn ApprovalPort> = Arc::new(crate::AllowAllApprovalPort);
852    let checkpoint_store: Arc<dyn TurnCheckpointStorePort> =
853        Arc::new(crate::NoOpCheckpointStorePort);
854    let artifact_store: Arc<dyn ArtifactStorePort> = Arc::new(crate::NoOpArtifactStorePort);
855    let cost_meter: Arc<dyn CostMeterPort> = Arc::new(crate::NoOpCostMeterPort);
856    let context_compactor: Arc<dyn ContextCompactorPort> =
857        Arc::new(crate::prompt::WindowContextCompactor::default());
858    let journal: Arc<dyn ToolJournalPort> = Arc::new(crate::NoOpToolJournalPort);
859    run_turn_stream_with_controls(
860        llm,
861        tools,
862        store,
863        events,
864        req,
865        policy,
866        default_model,
867        tool_policy,
868        approval,
869        crate::DispatchMode::NativePreferred,
870        checkpoint_store,
871        artifact_store,
872        cost_meter,
873        context_compactor,
874        journal,
875    )
876    .await
877}
878
879/// Execute a single turn in streaming mode with explicit policy/approval controls.
880#[expect(
881    clippy::too_many_arguments,
882    reason = "streaming entrypoint exposes all ports and controls explicitly for composition roots"
883)]
884pub(crate) async fn run_turn_stream_with_controls(
885    llm: Arc<dyn LlmPort>,
886    tools: Arc<dyn ToolPort>,
887    store: Arc<dyn SessionStore>,
888    events: Arc<dyn EventSink>,
889    req: AgentRequest,
890    policy: TurnPolicy,
891    default_model: String,
892    tool_policy: Arc<dyn ToolPolicyPort>,
893    approval: Arc<dyn ApprovalPort>,
894    dispatch_mode: crate::DispatchMode,
895    checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
896    artifact_store: Arc<dyn ArtifactStorePort>,
897    cost_meter: Arc<dyn CostMeterPort>,
898    context_compactor: Arc<dyn ContextCompactorPort>,
899    journal: Arc<dyn ToolJournalPort>,
900) -> Result<AgentEventStream, AgentError> {
901    let (tx, rx) = flume::bounded::<AgentStreamEvent>(STREAM_CHANNEL_CAPACITY);
902    let config = StreamRunConfig {
903        policy,
904        default_model,
905        tool_policy,
906        approval,
907        dispatch_mode,
908        checkpoint_store,
909        artifact_store,
910        cost_meter,
911        context_compactor,
912        journal,
913    };
914
915    tokio::spawn(async move {
916        if let Err(err) = run_turn_stream_inner(llm, tools, store, events, req, &config, &tx).await
917        {
918            let _ = tx.send_async(AgentStreamEvent::Error { error: err.to_string() }).await;
919        }
920    });
921
922    Ok(Box::pin(rx.into_stream()))
923}
924
925struct StreamRunConfig {
926    policy: TurnPolicy,
927    default_model: String,
928    tool_policy: Arc<dyn ToolPolicyPort>,
929    approval: Arc<dyn ApprovalPort>,
930    dispatch_mode: crate::DispatchMode,
931    checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
932    artifact_store: Arc<dyn ArtifactStorePort>,
933    cost_meter: Arc<dyn CostMeterPort>,
934    context_compactor: Arc<dyn ContextCompactorPort>,
935    journal: Arc<dyn ToolJournalPort>,
936}
937
938async fn run_turn_stream_inner(
939    llm: Arc<dyn LlmPort>,
940    tools: Arc<dyn ToolPort>,
941    store: Arc<dyn SessionStore>,
942    events: Arc<dyn EventSink>,
943    req: AgentRequest,
944    config: &StreamRunConfig,
945    tx: &flume::Sender<AgentStreamEvent>,
946) -> Result<(), AgentError> {
947    let model = req.model.as_deref().unwrap_or(&config.default_model);
948    let cancel_token = req.cancel_token.clone();
949    let selected_skills = resolve_selected_skills(&req);
950    let tool_call_policy = resolve_tool_call_policy(&req);
951
952    let mut session = store.load(&req.session_id).await?.unwrap_or_default();
953    let system_instructions = resolve_system_instructions_with_memory(&req, &session);
954    let tool_descriptors = tools.list_tools().await?;
955    let mut guard = LoopGuard::new(config.policy.clone());
956    let mut total_usage = TokenUsage::default();
957    let mut consecutive_parse_failures: u32 = 0;
958    let mut next_call_id: u64 = 0;
959    let mut last_tool_call_signature: Option<String> = None;
960    let mut same_tool_call_streak: u32 = 0;
961
962    events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
963    if !selected_skills.is_empty() {
964        events.emit(AgentEvent::SkillsSelected {
965            session_id: req.session_id.clone(),
966            skill_names: selected_skills.clone(),
967        });
968    }
969    session.messages.push(Message::text(Role::User, req.input.clone()));
970
971    loop {
972        let current_step = guard.steps.saturating_add(1);
973
974        if let Some(ref token) = cancel_token &&
975            token.is_cancelled()
976        {
977            events.emit(AgentEvent::Error {
978                session_id: req.session_id.clone(),
979                step: Some(current_step),
980                error: "turn cancelled".to_string(),
981            });
982            events.emit(AgentEvent::TurnCompleted {
983                session_id: req.session_id.clone(),
984                finish_reason: FinishReason::Cancelled,
985                usage: total_usage.clone(),
986            });
987            store.save(&req.session_id, &session).await?;
988            let _ = tx
989                .send_async(AgentStreamEvent::Error { error: "turn cancelled".to_string() })
990                .await;
991            let _ = tx.send_async(AgentStreamEvent::Finished { usage: total_usage.clone() }).await;
992            return Ok(());
993        }
994
995        config.cost_meter.check_budget(&req.session_id).await?;
996
997        if !guard.can_continue() {
998            let reason = guard.reason();
999            let msg = format!("Turn stopped: {reason:?}");
1000            events.emit(AgentEvent::Error {
1001                session_id: req.session_id.clone(),
1002                step: Some(current_step),
1003                error: msg.clone(),
1004            });
1005            events.emit(AgentEvent::TurnCompleted {
1006                session_id: req.session_id.clone(),
1007                finish_reason: FinishReason::GuardExceeded,
1008                usage: total_usage.clone(),
1009            });
1010            store.save(&req.session_id, &session).await?;
1011            let _ = tx.send_async(AgentStreamEvent::Error { error: msg }).await;
1012            let _ = tx.send_async(AgentStreamEvent::Finished { usage: total_usage.clone() }).await;
1013            return Ok(());
1014        }
1015
1016        let llm_request = crate::prompt::build_llm_request_with_options(
1017            model,
1018            &session,
1019            &tool_descriptors,
1020            &system_instructions,
1021            prompt_options_for_mode(config.dispatch_mode, llm.as_ref(), req.output_schema.clone()),
1022            config.context_compactor.as_ref(),
1023        );
1024        events.emit(AgentEvent::LlmCallStarted {
1025            session_id: req.session_id.clone(),
1026            step: current_step,
1027            model: model.to_string(),
1028        });
1029
1030        let mut assistant_content = String::new();
1031        let mut llm_usage = TokenUsage::default();
1032        let mut llm_tool_calls: Vec<ToolCall> = Vec::new();
1033        let mut llm_finish_reason = FinishReason::Stop;
1034        let mut fallback_to_complete = false;
1035
1036        let llm_request = llm_request.await;
1037        if llm.capabilities().native_tool_calling {
1038            fallback_to_complete = true;
1039        } else {
1040            match llm.complete_stream(llm_request.clone()).await {
1041                Ok(mut stream) => {
1042                    while let Some(item) = stream.next().await {
1043                        match item {
1044                            Ok(bob_core::types::LlmStreamChunk::TextDelta(delta)) => {
1045                                assistant_content.push_str(&delta);
1046                                let _ = tx
1047                                    .send_async(AgentStreamEvent::TextDelta { content: delta })
1048                                    .await;
1049                            }
1050                            Ok(bob_core::types::LlmStreamChunk::Done { usage }) => {
1051                                llm_usage = usage;
1052                            }
1053                            Err(err) => {
1054                                events.emit(AgentEvent::Error {
1055                                    session_id: req.session_id.clone(),
1056                                    step: Some(current_step),
1057                                    error: err.to_string(),
1058                                });
1059                                return Err(AgentError::Llm(err));
1060                            }
1061                        }
1062                    }
1063                }
1064                Err(err) => {
1065                    fallback_to_complete = true;
1066                    let _ = err;
1067                }
1068            }
1069        }
1070
1071        // Native tool-calling currently requires the non-streaming response path because the
1072        // internal stream interface does not expose structured tool-call chunks.
1073        // Providers that simply lack streaming support also fall back here.
1074        if fallback_to_complete {
1075            let llm_response = llm.complete(llm_request).await?;
1076            assistant_content = llm_response.content.clone();
1077            llm_usage = llm_response.usage;
1078            llm_finish_reason = llm_response.finish_reason;
1079            llm_tool_calls = llm_response.tool_calls;
1080            let _ =
1081                tx.send_async(AgentStreamEvent::TextDelta { content: llm_response.content }).await;
1082        }
1083
1084        guard.record_step();
1085        total_usage.prompt_tokens += llm_usage.prompt_tokens;
1086        total_usage.completion_tokens += llm_usage.completion_tokens;
1087        session.total_usage.prompt_tokens =
1088            session.total_usage.prompt_tokens.saturating_add(llm_usage.prompt_tokens);
1089        session.total_usage.completion_tokens =
1090            session.total_usage.completion_tokens.saturating_add(llm_usage.completion_tokens);
1091        config.cost_meter.record_llm_usage(&req.session_id, model, &llm_usage).await?;
1092        events.emit(AgentEvent::LlmCallCompleted {
1093            session_id: req.session_id.clone(),
1094            step: current_step,
1095            model: model.to_string(),
1096            usage: llm_usage.clone(),
1097        });
1098        let native_tool_call = if llm.capabilities().native_tool_calling {
1099            llm_tool_calls.first().cloned()
1100        } else {
1101            None
1102        };
1103        let assistant_message = if llm_tool_calls.is_empty() {
1104            Message::text(Role::Assistant, assistant_content.clone())
1105        } else {
1106            Message::assistant_tool_calls(assistant_content.clone(), llm_tool_calls.clone())
1107        };
1108        session.messages.push(assistant_message);
1109
1110        let _ = config
1111            .checkpoint_store
1112            .save_checkpoint(&TurnCheckpoint {
1113                session_id: req.session_id.clone(),
1114                step: guard.steps,
1115                tool_calls: guard.tool_calls,
1116                usage: total_usage.clone(),
1117            })
1118            .await;
1119
1120        let response_for_dispatch = bob_core::types::LlmResponse {
1121            content: assistant_content.clone(),
1122            usage: llm_usage.clone(),
1123            finish_reason: llm_finish_reason,
1124            tool_calls: llm_tool_calls,
1125        };
1126
1127        if let Ok(action) =
1128            parse_action_for_mode(config.dispatch_mode, llm.as_ref(), &response_for_dispatch)
1129        {
1130            consecutive_parse_failures = 0;
1131            match action {
1132                AgentAction::Final { .. } | AgentAction::AskUser { .. } => {
1133                    store.save(&req.session_id, &session).await?;
1134                    events.emit(AgentEvent::TurnCompleted {
1135                        session_id: req.session_id.clone(),
1136                        finish_reason: FinishReason::Stop,
1137                        usage: total_usage.clone(),
1138                    });
1139                    let _ = tx
1140                        .send_async(AgentStreamEvent::Finished { usage: total_usage.clone() })
1141                        .await;
1142                    return Ok(());
1143                }
1144                AgentAction::ToolCall { name, arguments } => {
1145                    let tool_call_id = native_tool_call
1146                        .as_ref()
1147                        .filter(|call| call.name == name && call.arguments == arguments)
1148                        .and_then(|call| call.call_id.clone());
1149                    let call_signature = format!(
1150                        "{}:{}",
1151                        name,
1152                        serde_json::to_string(&arguments).unwrap_or_default()
1153                    );
1154                    if last_tool_call_signature.as_ref() == Some(&call_signature) {
1155                        same_tool_call_streak = same_tool_call_streak.saturating_add(1);
1156                    } else {
1157                        same_tool_call_streak = 1;
1158                        last_tool_call_signature = Some(call_signature);
1159                    }
1160                    if same_tool_call_streak > MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS {
1161                        next_call_id += 1;
1162                        let call_id = format!("call-{next_call_id}");
1163                        events.emit(AgentEvent::ToolCallStarted {
1164                            session_id: req.session_id.clone(),
1165                            step: current_step,
1166                            name: name.clone(),
1167                        });
1168                        let _ = tx.send(AgentStreamEvent::ToolCallStarted {
1169                            name: name.clone(),
1170                            call_id: call_id.clone(),
1171                        });
1172                        guard.record_tool_call();
1173                        let duplicate_result = ToolResult {
1174                            name: name.clone(),
1175                            output: serde_json::json!({
1176                                "error": format!(
1177                                    "consecutive duplicate tool call limit reached (>{MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS}); skipping to prevent loop"
1178                                )
1179                            }),
1180                            is_error: true,
1181                        };
1182                        let _ = config
1183                            .cost_meter
1184                            .record_tool_result(&req.session_id, &duplicate_result)
1185                            .await;
1186                        events.emit(AgentEvent::ToolCallCompleted {
1187                            session_id: req.session_id.clone(),
1188                            step: current_step,
1189                            name: name.clone(),
1190                            is_error: true,
1191                        });
1192                        let output_str =
1193                            serde_json::to_string(&duplicate_result.output).unwrap_or_default();
1194                        session.messages.push(Message::tool_result(
1195                            name.clone(),
1196                            tool_call_id.clone(),
1197                            output_str,
1198                        ));
1199                        let _ = config
1200                            .artifact_store
1201                            .put(ArtifactRecord {
1202                                session_id: req.session_id.clone(),
1203                                kind: "tool_result".to_string(),
1204                                name: name.clone(),
1205                                content: duplicate_result.output.clone(),
1206                            })
1207                            .await;
1208                        let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
1209                            call_id,
1210                            result: duplicate_result,
1211                        });
1212                        continue;
1213                    }
1214
1215                    events.emit(AgentEvent::ToolCallStarted {
1216                        session_id: req.session_id.clone(),
1217                        step: current_step,
1218                        name: name.clone(),
1219                    });
1220                    next_call_id += 1;
1221                    let call_id = format!("call-{next_call_id}");
1222                    let _ = tx.send(AgentStreamEvent::ToolCallStarted {
1223                        name: name.clone(),
1224                        call_id: call_id.clone(),
1225                    });
1226                    let approval_context = ApprovalContext {
1227                        session_id: req.session_id.clone(),
1228                        turn_step: guard.steps.max(1),
1229                        selected_skills: selected_skills.clone(),
1230                    };
1231
1232                    // Journal lookup for idempotent replay.
1233                    let call_fingerprint = JournalEntry::fingerprint(&name, &arguments);
1234                    let tool_result = if let Ok(Some(cached)) =
1235                        config.journal.lookup(&req.session_id, &call_fingerprint).await
1236                    {
1237                        tracing::debug!(
1238                            session_id = %req.session_id,
1239                            tool = %name,
1240                            "replaying tool result from journal"
1241                        );
1242                        ToolResult {
1243                            name: cached.tool_name,
1244                            output: cached.result,
1245                            is_error: cached.is_error,
1246                        }
1247                    } else {
1248                        let result = execute_tool_call(
1249                            tools.as_ref(),
1250                            &mut guard,
1251                            ToolCall::new(name.clone(), arguments.clone()),
1252                            &tool_call_policy,
1253                            config.tool_policy.as_ref(),
1254                            config.approval.as_ref(),
1255                            &approval_context,
1256                            config.policy.tool_timeout_ms,
1257                        )
1258                        .await;
1259                        // Record in journal for future replay.
1260                        let _ = config
1261                            .journal
1262                            .append(JournalEntry {
1263                                session_id: req.session_id.clone(),
1264                                call_fingerprint: call_fingerprint.clone(),
1265                                tool_name: name.clone(),
1266                                arguments: arguments.clone(),
1267                                result: result.output.clone(),
1268                                is_error: result.is_error,
1269                                timestamp_ms: bob_core::tape::now_ms(),
1270                            })
1271                            .await;
1272                        result
1273                    };
1274
1275                    guard.record_tool_call();
1276                    let _ =
1277                        config.cost_meter.record_tool_result(&req.session_id, &tool_result).await;
1278                    let is_error = tool_result.is_error;
1279                    events.emit(AgentEvent::ToolCallCompleted {
1280                        session_id: req.session_id.clone(),
1281                        step: current_step,
1282                        name: name.clone(),
1283                        is_error,
1284                    });
1285                    let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
1286                        call_id,
1287                        result: tool_result.clone(),
1288                    });
1289
1290                    let output_str = serde_json::to_string(&tool_result.output).unwrap_or_default();
1291                    session.messages.push(Message::tool_result(
1292                        name.clone(),
1293                        tool_call_id,
1294                        output_str,
1295                    ));
1296                    let _ = config
1297                        .artifact_store
1298                        .put(ArtifactRecord {
1299                            session_id: req.session_id.clone(),
1300                            kind: "tool_result".to_string(),
1301                            name: name.clone(),
1302                            content: tool_result.output.clone(),
1303                        })
1304                        .await;
1305                }
1306            }
1307        } else {
1308            consecutive_parse_failures += 1;
1309            last_tool_call_signature = None;
1310            same_tool_call_streak = 0;
1311            if consecutive_parse_failures >= 2 {
1312                store.save(&req.session_id, &session).await?;
1313                events.emit(AgentEvent::Error {
1314                    session_id: req.session_id.clone(),
1315                    step: Some(current_step),
1316                    error: "LLM produced invalid JSON after re-prompt".to_string(),
1317                });
1318                return Err(AgentError::Internal(
1319                    "LLM produced invalid JSON after re-prompt".into(),
1320                ));
1321            }
1322            session.messages.push(Message::text(
1323                Role::User,
1324                "Your response was not valid JSON. \
1325                 Please respond with exactly one JSON object \
1326                 matching the required schema.",
1327            ));
1328        }
1329    }
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334    use super::*;
1335
1336    /// Small policy with tight limits for fast, deterministic tests.
1337    fn test_policy() -> TurnPolicy {
1338        TurnPolicy {
1339            max_steps: 3,
1340            max_tool_calls: 2,
1341            max_consecutive_errors: 2,
1342            turn_timeout_ms: 100,
1343            tool_timeout_ms: 50,
1344        }
1345    }
1346
1347    #[test]
1348    fn trips_on_max_steps() {
1349        let mut guard = LoopGuard::new(test_policy());
1350        assert!(guard.can_continue());
1351
1352        for _ in 0..3 {
1353            guard.record_step();
1354        }
1355
1356        assert!(!guard.can_continue(), "guard should trip after reaching max_steps");
1357        assert_eq!(guard.reason(), GuardReason::MaxSteps);
1358    }
1359
1360    #[test]
1361    fn trips_on_max_tool_calls() {
1362        let mut guard = LoopGuard::new(test_policy());
1363        assert!(guard.can_continue());
1364
1365        for _ in 0..2 {
1366            guard.record_tool_call();
1367        }
1368
1369        assert!(!guard.can_continue(), "guard should trip after reaching max_tool_calls");
1370        assert_eq!(guard.reason(), GuardReason::MaxToolCalls);
1371    }
1372
1373    #[test]
1374    fn trips_on_max_consecutive_errors() {
1375        let mut guard = LoopGuard::new(test_policy());
1376        assert!(guard.can_continue());
1377
1378        for _ in 0..2 {
1379            guard.record_error();
1380        }
1381
1382        assert!(!guard.can_continue(), "guard should trip after reaching max_consecutive_errors");
1383        assert_eq!(guard.reason(), GuardReason::MaxConsecutiveErrors);
1384    }
1385
1386    #[tokio::test]
1387    async fn trips_on_timeout() {
1388        let guard = LoopGuard::new(test_policy());
1389        assert!(guard.can_continue());
1390        assert!(!guard.timed_out());
1391
1392        // Sleep past the 100 ms timeout.
1393        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
1394
1395        assert!(!guard.can_continue(), "guard should trip after timeout");
1396        assert!(guard.timed_out());
1397        assert_eq!(guard.reason(), GuardReason::TurnTimeout);
1398    }
1399
1400    #[test]
1401    fn reset_errors_clears_counter() {
1402        let mut guard = LoopGuard::new(test_policy());
1403
1404        guard.record_error();
1405        guard.reset_errors();
1406
1407        // After reset, a single error should NOT trip the guard.
1408        guard.record_error();
1409        assert!(guard.can_continue(), "single error after reset should not trip guard");
1410    }
1411
1412    // ── run_turn FSM tests ───────────────────────────────────────
1413
1414    use std::{
1415        collections::{HashMap, VecDeque},
1416        sync::{Arc, Mutex},
1417    };
1418
1419    use bob_core::{
1420        error::{CostError, LlmError, StoreError, ToolError},
1421        ports::{
1422            ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
1423            ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
1424        },
1425        types::{
1426            AgentEvent, AgentRequest, AgentRunResult, AgentStreamEvent, ApprovalContext,
1427            ApprovalDecision, ArtifactRecord, CancelToken, LlmRequest, LlmResponse, LlmStream,
1428            LlmStreamChunk, SessionId, SessionState, ToolCall, ToolDescriptor, ToolResult,
1429            TurnCheckpoint,
1430        },
1431    };
1432    use futures_util::StreamExt;
1433
1434    // ── Mock ports ───────────────────────────────────────────────
1435
1436    /// LLM mock that returns queued responses in order.
1437    struct SequentialLlm {
1438        responses: Mutex<VecDeque<Result<LlmResponse, LlmError>>>,
1439    }
1440
1441    impl SequentialLlm {
1442        fn from_contents(contents: Vec<&str>) -> Self {
1443            let responses = contents
1444                .into_iter()
1445                .map(|c| {
1446                    Ok(LlmResponse {
1447                        content: c.to_string(),
1448                        usage: TokenUsage::default(),
1449                        finish_reason: FinishReason::Stop,
1450                        tool_calls: Vec::new(),
1451                    })
1452                })
1453                .collect();
1454            Self { responses: Mutex::new(responses) }
1455        }
1456
1457        fn from_responses(responses: Vec<LlmResponse>) -> Self {
1458            let queued = responses.into_iter().map(Ok).collect();
1459            Self { responses: Mutex::new(queued) }
1460        }
1461    }
1462
1463    #[async_trait::async_trait]
1464    impl LlmPort for SequentialLlm {
1465        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1466            let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
1467            q.pop_front().unwrap_or_else(|| {
1468                Ok(LlmResponse {
1469                    content: r#"{"type": "final", "content": "fallback"}"#.to_string(),
1470                    usage: TokenUsage::default(),
1471                    finish_reason: FinishReason::Stop,
1472                    tool_calls: Vec::new(),
1473                })
1474            })
1475        }
1476
1477        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1478            Err(LlmError::Provider("not implemented".into()))
1479        }
1480    }
1481
1482    /// Tool port mock with configurable tools and call results.
1483    struct MockToolPort {
1484        tools: Vec<ToolDescriptor>,
1485        call_results: Mutex<VecDeque<Result<ToolResult, ToolError>>>,
1486    }
1487
1488    impl MockToolPort {
1489        fn empty() -> Self {
1490            Self { tools: vec![], call_results: Mutex::new(VecDeque::new()) }
1491        }
1492
1493        fn with_tool_and_results(
1494            tool_name: &str,
1495            results: Vec<Result<ToolResult, ToolError>>,
1496        ) -> Self {
1497            Self {
1498                tools: vec![
1499                    ToolDescriptor::new(tool_name, format!("{tool_name} tool"))
1500                        .with_input_schema(serde_json::json!({"type": "object"})),
1501                ],
1502                call_results: Mutex::new(results.into()),
1503            }
1504        }
1505    }
1506
1507    #[async_trait::async_trait]
1508    impl ToolPort for MockToolPort {
1509        async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1510            Ok(self.tools.clone())
1511        }
1512
1513        async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
1514            let mut q = self.call_results.lock().unwrap_or_else(|p| p.into_inner());
1515            q.pop_front().unwrap_or_else(|| {
1516                Ok(ToolResult {
1517                    name: call.name,
1518                    output: serde_json::json!({"result": "default"}),
1519                    is_error: false,
1520                })
1521            })
1522        }
1523    }
1524
1525    struct NoCallToolPort {
1526        tools: Vec<ToolDescriptor>,
1527    }
1528
1529    #[async_trait::async_trait]
1530    impl ToolPort for NoCallToolPort {
1531        async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1532            Ok(self.tools.clone())
1533        }
1534
1535        async fn call_tool(&self, _call: ToolCall) -> Result<ToolResult, ToolError> {
1536            Err(ToolError::Execution(
1537                "tool call should be blocked by policy before execution".to_string(),
1538            ))
1539        }
1540    }
1541
1542    struct AllowAllPolicyPort;
1543
1544    impl ToolPolicyPort for AllowAllPolicyPort {
1545        fn is_tool_allowed(
1546            &self,
1547            _tool: &str,
1548            _deny_tools: &[String],
1549            _allow_tools: Option<&[String]>,
1550        ) -> bool {
1551            true
1552        }
1553    }
1554
1555    struct DenySearchPolicyPort;
1556
1557    impl ToolPolicyPort for DenySearchPolicyPort {
1558        fn is_tool_allowed(
1559            &self,
1560            tool: &str,
1561            _deny_tools: &[String],
1562            _allow_tools: Option<&[String]>,
1563        ) -> bool {
1564            tool != "search"
1565        }
1566    }
1567
1568    struct AlwaysApprovePort;
1569
1570    #[async_trait::async_trait]
1571    impl ApprovalPort for AlwaysApprovePort {
1572        async fn approve_tool_call(
1573            &self,
1574            _call: &ToolCall,
1575            _context: &ApprovalContext,
1576        ) -> Result<ApprovalDecision, ToolError> {
1577            Ok(ApprovalDecision::Approved)
1578        }
1579    }
1580
1581    struct AlwaysDenyApprovalPort;
1582
1583    #[async_trait::async_trait]
1584    impl ApprovalPort for AlwaysDenyApprovalPort {
1585        async fn approve_tool_call(
1586            &self,
1587            _call: &ToolCall,
1588            _context: &ApprovalContext,
1589        ) -> Result<ApprovalDecision, ToolError> {
1590            Ok(ApprovalDecision::Denied {
1591                reason: "approval policy rejected tool call".to_string(),
1592            })
1593        }
1594    }
1595
1596    struct CountingCheckpointPort {
1597        saved: Mutex<Vec<TurnCheckpoint>>,
1598    }
1599
1600    impl CountingCheckpointPort {
1601        fn new() -> Self {
1602            Self { saved: Mutex::new(Vec::new()) }
1603        }
1604    }
1605
1606    #[async_trait::async_trait]
1607    impl TurnCheckpointStorePort for CountingCheckpointPort {
1608        async fn save_checkpoint(&self, checkpoint: &TurnCheckpoint) -> Result<(), StoreError> {
1609            self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(checkpoint.clone());
1610            Ok(())
1611        }
1612
1613        async fn load_latest(
1614            &self,
1615            _session_id: &SessionId,
1616        ) -> Result<Option<TurnCheckpoint>, StoreError> {
1617            Ok(None)
1618        }
1619    }
1620
1621    struct NoopArtifactStore;
1622
1623    #[async_trait::async_trait]
1624    impl ArtifactStorePort for NoopArtifactStore {
1625        async fn put(&self, _artifact: ArtifactRecord) -> Result<(), StoreError> {
1626            Ok(())
1627        }
1628
1629        async fn list_by_session(
1630            &self,
1631            _session_id: &SessionId,
1632        ) -> Result<Vec<ArtifactRecord>, StoreError> {
1633            Ok(Vec::new())
1634        }
1635    }
1636
1637    struct CountingArtifactStore {
1638        saved: Mutex<Vec<ArtifactRecord>>,
1639    }
1640
1641    impl CountingArtifactStore {
1642        fn new() -> Self {
1643            Self { saved: Mutex::new(Vec::new()) }
1644        }
1645    }
1646
1647    #[async_trait::async_trait]
1648    impl ArtifactStorePort for CountingArtifactStore {
1649        async fn put(&self, artifact: ArtifactRecord) -> Result<(), StoreError> {
1650            self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(artifact);
1651            Ok(())
1652        }
1653
1654        async fn list_by_session(
1655            &self,
1656            _session_id: &SessionId,
1657        ) -> Result<Vec<ArtifactRecord>, StoreError> {
1658            Ok(self.saved.lock().unwrap_or_else(|p| p.into_inner()).clone())
1659        }
1660    }
1661
1662    struct CountingCostMeter {
1663        llm_calls: Mutex<u32>,
1664        tool_results: Mutex<u32>,
1665    }
1666
1667    impl CountingCostMeter {
1668        fn new() -> Self {
1669            Self { llm_calls: Mutex::new(0), tool_results: Mutex::new(0) }
1670        }
1671    }
1672
1673    #[async_trait::async_trait]
1674    impl CostMeterPort for CountingCostMeter {
1675        async fn check_budget(&self, _session_id: &SessionId) -> Result<(), CostError> {
1676            Ok(())
1677        }
1678
1679        async fn record_llm_usage(
1680            &self,
1681            _session_id: &SessionId,
1682            _model: &str,
1683            _usage: &TokenUsage,
1684        ) -> Result<(), CostError> {
1685            let mut count = self.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
1686            *count += 1;
1687            Ok(())
1688        }
1689
1690        async fn record_tool_result(
1691            &self,
1692            _session_id: &SessionId,
1693            _tool_result: &ToolResult,
1694        ) -> Result<(), CostError> {
1695            let mut count = self.tool_results.lock().unwrap_or_else(|p| p.into_inner());
1696            *count += 1;
1697            Ok(())
1698        }
1699    }
1700
1701    struct MemoryStore {
1702        data: Mutex<HashMap<SessionId, SessionState>>,
1703    }
1704
1705    impl MemoryStore {
1706        fn new() -> Self {
1707            Self { data: Mutex::new(HashMap::new()) }
1708        }
1709    }
1710
1711    struct FailingSaveStore;
1712
1713    #[async_trait::async_trait]
1714    impl SessionStore for FailingSaveStore {
1715        async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1716            Ok(None)
1717        }
1718
1719        async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
1720            Err(StoreError::Backend("simulated save failure".into()))
1721        }
1722    }
1723
1724    #[async_trait::async_trait]
1725    impl SessionStore for MemoryStore {
1726        async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1727            let map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1728            Ok(map.get(id).cloned())
1729        }
1730
1731        async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
1732            let mut map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1733            map.insert(id.clone(), state.clone());
1734            Ok(())
1735        }
1736    }
1737
1738    struct CollectingSink {
1739        events: Mutex<Vec<AgentEvent>>,
1740    }
1741
1742    impl CollectingSink {
1743        fn new() -> Self {
1744            Self { events: Mutex::new(Vec::new()) }
1745        }
1746
1747        fn event_count(&self) -> usize {
1748            self.events.lock().unwrap_or_else(|p| p.into_inner()).len()
1749        }
1750
1751        fn all_events(&self) -> Vec<AgentEvent> {
1752            self.events.lock().unwrap_or_else(|p| p.into_inner()).clone()
1753        }
1754    }
1755
1756    impl EventSink for CollectingSink {
1757        fn emit(&self, event: AgentEvent) {
1758            self.events.lock().unwrap_or_else(|p| p.into_inner()).push(event);
1759        }
1760    }
1761
1762    fn make_request(input: &str) -> AgentRequest {
1763        AgentRequest {
1764            input: input.into(),
1765            session_id: "test-session".into(),
1766            model: None,
1767            context: bob_core::types::RequestContext::default(),
1768            cancel_token: None,
1769            output_schema: None,
1770            max_output_retries: 0,
1771        }
1772    }
1773
1774    fn generous_policy() -> TurnPolicy {
1775        TurnPolicy {
1776            max_steps: 20,
1777            max_tool_calls: 10,
1778            max_consecutive_errors: 3,
1779            turn_timeout_ms: 30_000,
1780            tool_timeout_ms: 5_000,
1781        }
1782    }
1783
1784    struct StreamLlm {
1785        chunks: Mutex<VecDeque<Result<LlmStreamChunk, LlmError>>>,
1786    }
1787
1788    impl StreamLlm {
1789        fn new(chunks: Vec<Result<LlmStreamChunk, LlmError>>) -> Self {
1790            Self { chunks: Mutex::new(chunks.into()) }
1791        }
1792    }
1793
1794    #[async_trait::async_trait]
1795    impl LlmPort for StreamLlm {
1796        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1797            Err(LlmError::Provider("complete() should not be called in stream test".into()))
1798        }
1799
1800        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1801            let mut chunks = self.chunks.lock().unwrap_or_else(|p| p.into_inner());
1802            let items: Vec<Result<LlmStreamChunk, LlmError>> = chunks.drain(..).collect();
1803            Ok(Box::pin(futures_util::stream::iter(items)))
1804        }
1805    }
1806
1807    struct InspectingLlm {
1808        expected_substring: String,
1809    }
1810
1811    #[async_trait::async_trait]
1812    impl LlmPort for InspectingLlm {
1813        async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
1814            let system = req
1815                .messages
1816                .iter()
1817                .find(|m| m.role == Role::System)
1818                .map(|m| m.content.clone())
1819                .unwrap_or_default();
1820            if !system.contains(&self.expected_substring) {
1821                return Err(LlmError::Provider(format!(
1822                    "expected system prompt to include '{}', got: {}",
1823                    self.expected_substring, system
1824                )));
1825            }
1826            Ok(LlmResponse {
1827                content: r#"{"type": "final", "content": "ok"}"#.to_string(),
1828                usage: TokenUsage::default(),
1829                finish_reason: FinishReason::Stop,
1830                tool_calls: Vec::new(),
1831            })
1832        }
1833
1834        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1835            Err(LlmError::Provider("not used".into()))
1836        }
1837    }
1838
1839    // ── TC-01: Simple Final response ─────────────────────────────
1840
1841    #[tokio::test]
1842    async fn tc01_simple_final_response() {
1843        let llm =
1844            SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "Hello there!"}"#]);
1845        let tools = MockToolPort::empty();
1846        let store = MemoryStore::new();
1847        let sink = CollectingSink::new();
1848
1849        let result = run_turn(
1850            &llm,
1851            &tools,
1852            &store,
1853            &sink,
1854            make_request("Hi"),
1855            &generous_policy(),
1856            "test-model",
1857        )
1858        .await;
1859
1860        assert!(
1861            matches!(&result, Ok(AgentRunResult::Finished(_))),
1862            "expected Finished, got {result:?}"
1863        );
1864        let resp = match result {
1865            Ok(AgentRunResult::Finished(r)) => r,
1866            _ => return,
1867        };
1868
1869        assert_eq!(resp.content, "Hello there!");
1870        assert_eq!(resp.finish_reason, FinishReason::Stop);
1871        assert!(resp.tool_transcript.is_empty());
1872        assert!(sink.event_count() >= 3, "should emit TurnStarted, LlmCall*, TurnCompleted");
1873    }
1874
1875    // ── TC-02: ToolCall → Final chain ────────────────────────────
1876
1877    #[tokio::test]
1878    async fn tc02_tool_call_then_final() {
1879        let llm = SequentialLlm::from_contents(vec![
1880            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1881            r#"{"type": "final", "content": "Found results."}"#,
1882        ]);
1883        let tools = MockToolPort::with_tool_and_results(
1884            "search",
1885            vec![Ok(ToolResult {
1886                name: "search".into(),
1887                output: serde_json::json!({"hits": 42}),
1888                is_error: false,
1889            })],
1890        );
1891        let store = MemoryStore::new();
1892        let sink = CollectingSink::new();
1893
1894        let result = run_turn(
1895            &llm,
1896            &tools,
1897            &store,
1898            &sink,
1899            make_request("Search for rust"),
1900            &generous_policy(),
1901            "test-model",
1902        )
1903        .await;
1904
1905        assert!(
1906            matches!(&result, Ok(AgentRunResult::Finished(_))),
1907            "expected Finished, got {result:?}"
1908        );
1909        let resp = match result {
1910            Ok(AgentRunResult::Finished(r)) => r,
1911            _ => return,
1912        };
1913
1914        assert_eq!(resp.content, "Found results.");
1915        assert_eq!(resp.finish_reason, FinishReason::Stop);
1916        assert_eq!(resp.tool_transcript.len(), 1);
1917        assert_eq!(resp.tool_transcript[0].name, "search");
1918        assert!(!resp.tool_transcript[0].is_error);
1919    }
1920
1921    // ── TC-03: Parse error → re-prompt → success ────────────────
1922
1923    #[tokio::test]
1924    async fn tc03_parse_error_reprompt_success() {
1925        let llm = SequentialLlm::from_contents(vec![
1926            "This is not JSON at all.",
1927            r#"{"type": "final", "content": "Recovered"}"#,
1928        ]);
1929        let tools = MockToolPort::empty();
1930        let store = MemoryStore::new();
1931        let sink = CollectingSink::new();
1932
1933        let result = run_turn(
1934            &llm,
1935            &tools,
1936            &store,
1937            &sink,
1938            make_request("Hi"),
1939            &generous_policy(),
1940            "test-model",
1941        )
1942        .await;
1943
1944        assert!(
1945            matches!(&result, Ok(AgentRunResult::Finished(_))),
1946            "expected Finished after re-prompt, got {result:?}"
1947        );
1948        let resp = match result {
1949            Ok(AgentRunResult::Finished(r)) => r,
1950            _ => return,
1951        };
1952
1953        assert_eq!(resp.content, "Recovered");
1954        assert_eq!(resp.finish_reason, FinishReason::Stop);
1955    }
1956
1957    // ── TC-04: Double parse error → AgentError ──────────────────
1958
1959    #[tokio::test]
1960    async fn tc04_double_parse_error() {
1961        let llm = SequentialLlm::from_contents(vec!["not json 1", "not json 2"]);
1962        let tools = MockToolPort::empty();
1963        let store = MemoryStore::new();
1964        let sink = CollectingSink::new();
1965
1966        let result = run_turn(
1967            &llm,
1968            &tools,
1969            &store,
1970            &sink,
1971            make_request("Hi"),
1972            &generous_policy(),
1973            "test-model",
1974        )
1975        .await;
1976
1977        assert!(result.is_err(), "should return error after two parse failures");
1978        let msg = match result {
1979            Err(err) => err.to_string(),
1980            Ok(value) => format!("unexpected success: {value:?}"),
1981        };
1982        assert!(msg.contains("invalid JSON"), "error message = {msg}");
1983    }
1984
1985    // ── TC-05: max_steps exhaustion → GuardExceeded ─────────────
1986
1987    #[tokio::test]
1988    async fn tc05_max_steps_exhaustion() {
1989        // LLM always returns tool calls — the guard should stop after max_steps.
1990        let llm = SequentialLlm::from_contents(vec![
1991            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1992            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1993            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1994            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1995        ]);
1996        let tools = MockToolPort::with_tool_and_results(
1997            "t1",
1998            vec![
1999                Ok(ToolResult {
2000                    name: "t1".into(),
2001                    output: serde_json::json!(null),
2002                    is_error: false,
2003                }),
2004                Ok(ToolResult {
2005                    name: "t1".into(),
2006                    output: serde_json::json!(null),
2007                    is_error: false,
2008                }),
2009                Ok(ToolResult {
2010                    name: "t1".into(),
2011                    output: serde_json::json!(null),
2012                    is_error: false,
2013                }),
2014            ],
2015        );
2016        let store = MemoryStore::new();
2017        let sink = CollectingSink::new();
2018
2019        let policy = TurnPolicy {
2020            max_steps: 2,
2021            max_tool_calls: 10,
2022            max_consecutive_errors: 5,
2023            turn_timeout_ms: 30_000,
2024            tool_timeout_ms: 5_000,
2025        };
2026
2027        let result =
2028            run_turn(&llm, &tools, &store, &sink, make_request("do work"), &policy, "test-model")
2029                .await;
2030
2031        assert!(
2032            matches!(&result, Ok(AgentRunResult::Finished(_))),
2033            "expected Finished with GuardExceeded, got {result:?}"
2034        );
2035        let resp = match result {
2036            Ok(AgentRunResult::Finished(r)) => r,
2037            _ => return,
2038        };
2039
2040        assert_eq!(resp.finish_reason, FinishReason::GuardExceeded);
2041        assert!(resp.content.contains("MaxSteps"), "content = {}", resp.content);
2042    }
2043
2044    // ── TC-06: Cancellation mid-turn → Cancelled ────────────────
2045
2046    #[tokio::test]
2047    async fn tc06_cancellation() {
2048        let llm = SequentialLlm::from_contents(vec![
2049            r#"{"type": "final", "content": "should not reach"}"#,
2050        ]);
2051        let tools = MockToolPort::empty();
2052        let store = MemoryStore::new();
2053        let sink = CollectingSink::new();
2054
2055        let token = CancelToken::new();
2056        // Cancel before running.
2057        token.cancel();
2058
2059        let mut req = make_request("Hi");
2060        req.cancel_token = Some(token);
2061
2062        let result =
2063            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2064
2065        assert!(
2066            matches!(&result, Ok(AgentRunResult::Finished(_))),
2067            "expected Finished with Cancelled, got {result:?}"
2068        );
2069        let resp = match result {
2070            Ok(AgentRunResult::Finished(r)) => r,
2071            _ => return,
2072        };
2073
2074        assert_eq!(resp.finish_reason, FinishReason::Cancelled);
2075    }
2076
2077    // ── TC-07: Tool error → is_error result → LLM sees error → Final ───
2078
2079    #[tokio::test]
2080    async fn tc07_tool_error_then_final() {
2081        let llm = SequentialLlm::from_contents(vec![
2082            r#"{"type": "tool_call", "name": "flaky_tool", "arguments": {}}"#,
2083            r#"{"type": "final", "content": "Recovered from tool error."}"#,
2084        ]);
2085        let tools = MockToolPort::with_tool_and_results(
2086            "flaky_tool",
2087            vec![Err(ToolError::Execution("connection refused".into()))],
2088        );
2089        let store = MemoryStore::new();
2090        let sink = CollectingSink::new();
2091
2092        let result = run_turn(
2093            &llm,
2094            &tools,
2095            &store,
2096            &sink,
2097            make_request("call flaky"),
2098            &generous_policy(),
2099            "test-model",
2100        )
2101        .await;
2102
2103        assert!(
2104            matches!(&result, Ok(AgentRunResult::Finished(_))),
2105            "expected Finished, got {result:?}"
2106        );
2107        let resp = match result {
2108            Ok(AgentRunResult::Finished(r)) => r,
2109            _ => return,
2110        };
2111
2112        assert_eq!(resp.content, "Recovered from tool error.");
2113        assert_eq!(resp.tool_transcript.len(), 1);
2114        assert!(resp.tool_transcript[0].is_error);
2115    }
2116
2117    #[tokio::test]
2118    async fn tc08_save_failure_is_propagated() {
2119        let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "done"}"#]);
2120        let tools = MockToolPort::empty();
2121        let store = FailingSaveStore;
2122        let sink = CollectingSink::new();
2123
2124        let result = run_turn(
2125            &llm,
2126            &tools,
2127            &store,
2128            &sink,
2129            make_request("hello"),
2130            &generous_policy(),
2131            "test-model",
2132        )
2133        .await;
2134
2135        assert!(matches!(result, Err(AgentError::Store(_))), "expected Store error to be returned");
2136    }
2137
2138    #[tokio::test]
2139    async fn tc09_stream_turn_emits_text_and_finished() {
2140        let llm: Arc<dyn LlmPort> = Arc::new(StreamLlm::new(vec![
2141            Ok(LlmStreamChunk::TextDelta("{\"type\":\"final\",\"content\":\"he".into())),
2142            Ok(LlmStreamChunk::TextDelta("llo\"}".into())),
2143            Ok(LlmStreamChunk::Done {
2144                usage: TokenUsage { prompt_tokens: 3, completion_tokens: 4 },
2145            }),
2146        ]));
2147        let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
2148        let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
2149        let sink: Arc<dyn EventSink> = Arc::new(CollectingSink::new());
2150
2151        let stream_result = run_turn_stream(
2152            llm,
2153            tools,
2154            store,
2155            sink,
2156            make_request("hello"),
2157            generous_policy(),
2158            "test-model".to_string(),
2159        )
2160        .await;
2161        assert!(stream_result.is_ok(), "run_turn_stream should produce a stream");
2162        let mut stream = match stream_result {
2163            Ok(stream) => stream,
2164            Err(_) => return,
2165        };
2166
2167        let mut saw_text = false;
2168        let mut saw_finished = false;
2169        while let Some(event) = stream.next().await {
2170            match event {
2171                AgentStreamEvent::TextDelta { content } => {
2172                    saw_text = saw_text || !content.is_empty();
2173                }
2174                AgentStreamEvent::Finished { usage } => {
2175                    saw_finished = true;
2176                    assert_eq!(usage.prompt_tokens, 3);
2177                    assert_eq!(usage.completion_tokens, 4);
2178                }
2179                AgentStreamEvent::ToolCallStarted { .. } |
2180                AgentStreamEvent::ToolCallCompleted { .. } |
2181                AgentStreamEvent::Error { .. } => {}
2182            }
2183        }
2184
2185        assert!(saw_text, "expected at least one text delta");
2186        assert!(saw_finished, "expected a finished event");
2187    }
2188
2189    #[tokio::test]
2190    async fn tc10_skills_prompt_context_is_injected() {
2191        let llm = InspectingLlm { expected_substring: "Skill: rust-review".to_string() };
2192        let tools = MockToolPort::empty();
2193        let store = MemoryStore::new();
2194        let sink = CollectingSink::new();
2195
2196        let mut req = make_request("review this code");
2197        req.context.system_prompt = Some("Skill: rust-review\nUse strict checks.".to_string());
2198
2199        let result =
2200            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2201
2202        assert!(result.is_ok(), "run should succeed when skills prompt is injected");
2203    }
2204
2205    #[tokio::test]
2206    async fn tc11_selected_skills_context_emits_event() {
2207        let llm =
2208            SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "looks good"}"#]);
2209        let tools = MockToolPort::empty();
2210        let store = MemoryStore::new();
2211        let sink = CollectingSink::new();
2212
2213        let mut req = make_request("review code");
2214        req.context.selected_skills = vec!["rust-review".to_string(), "security-audit".to_string()];
2215
2216        let result =
2217            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2218        assert!(result.is_ok(), "run should succeed");
2219
2220        let events = sink.all_events();
2221        assert!(
2222            events.iter().any(|event| matches!(
2223                event,
2224                AgentEvent::SkillsSelected { skill_names, .. }
2225                    if skill_names == &vec!["rust-review".to_string(), "security-audit".to_string()]
2226            )),
2227            "skills.selected event should be emitted with context skill names"
2228        );
2229    }
2230
2231    #[tokio::test]
2232    async fn tc12_policy_deny_tool_blocks_execution() {
2233        let llm = SequentialLlm::from_contents(vec![
2234            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
2235            r#"{"type": "final", "content": "done"}"#,
2236        ]);
2237        let tools = NoCallToolPort {
2238            tools: vec![
2239                ToolDescriptor::new("search", "search tool")
2240                    .with_input_schema(serde_json::json!({"type":"object"})),
2241            ],
2242        };
2243        let store = MemoryStore::new();
2244        let sink = CollectingSink::new();
2245
2246        let mut req = make_request("search rust");
2247        req.context.tool_policy.deny_tools =
2248            vec!["search".to_string(), "local/shell_exec".to_string()];
2249
2250        let result =
2251            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2252        assert!(
2253            matches!(&result, Ok(AgentRunResult::Finished(_))),
2254            "expected finished response, got {result:?}"
2255        );
2256        let resp = match result {
2257            Ok(AgentRunResult::Finished(r)) => r,
2258            _ => return,
2259        };
2260
2261        assert_eq!(resp.finish_reason, FinishReason::Stop);
2262        assert_eq!(resp.tool_transcript.len(), 1);
2263        assert!(resp.tool_transcript[0].is_error);
2264        assert!(
2265            resp.tool_transcript[0].output.to_string().contains("denied"),
2266            "tool error should explain policy denial"
2267        );
2268    }
2269
2270    #[tokio::test]
2271    async fn tc13_approval_denied_blocks_execution() {
2272        let llm = SequentialLlm::from_contents(vec![
2273            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
2274            r#"{"type": "final", "content": "done"}"#,
2275        ]);
2276        let tools = NoCallToolPort {
2277            tools: vec![
2278                ToolDescriptor::new("search", "search tool")
2279                    .with_input_schema(serde_json::json!({"type":"object"})),
2280            ],
2281        };
2282        let store = MemoryStore::new();
2283        let sink = CollectingSink::new();
2284        let req = make_request("search rust");
2285        let tool_policy = AllowAllPolicyPort;
2286        let approval = AlwaysDenyApprovalPort;
2287
2288        let result = run_turn_with_controls(
2289            &llm,
2290            &tools,
2291            &store,
2292            &sink,
2293            req,
2294            &generous_policy(),
2295            "test-model",
2296            &tool_policy,
2297            &approval,
2298        )
2299        .await;
2300        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
2301        let resp = match result {
2302            Ok(AgentRunResult::Finished(r)) => r,
2303            _ => return,
2304        };
2305
2306        assert_eq!(resp.tool_transcript.len(), 1);
2307        assert!(resp.tool_transcript[0].is_error);
2308        assert!(
2309            resp.tool_transcript[0].output.to_string().contains("approval policy rejected"),
2310            "tool error should explain approval denial"
2311        );
2312    }
2313
2314    #[tokio::test]
2315    async fn tc14_custom_policy_port_blocks_execution() {
2316        let llm = SequentialLlm::from_contents(vec![
2317            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
2318            r#"{"type": "final", "content": "done"}"#,
2319        ]);
2320        let tools = NoCallToolPort {
2321            tools: vec![
2322                ToolDescriptor::new("search", "search tool")
2323                    .with_input_schema(serde_json::json!({"type":"object"})),
2324            ],
2325        };
2326        let store = MemoryStore::new();
2327        let sink = CollectingSink::new();
2328        let req = make_request("search rust");
2329        let tool_policy = DenySearchPolicyPort;
2330        let approval = AlwaysApprovePort;
2331
2332        let result = run_turn_with_controls(
2333            &llm,
2334            &tools,
2335            &store,
2336            &sink,
2337            req,
2338            &generous_policy(),
2339            "test-model",
2340            &tool_policy,
2341            &approval,
2342        )
2343        .await;
2344        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
2345        let resp = match result {
2346            Ok(AgentRunResult::Finished(r)) => r,
2347            _ => return,
2348        };
2349
2350        assert_eq!(resp.tool_transcript.len(), 1);
2351        assert!(resp.tool_transcript[0].is_error);
2352        assert!(
2353            resp.tool_transcript[0].output.to_string().contains("denied"),
2354            "tool error should explain policy denial"
2355        );
2356    }
2357
2358    #[tokio::test]
2359    async fn tc15_native_dispatch_mode_uses_llm_tool_calls() {
2360        struct NativeToolLlm {
2361            responses: Mutex<VecDeque<LlmResponse>>,
2362        }
2363
2364        #[async_trait::async_trait]
2365        impl LlmPort for NativeToolLlm {
2366            fn capabilities(&self) -> bob_core::types::LlmCapabilities {
2367                bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
2368            }
2369
2370            async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
2371                let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
2372                Ok(q.pop_front().unwrap_or(LlmResponse {
2373                    content: r#"{"type":"final","content":"fallback"}"#.to_string(),
2374                    usage: TokenUsage::default(),
2375                    finish_reason: FinishReason::Stop,
2376                    tool_calls: Vec::new(),
2377                }))
2378            }
2379
2380            async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
2381                Err(LlmError::Provider("not used".into()))
2382            }
2383        }
2384
2385        let llm = NativeToolLlm {
2386            responses: Mutex::new(VecDeque::from(vec![
2387                LlmResponse {
2388                    content: "ignored".to_string(),
2389                    usage: TokenUsage::default(),
2390                    finish_reason: FinishReason::Stop,
2391                    tool_calls: vec![
2392                        ToolCall::new("search", serde_json::json!({"q":"rust"}))
2393                            .with_call_id("call-search-1"),
2394                    ],
2395                },
2396                LlmResponse {
2397                    content: r#"{"type":"final","content":"done"}"#.to_string(),
2398                    usage: TokenUsage::default(),
2399                    finish_reason: FinishReason::Stop,
2400                    tool_calls: Vec::new(),
2401                },
2402            ])),
2403        };
2404        let tools = MockToolPort::with_tool_and_results(
2405            "search",
2406            vec![Ok(ToolResult {
2407                name: "search".to_string(),
2408                output: serde_json::json!({"hits": 2}),
2409                is_error: false,
2410            })],
2411        );
2412        let store = MemoryStore::new();
2413        let sink = CollectingSink::new();
2414        let checkpoint = CountingCheckpointPort::new();
2415        let artifacts = NoopArtifactStore;
2416        let cost = CountingCostMeter::new();
2417        let policy = AllowAllPolicyPort;
2418        let approval = AlwaysApprovePort;
2419
2420        let result = run_turn_with_extensions(
2421            &llm,
2422            &tools,
2423            &store,
2424            &sink,
2425            make_request("search rust"),
2426            &generous_policy(),
2427            "test-model",
2428            &policy,
2429            &approval,
2430            crate::DispatchMode::NativePreferred,
2431            &checkpoint,
2432            &artifacts,
2433            &cost,
2434            &crate::prompt::WindowContextCompactor::default(),
2435            &crate::NoOpToolJournalPort,
2436        )
2437        .await;
2438
2439        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2440        let resp = match result {
2441            Ok(AgentRunResult::Finished(r)) => r,
2442            _ => return,
2443        };
2444        assert_eq!(resp.tool_transcript.len(), 1);
2445        assert_eq!(resp.tool_transcript[0].name, "search");
2446
2447        let saved = store.load(&"test-session".to_string()).await;
2448        let saved = match saved {
2449            Ok(Some(state)) => state,
2450            other => panic!("expected saved state, got {other:?}"),
2451        };
2452        assert!(
2453            saved.messages.iter().any(|message| {
2454                message.role == Role::Assistant &&
2455                    message.tool_calls.len() == 1 &&
2456                    message.tool_calls[0].call_id.as_deref() == Some("call-search-1")
2457            }),
2458            "assistant tool call should be persisted structurally",
2459        );
2460        assert!(
2461            saved.messages.iter().any(|message| {
2462                message.role == Role::Tool &&
2463                    message.tool_call_id.as_deref() == Some("call-search-1") &&
2464                    message.tool_name.as_deref() == Some("search")
2465            }),
2466            "tool result should retain tool metadata",
2467        );
2468    }
2469
2470    #[tokio::test]
2471    async fn tc16_checkpoint_and_cost_ports_are_invoked() {
2472        let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "ok"}"#]);
2473        let tools = MockToolPort::empty();
2474        let store = MemoryStore::new();
2475        let sink = CollectingSink::new();
2476        let checkpoint = CountingCheckpointPort::new();
2477        let artifacts = NoopArtifactStore;
2478        let cost = CountingCostMeter::new();
2479        let policy = AllowAllPolicyPort;
2480        let approval = AlwaysApprovePort;
2481
2482        let result = run_turn_with_extensions(
2483            &llm,
2484            &tools,
2485            &store,
2486            &sink,
2487            make_request("hello"),
2488            &generous_policy(),
2489            "test-model",
2490            &policy,
2491            &approval,
2492            crate::DispatchMode::PromptGuided,
2493            &checkpoint,
2494            &artifacts,
2495            &cost,
2496            &crate::prompt::WindowContextCompactor::default(),
2497            &crate::NoOpToolJournalPort,
2498        )
2499        .await;
2500        assert!(result.is_ok(), "turn should succeed");
2501        let checkpoints = checkpoint.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
2502        let llm_calls = *cost.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
2503        assert!(checkpoints >= 1, "checkpoint port should be invoked at least once");
2504        assert!(llm_calls >= 1, "cost meter should record llm usage");
2505    }
2506
2507    #[tokio::test]
2508    async fn tc17_session_usage_accumulates_and_persists() {
2509        let llm_first = SequentialLlm::from_responses(vec![LlmResponse {
2510            content: r#"{"type":"final","content":"first"}"#.to_string(),
2511            usage: TokenUsage { prompt_tokens: 10, completion_tokens: 5 },
2512            finish_reason: FinishReason::Stop,
2513            tool_calls: Vec::new(),
2514        }]);
2515        let llm_second = SequentialLlm::from_responses(vec![LlmResponse {
2516            content: r#"{"type":"final","content":"second"}"#.to_string(),
2517            usage: TokenUsage { prompt_tokens: 3, completion_tokens: 2 },
2518            finish_reason: FinishReason::Stop,
2519            tool_calls: Vec::new(),
2520        }]);
2521        let tools = MockToolPort::empty();
2522        let store = MemoryStore::new();
2523        let sink = CollectingSink::new();
2524
2525        let first = run_turn(
2526            &llm_first,
2527            &tools,
2528            &store,
2529            &sink,
2530            make_request("hello"),
2531            &generous_policy(),
2532            "test-model",
2533        )
2534        .await;
2535        assert!(first.is_ok(), "first run should succeed");
2536
2537        let second = run_turn(
2538            &llm_second,
2539            &tools,
2540            &store,
2541            &sink,
2542            make_request("again"),
2543            &generous_policy(),
2544            "test-model",
2545        )
2546        .await;
2547        assert!(second.is_ok(), "second run should succeed");
2548
2549        let loaded = store.load(&"test-session".to_string()).await;
2550        assert!(loaded.is_ok(), "session should be persisted");
2551        let state = loaded.ok().flatten();
2552        assert!(state.is_some(), "session state should exist");
2553        let state = state.unwrap_or_default();
2554        assert_eq!(state.total_usage.prompt_tokens, 13);
2555        assert_eq!(state.total_usage.completion_tokens, 7);
2556    }
2557
2558    struct FallbackOnlyLlm {
2559        response: LlmResponse,
2560    }
2561
2562    #[async_trait::async_trait]
2563    impl LlmPort for FallbackOnlyLlm {
2564        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
2565            Ok(self.response.clone())
2566        }
2567
2568        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
2569            Err(LlmError::Provider("streaming not available".to_string()))
2570        }
2571    }
2572
2573    #[tokio::test]
2574    async fn tc18_stream_fallback_does_not_emit_error_event() {
2575        let llm: Arc<dyn LlmPort> = Arc::new(FallbackOnlyLlm {
2576            response: LlmResponse {
2577                content: r#"{"type":"final","content":"done"}"#.to_string(),
2578                usage: TokenUsage { prompt_tokens: 2, completion_tokens: 1 },
2579                finish_reason: FinishReason::Stop,
2580                tool_calls: Vec::new(),
2581            },
2582        });
2583        let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
2584        let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
2585        let sink = Arc::new(CollectingSink::new());
2586        let sink_dyn: Arc<dyn EventSink> = sink.clone();
2587
2588        let stream_result = run_turn_stream(
2589            llm,
2590            tools,
2591            store,
2592            sink_dyn,
2593            make_request("hello"),
2594            generous_policy(),
2595            "test-model".to_string(),
2596        )
2597        .await;
2598        assert!(stream_result.is_ok(), "stream run should succeed with fallback");
2599        let mut stream = match stream_result {
2600            Ok(stream) => stream,
2601            Err(_) => return,
2602        };
2603
2604        while let Some(_event) = stream.next().await {}
2605
2606        let events = sink.all_events();
2607        assert!(
2608            !events.iter().any(|event| matches!(event, AgentEvent::Error { .. })),
2609            "fallback should not emit AgentEvent::Error when complete() succeeds"
2610        );
2611    }
2612
2613    struct NativeStreamingBypassLlm {
2614        complete_calls: std::sync::atomic::AtomicUsize,
2615        complete_stream_calls: std::sync::atomic::AtomicUsize,
2616        response: LlmResponse,
2617    }
2618
2619    #[async_trait::async_trait]
2620    impl LlmPort for NativeStreamingBypassLlm {
2621        fn capabilities(&self) -> bob_core::types::LlmCapabilities {
2622            bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
2623        }
2624
2625        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
2626            self.complete_calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2627            Ok(self.response.clone())
2628        }
2629
2630        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
2631            self.complete_stream_calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2632            Err(LlmError::Provider(
2633                "complete_stream() should be bypassed for native tool calling".to_string(),
2634            ))
2635        }
2636    }
2637
2638    #[tokio::test]
2639    async fn tc19_stream_native_tool_calling_bypasses_complete_stream() {
2640        let llm_impl = Arc::new(NativeStreamingBypassLlm {
2641            complete_calls: std::sync::atomic::AtomicUsize::new(0),
2642            complete_stream_calls: std::sync::atomic::AtomicUsize::new(0),
2643            response: LlmResponse {
2644                content: r#"{"type":"final","content":"done"}"#.to_string(),
2645                usage: TokenUsage { prompt_tokens: 2, completion_tokens: 1 },
2646                finish_reason: FinishReason::Stop,
2647                tool_calls: Vec::new(),
2648            },
2649        });
2650        let llm: Arc<dyn LlmPort> = llm_impl.clone();
2651        let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
2652        let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
2653        let sink = Arc::new(CollectingSink::new());
2654        let sink_dyn: Arc<dyn EventSink> = sink.clone();
2655
2656        let stream_result = run_turn_stream(
2657            llm,
2658            tools,
2659            store,
2660            sink_dyn,
2661            make_request("hello"),
2662            generous_policy(),
2663            "test-model".to_string(),
2664        )
2665        .await;
2666        assert!(
2667            stream_result.is_ok(),
2668            "stream run should succeed through complete() for native tool calling"
2669        );
2670        let mut stream = match stream_result {
2671            Ok(stream) => stream,
2672            Err(_) => return,
2673        };
2674
2675        while let Some(_event) = stream.next().await {}
2676
2677        assert_eq!(
2678            llm_impl.complete_calls.load(std::sync::atomic::Ordering::SeqCst),
2679            1,
2680            "native tool-calling stream path should use complete()"
2681        );
2682        assert_eq!(
2683            llm_impl.complete_stream_calls.load(std::sync::atomic::Ordering::SeqCst),
2684            0,
2685            "native tool-calling stream path should bypass complete_stream()"
2686        );
2687        assert!(
2688            !sink.all_events().iter().any(|event| matches!(event, AgentEvent::Error { .. })),
2689            "native-tool fallback should stay on the success path"
2690        );
2691    }
2692
2693    #[tokio::test]
2694    async fn tc20_non_consecutive_duplicate_tool_calls_are_allowed() {
2695        let llm = SequentialLlm::from_contents(vec![
2696            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2697            r#"{"type":"tool_call","name":"tool_b","arguments":{"q":"docs"}}"#,
2698            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2699            r#"{"type":"final","content":"done"}"#,
2700        ]);
2701        let tools = MockToolPort::with_tool_and_results(
2702            "tool_a",
2703            vec![
2704                Ok(ToolResult {
2705                    name: "tool_a".to_string(),
2706                    output: serde_json::json!({"ok": 1}),
2707                    is_error: false,
2708                }),
2709                Ok(ToolResult {
2710                    name: "tool_b".to_string(),
2711                    output: serde_json::json!({"ok": 2}),
2712                    is_error: false,
2713                }),
2714                Ok(ToolResult {
2715                    name: "tool_a".to_string(),
2716                    output: serde_json::json!({"ok": 3}),
2717                    is_error: false,
2718                }),
2719            ],
2720        );
2721        let store = MemoryStore::new();
2722        let sink = CollectingSink::new();
2723
2724        let result = run_turn(
2725            &llm,
2726            &tools,
2727            &store,
2728            &sink,
2729            make_request("repeat searches"),
2730            &generous_policy(),
2731            "test-model",
2732        )
2733        .await;
2734        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2735        let resp = match result {
2736            Ok(AgentRunResult::Finished(resp)) => resp,
2737            _ => return,
2738        };
2739
2740        assert_eq!(resp.tool_transcript.len(), 3);
2741        assert!(resp.tool_transcript.iter().all(|entry| !entry.is_error));
2742    }
2743
2744    #[tokio::test]
2745    async fn tc21_excessive_consecutive_duplicate_tool_calls_are_blocked() {
2746        let llm = SequentialLlm::from_contents(vec![
2747            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2748            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2749            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2750            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2751            r#"{"type":"final","content":"done"}"#,
2752        ]);
2753        let tools = MockToolPort::with_tool_and_results(
2754            "tool_a",
2755            vec![
2756                Ok(ToolResult {
2757                    name: "tool_a".to_string(),
2758                    output: serde_json::json!({"ok": 1}),
2759                    is_error: false,
2760                }),
2761                Ok(ToolResult {
2762                    name: "tool_a".to_string(),
2763                    output: serde_json::json!({"ok": 2}),
2764                    is_error: false,
2765                }),
2766                Ok(ToolResult {
2767                    name: "tool_a".to_string(),
2768                    output: serde_json::json!({"ok": 3}),
2769                    is_error: false,
2770                }),
2771            ],
2772        );
2773        let store = MemoryStore::new();
2774        let sink = CollectingSink::new();
2775
2776        let result = run_turn(
2777            &llm,
2778            &tools,
2779            &store,
2780            &sink,
2781            make_request("poll repeatedly"),
2782            &generous_policy(),
2783            "test-model",
2784        )
2785        .await;
2786        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2787        let resp = match result {
2788            Ok(AgentRunResult::Finished(resp)) => resp,
2789            _ => return,
2790        };
2791
2792        assert_eq!(resp.tool_transcript.len(), 4);
2793        assert!(!resp.tool_transcript[2].is_error);
2794        assert!(resp.tool_transcript[3].is_error);
2795        assert!(
2796            resp.tool_transcript[3]
2797                .output
2798                .to_string()
2799                .contains("consecutive duplicate tool call limit reached"),
2800            "expected duplicate-call protection error in transcript"
2801        );
2802    }
2803
2804    #[tokio::test]
2805    async fn tc22_duplicate_block_path_records_artifact_and_cost() {
2806        let llm = SequentialLlm::from_contents(vec![
2807            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2808            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2809            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2810            r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2811            r#"{"type":"final","content":"done"}"#,
2812        ]);
2813        let tools = MockToolPort::with_tool_and_results(
2814            "tool_a",
2815            vec![
2816                Ok(ToolResult {
2817                    name: "tool_a".to_string(),
2818                    output: serde_json::json!({"ok": 1}),
2819                    is_error: false,
2820                }),
2821                Ok(ToolResult {
2822                    name: "tool_a".to_string(),
2823                    output: serde_json::json!({"ok": 2}),
2824                    is_error: false,
2825                }),
2826                Ok(ToolResult {
2827                    name: "tool_a".to_string(),
2828                    output: serde_json::json!({"ok": 3}),
2829                    is_error: false,
2830                }),
2831            ],
2832        );
2833        let store = MemoryStore::new();
2834        let sink = CollectingSink::new();
2835        let checkpoint = CountingCheckpointPort::new();
2836        let artifacts = CountingArtifactStore::new();
2837        let cost = CountingCostMeter::new();
2838        let policy = AllowAllPolicyPort;
2839        let approval = AlwaysApprovePort;
2840
2841        let result = run_turn_with_extensions(
2842            &llm,
2843            &tools,
2844            &store,
2845            &sink,
2846            make_request("poll repeatedly"),
2847            &generous_policy(),
2848            "test-model",
2849            &policy,
2850            &approval,
2851            crate::DispatchMode::PromptGuided,
2852            &checkpoint,
2853            &artifacts,
2854            &cost,
2855            &crate::prompt::WindowContextCompactor::default(),
2856            &crate::NoOpToolJournalPort,
2857        )
2858        .await;
2859        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2860
2861        let tool_results = *cost.tool_results.lock().unwrap_or_else(|p| p.into_inner());
2862        assert_eq!(tool_results, 4, "cost meter should record all tool outcomes");
2863        let saved_artifacts = artifacts.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
2864        assert_eq!(saved_artifacts, 4, "artifact store should record all tool outcomes");
2865    }
2866}