Skip to main content

bob_runtime/
scheduler.rs

1//! # Scheduler
2//!
3//! Scheduler: loop guard and 6-state turn FSM.
4//!
5//! ## Overview
6//!
7//! The scheduler is the core orchestration component that manages agent turn execution.
8//! It implements a finite state machine (FSM) with the following states:
9//!
10//! 1. **Start**: Initialize turn, load session
11//! 2. **LLM Call**: Send request to LLM
12//! 3. **Parse Action**: Parse LLM response into structured action
13//! 4. **Execute Tool**: Run tool if action is a tool call
14//! 5. **Update State**: Update session state with results
15//! 6. **Loop/Finish**: Either continue or complete the turn
16//!
17//! ## Loop Guard
18//!
19//! The [`LoopGuard`] ensures turn termination by tracking:
20//! - Number of steps (LLM calls)
21//! - Number of tool calls
22//! - Consecutive errors
23//! - Elapsed time
24//!
25//! If any limit is exceeded, the turn is terminated with an appropriate [`GuardReason`].
26//!
27//! ## Example
28//!
29//! ```rust,ignore
30//! use bob_runtime::scheduler::LoopGuard;
31//! use bob_core::types::TurnPolicy;
32//!
33//! let policy = TurnPolicy::default();
34//! let mut guard = LoopGuard::new(policy);
35//!
36//! while guard.can_continue() {
37//!     guard.record_step();
38//!     // Execute step...
39//! }
40//! ```
41
42use std::sync::Arc;
43
44use bob_core::{
45    error::AgentError,
46    normalize_tool_list,
47    ports::{
48        ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
49        ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
50    },
51    types::{
52        AgentAction, AgentEvent, AgentEventStream, AgentRequest, AgentResponse, AgentRunResult,
53        AgentStreamEvent, ApprovalContext, ApprovalDecision, ArtifactRecord, FinishReason,
54        GuardReason, Message, Role, TokenUsage, ToolCall, ToolResult, TurnCheckpoint, TurnPolicy,
55    },
56};
57use futures_util::StreamExt;
58use tokio::time::Instant;
59use tokio_stream::wrappers::UnboundedReceiverStream;
60
61/// Safety net that guarantees turn termination by tracking steps,
62/// tool calls, consecutive errors, and elapsed time against [`TurnPolicy`] limits.
63#[derive(Debug)]
64pub struct LoopGuard {
65    policy: TurnPolicy,
66    steps: u32,
67    tool_calls: u32,
68    consecutive_errors: u32,
69    start: Instant,
70}
71
72impl LoopGuard {
73    /// Create a new guard tied to the given policy.
74    #[must_use]
75    pub fn new(policy: TurnPolicy) -> Self {
76        Self { policy, steps: 0, tool_calls: 0, consecutive_errors: 0, start: Instant::now() }
77    }
78
79    /// Returns `true` if the turn may continue executing.
80    #[must_use]
81    pub fn can_continue(&self) -> bool {
82        self.steps < self.policy.max_steps &&
83            self.tool_calls < self.policy.max_tool_calls &&
84            self.consecutive_errors < self.policy.max_consecutive_errors &&
85            !self.timed_out()
86    }
87
88    /// Record one scheduler step.
89    pub fn record_step(&mut self) {
90        self.steps += 1;
91    }
92
93    /// Record one tool call.
94    pub fn record_tool_call(&mut self) {
95        self.tool_calls += 1;
96    }
97
98    /// Record a consecutive error.
99    pub fn record_error(&mut self) {
100        self.consecutive_errors += 1;
101    }
102
103    /// Reset the consecutive-error counter (e.g. after a successful call).
104    pub fn reset_errors(&mut self) {
105        self.consecutive_errors = 0;
106    }
107
108    /// The reason the guard stopped the turn.
109    ///
110    /// Only meaningful when [`can_continue`](Self::can_continue) returns `false`.
111    #[must_use]
112    pub fn reason(&self) -> GuardReason {
113        if self.steps >= self.policy.max_steps {
114            GuardReason::MaxSteps
115        } else if self.tool_calls >= self.policy.max_tool_calls {
116            GuardReason::MaxToolCalls
117        } else if self.consecutive_errors >= self.policy.max_consecutive_errors {
118            GuardReason::MaxConsecutiveErrors
119        } else if self.timed_out() {
120            GuardReason::TurnTimeout
121        } else {
122            // Fallback — shouldn't be called when `can_continue()` is true.
123            GuardReason::Cancelled
124        }
125    }
126
127    /// Returns `true` if the turn has exceeded its time budget.
128    #[must_use]
129    pub fn timed_out(&self) -> bool {
130        self.start.elapsed().as_millis() >= u128::from(self.policy.turn_timeout_ms)
131    }
132}
133
134// ── Default system instructions (v1) ─────────────────────────────────
135
136const DEFAULT_SYSTEM_INSTRUCTIONS: &str = "\
137You are a helpful AI assistant. \
138Think step by step before answering. \
139When you need external information, use the available tools.";
140
141fn resolve_system_instructions(req: &AgentRequest) -> String {
142    let Some(skills_prompt) = req.context.system_prompt.as_deref() else {
143        return DEFAULT_SYSTEM_INSTRUCTIONS.to_string();
144    };
145
146    if skills_prompt.trim().is_empty() {
147        DEFAULT_SYSTEM_INSTRUCTIONS.to_string()
148    } else {
149        format!("{DEFAULT_SYSTEM_INSTRUCTIONS}\n\n{skills_prompt}")
150    }
151}
152
153fn resolve_selected_skills(req: &AgentRequest) -> Vec<String> {
154    req.context.selected_skills.clone()
155}
156
157#[derive(Debug, Clone, Default)]
158struct ToolCallPolicy {
159    deny_tools: Vec<String>,
160    allow_tools: Option<Vec<String>>,
161}
162
163fn resolve_tool_call_policy(req: &AgentRequest) -> ToolCallPolicy {
164    let deny_tools =
165        normalize_tool_list(req.context.tool_policy.deny_tools.iter().map(String::as_str));
166    let allow_tools = req
167        .context
168        .tool_policy
169        .allow_tools
170        .as_ref()
171        .map(|tools| normalize_tool_list(tools.iter().map(String::as_str)));
172    ToolCallPolicy { deny_tools, allow_tools }
173}
174
175fn prompt_options_for_mode(
176    dispatch_mode: crate::DispatchMode,
177    llm: &dyn LlmPort,
178) -> crate::prompt::PromptBuildOptions {
179    match dispatch_mode {
180        crate::DispatchMode::PromptGuided => crate::prompt::PromptBuildOptions::default(),
181        crate::DispatchMode::NativePreferred => {
182            if llm.capabilities().native_tool_calling {
183                crate::prompt::PromptBuildOptions {
184                    include_action_schema: false,
185                    include_tool_schema: false,
186                }
187            } else {
188                crate::prompt::PromptBuildOptions::default()
189            }
190        }
191    }
192}
193
194fn parse_action_for_mode(
195    dispatch_mode: crate::DispatchMode,
196    llm: &dyn LlmPort,
197    response: &bob_core::types::LlmResponse,
198) -> Result<AgentAction, crate::action::ActionParseError> {
199    match dispatch_mode {
200        crate::DispatchMode::PromptGuided => crate::action::parse_action(&response.content),
201        crate::DispatchMode::NativePreferred => {
202            if llm.capabilities().native_tool_calling &&
203                let Some(tool_call) = response.tool_calls.first()
204            {
205                return Ok(AgentAction::ToolCall {
206                    name: tool_call.name.clone(),
207                    arguments: tool_call.arguments.clone(),
208                });
209            }
210            crate::action::parse_action(&response.content)
211        }
212    }
213}
214
215#[expect(
216    clippy::too_many_arguments,
217    reason = "tool execution needs explicit policy, approval, and timeout dependencies"
218)]
219async fn execute_tool_call(
220    tools: &dyn ToolPort,
221    guard: &mut LoopGuard,
222    tool_call: ToolCall,
223    policy: &ToolCallPolicy,
224    tool_policy_port: &dyn ToolPolicyPort,
225    approval_port: &dyn ApprovalPort,
226    approval_context: &ApprovalContext,
227    timeout_ms: u64,
228) -> ToolResult {
229    if !tool_policy_port.is_tool_allowed(
230        &tool_call.name,
231        &policy.deny_tools,
232        policy.allow_tools.as_deref(),
233    ) {
234        guard.record_error();
235        return ToolResult {
236            name: tool_call.name.clone(),
237            output: serde_json::json!({
238                "error": format!("tool '{}' denied by policy", tool_call.name)
239            }),
240            is_error: true,
241        };
242    }
243
244    match approval_port.approve_tool_call(&tool_call, approval_context).await {
245        Ok(ApprovalDecision::Approved) => {}
246        Ok(ApprovalDecision::Denied { reason }) => {
247            guard.record_error();
248            return ToolResult {
249                name: tool_call.name.clone(),
250                output: serde_json::json!({"error": reason}),
251                is_error: true,
252            };
253        }
254        Err(err) => {
255            guard.record_error();
256            return ToolResult {
257                name: tool_call.name.clone(),
258                output: serde_json::json!({"error": err.to_string()}),
259                is_error: true,
260            };
261        }
262    }
263
264    match tokio::time::timeout(
265        std::time::Duration::from_millis(timeout_ms),
266        tools.call_tool(tool_call.clone()),
267    )
268    .await
269    {
270        Ok(Ok(result)) => {
271            guard.reset_errors();
272            result
273        }
274        Ok(Err(err)) => {
275            guard.record_error();
276            ToolResult {
277                name: tool_call.name,
278                output: serde_json::json!({"error": err.to_string()}),
279                is_error: true,
280            }
281        }
282        Err(_) => {
283            guard.record_error();
284            ToolResult {
285                name: tool_call.name,
286                output: serde_json::json!({"error": "tool call timed out"}),
287                is_error: true,
288            }
289        }
290    }
291}
292
293// ── Turn Loop FSM ────────────────────────────────────────────────────
294
295/// Execute a single agent turn as a 6-state FSM.
296///
297/// States: Start → BuildPrompt → LlmInfer → ParseAction → CallTool → Done.
298/// The loop guard guarantees termination under all conditions.
299pub async fn run_turn(
300    llm: &dyn LlmPort,
301    tools: &dyn ToolPort,
302    store: &dyn SessionStore,
303    events: &dyn EventSink,
304    req: AgentRequest,
305    policy: &TurnPolicy,
306    default_model: &str,
307) -> Result<AgentRunResult, AgentError> {
308    let tool_policy = crate::DefaultToolPolicyPort;
309    let approval = crate::AllowAllApprovalPort;
310    let checkpoint_store = crate::NoOpCheckpointStorePort;
311    let artifact_store = crate::NoOpArtifactStorePort;
312    let cost_meter = crate::NoOpCostMeterPort;
313    run_turn_with_extensions(
314        llm,
315        tools,
316        store,
317        events,
318        req,
319        policy,
320        default_model,
321        &tool_policy,
322        &approval,
323        crate::DispatchMode::NativePreferred,
324        &checkpoint_store,
325        &artifact_store,
326        &cost_meter,
327    )
328    .await
329}
330
331/// Execute a single turn with explicit policy/approval controls.
332#[cfg_attr(
333    not(test),
334    expect(
335        dead_code,
336        reason = "reserved wrapper for partial control injection in external integrations"
337    )
338)]
339#[expect(
340    clippy::too_many_arguments,
341    reason = "wrapper exposes explicit dependency ports for compatibility and testability"
342)]
343pub(crate) async fn run_turn_with_controls(
344    llm: &dyn LlmPort,
345    tools: &dyn ToolPort,
346    store: &dyn SessionStore,
347    events: &dyn EventSink,
348    req: AgentRequest,
349    policy: &TurnPolicy,
350    default_model: &str,
351    tool_policy_port: &dyn ToolPolicyPort,
352    approval_port: &dyn ApprovalPort,
353) -> Result<AgentRunResult, AgentError> {
354    let checkpoint_store = crate::NoOpCheckpointStorePort;
355    let artifact_store = crate::NoOpArtifactStorePort;
356    let cost_meter = crate::NoOpCostMeterPort;
357    run_turn_with_extensions(
358        llm,
359        tools,
360        store,
361        events,
362        req,
363        policy,
364        default_model,
365        tool_policy_port,
366        approval_port,
367        crate::DispatchMode::PromptGuided,
368        &checkpoint_store,
369        &artifact_store,
370        &cost_meter,
371    )
372    .await
373}
374
375/// Execute a single turn with all extensibility controls injected.
376#[expect(
377    clippy::too_many_arguments,
378    reason = "core entrypoint exposes all ports explicitly for adapter injection"
379)]
380pub(crate) async fn run_turn_with_extensions(
381    llm: &dyn LlmPort,
382    tools: &dyn ToolPort,
383    store: &dyn SessionStore,
384    events: &dyn EventSink,
385    req: AgentRequest,
386    policy: &TurnPolicy,
387    default_model: &str,
388    tool_policy_port: &dyn ToolPolicyPort,
389    approval_port: &dyn ApprovalPort,
390    dispatch_mode: crate::DispatchMode,
391    checkpoint_store: &dyn TurnCheckpointStorePort,
392    artifact_store: &dyn ArtifactStorePort,
393    cost_meter: &dyn CostMeterPort,
394) -> Result<AgentRunResult, AgentError> {
395    let model = req.model.as_deref().unwrap_or(default_model);
396    let cancel_token = req.cancel_token.clone();
397    let system_instructions = resolve_system_instructions(&req);
398    let selected_skills = resolve_selected_skills(&req);
399    let tool_call_policy = resolve_tool_call_policy(&req);
400
401    let mut session = store.load(&req.session_id).await?.unwrap_or_default();
402    let tool_descriptors = tools.list_tools().await?;
403    let mut guard = LoopGuard::new(policy.clone());
404
405    // Progressive tool view: compact summaries for inactive tools, full schemas for activated.
406    let mut tool_view = crate::progressive_tools::ProgressiveToolView::new(tool_descriptors);
407
408    events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
409    if !selected_skills.is_empty() {
410        events.emit(AgentEvent::SkillsSelected { skill_names: selected_skills.clone() });
411    }
412
413    session.messages.push(Message { role: Role::User, content: req.input.clone() });
414
415    let mut tool_transcript: Vec<ToolResult> = Vec::new();
416    let mut total_usage = TokenUsage::default();
417    let mut consecutive_parse_failures: u32 = 0;
418    // Tool call dedup: detect infinite loops from repeated identical tool calls.
419    let mut seen_tool_calls: std::collections::HashSet<String> = std::collections::HashSet::new();
420
421    loop {
422        if let Some(ref token) = cancel_token &&
423            token.is_cancelled()
424        {
425            return finish_turn(
426                store,
427                events,
428                &req.session_id,
429                &session,
430                FinishResult {
431                    content: "Turn cancelled.",
432                    tool_transcript,
433                    usage: total_usage,
434                    finish_reason: FinishReason::Cancelled,
435                },
436            )
437            .await;
438        }
439
440        cost_meter.check_budget(&req.session_id).await?;
441
442        if !guard.can_continue() {
443            let reason = guard.reason();
444            let msg = format!("Turn stopped: {reason:?}");
445            return finish_turn(
446                store,
447                events,
448                &req.session_id,
449                &session,
450                FinishResult {
451                    content: &msg,
452                    tool_transcript,
453                    usage: total_usage,
454                    finish_reason: FinishReason::GuardExceeded,
455                },
456            )
457            .await;
458        }
459
460        // Build system instructions with progressive tool summary.
461        let mut augmented_instructions = system_instructions.clone();
462        let tool_summary = tool_view.summary_prompt();
463        if !tool_summary.is_empty() {
464            augmented_instructions.push('\n');
465            augmented_instructions.push('\n');
466            augmented_instructions.push_str(&tool_summary);
467        }
468
469        let active_tools = tool_view.activated_tools();
470        let llm_request = crate::prompt::build_llm_request_with_options(
471            model,
472            &session,
473            &active_tools,
474            &augmented_instructions,
475            prompt_options_for_mode(dispatch_mode, llm),
476        );
477
478        events.emit(AgentEvent::LlmCallStarted { model: model.to_string() });
479
480        let llm_response = if let Some(ref token) = cancel_token {
481            tokio::select! {
482                result = llm.complete(llm_request) => result?,
483                () = token.cancelled() => {
484                    return finish_turn(
485                        store, events, &req.session_id, &session,
486                        FinishResult { content: "Turn cancelled.", tool_transcript, usage: total_usage, finish_reason: FinishReason::Cancelled },
487                    ).await;
488                }
489            }
490        } else {
491            llm.complete(llm_request).await?
492        };
493
494        guard.record_step();
495        total_usage.prompt_tokens += llm_response.usage.prompt_tokens;
496        total_usage.completion_tokens += llm_response.usage.completion_tokens;
497        cost_meter.record_llm_usage(&req.session_id, model, &llm_response.usage).await?;
498
499        events.emit(AgentEvent::LlmCallCompleted { usage: llm_response.usage.clone() });
500
501        // Scan LLM response for tool name hints and activate them.
502        tool_view.activate_hints(&llm_response.content);
503
504        session
505            .messages
506            .push(Message { role: Role::Assistant, content: llm_response.content.clone() });
507
508        let _ = checkpoint_store
509            .save_checkpoint(&TurnCheckpoint {
510                session_id: req.session_id.clone(),
511                step: guard.steps,
512                tool_calls: guard.tool_calls,
513                usage: total_usage.clone(),
514            })
515            .await;
516
517        match parse_action_for_mode(dispatch_mode, llm, &llm_response) {
518            Ok(action) => {
519                consecutive_parse_failures = 0;
520                match action {
521                    AgentAction::Final { content } => {
522                        return finish_turn(
523                            store,
524                            events,
525                            &req.session_id,
526                            &session,
527                            FinishResult {
528                                content: &content,
529                                tool_transcript,
530                                usage: total_usage,
531                                finish_reason: FinishReason::Stop,
532                            },
533                        )
534                        .await;
535                    }
536                    AgentAction::AskUser { question } => {
537                        return finish_turn(
538                            store,
539                            events,
540                            &req.session_id,
541                            &session,
542                            FinishResult {
543                                content: &question,
544                                tool_transcript,
545                                usage: total_usage,
546                                finish_reason: FinishReason::Stop,
547                            },
548                        )
549                        .await;
550                    }
551                    AgentAction::ToolCall { name, arguments } => {
552                        // Activate the tool in progressive view for subsequent requests.
553                        tool_view.activate(&name);
554
555                        // Deduplicate identical tool calls to prevent infinite loops.
556                        let call_signature = format!(
557                            "{}:{}",
558                            name,
559                            serde_json::to_string(&arguments).unwrap_or_default()
560                        );
561                        if !seen_tool_calls.insert(call_signature) {
562                            let dup_result = ToolResult {
563                                name: name.clone(),
564                                output: serde_json::json!({
565                                    "error": "duplicate tool call detected (same name + arguments) — skipping to prevent loop"
566                                }),
567                                is_error: true,
568                            };
569                            guard.record_tool_call();
570                            let output_str =
571                                serde_json::to_string(&dup_result.output).unwrap_or_default();
572                            session
573                                .messages
574                                .push(Message { role: Role::Tool, content: output_str });
575                            events.emit(AgentEvent::ToolCallCompleted { name, is_error: true });
576                            tool_transcript.push(dup_result);
577                            continue;
578                        }
579
580                        events.emit(AgentEvent::ToolCallStarted { name: name.clone() });
581                        let approval_context = ApprovalContext {
582                            session_id: req.session_id.clone(),
583                            turn_step: guard.steps.max(1),
584                            selected_skills: selected_skills.clone(),
585                        };
586
587                        let tool_result = execute_tool_call(
588                            tools,
589                            &mut guard,
590                            ToolCall { name: name.clone(), arguments },
591                            &tool_call_policy,
592                            tool_policy_port,
593                            approval_port,
594                            &approval_context,
595                            policy.tool_timeout_ms,
596                        )
597                        .await;
598
599                        guard.record_tool_call();
600                        let _ = cost_meter.record_tool_result(&req.session_id, &tool_result).await;
601
602                        let is_error = tool_result.is_error;
603                        events.emit(AgentEvent::ToolCallCompleted { name: name.clone(), is_error });
604
605                        let output_str =
606                            serde_json::to_string(&tool_result.output).unwrap_or_default();
607                        session.messages.push(Message { role: Role::Tool, content: output_str });
608
609                        let _ = artifact_store
610                            .put(ArtifactRecord {
611                                session_id: req.session_id.clone(),
612                                kind: "tool_result".to_string(),
613                                name: name.clone(),
614                                content: tool_result.output.clone(),
615                            })
616                            .await;
617
618                        tool_transcript.push(tool_result);
619                    }
620                }
621            }
622            Err(_parse_err) => {
623                consecutive_parse_failures += 1;
624                if consecutive_parse_failures >= 2 {
625                    let _ = store.save(&req.session_id, &session).await;
626                    return Err(AgentError::Internal(
627                        "LLM produced invalid JSON after re-prompt".into(),
628                    ));
629                }
630                session.messages.push(Message {
631                    role: Role::User,
632                    content: "Your response was not valid JSON. \
633                              Please respond with exactly one JSON object \
634                              matching the required schema."
635                        .into(),
636                });
637            }
638        }
639    }
640}
641
642/// Bundled data for building the final response (reduces argument count).
643struct FinishResult<'a> {
644    content: &'a str,
645    tool_transcript: Vec<ToolResult>,
646    usage: TokenUsage,
647    finish_reason: FinishReason,
648}
649
650/// Helper: save session, emit `TurnCompleted`, and build the final response.
651async fn finish_turn(
652    store: &dyn SessionStore,
653    events: &dyn EventSink,
654    session_id: &bob_core::types::SessionId,
655    session: &bob_core::types::SessionState,
656    result: FinishResult<'_>,
657) -> Result<AgentRunResult, AgentError> {
658    store.save(session_id, session).await?;
659    events.emit(AgentEvent::TurnCompleted { finish_reason: result.finish_reason });
660    Ok(AgentRunResult::Finished(AgentResponse {
661        content: result.content.to_string(),
662        tool_transcript: result.tool_transcript,
663        usage: result.usage,
664        finish_reason: result.finish_reason,
665    }))
666}
667
668/// Execute a single turn in streaming mode and return an event stream.
669pub async fn run_turn_stream(
670    llm: Arc<dyn LlmPort>,
671    tools: Arc<dyn ToolPort>,
672    store: Arc<dyn SessionStore>,
673    events: Arc<dyn EventSink>,
674    req: AgentRequest,
675    policy: TurnPolicy,
676    default_model: String,
677) -> Result<AgentEventStream, AgentError> {
678    let tool_policy: Arc<dyn ToolPolicyPort> = Arc::new(crate::DefaultToolPolicyPort);
679    let approval: Arc<dyn ApprovalPort> = Arc::new(crate::AllowAllApprovalPort);
680    let checkpoint_store: Arc<dyn TurnCheckpointStorePort> =
681        Arc::new(crate::NoOpCheckpointStorePort);
682    let artifact_store: Arc<dyn ArtifactStorePort> = Arc::new(crate::NoOpArtifactStorePort);
683    let cost_meter: Arc<dyn CostMeterPort> = Arc::new(crate::NoOpCostMeterPort);
684    run_turn_stream_with_controls(
685        llm,
686        tools,
687        store,
688        events,
689        req,
690        policy,
691        default_model,
692        tool_policy,
693        approval,
694        crate::DispatchMode::NativePreferred,
695        checkpoint_store,
696        artifact_store,
697        cost_meter,
698    )
699    .await
700}
701
702/// Execute a single turn in streaming mode with explicit policy/approval controls.
703#[expect(
704    clippy::too_many_arguments,
705    reason = "streaming entrypoint exposes all ports and controls explicitly for composition roots"
706)]
707pub(crate) async fn run_turn_stream_with_controls(
708    llm: Arc<dyn LlmPort>,
709    tools: Arc<dyn ToolPort>,
710    store: Arc<dyn SessionStore>,
711    events: Arc<dyn EventSink>,
712    req: AgentRequest,
713    policy: TurnPolicy,
714    default_model: String,
715    tool_policy: Arc<dyn ToolPolicyPort>,
716    approval: Arc<dyn ApprovalPort>,
717    dispatch_mode: crate::DispatchMode,
718    checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
719    artifact_store: Arc<dyn ArtifactStorePort>,
720    cost_meter: Arc<dyn CostMeterPort>,
721) -> Result<AgentEventStream, AgentError> {
722    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<AgentStreamEvent>();
723    let config = StreamRunConfig {
724        policy,
725        default_model,
726        tool_policy,
727        approval,
728        dispatch_mode,
729        checkpoint_store,
730        artifact_store,
731        cost_meter,
732    };
733
734    tokio::spawn(async move {
735        if let Err(err) = run_turn_stream_inner(llm, tools, store, events, req, &config, &tx).await
736        {
737            let _ = tx.send(AgentStreamEvent::Error { error: err.to_string() });
738        }
739    });
740
741    Ok(Box::pin(UnboundedReceiverStream::new(rx)))
742}
743
744struct StreamRunConfig {
745    policy: TurnPolicy,
746    default_model: String,
747    tool_policy: Arc<dyn ToolPolicyPort>,
748    approval: Arc<dyn ApprovalPort>,
749    dispatch_mode: crate::DispatchMode,
750    checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
751    artifact_store: Arc<dyn ArtifactStorePort>,
752    cost_meter: Arc<dyn CostMeterPort>,
753}
754
755async fn run_turn_stream_inner(
756    llm: Arc<dyn LlmPort>,
757    tools: Arc<dyn ToolPort>,
758    store: Arc<dyn SessionStore>,
759    events: Arc<dyn EventSink>,
760    req: AgentRequest,
761    config: &StreamRunConfig,
762    tx: &tokio::sync::mpsc::UnboundedSender<AgentStreamEvent>,
763) -> Result<(), AgentError> {
764    let model = req.model.as_deref().unwrap_or(&config.default_model);
765    let cancel_token = req.cancel_token.clone();
766    let system_instructions = resolve_system_instructions(&req);
767    let selected_skills = resolve_selected_skills(&req);
768    let tool_call_policy = resolve_tool_call_policy(&req);
769
770    let mut session = store.load(&req.session_id).await?.unwrap_or_default();
771    let tool_descriptors = tools.list_tools().await?;
772    let mut guard = LoopGuard::new(config.policy.clone());
773    let mut total_usage = TokenUsage::default();
774    let mut consecutive_parse_failures: u32 = 0;
775    let mut next_call_id: u64 = 0;
776
777    events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
778    if !selected_skills.is_empty() {
779        events.emit(AgentEvent::SkillsSelected { skill_names: selected_skills.clone() });
780    }
781    session.messages.push(Message { role: Role::User, content: req.input.clone() });
782
783    loop {
784        if let Some(ref token) = cancel_token &&
785            token.is_cancelled()
786        {
787            events.emit(AgentEvent::Error { error: "turn cancelled".to_string() });
788            events.emit(AgentEvent::TurnCompleted { finish_reason: FinishReason::Cancelled });
789            store.save(&req.session_id, &session).await?;
790            let _ = tx.send(AgentStreamEvent::Error { error: "turn cancelled".to_string() });
791            let _ = tx.send(AgentStreamEvent::Finished { usage: total_usage.clone() });
792            return Ok(());
793        }
794
795        config.cost_meter.check_budget(&req.session_id).await?;
796
797        if !guard.can_continue() {
798            let reason = guard.reason();
799            let msg = format!("Turn stopped: {reason:?}");
800            events.emit(AgentEvent::Error { error: msg.clone() });
801            events.emit(AgentEvent::TurnCompleted { finish_reason: FinishReason::GuardExceeded });
802            store.save(&req.session_id, &session).await?;
803            let _ = tx.send(AgentStreamEvent::Error { error: msg });
804            let _ = tx.send(AgentStreamEvent::Finished { usage: total_usage.clone() });
805            return Ok(());
806        }
807
808        let llm_request = crate::prompt::build_llm_request_with_options(
809            model,
810            &session,
811            &tool_descriptors,
812            &system_instructions,
813            prompt_options_for_mode(config.dispatch_mode, llm.as_ref()),
814        );
815        events.emit(AgentEvent::LlmCallStarted { model: model.to_string() });
816
817        let mut assistant_content = String::new();
818        let mut llm_usage = TokenUsage::default();
819        let mut llm_tool_calls: Vec<ToolCall> = Vec::new();
820        let mut fallback_to_complete = false;
821
822        match llm.complete_stream(llm_request.clone()).await {
823            Ok(mut stream) => {
824                while let Some(item) = stream.next().await {
825                    match item {
826                        Ok(bob_core::types::LlmStreamChunk::TextDelta(delta)) => {
827                            assistant_content.push_str(&delta);
828                            let _ = tx.send(AgentStreamEvent::TextDelta { content: delta });
829                        }
830                        Ok(bob_core::types::LlmStreamChunk::Done { usage }) => {
831                            llm_usage = usage;
832                        }
833                        Err(err) => {
834                            events.emit(AgentEvent::Error { error: err.to_string() });
835                            return Err(AgentError::Llm(err));
836                        }
837                    }
838                }
839            }
840            Err(err) => {
841                fallback_to_complete = true;
842                events.emit(AgentEvent::Error { error: err.to_string() });
843            }
844        }
845
846        // Provider may not support streaming — fall back to non-streaming complete.
847        if fallback_to_complete {
848            let llm_response = llm.complete(llm_request).await?;
849            assistant_content = llm_response.content.clone();
850            llm_usage = llm_response.usage;
851            llm_tool_calls = llm_response.tool_calls;
852            let _ = tx.send(AgentStreamEvent::TextDelta { content: llm_response.content });
853        }
854
855        guard.record_step();
856        total_usage.prompt_tokens += llm_usage.prompt_tokens;
857        total_usage.completion_tokens += llm_usage.completion_tokens;
858        config.cost_meter.record_llm_usage(&req.session_id, model, &llm_usage).await?;
859        events.emit(AgentEvent::LlmCallCompleted { usage: llm_usage.clone() });
860        session
861            .messages
862            .push(Message { role: Role::Assistant, content: assistant_content.clone() });
863
864        let _ = config
865            .checkpoint_store
866            .save_checkpoint(&TurnCheckpoint {
867                session_id: req.session_id.clone(),
868                step: guard.steps,
869                tool_calls: guard.tool_calls,
870                usage: total_usage.clone(),
871            })
872            .await;
873
874        let response_for_dispatch = bob_core::types::LlmResponse {
875            content: assistant_content.clone(),
876            usage: llm_usage.clone(),
877            finish_reason: FinishReason::Stop,
878            tool_calls: llm_tool_calls,
879        };
880
881        if let Ok(action) =
882            parse_action_for_mode(config.dispatch_mode, llm.as_ref(), &response_for_dispatch)
883        {
884            consecutive_parse_failures = 0;
885            match action {
886                AgentAction::Final { .. } | AgentAction::AskUser { .. } => {
887                    store.save(&req.session_id, &session).await?;
888                    events.emit(AgentEvent::TurnCompleted { finish_reason: FinishReason::Stop });
889                    let _ = tx.send(AgentStreamEvent::Finished { usage: total_usage.clone() });
890                    return Ok(());
891                }
892                AgentAction::ToolCall { name, arguments } => {
893                    events.emit(AgentEvent::ToolCallStarted { name: name.clone() });
894                    next_call_id += 1;
895                    let call_id = format!("call-{next_call_id}");
896                    let _ = tx.send(AgentStreamEvent::ToolCallStarted {
897                        name: name.clone(),
898                        call_id: call_id.clone(),
899                    });
900                    let approval_context = ApprovalContext {
901                        session_id: req.session_id.clone(),
902                        turn_step: guard.steps.max(1),
903                        selected_skills: selected_skills.clone(),
904                    };
905
906                    let tool_result = execute_tool_call(
907                        tools.as_ref(),
908                        &mut guard,
909                        ToolCall { name: name.clone(), arguments },
910                        &tool_call_policy,
911                        config.tool_policy.as_ref(),
912                        config.approval.as_ref(),
913                        &approval_context,
914                        config.policy.tool_timeout_ms,
915                    )
916                    .await;
917
918                    guard.record_tool_call();
919                    let _ =
920                        config.cost_meter.record_tool_result(&req.session_id, &tool_result).await;
921                    let is_error = tool_result.is_error;
922                    events.emit(AgentEvent::ToolCallCompleted { name: name.clone(), is_error });
923                    let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
924                        call_id,
925                        result: tool_result.clone(),
926                    });
927
928                    let output_str = serde_json::to_string(&tool_result.output).unwrap_or_default();
929                    session.messages.push(Message { role: Role::Tool, content: output_str });
930                    let _ = config
931                        .artifact_store
932                        .put(ArtifactRecord {
933                            session_id: req.session_id.clone(),
934                            kind: "tool_result".to_string(),
935                            name: name.clone(),
936                            content: tool_result.output.clone(),
937                        })
938                        .await;
939                }
940            }
941        } else {
942            consecutive_parse_failures += 1;
943            if consecutive_parse_failures >= 2 {
944                store.save(&req.session_id, &session).await?;
945                events.emit(AgentEvent::Error {
946                    error: "LLM produced invalid JSON after re-prompt".to_string(),
947                });
948                return Err(AgentError::Internal(
949                    "LLM produced invalid JSON after re-prompt".into(),
950                ));
951            }
952            session.messages.push(Message {
953                role: Role::User,
954                content: "Your response was not valid JSON. \
955                          Please respond with exactly one JSON object \
956                          matching the required schema."
957                    .into(),
958            });
959        }
960    }
961}
962
963#[cfg(test)]
964mod tests {
965    use super::*;
966
967    /// Small policy with tight limits for fast, deterministic tests.
968    fn test_policy() -> TurnPolicy {
969        TurnPolicy {
970            max_steps: 3,
971            max_tool_calls: 2,
972            max_consecutive_errors: 2,
973            turn_timeout_ms: 100,
974            tool_timeout_ms: 50,
975        }
976    }
977
978    #[test]
979    fn trips_on_max_steps() {
980        let mut guard = LoopGuard::new(test_policy());
981        assert!(guard.can_continue());
982
983        for _ in 0..3 {
984            guard.record_step();
985        }
986
987        assert!(!guard.can_continue(), "guard should trip after reaching max_steps");
988        assert_eq!(guard.reason(), GuardReason::MaxSteps);
989    }
990
991    #[test]
992    fn trips_on_max_tool_calls() {
993        let mut guard = LoopGuard::new(test_policy());
994        assert!(guard.can_continue());
995
996        for _ in 0..2 {
997            guard.record_tool_call();
998        }
999
1000        assert!(!guard.can_continue(), "guard should trip after reaching max_tool_calls");
1001        assert_eq!(guard.reason(), GuardReason::MaxToolCalls);
1002    }
1003
1004    #[test]
1005    fn trips_on_max_consecutive_errors() {
1006        let mut guard = LoopGuard::new(test_policy());
1007        assert!(guard.can_continue());
1008
1009        for _ in 0..2 {
1010            guard.record_error();
1011        }
1012
1013        assert!(!guard.can_continue(), "guard should trip after reaching max_consecutive_errors");
1014        assert_eq!(guard.reason(), GuardReason::MaxConsecutiveErrors);
1015    }
1016
1017    #[tokio::test]
1018    async fn trips_on_timeout() {
1019        let guard = LoopGuard::new(test_policy());
1020        assert!(guard.can_continue());
1021        assert!(!guard.timed_out());
1022
1023        // Sleep past the 100 ms timeout.
1024        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
1025
1026        assert!(!guard.can_continue(), "guard should trip after timeout");
1027        assert!(guard.timed_out());
1028        assert_eq!(guard.reason(), GuardReason::TurnTimeout);
1029    }
1030
1031    #[test]
1032    fn reset_errors_clears_counter() {
1033        let mut guard = LoopGuard::new(test_policy());
1034
1035        guard.record_error();
1036        guard.reset_errors();
1037
1038        // After reset, a single error should NOT trip the guard.
1039        guard.record_error();
1040        assert!(guard.can_continue(), "single error after reset should not trip guard");
1041    }
1042
1043    // ── run_turn FSM tests ───────────────────────────────────────
1044
1045    use std::{
1046        collections::{HashMap, VecDeque},
1047        sync::{Arc, Mutex},
1048    };
1049
1050    use bob_core::{
1051        error::{CostError, LlmError, StoreError, ToolError},
1052        ports::{
1053            ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
1054            ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
1055        },
1056        types::{
1057            AgentEvent, AgentRequest, AgentRunResult, AgentStreamEvent, ApprovalContext,
1058            ApprovalDecision, ArtifactRecord, CancelToken, LlmRequest, LlmResponse, LlmStream,
1059            LlmStreamChunk, SessionId, SessionState, ToolCall, ToolDescriptor, ToolResult,
1060            ToolSource, TurnCheckpoint,
1061        },
1062    };
1063    use futures_util::StreamExt;
1064
1065    // ── Mock ports ───────────────────────────────────────────────
1066
1067    /// LLM mock that returns queued responses in order.
1068    struct SequentialLlm {
1069        responses: Mutex<VecDeque<Result<LlmResponse, LlmError>>>,
1070    }
1071
1072    impl SequentialLlm {
1073        fn from_contents(contents: Vec<&str>) -> Self {
1074            let responses = contents
1075                .into_iter()
1076                .map(|c| {
1077                    Ok(LlmResponse {
1078                        content: c.to_string(),
1079                        usage: TokenUsage::default(),
1080                        finish_reason: FinishReason::Stop,
1081                        tool_calls: Vec::new(),
1082                    })
1083                })
1084                .collect();
1085            Self { responses: Mutex::new(responses) }
1086        }
1087    }
1088
1089    #[async_trait::async_trait]
1090    impl LlmPort for SequentialLlm {
1091        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1092            let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
1093            q.pop_front().unwrap_or_else(|| {
1094                Ok(LlmResponse {
1095                    content: r#"{"type": "final", "content": "fallback"}"#.to_string(),
1096                    usage: TokenUsage::default(),
1097                    finish_reason: FinishReason::Stop,
1098                    tool_calls: Vec::new(),
1099                })
1100            })
1101        }
1102
1103        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1104            Err(LlmError::Provider("not implemented".into()))
1105        }
1106    }
1107
1108    /// Tool port mock with configurable tools and call results.
1109    struct MockToolPort {
1110        tools: Vec<ToolDescriptor>,
1111        call_results: Mutex<VecDeque<Result<ToolResult, ToolError>>>,
1112    }
1113
1114    impl MockToolPort {
1115        fn empty() -> Self {
1116            Self { tools: vec![], call_results: Mutex::new(VecDeque::new()) }
1117        }
1118
1119        fn with_tool_and_results(
1120            tool_name: &str,
1121            results: Vec<Result<ToolResult, ToolError>>,
1122        ) -> Self {
1123            Self {
1124                tools: vec![ToolDescriptor {
1125                    id: tool_name.to_string(),
1126                    description: format!("{tool_name} tool"),
1127                    input_schema: serde_json::json!({"type": "object"}),
1128                    source: ToolSource::Local,
1129                }],
1130                call_results: Mutex::new(results.into()),
1131            }
1132        }
1133    }
1134
1135    #[async_trait::async_trait]
1136    impl ToolPort for MockToolPort {
1137        async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1138            Ok(self.tools.clone())
1139        }
1140
1141        async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
1142            let mut q = self.call_results.lock().unwrap_or_else(|p| p.into_inner());
1143            q.pop_front().unwrap_or_else(|| {
1144                Ok(ToolResult {
1145                    name: call.name,
1146                    output: serde_json::json!({"result": "default"}),
1147                    is_error: false,
1148                })
1149            })
1150        }
1151    }
1152
1153    struct NoCallToolPort {
1154        tools: Vec<ToolDescriptor>,
1155    }
1156
1157    #[async_trait::async_trait]
1158    impl ToolPort for NoCallToolPort {
1159        async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1160            Ok(self.tools.clone())
1161        }
1162
1163        async fn call_tool(&self, _call: ToolCall) -> Result<ToolResult, ToolError> {
1164            Err(ToolError::Execution(
1165                "tool call should be blocked by policy before execution".to_string(),
1166            ))
1167        }
1168    }
1169
1170    struct AllowAllPolicyPort;
1171
1172    impl ToolPolicyPort for AllowAllPolicyPort {
1173        fn is_tool_allowed(
1174            &self,
1175            _tool: &str,
1176            _deny_tools: &[String],
1177            _allow_tools: Option<&[String]>,
1178        ) -> bool {
1179            true
1180        }
1181    }
1182
1183    struct DenySearchPolicyPort;
1184
1185    impl ToolPolicyPort for DenySearchPolicyPort {
1186        fn is_tool_allowed(
1187            &self,
1188            tool: &str,
1189            _deny_tools: &[String],
1190            _allow_tools: Option<&[String]>,
1191        ) -> bool {
1192            tool != "search"
1193        }
1194    }
1195
1196    struct AlwaysApprovePort;
1197
1198    #[async_trait::async_trait]
1199    impl ApprovalPort for AlwaysApprovePort {
1200        async fn approve_tool_call(
1201            &self,
1202            _call: &ToolCall,
1203            _context: &ApprovalContext,
1204        ) -> Result<ApprovalDecision, ToolError> {
1205            Ok(ApprovalDecision::Approved)
1206        }
1207    }
1208
1209    struct AlwaysDenyApprovalPort;
1210
1211    #[async_trait::async_trait]
1212    impl ApprovalPort for AlwaysDenyApprovalPort {
1213        async fn approve_tool_call(
1214            &self,
1215            _call: &ToolCall,
1216            _context: &ApprovalContext,
1217        ) -> Result<ApprovalDecision, ToolError> {
1218            Ok(ApprovalDecision::Denied {
1219                reason: "approval policy rejected tool call".to_string(),
1220            })
1221        }
1222    }
1223
1224    struct CountingCheckpointPort {
1225        saved: Mutex<Vec<TurnCheckpoint>>,
1226    }
1227
1228    impl CountingCheckpointPort {
1229        fn new() -> Self {
1230            Self { saved: Mutex::new(Vec::new()) }
1231        }
1232    }
1233
1234    #[async_trait::async_trait]
1235    impl TurnCheckpointStorePort for CountingCheckpointPort {
1236        async fn save_checkpoint(&self, checkpoint: &TurnCheckpoint) -> Result<(), StoreError> {
1237            self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(checkpoint.clone());
1238            Ok(())
1239        }
1240
1241        async fn load_latest(
1242            &self,
1243            _session_id: &SessionId,
1244        ) -> Result<Option<TurnCheckpoint>, StoreError> {
1245            Ok(None)
1246        }
1247    }
1248
1249    struct NoopArtifactStore;
1250
1251    #[async_trait::async_trait]
1252    impl ArtifactStorePort for NoopArtifactStore {
1253        async fn put(&self, _artifact: ArtifactRecord) -> Result<(), StoreError> {
1254            Ok(())
1255        }
1256
1257        async fn list_by_session(
1258            &self,
1259            _session_id: &SessionId,
1260        ) -> Result<Vec<ArtifactRecord>, StoreError> {
1261            Ok(Vec::new())
1262        }
1263    }
1264
1265    struct CountingCostMeter {
1266        llm_calls: Mutex<u32>,
1267    }
1268
1269    impl CountingCostMeter {
1270        fn new() -> Self {
1271            Self { llm_calls: Mutex::new(0) }
1272        }
1273    }
1274
1275    #[async_trait::async_trait]
1276    impl CostMeterPort for CountingCostMeter {
1277        async fn check_budget(&self, _session_id: &SessionId) -> Result<(), CostError> {
1278            Ok(())
1279        }
1280
1281        async fn record_llm_usage(
1282            &self,
1283            _session_id: &SessionId,
1284            _model: &str,
1285            _usage: &TokenUsage,
1286        ) -> Result<(), CostError> {
1287            let mut count = self.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
1288            *count += 1;
1289            Ok(())
1290        }
1291
1292        async fn record_tool_result(
1293            &self,
1294            _session_id: &SessionId,
1295            _tool_result: &ToolResult,
1296        ) -> Result<(), CostError> {
1297            Ok(())
1298        }
1299    }
1300
1301    struct MemoryStore {
1302        data: Mutex<HashMap<SessionId, SessionState>>,
1303    }
1304
1305    impl MemoryStore {
1306        fn new() -> Self {
1307            Self { data: Mutex::new(HashMap::new()) }
1308        }
1309    }
1310
1311    struct FailingSaveStore;
1312
1313    #[async_trait::async_trait]
1314    impl SessionStore for FailingSaveStore {
1315        async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1316            Ok(None)
1317        }
1318
1319        async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
1320            Err(StoreError::Backend("simulated save failure".into()))
1321        }
1322    }
1323
1324    #[async_trait::async_trait]
1325    impl SessionStore for MemoryStore {
1326        async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1327            let map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1328            Ok(map.get(id).cloned())
1329        }
1330
1331        async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
1332            let mut map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1333            map.insert(id.clone(), state.clone());
1334            Ok(())
1335        }
1336    }
1337
1338    struct CollectingSink {
1339        events: Mutex<Vec<AgentEvent>>,
1340    }
1341
1342    impl CollectingSink {
1343        fn new() -> Self {
1344            Self { events: Mutex::new(Vec::new()) }
1345        }
1346
1347        fn event_count(&self) -> usize {
1348            self.events.lock().unwrap_or_else(|p| p.into_inner()).len()
1349        }
1350
1351        fn all_events(&self) -> Vec<AgentEvent> {
1352            self.events.lock().unwrap_or_else(|p| p.into_inner()).clone()
1353        }
1354    }
1355
1356    impl EventSink for CollectingSink {
1357        fn emit(&self, event: AgentEvent) {
1358            self.events.lock().unwrap_or_else(|p| p.into_inner()).push(event);
1359        }
1360    }
1361
1362    fn make_request(input: &str) -> AgentRequest {
1363        AgentRequest {
1364            input: input.into(),
1365            session_id: "test-session".into(),
1366            model: None,
1367            context: bob_core::types::RequestContext::default(),
1368            cancel_token: None,
1369        }
1370    }
1371
1372    fn generous_policy() -> TurnPolicy {
1373        TurnPolicy {
1374            max_steps: 20,
1375            max_tool_calls: 10,
1376            max_consecutive_errors: 3,
1377            turn_timeout_ms: 30_000,
1378            tool_timeout_ms: 5_000,
1379        }
1380    }
1381
1382    struct StreamLlm {
1383        chunks: Mutex<VecDeque<Result<LlmStreamChunk, LlmError>>>,
1384    }
1385
1386    impl StreamLlm {
1387        fn new(chunks: Vec<Result<LlmStreamChunk, LlmError>>) -> Self {
1388            Self { chunks: Mutex::new(chunks.into()) }
1389        }
1390    }
1391
1392    #[async_trait::async_trait]
1393    impl LlmPort for StreamLlm {
1394        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1395            Err(LlmError::Provider("complete() should not be called in stream test".into()))
1396        }
1397
1398        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1399            let mut chunks = self.chunks.lock().unwrap_or_else(|p| p.into_inner());
1400            let items: Vec<Result<LlmStreamChunk, LlmError>> = chunks.drain(..).collect();
1401            Ok(Box::pin(futures_util::stream::iter(items)))
1402        }
1403    }
1404
1405    struct InspectingLlm {
1406        expected_substring: String,
1407    }
1408
1409    #[async_trait::async_trait]
1410    impl LlmPort for InspectingLlm {
1411        async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
1412            let system = req
1413                .messages
1414                .iter()
1415                .find(|m| m.role == Role::System)
1416                .map(|m| m.content.clone())
1417                .unwrap_or_default();
1418            if !system.contains(&self.expected_substring) {
1419                return Err(LlmError::Provider(format!(
1420                    "expected system prompt to include '{}', got: {}",
1421                    self.expected_substring, system
1422                )));
1423            }
1424            Ok(LlmResponse {
1425                content: r#"{"type": "final", "content": "ok"}"#.to_string(),
1426                usage: TokenUsage::default(),
1427                finish_reason: FinishReason::Stop,
1428                tool_calls: Vec::new(),
1429            })
1430        }
1431
1432        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1433            Err(LlmError::Provider("not used".into()))
1434        }
1435    }
1436
1437    // ── TC-01: Simple Final response ─────────────────────────────
1438
1439    #[tokio::test]
1440    async fn tc01_simple_final_response() {
1441        let llm =
1442            SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "Hello there!"}"#]);
1443        let tools = MockToolPort::empty();
1444        let store = MemoryStore::new();
1445        let sink = CollectingSink::new();
1446
1447        let result = run_turn(
1448            &llm,
1449            &tools,
1450            &store,
1451            &sink,
1452            make_request("Hi"),
1453            &generous_policy(),
1454            "test-model",
1455        )
1456        .await;
1457
1458        assert!(
1459            matches!(&result, Ok(AgentRunResult::Finished(_))),
1460            "expected Finished, got {result:?}"
1461        );
1462        let resp = match result {
1463            Ok(AgentRunResult::Finished(r)) => r,
1464            _ => return,
1465        };
1466
1467        assert_eq!(resp.content, "Hello there!");
1468        assert_eq!(resp.finish_reason, FinishReason::Stop);
1469        assert!(resp.tool_transcript.is_empty());
1470        assert!(sink.event_count() >= 3, "should emit TurnStarted, LlmCall*, TurnCompleted");
1471    }
1472
1473    // ── TC-02: ToolCall → Final chain ────────────────────────────
1474
1475    #[tokio::test]
1476    async fn tc02_tool_call_then_final() {
1477        let llm = SequentialLlm::from_contents(vec![
1478            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1479            r#"{"type": "final", "content": "Found results."}"#,
1480        ]);
1481        let tools = MockToolPort::with_tool_and_results(
1482            "search",
1483            vec![Ok(ToolResult {
1484                name: "search".into(),
1485                output: serde_json::json!({"hits": 42}),
1486                is_error: false,
1487            })],
1488        );
1489        let store = MemoryStore::new();
1490        let sink = CollectingSink::new();
1491
1492        let result = run_turn(
1493            &llm,
1494            &tools,
1495            &store,
1496            &sink,
1497            make_request("Search for rust"),
1498            &generous_policy(),
1499            "test-model",
1500        )
1501        .await;
1502
1503        assert!(
1504            matches!(&result, Ok(AgentRunResult::Finished(_))),
1505            "expected Finished, got {result:?}"
1506        );
1507        let resp = match result {
1508            Ok(AgentRunResult::Finished(r)) => r,
1509            _ => return,
1510        };
1511
1512        assert_eq!(resp.content, "Found results.");
1513        assert_eq!(resp.finish_reason, FinishReason::Stop);
1514        assert_eq!(resp.tool_transcript.len(), 1);
1515        assert_eq!(resp.tool_transcript[0].name, "search");
1516        assert!(!resp.tool_transcript[0].is_error);
1517    }
1518
1519    // ── TC-03: Parse error → re-prompt → success ────────────────
1520
1521    #[tokio::test]
1522    async fn tc03_parse_error_reprompt_success() {
1523        let llm = SequentialLlm::from_contents(vec![
1524            "This is not JSON at all.",
1525            r#"{"type": "final", "content": "Recovered"}"#,
1526        ]);
1527        let tools = MockToolPort::empty();
1528        let store = MemoryStore::new();
1529        let sink = CollectingSink::new();
1530
1531        let result = run_turn(
1532            &llm,
1533            &tools,
1534            &store,
1535            &sink,
1536            make_request("Hi"),
1537            &generous_policy(),
1538            "test-model",
1539        )
1540        .await;
1541
1542        assert!(
1543            matches!(&result, Ok(AgentRunResult::Finished(_))),
1544            "expected Finished after re-prompt, got {result:?}"
1545        );
1546        let resp = match result {
1547            Ok(AgentRunResult::Finished(r)) => r,
1548            _ => return,
1549        };
1550
1551        assert_eq!(resp.content, "Recovered");
1552        assert_eq!(resp.finish_reason, FinishReason::Stop);
1553    }
1554
1555    // ── TC-04: Double parse error → AgentError ──────────────────
1556
1557    #[tokio::test]
1558    async fn tc04_double_parse_error() {
1559        let llm = SequentialLlm::from_contents(vec!["not json 1", "not json 2"]);
1560        let tools = MockToolPort::empty();
1561        let store = MemoryStore::new();
1562        let sink = CollectingSink::new();
1563
1564        let result = run_turn(
1565            &llm,
1566            &tools,
1567            &store,
1568            &sink,
1569            make_request("Hi"),
1570            &generous_policy(),
1571            "test-model",
1572        )
1573        .await;
1574
1575        assert!(result.is_err(), "should return error after two parse failures");
1576        let msg = match result {
1577            Err(err) => err.to_string(),
1578            Ok(value) => format!("unexpected success: {value:?}"),
1579        };
1580        assert!(msg.contains("invalid JSON"), "error message = {msg}");
1581    }
1582
1583    // ── TC-05: max_steps exhaustion → GuardExceeded ─────────────
1584
1585    #[tokio::test]
1586    async fn tc05_max_steps_exhaustion() {
1587        // LLM always returns tool calls — the guard should stop after max_steps.
1588        let llm = SequentialLlm::from_contents(vec![
1589            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1590            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1591            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1592            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1593        ]);
1594        let tools = MockToolPort::with_tool_and_results(
1595            "t1",
1596            vec![
1597                Ok(ToolResult {
1598                    name: "t1".into(),
1599                    output: serde_json::json!(null),
1600                    is_error: false,
1601                }),
1602                Ok(ToolResult {
1603                    name: "t1".into(),
1604                    output: serde_json::json!(null),
1605                    is_error: false,
1606                }),
1607                Ok(ToolResult {
1608                    name: "t1".into(),
1609                    output: serde_json::json!(null),
1610                    is_error: false,
1611                }),
1612            ],
1613        );
1614        let store = MemoryStore::new();
1615        let sink = CollectingSink::new();
1616
1617        let policy = TurnPolicy {
1618            max_steps: 2,
1619            max_tool_calls: 10,
1620            max_consecutive_errors: 5,
1621            turn_timeout_ms: 30_000,
1622            tool_timeout_ms: 5_000,
1623        };
1624
1625        let result =
1626            run_turn(&llm, &tools, &store, &sink, make_request("do work"), &policy, "test-model")
1627                .await;
1628
1629        assert!(
1630            matches!(&result, Ok(AgentRunResult::Finished(_))),
1631            "expected Finished with GuardExceeded, got {result:?}"
1632        );
1633        let resp = match result {
1634            Ok(AgentRunResult::Finished(r)) => r,
1635            _ => return,
1636        };
1637
1638        assert_eq!(resp.finish_reason, FinishReason::GuardExceeded);
1639        assert!(resp.content.contains("MaxSteps"), "content = {}", resp.content);
1640    }
1641
1642    // ── TC-06: Cancellation mid-turn → Cancelled ────────────────
1643
1644    #[tokio::test]
1645    async fn tc06_cancellation() {
1646        let llm = SequentialLlm::from_contents(vec![
1647            r#"{"type": "final", "content": "should not reach"}"#,
1648        ]);
1649        let tools = MockToolPort::empty();
1650        let store = MemoryStore::new();
1651        let sink = CollectingSink::new();
1652
1653        let token = CancelToken::new();
1654        // Cancel before running.
1655        token.cancel();
1656
1657        let mut req = make_request("Hi");
1658        req.cancel_token = Some(token);
1659
1660        let result =
1661            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1662
1663        assert!(
1664            matches!(&result, Ok(AgentRunResult::Finished(_))),
1665            "expected Finished with Cancelled, got {result:?}"
1666        );
1667        let resp = match result {
1668            Ok(AgentRunResult::Finished(r)) => r,
1669            _ => return,
1670        };
1671
1672        assert_eq!(resp.finish_reason, FinishReason::Cancelled);
1673    }
1674
1675    // ── TC-07: Tool error → is_error result → LLM sees error → Final ───
1676
1677    #[tokio::test]
1678    async fn tc07_tool_error_then_final() {
1679        let llm = SequentialLlm::from_contents(vec![
1680            r#"{"type": "tool_call", "name": "flaky_tool", "arguments": {}}"#,
1681            r#"{"type": "final", "content": "Recovered from tool error."}"#,
1682        ]);
1683        let tools = MockToolPort::with_tool_and_results(
1684            "flaky_tool",
1685            vec![Err(ToolError::Execution("connection refused".into()))],
1686        );
1687        let store = MemoryStore::new();
1688        let sink = CollectingSink::new();
1689
1690        let result = run_turn(
1691            &llm,
1692            &tools,
1693            &store,
1694            &sink,
1695            make_request("call flaky"),
1696            &generous_policy(),
1697            "test-model",
1698        )
1699        .await;
1700
1701        assert!(
1702            matches!(&result, Ok(AgentRunResult::Finished(_))),
1703            "expected Finished, got {result:?}"
1704        );
1705        let resp = match result {
1706            Ok(AgentRunResult::Finished(r)) => r,
1707            _ => return,
1708        };
1709
1710        assert_eq!(resp.content, "Recovered from tool error.");
1711        assert_eq!(resp.tool_transcript.len(), 1);
1712        assert!(resp.tool_transcript[0].is_error);
1713    }
1714
1715    #[tokio::test]
1716    async fn tc08_save_failure_is_propagated() {
1717        let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "done"}"#]);
1718        let tools = MockToolPort::empty();
1719        let store = FailingSaveStore;
1720        let sink = CollectingSink::new();
1721
1722        let result = run_turn(
1723            &llm,
1724            &tools,
1725            &store,
1726            &sink,
1727            make_request("hello"),
1728            &generous_policy(),
1729            "test-model",
1730        )
1731        .await;
1732
1733        assert!(matches!(result, Err(AgentError::Store(_))), "expected Store error to be returned");
1734    }
1735
1736    #[tokio::test]
1737    async fn tc09_stream_turn_emits_text_and_finished() {
1738        let llm: Arc<dyn LlmPort> = Arc::new(StreamLlm::new(vec![
1739            Ok(LlmStreamChunk::TextDelta("{\"type\":\"final\",\"content\":\"he".into())),
1740            Ok(LlmStreamChunk::TextDelta("llo\"}".into())),
1741            Ok(LlmStreamChunk::Done {
1742                usage: TokenUsage { prompt_tokens: 3, completion_tokens: 4 },
1743            }),
1744        ]));
1745        let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
1746        let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
1747        let sink: Arc<dyn EventSink> = Arc::new(CollectingSink::new());
1748
1749        let stream_result = run_turn_stream(
1750            llm,
1751            tools,
1752            store,
1753            sink,
1754            make_request("hello"),
1755            generous_policy(),
1756            "test-model".to_string(),
1757        )
1758        .await;
1759        assert!(stream_result.is_ok(), "run_turn_stream should produce a stream");
1760        let mut stream = match stream_result {
1761            Ok(stream) => stream,
1762            Err(_) => return,
1763        };
1764
1765        let mut saw_text = false;
1766        let mut saw_finished = false;
1767        while let Some(event) = stream.next().await {
1768            match event {
1769                AgentStreamEvent::TextDelta { content } => {
1770                    saw_text = saw_text || !content.is_empty();
1771                }
1772                AgentStreamEvent::Finished { usage } => {
1773                    saw_finished = true;
1774                    assert_eq!(usage.prompt_tokens, 3);
1775                    assert_eq!(usage.completion_tokens, 4);
1776                }
1777                AgentStreamEvent::ToolCallStarted { .. } |
1778                AgentStreamEvent::ToolCallCompleted { .. } |
1779                AgentStreamEvent::Error { .. } => {}
1780            }
1781        }
1782
1783        assert!(saw_text, "expected at least one text delta");
1784        assert!(saw_finished, "expected a finished event");
1785    }
1786
1787    #[tokio::test]
1788    async fn tc10_skills_prompt_context_is_injected() {
1789        let llm = InspectingLlm { expected_substring: "Skill: rust-review".to_string() };
1790        let tools = MockToolPort::empty();
1791        let store = MemoryStore::new();
1792        let sink = CollectingSink::new();
1793
1794        let mut req = make_request("review this code");
1795        req.context.system_prompt = Some("Skill: rust-review\nUse strict checks.".to_string());
1796
1797        let result =
1798            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1799
1800        assert!(result.is_ok(), "run should succeed when skills prompt is injected");
1801    }
1802
1803    #[tokio::test]
1804    async fn tc11_selected_skills_context_emits_event() {
1805        let llm =
1806            SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "looks good"}"#]);
1807        let tools = MockToolPort::empty();
1808        let store = MemoryStore::new();
1809        let sink = CollectingSink::new();
1810
1811        let mut req = make_request("review code");
1812        req.context.selected_skills = vec!["rust-review".to_string(), "security-audit".to_string()];
1813
1814        let result =
1815            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1816        assert!(result.is_ok(), "run should succeed");
1817
1818        let events = sink.all_events();
1819        assert!(
1820            events.iter().any(|event| matches!(
1821                event,
1822                AgentEvent::SkillsSelected { skill_names }
1823                    if skill_names == &vec!["rust-review".to_string(), "security-audit".to_string()]
1824            )),
1825            "skills.selected event should be emitted with context skill names"
1826        );
1827    }
1828
1829    #[tokio::test]
1830    async fn tc12_policy_deny_tool_blocks_execution() {
1831        let llm = SequentialLlm::from_contents(vec![
1832            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1833            r#"{"type": "final", "content": "done"}"#,
1834        ]);
1835        let tools = NoCallToolPort {
1836            tools: vec![ToolDescriptor {
1837                id: "search".to_string(),
1838                description: "search tool".to_string(),
1839                input_schema: serde_json::json!({"type":"object"}),
1840                source: ToolSource::Local,
1841            }],
1842        };
1843        let store = MemoryStore::new();
1844        let sink = CollectingSink::new();
1845
1846        let mut req = make_request("search rust");
1847        req.context.tool_policy.deny_tools =
1848            vec!["search".to_string(), "local/shell_exec".to_string()];
1849
1850        let result =
1851            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1852        assert!(
1853            matches!(&result, Ok(AgentRunResult::Finished(_))),
1854            "expected finished response, got {result:?}"
1855        );
1856        let resp = match result {
1857            Ok(AgentRunResult::Finished(r)) => r,
1858            _ => return,
1859        };
1860
1861        assert_eq!(resp.finish_reason, FinishReason::Stop);
1862        assert_eq!(resp.tool_transcript.len(), 1);
1863        assert!(resp.tool_transcript[0].is_error);
1864        assert!(
1865            resp.tool_transcript[0].output.to_string().contains("denied"),
1866            "tool error should explain policy denial"
1867        );
1868    }
1869
1870    #[tokio::test]
1871    async fn tc13_approval_denied_blocks_execution() {
1872        let llm = SequentialLlm::from_contents(vec![
1873            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1874            r#"{"type": "final", "content": "done"}"#,
1875        ]);
1876        let tools = NoCallToolPort {
1877            tools: vec![ToolDescriptor {
1878                id: "search".to_string(),
1879                description: "search tool".to_string(),
1880                input_schema: serde_json::json!({"type":"object"}),
1881                source: ToolSource::Local,
1882            }],
1883        };
1884        let store = MemoryStore::new();
1885        let sink = CollectingSink::new();
1886        let req = make_request("search rust");
1887        let tool_policy = AllowAllPolicyPort;
1888        let approval = AlwaysDenyApprovalPort;
1889
1890        let result = run_turn_with_controls(
1891            &llm,
1892            &tools,
1893            &store,
1894            &sink,
1895            req,
1896            &generous_policy(),
1897            "test-model",
1898            &tool_policy,
1899            &approval,
1900        )
1901        .await;
1902        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
1903        let resp = match result {
1904            Ok(AgentRunResult::Finished(r)) => r,
1905            _ => return,
1906        };
1907
1908        assert_eq!(resp.tool_transcript.len(), 1);
1909        assert!(resp.tool_transcript[0].is_error);
1910        assert!(
1911            resp.tool_transcript[0].output.to_string().contains("approval policy rejected"),
1912            "tool error should explain approval denial"
1913        );
1914    }
1915
1916    #[tokio::test]
1917    async fn tc14_custom_policy_port_blocks_execution() {
1918        let llm = SequentialLlm::from_contents(vec![
1919            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1920            r#"{"type": "final", "content": "done"}"#,
1921        ]);
1922        let tools = NoCallToolPort {
1923            tools: vec![ToolDescriptor {
1924                id: "search".to_string(),
1925                description: "search tool".to_string(),
1926                input_schema: serde_json::json!({"type":"object"}),
1927                source: ToolSource::Local,
1928            }],
1929        };
1930        let store = MemoryStore::new();
1931        let sink = CollectingSink::new();
1932        let req = make_request("search rust");
1933        let tool_policy = DenySearchPolicyPort;
1934        let approval = AlwaysApprovePort;
1935
1936        let result = run_turn_with_controls(
1937            &llm,
1938            &tools,
1939            &store,
1940            &sink,
1941            req,
1942            &generous_policy(),
1943            "test-model",
1944            &tool_policy,
1945            &approval,
1946        )
1947        .await;
1948        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
1949        let resp = match result {
1950            Ok(AgentRunResult::Finished(r)) => r,
1951            _ => return,
1952        };
1953
1954        assert_eq!(resp.tool_transcript.len(), 1);
1955        assert!(resp.tool_transcript[0].is_error);
1956        assert!(
1957            resp.tool_transcript[0].output.to_string().contains("denied"),
1958            "tool error should explain policy denial"
1959        );
1960    }
1961
1962    #[tokio::test]
1963    async fn tc15_native_dispatch_mode_uses_llm_tool_calls() {
1964        struct NativeToolLlm {
1965            responses: Mutex<VecDeque<LlmResponse>>,
1966        }
1967
1968        #[async_trait::async_trait]
1969        impl LlmPort for NativeToolLlm {
1970            fn capabilities(&self) -> bob_core::types::LlmCapabilities {
1971                bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
1972            }
1973
1974            async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1975                let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
1976                Ok(q.pop_front().unwrap_or(LlmResponse {
1977                    content: r#"{"type":"final","content":"fallback"}"#.to_string(),
1978                    usage: TokenUsage::default(),
1979                    finish_reason: FinishReason::Stop,
1980                    tool_calls: Vec::new(),
1981                }))
1982            }
1983
1984            async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1985                Err(LlmError::Provider("not used".into()))
1986            }
1987        }
1988
1989        let llm = NativeToolLlm {
1990            responses: Mutex::new(VecDeque::from(vec![
1991                LlmResponse {
1992                    content: "ignored".to_string(),
1993                    usage: TokenUsage::default(),
1994                    finish_reason: FinishReason::Stop,
1995                    tool_calls: vec![ToolCall {
1996                        name: "search".to_string(),
1997                        arguments: serde_json::json!({"q":"rust"}),
1998                    }],
1999                },
2000                LlmResponse {
2001                    content: r#"{"type":"final","content":"done"}"#.to_string(),
2002                    usage: TokenUsage::default(),
2003                    finish_reason: FinishReason::Stop,
2004                    tool_calls: Vec::new(),
2005                },
2006            ])),
2007        };
2008        let tools = MockToolPort::with_tool_and_results(
2009            "search",
2010            vec![Ok(ToolResult {
2011                name: "search".to_string(),
2012                output: serde_json::json!({"hits": 2}),
2013                is_error: false,
2014            })],
2015        );
2016        let store = MemoryStore::new();
2017        let sink = CollectingSink::new();
2018        let checkpoint = CountingCheckpointPort::new();
2019        let artifacts = NoopArtifactStore;
2020        let cost = CountingCostMeter::new();
2021        let policy = AllowAllPolicyPort;
2022        let approval = AlwaysApprovePort;
2023
2024        let result = run_turn_with_extensions(
2025            &llm,
2026            &tools,
2027            &store,
2028            &sink,
2029            make_request("search rust"),
2030            &generous_policy(),
2031            "test-model",
2032            &policy,
2033            &approval,
2034            crate::DispatchMode::NativePreferred,
2035            &checkpoint,
2036            &artifacts,
2037            &cost,
2038        )
2039        .await;
2040
2041        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2042        let resp = match result {
2043            Ok(AgentRunResult::Finished(r)) => r,
2044            _ => return,
2045        };
2046        assert_eq!(resp.tool_transcript.len(), 1);
2047        assert_eq!(resp.tool_transcript[0].name, "search");
2048    }
2049
2050    #[tokio::test]
2051    async fn tc16_checkpoint_and_cost_ports_are_invoked() {
2052        let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "ok"}"#]);
2053        let tools = MockToolPort::empty();
2054        let store = MemoryStore::new();
2055        let sink = CollectingSink::new();
2056        let checkpoint = CountingCheckpointPort::new();
2057        let artifacts = NoopArtifactStore;
2058        let cost = CountingCostMeter::new();
2059        let policy = AllowAllPolicyPort;
2060        let approval = AlwaysApprovePort;
2061
2062        let result = run_turn_with_extensions(
2063            &llm,
2064            &tools,
2065            &store,
2066            &sink,
2067            make_request("hello"),
2068            &generous_policy(),
2069            "test-model",
2070            &policy,
2071            &approval,
2072            crate::DispatchMode::PromptGuided,
2073            &checkpoint,
2074            &artifacts,
2075            &cost,
2076        )
2077        .await;
2078        assert!(result.is_ok(), "turn should succeed");
2079        let checkpoints = checkpoint.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
2080        let llm_calls = *cost.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
2081        assert!(checkpoints >= 1, "checkpoint port should be invoked at least once");
2082        assert!(llm_calls >= 1, "cost meter should record llm usage");
2083    }
2084}