Skip to main content

bob_runtime/
scheduler.rs

1//! # Scheduler
2//!
3//! Scheduler: loop guard and 6-state turn FSM.
4//!
5//! ## Overview
6//!
7//! The scheduler is the core orchestration component that manages agent turn execution.
8//! It implements a finite state machine (FSM) with the following states:
9//!
10//! 1. **Start**: Initialize turn, load session
11//! 2. **LLM Call**: Send request to LLM
12//! 3. **Parse Action**: Parse LLM response into structured action
13//! 4. **Execute Tool**: Run tool if action is a tool call
14//! 5. **Update State**: Update session state with results
15//! 6. **Loop/Finish**: Either continue or complete the turn
16//!
17//! ## Loop Guard
18//!
19//! The [`LoopGuard`] ensures turn termination by tracking:
20//! - Number of steps (LLM calls)
21//! - Number of tool calls
22//! - Consecutive errors
23//! - Elapsed time
24//!
25//! If any limit is exceeded, the turn is terminated with an appropriate [`GuardReason`].
26//!
27//! ## Example
28//!
29//! ```rust,ignore
30//! use bob_runtime::scheduler::LoopGuard;
31//! use bob_core::types::TurnPolicy;
32//!
33//! let policy = TurnPolicy::default();
34//! let mut guard = LoopGuard::new(policy);
35//!
36//! while guard.can_continue() {
37//!     guard.record_step();
38//!     // Execute step...
39//! }
40//! ```
41
42use std::sync::Arc;
43
44use bob_core::{
45    error::AgentError,
46    normalize_tool_list,
47    ports::{
48        ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
49        ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
50    },
51    types::{
52        AgentAction, AgentEvent, AgentEventStream, AgentRequest, AgentResponse, AgentRunResult,
53        AgentStreamEvent, ApprovalContext, ApprovalDecision, ArtifactRecord, FinishReason,
54        GuardReason, Message, Role, TokenUsage, ToolCall, ToolResult, TurnCheckpoint, TurnPolicy,
55    },
56};
57use futures_util::StreamExt;
58use tokio::time::Instant;
59use tokio_stream::wrappers::UnboundedReceiverStream;
60
61/// Safety net that guarantees turn termination by tracking steps,
62/// tool calls, consecutive errors, and elapsed time against [`TurnPolicy`] limits.
63#[derive(Debug)]
64pub struct LoopGuard {
65    policy: TurnPolicy,
66    steps: u32,
67    tool_calls: u32,
68    consecutive_errors: u32,
69    start: Instant,
70}
71
72impl LoopGuard {
73    /// Create a new guard tied to the given policy.
74    #[must_use]
75    pub fn new(policy: TurnPolicy) -> Self {
76        Self { policy, steps: 0, tool_calls: 0, consecutive_errors: 0, start: Instant::now() }
77    }
78
79    /// Returns `true` if the turn may continue executing.
80    #[must_use]
81    pub fn can_continue(&self) -> bool {
82        self.steps < self.policy.max_steps &&
83            self.tool_calls < self.policy.max_tool_calls &&
84            self.consecutive_errors < self.policy.max_consecutive_errors &&
85            !self.timed_out()
86    }
87
88    /// Record one scheduler step.
89    pub fn record_step(&mut self) {
90        self.steps += 1;
91    }
92
93    /// Record one tool call.
94    pub fn record_tool_call(&mut self) {
95        self.tool_calls += 1;
96    }
97
98    /// Record a consecutive error.
99    pub fn record_error(&mut self) {
100        self.consecutive_errors += 1;
101    }
102
103    /// Reset the consecutive-error counter (e.g. after a successful call).
104    pub fn reset_errors(&mut self) {
105        self.consecutive_errors = 0;
106    }
107
108    /// The reason the guard stopped the turn.
109    ///
110    /// Only meaningful when [`can_continue`](Self::can_continue) returns `false`.
111    #[must_use]
112    pub fn reason(&self) -> GuardReason {
113        if self.steps >= self.policy.max_steps {
114            GuardReason::MaxSteps
115        } else if self.tool_calls >= self.policy.max_tool_calls {
116            GuardReason::MaxToolCalls
117        } else if self.consecutive_errors >= self.policy.max_consecutive_errors {
118            GuardReason::MaxConsecutiveErrors
119        } else if self.timed_out() {
120            GuardReason::TurnTimeout
121        } else {
122            // Fallback — shouldn't be called when `can_continue()` is true.
123            GuardReason::Cancelled
124        }
125    }
126
127    /// Returns `true` if the turn has exceeded its time budget.
128    #[must_use]
129    pub fn timed_out(&self) -> bool {
130        self.start.elapsed().as_millis() >= u128::from(self.policy.turn_timeout_ms)
131    }
132}
133
134// ── Default system instructions (v1) ─────────────────────────────────
135
136const DEFAULT_SYSTEM_INSTRUCTIONS: &str = "\
137You are a helpful AI assistant. \
138Think step by step before answering. \
139When you need external information, use the available tools.";
140
141fn resolve_system_instructions(req: &AgentRequest) -> String {
142    let Some(skills_prompt) = req.context.system_prompt.as_deref() else {
143        return DEFAULT_SYSTEM_INSTRUCTIONS.to_string();
144    };
145
146    if skills_prompt.trim().is_empty() {
147        DEFAULT_SYSTEM_INSTRUCTIONS.to_string()
148    } else {
149        format!("{DEFAULT_SYSTEM_INSTRUCTIONS}\n\n{skills_prompt}")
150    }
151}
152
153fn resolve_selected_skills(req: &AgentRequest) -> Vec<String> {
154    req.context.selected_skills.clone()
155}
156
157#[derive(Debug, Clone, Default)]
158struct ToolCallPolicy {
159    deny_tools: Vec<String>,
160    allow_tools: Option<Vec<String>>,
161}
162
163fn resolve_tool_call_policy(req: &AgentRequest) -> ToolCallPolicy {
164    let deny_tools =
165        normalize_tool_list(req.context.tool_policy.deny_tools.iter().map(String::as_str));
166    let allow_tools = req
167        .context
168        .tool_policy
169        .allow_tools
170        .as_ref()
171        .map(|tools| normalize_tool_list(tools.iter().map(String::as_str)));
172    ToolCallPolicy { deny_tools, allow_tools }
173}
174
175fn prompt_options_for_mode(
176    dispatch_mode: crate::DispatchMode,
177    llm: &dyn LlmPort,
178) -> crate::prompt::PromptBuildOptions {
179    match dispatch_mode {
180        crate::DispatchMode::PromptGuided => crate::prompt::PromptBuildOptions::default(),
181        crate::DispatchMode::NativePreferred => {
182            if llm.capabilities().native_tool_calling {
183                crate::prompt::PromptBuildOptions {
184                    include_action_schema: false,
185                    include_tool_schema: false,
186                }
187            } else {
188                crate::prompt::PromptBuildOptions::default()
189            }
190        }
191    }
192}
193
194fn parse_action_for_mode(
195    dispatch_mode: crate::DispatchMode,
196    llm: &dyn LlmPort,
197    response: &bob_core::types::LlmResponse,
198) -> Result<AgentAction, crate::action::ActionParseError> {
199    match dispatch_mode {
200        crate::DispatchMode::PromptGuided => crate::action::parse_action(&response.content),
201        crate::DispatchMode::NativePreferred => {
202            if llm.capabilities().native_tool_calling &&
203                let Some(tool_call) = response.tool_calls.first()
204            {
205                return Ok(AgentAction::ToolCall {
206                    name: tool_call.name.clone(),
207                    arguments: tool_call.arguments.clone(),
208                });
209            }
210            crate::action::parse_action(&response.content)
211        }
212    }
213}
214
215#[expect(
216    clippy::too_many_arguments,
217    reason = "tool execution needs explicit policy, approval, and timeout dependencies"
218)]
219async fn execute_tool_call(
220    tools: &dyn ToolPort,
221    guard: &mut LoopGuard,
222    tool_call: ToolCall,
223    policy: &ToolCallPolicy,
224    tool_policy_port: &dyn ToolPolicyPort,
225    approval_port: &dyn ApprovalPort,
226    approval_context: &ApprovalContext,
227    timeout_ms: u64,
228) -> ToolResult {
229    if !tool_policy_port.is_tool_allowed(
230        &tool_call.name,
231        &policy.deny_tools,
232        policy.allow_tools.as_deref(),
233    ) {
234        guard.record_error();
235        return ToolResult {
236            name: tool_call.name.clone(),
237            output: serde_json::json!({
238                "error": format!("tool '{}' denied by policy", tool_call.name)
239            }),
240            is_error: true,
241        };
242    }
243
244    match approval_port.approve_tool_call(&tool_call, approval_context).await {
245        Ok(ApprovalDecision::Approved) => {}
246        Ok(ApprovalDecision::Denied { reason }) => {
247            guard.record_error();
248            return ToolResult {
249                name: tool_call.name.clone(),
250                output: serde_json::json!({"error": reason}),
251                is_error: true,
252            };
253        }
254        Err(err) => {
255            guard.record_error();
256            return ToolResult {
257                name: tool_call.name.clone(),
258                output: serde_json::json!({"error": err.to_string()}),
259                is_error: true,
260            };
261        }
262    }
263
264    match tokio::time::timeout(
265        std::time::Duration::from_millis(timeout_ms),
266        tools.call_tool(tool_call.clone()),
267    )
268    .await
269    {
270        Ok(Ok(result)) => {
271            guard.reset_errors();
272            result
273        }
274        Ok(Err(err)) => {
275            guard.record_error();
276            ToolResult {
277                name: tool_call.name,
278                output: serde_json::json!({"error": err.to_string()}),
279                is_error: true,
280            }
281        }
282        Err(_) => {
283            guard.record_error();
284            ToolResult {
285                name: tool_call.name,
286                output: serde_json::json!({"error": "tool call timed out"}),
287                is_error: true,
288            }
289        }
290    }
291}
292
293// ── Turn Loop FSM ────────────────────────────────────────────────────
294
295/// Execute a single agent turn as a 6-state FSM.
296///
297/// States: Start → BuildPrompt → LlmInfer → ParseAction → CallTool → Done.
298/// The loop guard guarantees termination under all conditions.
299pub async fn run_turn(
300    llm: &dyn LlmPort,
301    tools: &dyn ToolPort,
302    store: &dyn SessionStore,
303    events: &dyn EventSink,
304    req: AgentRequest,
305    policy: &TurnPolicy,
306    default_model: &str,
307) -> Result<AgentRunResult, AgentError> {
308    let tool_policy = crate::DefaultToolPolicyPort;
309    let approval = crate::AllowAllApprovalPort;
310    let checkpoint_store = crate::NoOpCheckpointStorePort;
311    let artifact_store = crate::NoOpArtifactStorePort;
312    let cost_meter = crate::NoOpCostMeterPort;
313    run_turn_with_extensions(
314        llm,
315        tools,
316        store,
317        events,
318        req,
319        policy,
320        default_model,
321        &tool_policy,
322        &approval,
323        crate::DispatchMode::NativePreferred,
324        &checkpoint_store,
325        &artifact_store,
326        &cost_meter,
327    )
328    .await
329}
330
331/// Execute a single turn with explicit policy/approval controls.
332#[cfg_attr(
333    not(test),
334    expect(
335        dead_code,
336        reason = "reserved wrapper for partial control injection in external integrations"
337    )
338)]
339#[expect(
340    clippy::too_many_arguments,
341    reason = "wrapper exposes explicit dependency ports for compatibility and testability"
342)]
343pub(crate) async fn run_turn_with_controls(
344    llm: &dyn LlmPort,
345    tools: &dyn ToolPort,
346    store: &dyn SessionStore,
347    events: &dyn EventSink,
348    req: AgentRequest,
349    policy: &TurnPolicy,
350    default_model: &str,
351    tool_policy_port: &dyn ToolPolicyPort,
352    approval_port: &dyn ApprovalPort,
353) -> Result<AgentRunResult, AgentError> {
354    let checkpoint_store = crate::NoOpCheckpointStorePort;
355    let artifact_store = crate::NoOpArtifactStorePort;
356    let cost_meter = crate::NoOpCostMeterPort;
357    run_turn_with_extensions(
358        llm,
359        tools,
360        store,
361        events,
362        req,
363        policy,
364        default_model,
365        tool_policy_port,
366        approval_port,
367        crate::DispatchMode::PromptGuided,
368        &checkpoint_store,
369        &artifact_store,
370        &cost_meter,
371    )
372    .await
373}
374
375/// Execute a single turn with all extensibility controls injected.
376#[expect(
377    clippy::too_many_arguments,
378    reason = "core entrypoint exposes all ports explicitly for adapter injection"
379)]
380pub(crate) async fn run_turn_with_extensions(
381    llm: &dyn LlmPort,
382    tools: &dyn ToolPort,
383    store: &dyn SessionStore,
384    events: &dyn EventSink,
385    req: AgentRequest,
386    policy: &TurnPolicy,
387    default_model: &str,
388    tool_policy_port: &dyn ToolPolicyPort,
389    approval_port: &dyn ApprovalPort,
390    dispatch_mode: crate::DispatchMode,
391    checkpoint_store: &dyn TurnCheckpointStorePort,
392    artifact_store: &dyn ArtifactStorePort,
393    cost_meter: &dyn CostMeterPort,
394) -> Result<AgentRunResult, AgentError> {
395    let model = req.model.as_deref().unwrap_or(default_model);
396    let cancel_token = req.cancel_token.clone();
397    let system_instructions = resolve_system_instructions(&req);
398    let selected_skills = resolve_selected_skills(&req);
399    let tool_call_policy = resolve_tool_call_policy(&req);
400
401    let mut session = store.load(&req.session_id).await?.unwrap_or_default();
402    let tool_descriptors = tools.list_tools().await?;
403    let mut guard = LoopGuard::new(policy.clone());
404
405    events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
406    if !selected_skills.is_empty() {
407        events.emit(AgentEvent::SkillsSelected { skill_names: selected_skills.clone() });
408    }
409
410    session.messages.push(Message { role: Role::User, content: req.input.clone() });
411
412    let mut tool_transcript: Vec<ToolResult> = Vec::new();
413    let mut total_usage = TokenUsage::default();
414    let mut consecutive_parse_failures: u32 = 0;
415
416    loop {
417        if let Some(ref token) = cancel_token &&
418            token.is_cancelled()
419        {
420            return finish_turn(
421                store,
422                events,
423                &req.session_id,
424                &session,
425                FinishResult {
426                    content: "Turn cancelled.",
427                    tool_transcript,
428                    usage: total_usage,
429                    finish_reason: FinishReason::Cancelled,
430                },
431            )
432            .await;
433        }
434
435        cost_meter.check_budget(&req.session_id).await?;
436
437        if !guard.can_continue() {
438            let reason = guard.reason();
439            let msg = format!("Turn stopped: {reason:?}");
440            return finish_turn(
441                store,
442                events,
443                &req.session_id,
444                &session,
445                FinishResult {
446                    content: &msg,
447                    tool_transcript,
448                    usage: total_usage,
449                    finish_reason: FinishReason::GuardExceeded,
450                },
451            )
452            .await;
453        }
454
455        let llm_request = crate::prompt::build_llm_request_with_options(
456            model,
457            &session,
458            &tool_descriptors,
459            &system_instructions,
460            prompt_options_for_mode(dispatch_mode, llm),
461        );
462
463        events.emit(AgentEvent::LlmCallStarted { model: model.to_string() });
464
465        let llm_response = if let Some(ref token) = cancel_token {
466            tokio::select! {
467                result = llm.complete(llm_request) => result?,
468                () = token.cancelled() => {
469                    return finish_turn(
470                        store, events, &req.session_id, &session,
471                        FinishResult { content: "Turn cancelled.", tool_transcript, usage: total_usage, finish_reason: FinishReason::Cancelled },
472                    ).await;
473                }
474            }
475        } else {
476            llm.complete(llm_request).await?
477        };
478
479        guard.record_step();
480        total_usage.prompt_tokens += llm_response.usage.prompt_tokens;
481        total_usage.completion_tokens += llm_response.usage.completion_tokens;
482        cost_meter.record_llm_usage(&req.session_id, model, &llm_response.usage).await?;
483
484        events.emit(AgentEvent::LlmCallCompleted { usage: llm_response.usage.clone() });
485
486        session
487            .messages
488            .push(Message { role: Role::Assistant, content: llm_response.content.clone() });
489
490        let _ = checkpoint_store
491            .save_checkpoint(&TurnCheckpoint {
492                session_id: req.session_id.clone(),
493                step: guard.steps,
494                tool_calls: guard.tool_calls,
495                usage: total_usage.clone(),
496            })
497            .await;
498
499        match parse_action_for_mode(dispatch_mode, llm, &llm_response) {
500            Ok(action) => {
501                consecutive_parse_failures = 0;
502                match action {
503                    AgentAction::Final { content } => {
504                        return finish_turn(
505                            store,
506                            events,
507                            &req.session_id,
508                            &session,
509                            FinishResult {
510                                content: &content,
511                                tool_transcript,
512                                usage: total_usage,
513                                finish_reason: FinishReason::Stop,
514                            },
515                        )
516                        .await;
517                    }
518                    AgentAction::AskUser { question } => {
519                        return finish_turn(
520                            store,
521                            events,
522                            &req.session_id,
523                            &session,
524                            FinishResult {
525                                content: &question,
526                                tool_transcript,
527                                usage: total_usage,
528                                finish_reason: FinishReason::Stop,
529                            },
530                        )
531                        .await;
532                    }
533                    AgentAction::ToolCall { name, arguments } => {
534                        events.emit(AgentEvent::ToolCallStarted { name: name.clone() });
535                        let approval_context = ApprovalContext {
536                            session_id: req.session_id.clone(),
537                            turn_step: guard.steps.max(1),
538                            selected_skills: selected_skills.clone(),
539                        };
540
541                        let tool_result = execute_tool_call(
542                            tools,
543                            &mut guard,
544                            ToolCall { name: name.clone(), arguments },
545                            &tool_call_policy,
546                            tool_policy_port,
547                            approval_port,
548                            &approval_context,
549                            policy.tool_timeout_ms,
550                        )
551                        .await;
552
553                        guard.record_tool_call();
554                        let _ = cost_meter.record_tool_result(&req.session_id, &tool_result).await;
555
556                        let is_error = tool_result.is_error;
557                        events.emit(AgentEvent::ToolCallCompleted { name: name.clone(), is_error });
558
559                        let output_str =
560                            serde_json::to_string(&tool_result.output).unwrap_or_default();
561                        session.messages.push(Message { role: Role::Tool, content: output_str });
562
563                        let _ = artifact_store
564                            .put(ArtifactRecord {
565                                session_id: req.session_id.clone(),
566                                kind: "tool_result".to_string(),
567                                name: name.clone(),
568                                content: tool_result.output.clone(),
569                            })
570                            .await;
571
572                        tool_transcript.push(tool_result);
573                    }
574                }
575            }
576            Err(_parse_err) => {
577                consecutive_parse_failures += 1;
578                if consecutive_parse_failures >= 2 {
579                    let _ = store.save(&req.session_id, &session).await;
580                    return Err(AgentError::Internal(
581                        "LLM produced invalid JSON after re-prompt".into(),
582                    ));
583                }
584                session.messages.push(Message {
585                    role: Role::User,
586                    content: "Your response was not valid JSON. \
587                              Please respond with exactly one JSON object \
588                              matching the required schema."
589                        .into(),
590                });
591            }
592        }
593    }
594}
595
596/// Bundled data for building the final response (reduces argument count).
597struct FinishResult<'a> {
598    content: &'a str,
599    tool_transcript: Vec<ToolResult>,
600    usage: TokenUsage,
601    finish_reason: FinishReason,
602}
603
604/// Helper: save session, emit `TurnCompleted`, and build the final response.
605async fn finish_turn(
606    store: &dyn SessionStore,
607    events: &dyn EventSink,
608    session_id: &bob_core::types::SessionId,
609    session: &bob_core::types::SessionState,
610    result: FinishResult<'_>,
611) -> Result<AgentRunResult, AgentError> {
612    store.save(session_id, session).await?;
613    events.emit(AgentEvent::TurnCompleted { finish_reason: result.finish_reason });
614    Ok(AgentRunResult::Finished(AgentResponse {
615        content: result.content.to_string(),
616        tool_transcript: result.tool_transcript,
617        usage: result.usage,
618        finish_reason: result.finish_reason,
619    }))
620}
621
622/// Execute a single turn in streaming mode and return an event stream.
623pub async fn run_turn_stream(
624    llm: Arc<dyn LlmPort>,
625    tools: Arc<dyn ToolPort>,
626    store: Arc<dyn SessionStore>,
627    events: Arc<dyn EventSink>,
628    req: AgentRequest,
629    policy: TurnPolicy,
630    default_model: String,
631) -> Result<AgentEventStream, AgentError> {
632    let tool_policy: Arc<dyn ToolPolicyPort> = Arc::new(crate::DefaultToolPolicyPort);
633    let approval: Arc<dyn ApprovalPort> = Arc::new(crate::AllowAllApprovalPort);
634    let checkpoint_store: Arc<dyn TurnCheckpointStorePort> =
635        Arc::new(crate::NoOpCheckpointStorePort);
636    let artifact_store: Arc<dyn ArtifactStorePort> = Arc::new(crate::NoOpArtifactStorePort);
637    let cost_meter: Arc<dyn CostMeterPort> = Arc::new(crate::NoOpCostMeterPort);
638    run_turn_stream_with_controls(
639        llm,
640        tools,
641        store,
642        events,
643        req,
644        policy,
645        default_model,
646        tool_policy,
647        approval,
648        crate::DispatchMode::NativePreferred,
649        checkpoint_store,
650        artifact_store,
651        cost_meter,
652    )
653    .await
654}
655
656/// Execute a single turn in streaming mode with explicit policy/approval controls.
657#[expect(
658    clippy::too_many_arguments,
659    reason = "streaming entrypoint exposes all ports and controls explicitly for composition roots"
660)]
661pub(crate) async fn run_turn_stream_with_controls(
662    llm: Arc<dyn LlmPort>,
663    tools: Arc<dyn ToolPort>,
664    store: Arc<dyn SessionStore>,
665    events: Arc<dyn EventSink>,
666    req: AgentRequest,
667    policy: TurnPolicy,
668    default_model: String,
669    tool_policy: Arc<dyn ToolPolicyPort>,
670    approval: Arc<dyn ApprovalPort>,
671    dispatch_mode: crate::DispatchMode,
672    checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
673    artifact_store: Arc<dyn ArtifactStorePort>,
674    cost_meter: Arc<dyn CostMeterPort>,
675) -> Result<AgentEventStream, AgentError> {
676    let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<AgentStreamEvent>();
677    let config = StreamRunConfig {
678        policy,
679        default_model,
680        tool_policy,
681        approval,
682        dispatch_mode,
683        checkpoint_store,
684        artifact_store,
685        cost_meter,
686    };
687
688    tokio::spawn(async move {
689        if let Err(err) = run_turn_stream_inner(llm, tools, store, events, req, &config, &tx).await
690        {
691            let _ = tx.send(AgentStreamEvent::Error { error: err.to_string() });
692        }
693    });
694
695    Ok(Box::pin(UnboundedReceiverStream::new(rx)))
696}
697
698struct StreamRunConfig {
699    policy: TurnPolicy,
700    default_model: String,
701    tool_policy: Arc<dyn ToolPolicyPort>,
702    approval: Arc<dyn ApprovalPort>,
703    dispatch_mode: crate::DispatchMode,
704    checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
705    artifact_store: Arc<dyn ArtifactStorePort>,
706    cost_meter: Arc<dyn CostMeterPort>,
707}
708
709async fn run_turn_stream_inner(
710    llm: Arc<dyn LlmPort>,
711    tools: Arc<dyn ToolPort>,
712    store: Arc<dyn SessionStore>,
713    events: Arc<dyn EventSink>,
714    req: AgentRequest,
715    config: &StreamRunConfig,
716    tx: &tokio::sync::mpsc::UnboundedSender<AgentStreamEvent>,
717) -> Result<(), AgentError> {
718    let model = req.model.as_deref().unwrap_or(&config.default_model);
719    let cancel_token = req.cancel_token.clone();
720    let system_instructions = resolve_system_instructions(&req);
721    let selected_skills = resolve_selected_skills(&req);
722    let tool_call_policy = resolve_tool_call_policy(&req);
723
724    let mut session = store.load(&req.session_id).await?.unwrap_or_default();
725    let tool_descriptors = tools.list_tools().await?;
726    let mut guard = LoopGuard::new(config.policy.clone());
727    let mut total_usage = TokenUsage::default();
728    let mut consecutive_parse_failures: u32 = 0;
729    let mut next_call_id: u64 = 0;
730
731    events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
732    if !selected_skills.is_empty() {
733        events.emit(AgentEvent::SkillsSelected { skill_names: selected_skills.clone() });
734    }
735    session.messages.push(Message { role: Role::User, content: req.input.clone() });
736
737    loop {
738        if let Some(ref token) = cancel_token &&
739            token.is_cancelled()
740        {
741            events.emit(AgentEvent::Error { error: "turn cancelled".to_string() });
742            events.emit(AgentEvent::TurnCompleted { finish_reason: FinishReason::Cancelled });
743            store.save(&req.session_id, &session).await?;
744            let _ = tx.send(AgentStreamEvent::Error { error: "turn cancelled".to_string() });
745            let _ = tx.send(AgentStreamEvent::Finished { usage: total_usage.clone() });
746            return Ok(());
747        }
748
749        config.cost_meter.check_budget(&req.session_id).await?;
750
751        if !guard.can_continue() {
752            let reason = guard.reason();
753            let msg = format!("Turn stopped: {reason:?}");
754            events.emit(AgentEvent::Error { error: msg.clone() });
755            events.emit(AgentEvent::TurnCompleted { finish_reason: FinishReason::GuardExceeded });
756            store.save(&req.session_id, &session).await?;
757            let _ = tx.send(AgentStreamEvent::Error { error: msg });
758            let _ = tx.send(AgentStreamEvent::Finished { usage: total_usage.clone() });
759            return Ok(());
760        }
761
762        let llm_request = crate::prompt::build_llm_request_with_options(
763            model,
764            &session,
765            &tool_descriptors,
766            &system_instructions,
767            prompt_options_for_mode(config.dispatch_mode, llm.as_ref()),
768        );
769        events.emit(AgentEvent::LlmCallStarted { model: model.to_string() });
770
771        let mut assistant_content = String::new();
772        let mut llm_usage = TokenUsage::default();
773        let mut llm_tool_calls: Vec<ToolCall> = Vec::new();
774        let mut fallback_to_complete = false;
775
776        match llm.complete_stream(llm_request.clone()).await {
777            Ok(mut stream) => {
778                while let Some(item) = stream.next().await {
779                    match item {
780                        Ok(bob_core::types::LlmStreamChunk::TextDelta(delta)) => {
781                            assistant_content.push_str(&delta);
782                            let _ = tx.send(AgentStreamEvent::TextDelta { content: delta });
783                        }
784                        Ok(bob_core::types::LlmStreamChunk::Done { usage }) => {
785                            llm_usage = usage;
786                        }
787                        Err(err) => {
788                            events.emit(AgentEvent::Error { error: err.to_string() });
789                            return Err(AgentError::Llm(err));
790                        }
791                    }
792                }
793            }
794            Err(err) => {
795                fallback_to_complete = true;
796                events.emit(AgentEvent::Error { error: err.to_string() });
797            }
798        }
799
800        // Provider may not support streaming — fall back to non-streaming complete.
801        if fallback_to_complete {
802            let llm_response = llm.complete(llm_request).await?;
803            assistant_content = llm_response.content.clone();
804            llm_usage = llm_response.usage;
805            llm_tool_calls = llm_response.tool_calls;
806            let _ = tx.send(AgentStreamEvent::TextDelta { content: llm_response.content });
807        }
808
809        guard.record_step();
810        total_usage.prompt_tokens += llm_usage.prompt_tokens;
811        total_usage.completion_tokens += llm_usage.completion_tokens;
812        config.cost_meter.record_llm_usage(&req.session_id, model, &llm_usage).await?;
813        events.emit(AgentEvent::LlmCallCompleted { usage: llm_usage.clone() });
814        session
815            .messages
816            .push(Message { role: Role::Assistant, content: assistant_content.clone() });
817
818        let _ = config
819            .checkpoint_store
820            .save_checkpoint(&TurnCheckpoint {
821                session_id: req.session_id.clone(),
822                step: guard.steps,
823                tool_calls: guard.tool_calls,
824                usage: total_usage.clone(),
825            })
826            .await;
827
828        let response_for_dispatch = bob_core::types::LlmResponse {
829            content: assistant_content.clone(),
830            usage: llm_usage.clone(),
831            finish_reason: FinishReason::Stop,
832            tool_calls: llm_tool_calls,
833        };
834
835        if let Ok(action) =
836            parse_action_for_mode(config.dispatch_mode, llm.as_ref(), &response_for_dispatch)
837        {
838            consecutive_parse_failures = 0;
839            match action {
840                AgentAction::Final { .. } | AgentAction::AskUser { .. } => {
841                    store.save(&req.session_id, &session).await?;
842                    events.emit(AgentEvent::TurnCompleted { finish_reason: FinishReason::Stop });
843                    let _ = tx.send(AgentStreamEvent::Finished { usage: total_usage.clone() });
844                    return Ok(());
845                }
846                AgentAction::ToolCall { name, arguments } => {
847                    events.emit(AgentEvent::ToolCallStarted { name: name.clone() });
848                    next_call_id += 1;
849                    let call_id = format!("call-{next_call_id}");
850                    let _ = tx.send(AgentStreamEvent::ToolCallStarted {
851                        name: name.clone(),
852                        call_id: call_id.clone(),
853                    });
854                    let approval_context = ApprovalContext {
855                        session_id: req.session_id.clone(),
856                        turn_step: guard.steps.max(1),
857                        selected_skills: selected_skills.clone(),
858                    };
859
860                    let tool_result = execute_tool_call(
861                        tools.as_ref(),
862                        &mut guard,
863                        ToolCall { name: name.clone(), arguments },
864                        &tool_call_policy,
865                        config.tool_policy.as_ref(),
866                        config.approval.as_ref(),
867                        &approval_context,
868                        config.policy.tool_timeout_ms,
869                    )
870                    .await;
871
872                    guard.record_tool_call();
873                    let _ =
874                        config.cost_meter.record_tool_result(&req.session_id, &tool_result).await;
875                    let is_error = tool_result.is_error;
876                    events.emit(AgentEvent::ToolCallCompleted { name: name.clone(), is_error });
877                    let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
878                        call_id,
879                        result: tool_result.clone(),
880                    });
881
882                    let output_str = serde_json::to_string(&tool_result.output).unwrap_or_default();
883                    session.messages.push(Message { role: Role::Tool, content: output_str });
884                    let _ = config
885                        .artifact_store
886                        .put(ArtifactRecord {
887                            session_id: req.session_id.clone(),
888                            kind: "tool_result".to_string(),
889                            name: name.clone(),
890                            content: tool_result.output.clone(),
891                        })
892                        .await;
893                }
894            }
895        } else {
896            consecutive_parse_failures += 1;
897            if consecutive_parse_failures >= 2 {
898                store.save(&req.session_id, &session).await?;
899                events.emit(AgentEvent::Error {
900                    error: "LLM produced invalid JSON after re-prompt".to_string(),
901                });
902                return Err(AgentError::Internal(
903                    "LLM produced invalid JSON after re-prompt".into(),
904                ));
905            }
906            session.messages.push(Message {
907                role: Role::User,
908                content: "Your response was not valid JSON. \
909                          Please respond with exactly one JSON object \
910                          matching the required schema."
911                    .into(),
912            });
913        }
914    }
915}
916
917#[cfg(test)]
918mod tests {
919    use super::*;
920
921    /// Small policy with tight limits for fast, deterministic tests.
922    fn test_policy() -> TurnPolicy {
923        TurnPolicy {
924            max_steps: 3,
925            max_tool_calls: 2,
926            max_consecutive_errors: 2,
927            turn_timeout_ms: 100,
928            tool_timeout_ms: 50,
929        }
930    }
931
932    #[test]
933    fn trips_on_max_steps() {
934        let mut guard = LoopGuard::new(test_policy());
935        assert!(guard.can_continue());
936
937        for _ in 0..3 {
938            guard.record_step();
939        }
940
941        assert!(!guard.can_continue(), "guard should trip after reaching max_steps");
942        assert_eq!(guard.reason(), GuardReason::MaxSteps);
943    }
944
945    #[test]
946    fn trips_on_max_tool_calls() {
947        let mut guard = LoopGuard::new(test_policy());
948        assert!(guard.can_continue());
949
950        for _ in 0..2 {
951            guard.record_tool_call();
952        }
953
954        assert!(!guard.can_continue(), "guard should trip after reaching max_tool_calls");
955        assert_eq!(guard.reason(), GuardReason::MaxToolCalls);
956    }
957
958    #[test]
959    fn trips_on_max_consecutive_errors() {
960        let mut guard = LoopGuard::new(test_policy());
961        assert!(guard.can_continue());
962
963        for _ in 0..2 {
964            guard.record_error();
965        }
966
967        assert!(!guard.can_continue(), "guard should trip after reaching max_consecutive_errors");
968        assert_eq!(guard.reason(), GuardReason::MaxConsecutiveErrors);
969    }
970
971    #[tokio::test]
972    async fn trips_on_timeout() {
973        let guard = LoopGuard::new(test_policy());
974        assert!(guard.can_continue());
975        assert!(!guard.timed_out());
976
977        // Sleep past the 100 ms timeout.
978        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
979
980        assert!(!guard.can_continue(), "guard should trip after timeout");
981        assert!(guard.timed_out());
982        assert_eq!(guard.reason(), GuardReason::TurnTimeout);
983    }
984
985    #[test]
986    fn reset_errors_clears_counter() {
987        let mut guard = LoopGuard::new(test_policy());
988
989        guard.record_error();
990        guard.reset_errors();
991
992        // After reset, a single error should NOT trip the guard.
993        guard.record_error();
994        assert!(guard.can_continue(), "single error after reset should not trip guard");
995    }
996
997    // ── run_turn FSM tests ───────────────────────────────────────
998
999    use std::{
1000        collections::{HashMap, VecDeque},
1001        sync::{Arc, Mutex},
1002    };
1003
1004    use bob_core::{
1005        error::{CostError, LlmError, StoreError, ToolError},
1006        ports::{
1007            ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
1008            ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
1009        },
1010        types::{
1011            AgentEvent, AgentRequest, AgentRunResult, AgentStreamEvent, ApprovalContext,
1012            ApprovalDecision, ArtifactRecord, CancelToken, LlmRequest, LlmResponse, LlmStream,
1013            LlmStreamChunk, SessionId, SessionState, ToolCall, ToolDescriptor, ToolResult,
1014            ToolSource, TurnCheckpoint,
1015        },
1016    };
1017    use futures_util::StreamExt;
1018
1019    // ── Mock ports ───────────────────────────────────────────────
1020
1021    /// LLM mock that returns queued responses in order.
1022    struct SequentialLlm {
1023        responses: Mutex<VecDeque<Result<LlmResponse, LlmError>>>,
1024    }
1025
1026    impl SequentialLlm {
1027        fn from_contents(contents: Vec<&str>) -> Self {
1028            let responses = contents
1029                .into_iter()
1030                .map(|c| {
1031                    Ok(LlmResponse {
1032                        content: c.to_string(),
1033                        usage: TokenUsage::default(),
1034                        finish_reason: FinishReason::Stop,
1035                        tool_calls: Vec::new(),
1036                    })
1037                })
1038                .collect();
1039            Self { responses: Mutex::new(responses) }
1040        }
1041    }
1042
1043    #[async_trait::async_trait]
1044    impl LlmPort for SequentialLlm {
1045        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1046            let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
1047            q.pop_front().unwrap_or_else(|| {
1048                Ok(LlmResponse {
1049                    content: r#"{"type": "final", "content": "fallback"}"#.to_string(),
1050                    usage: TokenUsage::default(),
1051                    finish_reason: FinishReason::Stop,
1052                    tool_calls: Vec::new(),
1053                })
1054            })
1055        }
1056
1057        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1058            Err(LlmError::Provider("not implemented".into()))
1059        }
1060    }
1061
1062    /// Tool port mock with configurable tools and call results.
1063    struct MockToolPort {
1064        tools: Vec<ToolDescriptor>,
1065        call_results: Mutex<VecDeque<Result<ToolResult, ToolError>>>,
1066    }
1067
1068    impl MockToolPort {
1069        fn empty() -> Self {
1070            Self { tools: vec![], call_results: Mutex::new(VecDeque::new()) }
1071        }
1072
1073        fn with_tool_and_results(
1074            tool_name: &str,
1075            results: Vec<Result<ToolResult, ToolError>>,
1076        ) -> Self {
1077            Self {
1078                tools: vec![ToolDescriptor {
1079                    id: tool_name.to_string(),
1080                    description: format!("{tool_name} tool"),
1081                    input_schema: serde_json::json!({"type": "object"}),
1082                    source: ToolSource::Local,
1083                }],
1084                call_results: Mutex::new(results.into()),
1085            }
1086        }
1087    }
1088
1089    #[async_trait::async_trait]
1090    impl ToolPort for MockToolPort {
1091        async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1092            Ok(self.tools.clone())
1093        }
1094
1095        async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
1096            let mut q = self.call_results.lock().unwrap_or_else(|p| p.into_inner());
1097            q.pop_front().unwrap_or_else(|| {
1098                Ok(ToolResult {
1099                    name: call.name,
1100                    output: serde_json::json!({"result": "default"}),
1101                    is_error: false,
1102                })
1103            })
1104        }
1105    }
1106
1107    struct NoCallToolPort {
1108        tools: Vec<ToolDescriptor>,
1109    }
1110
1111    #[async_trait::async_trait]
1112    impl ToolPort for NoCallToolPort {
1113        async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1114            Ok(self.tools.clone())
1115        }
1116
1117        async fn call_tool(&self, _call: ToolCall) -> Result<ToolResult, ToolError> {
1118            Err(ToolError::Execution(
1119                "tool call should be blocked by policy before execution".to_string(),
1120            ))
1121        }
1122    }
1123
1124    struct AllowAllPolicyPort;
1125
1126    impl ToolPolicyPort for AllowAllPolicyPort {
1127        fn is_tool_allowed(
1128            &self,
1129            _tool: &str,
1130            _deny_tools: &[String],
1131            _allow_tools: Option<&[String]>,
1132        ) -> bool {
1133            true
1134        }
1135    }
1136
1137    struct DenySearchPolicyPort;
1138
1139    impl ToolPolicyPort for DenySearchPolicyPort {
1140        fn is_tool_allowed(
1141            &self,
1142            tool: &str,
1143            _deny_tools: &[String],
1144            _allow_tools: Option<&[String]>,
1145        ) -> bool {
1146            tool != "search"
1147        }
1148    }
1149
1150    struct AlwaysApprovePort;
1151
1152    #[async_trait::async_trait]
1153    impl ApprovalPort for AlwaysApprovePort {
1154        async fn approve_tool_call(
1155            &self,
1156            _call: &ToolCall,
1157            _context: &ApprovalContext,
1158        ) -> Result<ApprovalDecision, ToolError> {
1159            Ok(ApprovalDecision::Approved)
1160        }
1161    }
1162
1163    struct AlwaysDenyApprovalPort;
1164
1165    #[async_trait::async_trait]
1166    impl ApprovalPort for AlwaysDenyApprovalPort {
1167        async fn approve_tool_call(
1168            &self,
1169            _call: &ToolCall,
1170            _context: &ApprovalContext,
1171        ) -> Result<ApprovalDecision, ToolError> {
1172            Ok(ApprovalDecision::Denied {
1173                reason: "approval policy rejected tool call".to_string(),
1174            })
1175        }
1176    }
1177
1178    struct CountingCheckpointPort {
1179        saved: Mutex<Vec<TurnCheckpoint>>,
1180    }
1181
1182    impl CountingCheckpointPort {
1183        fn new() -> Self {
1184            Self { saved: Mutex::new(Vec::new()) }
1185        }
1186    }
1187
1188    #[async_trait::async_trait]
1189    impl TurnCheckpointStorePort for CountingCheckpointPort {
1190        async fn save_checkpoint(&self, checkpoint: &TurnCheckpoint) -> Result<(), StoreError> {
1191            self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(checkpoint.clone());
1192            Ok(())
1193        }
1194
1195        async fn load_latest(
1196            &self,
1197            _session_id: &SessionId,
1198        ) -> Result<Option<TurnCheckpoint>, StoreError> {
1199            Ok(None)
1200        }
1201    }
1202
1203    struct NoopArtifactStore;
1204
1205    #[async_trait::async_trait]
1206    impl ArtifactStorePort for NoopArtifactStore {
1207        async fn put(&self, _artifact: ArtifactRecord) -> Result<(), StoreError> {
1208            Ok(())
1209        }
1210
1211        async fn list_by_session(
1212            &self,
1213            _session_id: &SessionId,
1214        ) -> Result<Vec<ArtifactRecord>, StoreError> {
1215            Ok(Vec::new())
1216        }
1217    }
1218
1219    struct CountingCostMeter {
1220        llm_calls: Mutex<u32>,
1221    }
1222
1223    impl CountingCostMeter {
1224        fn new() -> Self {
1225            Self { llm_calls: Mutex::new(0) }
1226        }
1227    }
1228
1229    #[async_trait::async_trait]
1230    impl CostMeterPort for CountingCostMeter {
1231        async fn check_budget(&self, _session_id: &SessionId) -> Result<(), CostError> {
1232            Ok(())
1233        }
1234
1235        async fn record_llm_usage(
1236            &self,
1237            _session_id: &SessionId,
1238            _model: &str,
1239            _usage: &TokenUsage,
1240        ) -> Result<(), CostError> {
1241            let mut count = self.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
1242            *count += 1;
1243            Ok(())
1244        }
1245
1246        async fn record_tool_result(
1247            &self,
1248            _session_id: &SessionId,
1249            _tool_result: &ToolResult,
1250        ) -> Result<(), CostError> {
1251            Ok(())
1252        }
1253    }
1254
1255    struct MemoryStore {
1256        data: Mutex<HashMap<SessionId, SessionState>>,
1257    }
1258
1259    impl MemoryStore {
1260        fn new() -> Self {
1261            Self { data: Mutex::new(HashMap::new()) }
1262        }
1263    }
1264
1265    struct FailingSaveStore;
1266
1267    #[async_trait::async_trait]
1268    impl SessionStore for FailingSaveStore {
1269        async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1270            Ok(None)
1271        }
1272
1273        async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
1274            Err(StoreError::Backend("simulated save failure".into()))
1275        }
1276    }
1277
1278    #[async_trait::async_trait]
1279    impl SessionStore for MemoryStore {
1280        async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1281            let map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1282            Ok(map.get(id).cloned())
1283        }
1284
1285        async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
1286            let mut map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1287            map.insert(id.clone(), state.clone());
1288            Ok(())
1289        }
1290    }
1291
1292    struct CollectingSink {
1293        events: Mutex<Vec<AgentEvent>>,
1294    }
1295
1296    impl CollectingSink {
1297        fn new() -> Self {
1298            Self { events: Mutex::new(Vec::new()) }
1299        }
1300
1301        fn event_count(&self) -> usize {
1302            self.events.lock().unwrap_or_else(|p| p.into_inner()).len()
1303        }
1304
1305        fn all_events(&self) -> Vec<AgentEvent> {
1306            self.events.lock().unwrap_or_else(|p| p.into_inner()).clone()
1307        }
1308    }
1309
1310    impl EventSink for CollectingSink {
1311        fn emit(&self, event: AgentEvent) {
1312            self.events.lock().unwrap_or_else(|p| p.into_inner()).push(event);
1313        }
1314    }
1315
1316    fn make_request(input: &str) -> AgentRequest {
1317        AgentRequest {
1318            input: input.into(),
1319            session_id: "test-session".into(),
1320            model: None,
1321            context: bob_core::types::RequestContext::default(),
1322            cancel_token: None,
1323        }
1324    }
1325
1326    fn generous_policy() -> TurnPolicy {
1327        TurnPolicy {
1328            max_steps: 20,
1329            max_tool_calls: 10,
1330            max_consecutive_errors: 3,
1331            turn_timeout_ms: 30_000,
1332            tool_timeout_ms: 5_000,
1333        }
1334    }
1335
1336    struct StreamLlm {
1337        chunks: Mutex<VecDeque<Result<LlmStreamChunk, LlmError>>>,
1338    }
1339
1340    impl StreamLlm {
1341        fn new(chunks: Vec<Result<LlmStreamChunk, LlmError>>) -> Self {
1342            Self { chunks: Mutex::new(chunks.into()) }
1343        }
1344    }
1345
1346    #[async_trait::async_trait]
1347    impl LlmPort for StreamLlm {
1348        async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1349            Err(LlmError::Provider("complete() should not be called in stream test".into()))
1350        }
1351
1352        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1353            let mut chunks = self.chunks.lock().unwrap_or_else(|p| p.into_inner());
1354            let items: Vec<Result<LlmStreamChunk, LlmError>> = chunks.drain(..).collect();
1355            Ok(Box::pin(futures_util::stream::iter(items)))
1356        }
1357    }
1358
1359    struct InspectingLlm {
1360        expected_substring: String,
1361    }
1362
1363    #[async_trait::async_trait]
1364    impl LlmPort for InspectingLlm {
1365        async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
1366            let system = req
1367                .messages
1368                .iter()
1369                .find(|m| m.role == Role::System)
1370                .map(|m| m.content.clone())
1371                .unwrap_or_default();
1372            if !system.contains(&self.expected_substring) {
1373                return Err(LlmError::Provider(format!(
1374                    "expected system prompt to include '{}', got: {}",
1375                    self.expected_substring, system
1376                )));
1377            }
1378            Ok(LlmResponse {
1379                content: r#"{"type": "final", "content": "ok"}"#.to_string(),
1380                usage: TokenUsage::default(),
1381                finish_reason: FinishReason::Stop,
1382                tool_calls: Vec::new(),
1383            })
1384        }
1385
1386        async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1387            Err(LlmError::Provider("not used".into()))
1388        }
1389    }
1390
1391    // ── TC-01: Simple Final response ─────────────────────────────
1392
1393    #[tokio::test]
1394    async fn tc01_simple_final_response() {
1395        let llm =
1396            SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "Hello there!"}"#]);
1397        let tools = MockToolPort::empty();
1398        let store = MemoryStore::new();
1399        let sink = CollectingSink::new();
1400
1401        let result = run_turn(
1402            &llm,
1403            &tools,
1404            &store,
1405            &sink,
1406            make_request("Hi"),
1407            &generous_policy(),
1408            "test-model",
1409        )
1410        .await;
1411
1412        assert!(
1413            matches!(&result, Ok(AgentRunResult::Finished(_))),
1414            "expected Finished, got {result:?}"
1415        );
1416        let resp = match result {
1417            Ok(AgentRunResult::Finished(r)) => r,
1418            _ => return,
1419        };
1420
1421        assert_eq!(resp.content, "Hello there!");
1422        assert_eq!(resp.finish_reason, FinishReason::Stop);
1423        assert!(resp.tool_transcript.is_empty());
1424        assert!(sink.event_count() >= 3, "should emit TurnStarted, LlmCall*, TurnCompleted");
1425    }
1426
1427    // ── TC-02: ToolCall → Final chain ────────────────────────────
1428
1429    #[tokio::test]
1430    async fn tc02_tool_call_then_final() {
1431        let llm = SequentialLlm::from_contents(vec![
1432            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1433            r#"{"type": "final", "content": "Found results."}"#,
1434        ]);
1435        let tools = MockToolPort::with_tool_and_results(
1436            "search",
1437            vec![Ok(ToolResult {
1438                name: "search".into(),
1439                output: serde_json::json!({"hits": 42}),
1440                is_error: false,
1441            })],
1442        );
1443        let store = MemoryStore::new();
1444        let sink = CollectingSink::new();
1445
1446        let result = run_turn(
1447            &llm,
1448            &tools,
1449            &store,
1450            &sink,
1451            make_request("Search for rust"),
1452            &generous_policy(),
1453            "test-model",
1454        )
1455        .await;
1456
1457        assert!(
1458            matches!(&result, Ok(AgentRunResult::Finished(_))),
1459            "expected Finished, got {result:?}"
1460        );
1461        let resp = match result {
1462            Ok(AgentRunResult::Finished(r)) => r,
1463            _ => return,
1464        };
1465
1466        assert_eq!(resp.content, "Found results.");
1467        assert_eq!(resp.finish_reason, FinishReason::Stop);
1468        assert_eq!(resp.tool_transcript.len(), 1);
1469        assert_eq!(resp.tool_transcript[0].name, "search");
1470        assert!(!resp.tool_transcript[0].is_error);
1471    }
1472
1473    // ── TC-03: Parse error → re-prompt → success ────────────────
1474
1475    #[tokio::test]
1476    async fn tc03_parse_error_reprompt_success() {
1477        let llm = SequentialLlm::from_contents(vec![
1478            "This is not JSON at all.",
1479            r#"{"type": "final", "content": "Recovered"}"#,
1480        ]);
1481        let tools = MockToolPort::empty();
1482        let store = MemoryStore::new();
1483        let sink = CollectingSink::new();
1484
1485        let result = run_turn(
1486            &llm,
1487            &tools,
1488            &store,
1489            &sink,
1490            make_request("Hi"),
1491            &generous_policy(),
1492            "test-model",
1493        )
1494        .await;
1495
1496        assert!(
1497            matches!(&result, Ok(AgentRunResult::Finished(_))),
1498            "expected Finished after re-prompt, got {result:?}"
1499        );
1500        let resp = match result {
1501            Ok(AgentRunResult::Finished(r)) => r,
1502            _ => return,
1503        };
1504
1505        assert_eq!(resp.content, "Recovered");
1506        assert_eq!(resp.finish_reason, FinishReason::Stop);
1507    }
1508
1509    // ── TC-04: Double parse error → AgentError ──────────────────
1510
1511    #[tokio::test]
1512    async fn tc04_double_parse_error() {
1513        let llm = SequentialLlm::from_contents(vec!["not json 1", "not json 2"]);
1514        let tools = MockToolPort::empty();
1515        let store = MemoryStore::new();
1516        let sink = CollectingSink::new();
1517
1518        let result = run_turn(
1519            &llm,
1520            &tools,
1521            &store,
1522            &sink,
1523            make_request("Hi"),
1524            &generous_policy(),
1525            "test-model",
1526        )
1527        .await;
1528
1529        assert!(result.is_err(), "should return error after two parse failures");
1530        let msg = match result {
1531            Err(err) => err.to_string(),
1532            Ok(value) => format!("unexpected success: {value:?}"),
1533        };
1534        assert!(msg.contains("invalid JSON"), "error message = {msg}");
1535    }
1536
1537    // ── TC-05: max_steps exhaustion → GuardExceeded ─────────────
1538
1539    #[tokio::test]
1540    async fn tc05_max_steps_exhaustion() {
1541        // LLM always returns tool calls — the guard should stop after max_steps.
1542        let llm = SequentialLlm::from_contents(vec![
1543            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1544            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1545            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1546            r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1547        ]);
1548        let tools = MockToolPort::with_tool_and_results(
1549            "t1",
1550            vec![
1551                Ok(ToolResult {
1552                    name: "t1".into(),
1553                    output: serde_json::json!(null),
1554                    is_error: false,
1555                }),
1556                Ok(ToolResult {
1557                    name: "t1".into(),
1558                    output: serde_json::json!(null),
1559                    is_error: false,
1560                }),
1561                Ok(ToolResult {
1562                    name: "t1".into(),
1563                    output: serde_json::json!(null),
1564                    is_error: false,
1565                }),
1566            ],
1567        );
1568        let store = MemoryStore::new();
1569        let sink = CollectingSink::new();
1570
1571        let policy = TurnPolicy {
1572            max_steps: 2,
1573            max_tool_calls: 10,
1574            max_consecutive_errors: 5,
1575            turn_timeout_ms: 30_000,
1576            tool_timeout_ms: 5_000,
1577        };
1578
1579        let result =
1580            run_turn(&llm, &tools, &store, &sink, make_request("do work"), &policy, "test-model")
1581                .await;
1582
1583        assert!(
1584            matches!(&result, Ok(AgentRunResult::Finished(_))),
1585            "expected Finished with GuardExceeded, got {result:?}"
1586        );
1587        let resp = match result {
1588            Ok(AgentRunResult::Finished(r)) => r,
1589            _ => return,
1590        };
1591
1592        assert_eq!(resp.finish_reason, FinishReason::GuardExceeded);
1593        assert!(resp.content.contains("MaxSteps"), "content = {}", resp.content);
1594    }
1595
1596    // ── TC-06: Cancellation mid-turn → Cancelled ────────────────
1597
1598    #[tokio::test]
1599    async fn tc06_cancellation() {
1600        let llm = SequentialLlm::from_contents(vec![
1601            r#"{"type": "final", "content": "should not reach"}"#,
1602        ]);
1603        let tools = MockToolPort::empty();
1604        let store = MemoryStore::new();
1605        let sink = CollectingSink::new();
1606
1607        let token = CancelToken::new();
1608        // Cancel before running.
1609        token.cancel();
1610
1611        let mut req = make_request("Hi");
1612        req.cancel_token = Some(token);
1613
1614        let result =
1615            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1616
1617        assert!(
1618            matches!(&result, Ok(AgentRunResult::Finished(_))),
1619            "expected Finished with Cancelled, got {result:?}"
1620        );
1621        let resp = match result {
1622            Ok(AgentRunResult::Finished(r)) => r,
1623            _ => return,
1624        };
1625
1626        assert_eq!(resp.finish_reason, FinishReason::Cancelled);
1627    }
1628
1629    // ── TC-07: Tool error → is_error result → LLM sees error → Final ───
1630
1631    #[tokio::test]
1632    async fn tc07_tool_error_then_final() {
1633        let llm = SequentialLlm::from_contents(vec![
1634            r#"{"type": "tool_call", "name": "flaky_tool", "arguments": {}}"#,
1635            r#"{"type": "final", "content": "Recovered from tool error."}"#,
1636        ]);
1637        let tools = MockToolPort::with_tool_and_results(
1638            "flaky_tool",
1639            vec![Err(ToolError::Execution("connection refused".into()))],
1640        );
1641        let store = MemoryStore::new();
1642        let sink = CollectingSink::new();
1643
1644        let result = run_turn(
1645            &llm,
1646            &tools,
1647            &store,
1648            &sink,
1649            make_request("call flaky"),
1650            &generous_policy(),
1651            "test-model",
1652        )
1653        .await;
1654
1655        assert!(
1656            matches!(&result, Ok(AgentRunResult::Finished(_))),
1657            "expected Finished, got {result:?}"
1658        );
1659        let resp = match result {
1660            Ok(AgentRunResult::Finished(r)) => r,
1661            _ => return,
1662        };
1663
1664        assert_eq!(resp.content, "Recovered from tool error.");
1665        assert_eq!(resp.tool_transcript.len(), 1);
1666        assert!(resp.tool_transcript[0].is_error);
1667    }
1668
1669    #[tokio::test]
1670    async fn tc08_save_failure_is_propagated() {
1671        let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "done"}"#]);
1672        let tools = MockToolPort::empty();
1673        let store = FailingSaveStore;
1674        let sink = CollectingSink::new();
1675
1676        let result = run_turn(
1677            &llm,
1678            &tools,
1679            &store,
1680            &sink,
1681            make_request("hello"),
1682            &generous_policy(),
1683            "test-model",
1684        )
1685        .await;
1686
1687        assert!(matches!(result, Err(AgentError::Store(_))), "expected Store error to be returned");
1688    }
1689
1690    #[tokio::test]
1691    async fn tc09_stream_turn_emits_text_and_finished() {
1692        let llm: Arc<dyn LlmPort> = Arc::new(StreamLlm::new(vec![
1693            Ok(LlmStreamChunk::TextDelta("{\"type\":\"final\",\"content\":\"he".into())),
1694            Ok(LlmStreamChunk::TextDelta("llo\"}".into())),
1695            Ok(LlmStreamChunk::Done {
1696                usage: TokenUsage { prompt_tokens: 3, completion_tokens: 4 },
1697            }),
1698        ]));
1699        let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
1700        let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
1701        let sink: Arc<dyn EventSink> = Arc::new(CollectingSink::new());
1702
1703        let stream_result = run_turn_stream(
1704            llm,
1705            tools,
1706            store,
1707            sink,
1708            make_request("hello"),
1709            generous_policy(),
1710            "test-model".to_string(),
1711        )
1712        .await;
1713        assert!(stream_result.is_ok(), "run_turn_stream should produce a stream");
1714        let mut stream = match stream_result {
1715            Ok(stream) => stream,
1716            Err(_) => return,
1717        };
1718
1719        let mut saw_text = false;
1720        let mut saw_finished = false;
1721        while let Some(event) = stream.next().await {
1722            match event {
1723                AgentStreamEvent::TextDelta { content } => {
1724                    saw_text = saw_text || !content.is_empty();
1725                }
1726                AgentStreamEvent::Finished { usage } => {
1727                    saw_finished = true;
1728                    assert_eq!(usage.prompt_tokens, 3);
1729                    assert_eq!(usage.completion_tokens, 4);
1730                }
1731                AgentStreamEvent::ToolCallStarted { .. } |
1732                AgentStreamEvent::ToolCallCompleted { .. } |
1733                AgentStreamEvent::Error { .. } => {}
1734            }
1735        }
1736
1737        assert!(saw_text, "expected at least one text delta");
1738        assert!(saw_finished, "expected a finished event");
1739    }
1740
1741    #[tokio::test]
1742    async fn tc10_skills_prompt_context_is_injected() {
1743        let llm = InspectingLlm { expected_substring: "Skill: rust-review".to_string() };
1744        let tools = MockToolPort::empty();
1745        let store = MemoryStore::new();
1746        let sink = CollectingSink::new();
1747
1748        let mut req = make_request("review this code");
1749        req.context.system_prompt = Some("Skill: rust-review\nUse strict checks.".to_string());
1750
1751        let result =
1752            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1753
1754        assert!(result.is_ok(), "run should succeed when skills prompt is injected");
1755    }
1756
1757    #[tokio::test]
1758    async fn tc11_selected_skills_context_emits_event() {
1759        let llm =
1760            SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "looks good"}"#]);
1761        let tools = MockToolPort::empty();
1762        let store = MemoryStore::new();
1763        let sink = CollectingSink::new();
1764
1765        let mut req = make_request("review code");
1766        req.context.selected_skills = vec!["rust-review".to_string(), "security-audit".to_string()];
1767
1768        let result =
1769            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1770        assert!(result.is_ok(), "run should succeed");
1771
1772        let events = sink.all_events();
1773        assert!(
1774            events.iter().any(|event| matches!(
1775                event,
1776                AgentEvent::SkillsSelected { skill_names }
1777                    if skill_names == &vec!["rust-review".to_string(), "security-audit".to_string()]
1778            )),
1779            "skills.selected event should be emitted with context skill names"
1780        );
1781    }
1782
1783    #[tokio::test]
1784    async fn tc12_policy_deny_tool_blocks_execution() {
1785        let llm = SequentialLlm::from_contents(vec![
1786            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1787            r#"{"type": "final", "content": "done"}"#,
1788        ]);
1789        let tools = NoCallToolPort {
1790            tools: vec![ToolDescriptor {
1791                id: "search".to_string(),
1792                description: "search tool".to_string(),
1793                input_schema: serde_json::json!({"type":"object"}),
1794                source: ToolSource::Local,
1795            }],
1796        };
1797        let store = MemoryStore::new();
1798        let sink = CollectingSink::new();
1799
1800        let mut req = make_request("search rust");
1801        req.context.tool_policy.deny_tools =
1802            vec!["search".to_string(), "local/shell_exec".to_string()];
1803
1804        let result =
1805            run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1806        assert!(
1807            matches!(&result, Ok(AgentRunResult::Finished(_))),
1808            "expected finished response, got {result:?}"
1809        );
1810        let resp = match result {
1811            Ok(AgentRunResult::Finished(r)) => r,
1812            _ => return,
1813        };
1814
1815        assert_eq!(resp.finish_reason, FinishReason::Stop);
1816        assert_eq!(resp.tool_transcript.len(), 1);
1817        assert!(resp.tool_transcript[0].is_error);
1818        assert!(
1819            resp.tool_transcript[0].output.to_string().contains("denied"),
1820            "tool error should explain policy denial"
1821        );
1822    }
1823
1824    #[tokio::test]
1825    async fn tc13_approval_denied_blocks_execution() {
1826        let llm = SequentialLlm::from_contents(vec![
1827            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1828            r#"{"type": "final", "content": "done"}"#,
1829        ]);
1830        let tools = NoCallToolPort {
1831            tools: vec![ToolDescriptor {
1832                id: "search".to_string(),
1833                description: "search tool".to_string(),
1834                input_schema: serde_json::json!({"type":"object"}),
1835                source: ToolSource::Local,
1836            }],
1837        };
1838        let store = MemoryStore::new();
1839        let sink = CollectingSink::new();
1840        let req = make_request("search rust");
1841        let tool_policy = AllowAllPolicyPort;
1842        let approval = AlwaysDenyApprovalPort;
1843
1844        let result = run_turn_with_controls(
1845            &llm,
1846            &tools,
1847            &store,
1848            &sink,
1849            req,
1850            &generous_policy(),
1851            "test-model",
1852            &tool_policy,
1853            &approval,
1854        )
1855        .await;
1856        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
1857        let resp = match result {
1858            Ok(AgentRunResult::Finished(r)) => r,
1859            _ => return,
1860        };
1861
1862        assert_eq!(resp.tool_transcript.len(), 1);
1863        assert!(resp.tool_transcript[0].is_error);
1864        assert!(
1865            resp.tool_transcript[0].output.to_string().contains("approval policy rejected"),
1866            "tool error should explain approval denial"
1867        );
1868    }
1869
1870    #[tokio::test]
1871    async fn tc14_custom_policy_port_blocks_execution() {
1872        let llm = SequentialLlm::from_contents(vec![
1873            r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1874            r#"{"type": "final", "content": "done"}"#,
1875        ]);
1876        let tools = NoCallToolPort {
1877            tools: vec![ToolDescriptor {
1878                id: "search".to_string(),
1879                description: "search tool".to_string(),
1880                input_schema: serde_json::json!({"type":"object"}),
1881                source: ToolSource::Local,
1882            }],
1883        };
1884        let store = MemoryStore::new();
1885        let sink = CollectingSink::new();
1886        let req = make_request("search rust");
1887        let tool_policy = DenySearchPolicyPort;
1888        let approval = AlwaysApprovePort;
1889
1890        let result = run_turn_with_controls(
1891            &llm,
1892            &tools,
1893            &store,
1894            &sink,
1895            req,
1896            &generous_policy(),
1897            "test-model",
1898            &tool_policy,
1899            &approval,
1900        )
1901        .await;
1902        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
1903        let resp = match result {
1904            Ok(AgentRunResult::Finished(r)) => r,
1905            _ => return,
1906        };
1907
1908        assert_eq!(resp.tool_transcript.len(), 1);
1909        assert!(resp.tool_transcript[0].is_error);
1910        assert!(
1911            resp.tool_transcript[0].output.to_string().contains("denied"),
1912            "tool error should explain policy denial"
1913        );
1914    }
1915
1916    #[tokio::test]
1917    async fn tc15_native_dispatch_mode_uses_llm_tool_calls() {
1918        struct NativeToolLlm {
1919            responses: Mutex<VecDeque<LlmResponse>>,
1920        }
1921
1922        #[async_trait::async_trait]
1923        impl LlmPort for NativeToolLlm {
1924            fn capabilities(&self) -> bob_core::types::LlmCapabilities {
1925                bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
1926            }
1927
1928            async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1929                let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
1930                Ok(q.pop_front().unwrap_or(LlmResponse {
1931                    content: r#"{"type":"final","content":"fallback"}"#.to_string(),
1932                    usage: TokenUsage::default(),
1933                    finish_reason: FinishReason::Stop,
1934                    tool_calls: Vec::new(),
1935                }))
1936            }
1937
1938            async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1939                Err(LlmError::Provider("not used".into()))
1940            }
1941        }
1942
1943        let llm = NativeToolLlm {
1944            responses: Mutex::new(VecDeque::from(vec![
1945                LlmResponse {
1946                    content: "ignored".to_string(),
1947                    usage: TokenUsage::default(),
1948                    finish_reason: FinishReason::Stop,
1949                    tool_calls: vec![ToolCall {
1950                        name: "search".to_string(),
1951                        arguments: serde_json::json!({"q":"rust"}),
1952                    }],
1953                },
1954                LlmResponse {
1955                    content: r#"{"type":"final","content":"done"}"#.to_string(),
1956                    usage: TokenUsage::default(),
1957                    finish_reason: FinishReason::Stop,
1958                    tool_calls: Vec::new(),
1959                },
1960            ])),
1961        };
1962        let tools = MockToolPort::with_tool_and_results(
1963            "search",
1964            vec![Ok(ToolResult {
1965                name: "search".to_string(),
1966                output: serde_json::json!({"hits": 2}),
1967                is_error: false,
1968            })],
1969        );
1970        let store = MemoryStore::new();
1971        let sink = CollectingSink::new();
1972        let checkpoint = CountingCheckpointPort::new();
1973        let artifacts = NoopArtifactStore;
1974        let cost = CountingCostMeter::new();
1975        let policy = AllowAllPolicyPort;
1976        let approval = AlwaysApprovePort;
1977
1978        let result = run_turn_with_extensions(
1979            &llm,
1980            &tools,
1981            &store,
1982            &sink,
1983            make_request("search rust"),
1984            &generous_policy(),
1985            "test-model",
1986            &policy,
1987            &approval,
1988            crate::DispatchMode::NativePreferred,
1989            &checkpoint,
1990            &artifacts,
1991            &cost,
1992        )
1993        .await;
1994
1995        assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
1996        let resp = match result {
1997            Ok(AgentRunResult::Finished(r)) => r,
1998            _ => return,
1999        };
2000        assert_eq!(resp.tool_transcript.len(), 1);
2001        assert_eq!(resp.tool_transcript[0].name, "search");
2002    }
2003
2004    #[tokio::test]
2005    async fn tc16_checkpoint_and_cost_ports_are_invoked() {
2006        let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "ok"}"#]);
2007        let tools = MockToolPort::empty();
2008        let store = MemoryStore::new();
2009        let sink = CollectingSink::new();
2010        let checkpoint = CountingCheckpointPort::new();
2011        let artifacts = NoopArtifactStore;
2012        let cost = CountingCostMeter::new();
2013        let policy = AllowAllPolicyPort;
2014        let approval = AlwaysApprovePort;
2015
2016        let result = run_turn_with_extensions(
2017            &llm,
2018            &tools,
2019            &store,
2020            &sink,
2021            make_request("hello"),
2022            &generous_policy(),
2023            "test-model",
2024            &policy,
2025            &approval,
2026            crate::DispatchMode::PromptGuided,
2027            &checkpoint,
2028            &artifacts,
2029            &cost,
2030        )
2031        .await;
2032        assert!(result.is_ok(), "turn should succeed");
2033        let checkpoints = checkpoint.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
2034        let llm_calls = *cost.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
2035        assert!(checkpoints >= 1, "checkpoint port should be invoked at least once");
2036        assert!(llm_calls >= 1, "cost meter should record llm usage");
2037    }
2038}