1use std::sync::Arc;
43
44use bob_core::{
45 error::AgentError,
46 journal::{JournalEntry, ToolJournalPort},
47 normalize_tool_list,
48 ports::{
49 ApprovalPort, ArtifactStorePort, ContextCompactorPort, CostMeterPort, EventSink, LlmPort,
50 SessionStore, ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
51 },
52 types::{
53 AgentAction, AgentEvent, AgentEventStream, AgentRequest, AgentResponse, AgentRunResult,
54 AgentStreamEvent, ApprovalContext, ApprovalDecision, ArtifactRecord, FinishReason,
55 GuardReason, Message, Role, TokenUsage, ToolCall, ToolResult, TurnCheckpoint, TurnPolicy,
56 },
57};
58use futures_util::StreamExt;
59use tokio::time::Instant;
60
61const STREAM_CHANNEL_CAPACITY: usize = 256;
63
64#[derive(Debug)]
67pub struct LoopGuard {
68 policy: TurnPolicy,
69 steps: u32,
70 tool_calls: u32,
71 consecutive_errors: u32,
72 start: Instant,
73}
74
75impl LoopGuard {
76 #[must_use]
78 pub fn new(policy: TurnPolicy) -> Self {
79 Self { policy, steps: 0, tool_calls: 0, consecutive_errors: 0, start: Instant::now() }
80 }
81
82 #[must_use]
84 pub fn can_continue(&self) -> bool {
85 self.steps < self.policy.max_steps &&
86 self.tool_calls < self.policy.max_tool_calls &&
87 self.consecutive_errors < self.policy.max_consecutive_errors &&
88 !self.timed_out()
89 }
90
91 pub fn record_step(&mut self) {
93 self.steps += 1;
94 }
95
96 pub fn record_tool_call(&mut self) {
98 self.tool_calls += 1;
99 }
100
101 pub fn record_error(&mut self) {
103 self.consecutive_errors += 1;
104 }
105
106 pub fn reset_errors(&mut self) {
108 self.consecutive_errors = 0;
109 }
110
111 #[must_use]
115 pub fn reason(&self) -> GuardReason {
116 if self.steps >= self.policy.max_steps {
117 GuardReason::MaxSteps
118 } else if self.tool_calls >= self.policy.max_tool_calls {
119 GuardReason::MaxToolCalls
120 } else if self.consecutive_errors >= self.policy.max_consecutive_errors {
121 GuardReason::MaxConsecutiveErrors
122 } else if self.timed_out() {
123 GuardReason::TurnTimeout
124 } else {
125 GuardReason::Cancelled
127 }
128 }
129
130 #[must_use]
132 pub fn timed_out(&self) -> bool {
133 self.start.elapsed().as_millis() >= u128::from(self.policy.turn_timeout_ms)
134 }
135}
136
137const DEFAULT_SYSTEM_INSTRUCTIONS: &str = "\
140You are a helpful AI assistant. \
141Think step by step before answering. \
142When you need external information, use the available tools.";
143const MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS: u32 = 3;
144
145fn resolve_system_instructions(req: &AgentRequest) -> String {
146 if let Some(skills_prompt) = req.context.system_prompt.as_deref() {
147 if skills_prompt.trim().is_empty() {
148 DEFAULT_SYSTEM_INSTRUCTIONS.to_string()
149 } else {
150 format!("{DEFAULT_SYSTEM_INSTRUCTIONS}\n\n{skills_prompt}")
151 }
152 } else {
153 DEFAULT_SYSTEM_INSTRUCTIONS.to_string()
154 }
155}
156
157fn resolve_system_instructions_with_memory(
158 req: &AgentRequest,
159 session: &bob_core::types::SessionState,
160) -> String {
161 let base_instructions = resolve_system_instructions(req);
162 crate::memory_context::inject_memory_prompt(
163 &base_instructions,
164 session.memory_summary.as_deref(),
165 )
166}
167
168fn resolve_selected_skills(req: &AgentRequest) -> Vec<String> {
169 req.context.selected_skills.clone()
170}
171
172#[derive(Debug, Clone, Default)]
173struct ToolCallPolicy {
174 deny_tools: Vec<String>,
175 allow_tools: Option<Vec<String>>,
176}
177
178fn resolve_tool_call_policy(req: &AgentRequest) -> ToolCallPolicy {
179 let deny_tools =
180 normalize_tool_list(req.context.tool_policy.deny_tools.iter().map(String::as_str));
181 let allow_tools = req
182 .context
183 .tool_policy
184 .allow_tools
185 .as_ref()
186 .map(|tools| normalize_tool_list(tools.iter().map(String::as_str)));
187 ToolCallPolicy { deny_tools, allow_tools }
188}
189
190fn prompt_options_for_mode(
191 dispatch_mode: crate::DispatchMode,
192 llm: &dyn LlmPort,
193 output_schema: Option<serde_json::Value>,
194) -> crate::prompt::PromptBuildOptions {
195 let mut opts = match dispatch_mode {
196 crate::DispatchMode::PromptGuided => crate::prompt::PromptBuildOptions::default(),
197 crate::DispatchMode::NativePreferred => {
198 if llm.capabilities().native_tool_calling {
199 crate::prompt::PromptBuildOptions {
200 include_action_schema: false,
201 include_tool_schema: false,
202 ..Default::default()
203 }
204 } else {
205 crate::prompt::PromptBuildOptions::default()
206 }
207 }
208 };
209 opts.structured_output = output_schema;
210 opts
211}
212
213fn parse_action_for_mode(
214 dispatch_mode: crate::DispatchMode,
215 llm: &dyn LlmPort,
216 response: &bob_core::types::LlmResponse,
217) -> Result<AgentAction, crate::action::ActionParseError> {
218 match dispatch_mode {
219 crate::DispatchMode::PromptGuided => crate::action::parse_action(&response.content),
220 crate::DispatchMode::NativePreferred => {
221 if llm.capabilities().native_tool_calling &&
222 let Some(tool_call) = response.tool_calls.first()
223 {
224 return Ok(AgentAction::ToolCall {
225 name: tool_call.name.clone(),
226 arguments: tool_call.arguments.clone(),
227 });
228 }
229 crate::action::parse_action(&response.content)
230 }
231 }
232}
233
234#[expect(
235 clippy::too_many_arguments,
236 reason = "tool execution needs explicit policy, approval, and timeout dependencies"
237)]
238async fn execute_tool_call(
239 tools: &dyn ToolPort,
240 guard: &mut LoopGuard,
241 tool_call: ToolCall,
242 policy: &ToolCallPolicy,
243 tool_policy_port: &dyn ToolPolicyPort,
244 approval_port: &dyn ApprovalPort,
245 approval_context: &ApprovalContext,
246 timeout_ms: u64,
247) -> ToolResult {
248 if !tool_policy_port.is_tool_allowed(
249 &tool_call.name,
250 &policy.deny_tools,
251 policy.allow_tools.as_deref(),
252 ) {
253 guard.record_error();
254 return ToolResult {
255 name: tool_call.name.clone(),
256 output: serde_json::json!({
257 "error": format!("tool '{}' denied by policy", tool_call.name)
258 }),
259 is_error: true,
260 };
261 }
262
263 match approval_port.approve_tool_call(&tool_call, approval_context).await {
264 Ok(ApprovalDecision::Approved) => {}
265 Ok(ApprovalDecision::Denied { reason }) => {
266 guard.record_error();
267 return ToolResult {
268 name: tool_call.name.clone(),
269 output: serde_json::json!({"error": reason}),
270 is_error: true,
271 };
272 }
273 Err(err) => {
274 guard.record_error();
275 return ToolResult {
276 name: tool_call.name.clone(),
277 output: serde_json::json!({"error": err.to_string()}),
278 is_error: true,
279 };
280 }
281 }
282
283 match tokio::time::timeout(
284 std::time::Duration::from_millis(timeout_ms),
285 tools.call_tool(tool_call.clone()),
286 )
287 .await
288 {
289 Ok(Ok(result)) => {
290 guard.reset_errors();
291 result
292 }
293 Ok(Err(err)) => {
294 guard.record_error();
295 ToolResult {
296 name: tool_call.name,
297 output: serde_json::json!({"error": err.to_string()}),
298 is_error: true,
299 }
300 }
301 Err(_) => {
302 guard.record_error();
303 ToolResult {
304 name: tool_call.name,
305 output: serde_json::json!({"error": "tool call timed out"}),
306 is_error: true,
307 }
308 }
309 }
310}
311
312pub async fn run_turn(
319 llm: &dyn LlmPort,
320 tools: &dyn ToolPort,
321 store: &dyn SessionStore,
322 events: &dyn EventSink,
323 req: AgentRequest,
324 policy: &TurnPolicy,
325 default_model: &str,
326) -> Result<AgentRunResult, AgentError> {
327 let tool_policy = crate::DefaultToolPolicyPort;
328 let approval = crate::AllowAllApprovalPort;
329 let checkpoint_store = crate::NoOpCheckpointStorePort;
330 let artifact_store = crate::NoOpArtifactStorePort;
331 let cost_meter = crate::NoOpCostMeterPort;
332 let compactor = crate::prompt::WindowContextCompactor::default();
333 let journal = crate::NoOpToolJournalPort;
334 run_turn_with_extensions(
335 llm,
336 tools,
337 store,
338 events,
339 req,
340 policy,
341 default_model,
342 &tool_policy,
343 &approval,
344 crate::DispatchMode::NativePreferred,
345 &checkpoint_store,
346 &artifact_store,
347 &cost_meter,
348 &compactor,
349 &journal,
350 )
351 .await
352}
353
354#[cfg_attr(
356 not(test),
357 expect(
358 dead_code,
359 reason = "reserved wrapper for partial control injection in external integrations"
360 )
361)]
362#[expect(
363 clippy::too_many_arguments,
364 reason = "wrapper exposes explicit dependency ports for compatibility and testability"
365)]
366pub(crate) async fn run_turn_with_controls(
367 llm: &dyn LlmPort,
368 tools: &dyn ToolPort,
369 store: &dyn SessionStore,
370 events: &dyn EventSink,
371 req: AgentRequest,
372 policy: &TurnPolicy,
373 default_model: &str,
374 tool_policy_port: &dyn ToolPolicyPort,
375 approval_port: &dyn ApprovalPort,
376) -> Result<AgentRunResult, AgentError> {
377 let checkpoint_store = crate::NoOpCheckpointStorePort;
378 let artifact_store = crate::NoOpArtifactStorePort;
379 let cost_meter = crate::NoOpCostMeterPort;
380 let compactor = crate::prompt::WindowContextCompactor::default();
381 let journal = crate::NoOpToolJournalPort;
382 run_turn_with_extensions(
383 llm,
384 tools,
385 store,
386 events,
387 req,
388 policy,
389 default_model,
390 tool_policy_port,
391 approval_port,
392 crate::DispatchMode::PromptGuided,
393 &checkpoint_store,
394 &artifact_store,
395 &cost_meter,
396 &compactor,
397 &journal,
398 )
399 .await
400}
401
402#[expect(
404 clippy::too_many_arguments,
405 reason = "core entrypoint exposes all ports explicitly for adapter injection"
406)]
407pub(crate) async fn run_turn_with_extensions(
408 llm: &dyn LlmPort,
409 tools: &dyn ToolPort,
410 store: &dyn SessionStore,
411 events: &dyn EventSink,
412 req: AgentRequest,
413 policy: &TurnPolicy,
414 default_model: &str,
415 tool_policy_port: &dyn ToolPolicyPort,
416 approval_port: &dyn ApprovalPort,
417 dispatch_mode: crate::DispatchMode,
418 checkpoint_store: &dyn TurnCheckpointStorePort,
419 artifact_store: &dyn ArtifactStorePort,
420 cost_meter: &dyn CostMeterPort,
421 context_compactor: &dyn ContextCompactorPort,
422 journal: &dyn ToolJournalPort,
423) -> Result<AgentRunResult, AgentError> {
424 let model = req.model.as_deref().unwrap_or(default_model);
425 let cancel_token = req.cancel_token.clone();
426 let selected_skills = resolve_selected_skills(&req);
427 let tool_call_policy = resolve_tool_call_policy(&req);
428
429 let mut session = store.load(&req.session_id).await?.unwrap_or_default();
430 let system_instructions = resolve_system_instructions_with_memory(&req, &session);
431 let tool_descriptors = tools.list_tools().await?;
432 let mut guard = LoopGuard::new(policy.clone());
433
434 let mut tool_view = crate::progressive_tools::ProgressiveToolView::new(tool_descriptors);
436
437 events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
438 if !selected_skills.is_empty() {
439 events.emit(AgentEvent::SkillsSelected {
440 session_id: req.session_id.clone(),
441 skill_names: selected_skills.clone(),
442 });
443 }
444
445 session.messages.push(Message::text(Role::User, req.input.clone()));
446
447 let mut tool_transcript: Vec<ToolResult> = Vec::new();
448 let mut total_usage = TokenUsage::default();
449 let mut consecutive_parse_failures: u32 = 0;
450 let mut consecutive_validation_failures: u32 = 0;
451 let max_output_retries = req.max_output_retries;
452 let mut last_tool_call_signature: Option<String> = None;
454 let mut same_tool_call_streak: u32 = 0;
455
456 loop {
457 let current_step = guard.steps.saturating_add(1);
458
459 if let Some(ref token) = cancel_token &&
460 token.is_cancelled()
461 {
462 return finish_turn(
463 store,
464 events,
465 &req.session_id,
466 &session,
467 FinishResult {
468 content: "Turn cancelled.",
469 tool_transcript,
470 usage: total_usage,
471 finish_reason: FinishReason::Cancelled,
472 },
473 )
474 .await;
475 }
476
477 cost_meter.check_budget(&req.session_id).await?;
478
479 if !guard.can_continue() {
480 let reason = guard.reason();
481 let msg = format!("Turn stopped: {reason:?}");
482 return finish_turn(
483 store,
484 events,
485 &req.session_id,
486 &session,
487 FinishResult {
488 content: &msg,
489 tool_transcript,
490 usage: total_usage,
491 finish_reason: FinishReason::GuardExceeded,
492 },
493 )
494 .await;
495 }
496
497 let mut augmented_instructions = system_instructions.clone();
499 let tool_summary = tool_view.summary_prompt();
500 if !tool_summary.is_empty() {
501 augmented_instructions.push('\n');
502 augmented_instructions.push('\n');
503 augmented_instructions.push_str(&tool_summary);
504 }
505
506 let active_tools = tool_view.activated_tools();
507 let llm_request = crate::prompt::build_llm_request_with_options(
508 model,
509 &session,
510 &active_tools,
511 &augmented_instructions,
512 prompt_options_for_mode(dispatch_mode, llm, req.output_schema.clone()),
513 context_compactor,
514 )
515 .await;
516
517 events.emit(AgentEvent::LlmCallStarted {
518 session_id: req.session_id.clone(),
519 step: current_step,
520 model: model.to_string(),
521 });
522
523 let llm_response = if let Some(ref token) = cancel_token {
524 tokio::select! {
525 result = llm.complete(llm_request.clone()) => result?,
526 () = token.cancelled() => {
527 return finish_turn(
528 store, events, &req.session_id, &session,
529 FinishResult { content: "Turn cancelled.", tool_transcript, usage: total_usage, finish_reason: FinishReason::Cancelled },
530 ).await;
531 }
532 }
533 } else {
534 llm.complete(llm_request).await?
535 };
536
537 guard.record_step();
538 total_usage.prompt_tokens += llm_response.usage.prompt_tokens;
539 total_usage.completion_tokens += llm_response.usage.completion_tokens;
540 session.total_usage.prompt_tokens =
541 session.total_usage.prompt_tokens.saturating_add(llm_response.usage.prompt_tokens);
542 session.total_usage.completion_tokens = session
543 .total_usage
544 .completion_tokens
545 .saturating_add(llm_response.usage.completion_tokens);
546 cost_meter.record_llm_usage(&req.session_id, model, &llm_response.usage).await?;
547
548 events.emit(AgentEvent::LlmCallCompleted {
549 session_id: req.session_id.clone(),
550 step: current_step,
551 model: model.to_string(),
552 usage: llm_response.usage.clone(),
553 });
554 let native_tool_call = if llm.capabilities().native_tool_calling {
555 llm_response.tool_calls.first().cloned()
556 } else {
557 None
558 };
559
560 tool_view.activate_hints(&llm_response.content);
562
563 let assistant_message = if llm_response.tool_calls.is_empty() {
564 Message::text(Role::Assistant, llm_response.content.clone())
565 } else {
566 Message::assistant_tool_calls(
567 llm_response.content.clone(),
568 llm_response.tool_calls.clone(),
569 )
570 };
571 session.messages.push(assistant_message);
572
573 let _ = checkpoint_store
574 .save_checkpoint(&TurnCheckpoint {
575 session_id: req.session_id.clone(),
576 step: guard.steps,
577 tool_calls: guard.tool_calls,
578 usage: total_usage.clone(),
579 })
580 .await;
581
582 match parse_action_for_mode(dispatch_mode, llm, &llm_response) {
583 Ok(action) => {
584 consecutive_parse_failures = 0;
585 match action {
586 AgentAction::Final { content } => {
587 if let Some(ref schema) = req.output_schema {
589 match crate::output_validation::validate_output_str(&content, schema) {
590 Ok(_) => {}
591 Err(validation_err) => {
592 consecutive_validation_failures += 1;
593 if consecutive_validation_failures > max_output_retries {
594 tracing::warn!(
595 session_id = %req.session_id,
596 "output schema validation failed after {} retries",
597 max_output_retries,
598 );
599 } else {
601 let prompt =
602 crate::output_validation::validation_error_prompt(
603 &content,
604 &validation_err,
605 );
606 session.messages.push(Message::text(Role::User, prompt));
607 continue;
608 }
609 }
610 }
611 }
612 return finish_turn(
613 store,
614 events,
615 &req.session_id,
616 &session,
617 FinishResult {
618 content: &content,
619 tool_transcript,
620 usage: total_usage,
621 finish_reason: FinishReason::Stop,
622 },
623 )
624 .await;
625 }
626 AgentAction::AskUser { question } => {
627 return finish_turn(
628 store,
629 events,
630 &req.session_id,
631 &session,
632 FinishResult {
633 content: &question,
634 tool_transcript,
635 usage: total_usage,
636 finish_reason: FinishReason::Stop,
637 },
638 )
639 .await;
640 }
641 AgentAction::ToolCall { name, arguments } => {
642 let tool_call_id = native_tool_call
643 .as_ref()
644 .filter(|call| call.name == name && call.arguments == arguments)
645 .and_then(|call| call.call_id.clone());
646 tool_view.activate(&name);
648
649 let call_signature = format!(
650 "{}:{}",
651 name,
652 serde_json::to_string(&arguments).unwrap_or_default()
653 );
654 if last_tool_call_signature.as_ref() == Some(&call_signature) {
655 same_tool_call_streak = same_tool_call_streak.saturating_add(1);
656 } else {
657 same_tool_call_streak = 1;
658 last_tool_call_signature = Some(call_signature);
659 }
660
661 if same_tool_call_streak > MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS {
662 events.emit(AgentEvent::ToolCallStarted {
663 session_id: req.session_id.clone(),
664 step: current_step,
665 name: name.clone(),
666 });
667 let dup_result = ToolResult {
668 name: name.clone(),
669 output: serde_json::json!({
670 "error": format!(
671 "consecutive duplicate tool call limit reached (>{MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS}); skipping to prevent loop"
672 )
673 }),
674 is_error: true,
675 };
676 guard.record_tool_call();
677 let _ =
678 cost_meter.record_tool_result(&req.session_id, &dup_result).await;
679 let output_str =
680 serde_json::to_string(&dup_result.output).unwrap_or_default();
681 session.messages.push(Message::tool_result(
682 name.clone(),
683 tool_call_id.clone(),
684 output_str,
685 ));
686 events.emit(AgentEvent::ToolCallCompleted {
687 session_id: req.session_id.clone(),
688 step: current_step,
689 name: name.clone(),
690 is_error: true,
691 });
692 let _ = artifact_store
693 .put(ArtifactRecord {
694 session_id: req.session_id.clone(),
695 kind: "tool_result".to_string(),
696 name: name.clone(),
697 content: dup_result.output.clone(),
698 })
699 .await;
700 tool_transcript.push(dup_result);
701 continue;
702 }
703
704 events.emit(AgentEvent::ToolCallStarted {
705 session_id: req.session_id.clone(),
706 step: current_step,
707 name: name.clone(),
708 });
709 let approval_context = ApprovalContext {
710 session_id: req.session_id.clone(),
711 turn_step: guard.steps.max(1),
712 selected_skills: selected_skills.clone(),
713 };
714
715 let call_fingerprint = JournalEntry::fingerprint(&name, &arguments);
717 let tool_result = if let Ok(Some(cached)) =
718 journal.lookup(&req.session_id, &call_fingerprint).await
719 {
720 tracing::debug!(
721 session_id = %req.session_id,
722 tool = %name,
723 "replaying tool result from journal"
724 );
725 ToolResult {
726 name: cached.tool_name,
727 output: cached.result,
728 is_error: cached.is_error,
729 }
730 } else {
731 let result = execute_tool_call(
732 tools,
733 &mut guard,
734 ToolCall::new(name.clone(), arguments.clone()),
735 &tool_call_policy,
736 tool_policy_port,
737 approval_port,
738 &approval_context,
739 policy.tool_timeout_ms,
740 )
741 .await;
742 let _ = journal
744 .append(JournalEntry {
745 session_id: req.session_id.clone(),
746 call_fingerprint: call_fingerprint.clone(),
747 tool_name: name.clone(),
748 arguments: arguments.clone(),
749 result: result.output.clone(),
750 is_error: result.is_error,
751 timestamp_ms: bob_core::tape::now_ms(),
752 })
753 .await;
754 result
755 };
756
757 guard.record_tool_call();
758 let _ = cost_meter.record_tool_result(&req.session_id, &tool_result).await;
759
760 let is_error = tool_result.is_error;
761 events.emit(AgentEvent::ToolCallCompleted {
762 session_id: req.session_id.clone(),
763 step: current_step,
764 name: name.clone(),
765 is_error,
766 });
767
768 let output_str =
769 serde_json::to_string(&tool_result.output).unwrap_or_default();
770 session.messages.push(Message::tool_result(
771 name.clone(),
772 tool_call_id,
773 output_str,
774 ));
775
776 let _ = artifact_store
777 .put(ArtifactRecord {
778 session_id: req.session_id.clone(),
779 kind: "tool_result".to_string(),
780 name: name.clone(),
781 content: tool_result.output.clone(),
782 })
783 .await;
784
785 tool_transcript.push(tool_result);
786 }
787 }
788 }
789 Err(_parse_err) => {
790 consecutive_parse_failures += 1;
791 last_tool_call_signature = None;
792 same_tool_call_streak = 0;
793 if consecutive_parse_failures >= 2 {
794 let _ = store.save(&req.session_id, &session).await;
795 return Err(AgentError::Internal(
796 "LLM produced invalid JSON after re-prompt".into(),
797 ));
798 }
799 session.messages.push(Message::text(
800 Role::User,
801 "Your response was not valid JSON. \
802 Please respond with exactly one JSON object \
803 matching the required schema.",
804 ));
805 }
806 }
807 }
808}
809
810struct FinishResult<'a> {
812 content: &'a str,
813 tool_transcript: Vec<ToolResult>,
814 usage: TokenUsage,
815 finish_reason: FinishReason,
816}
817
818async fn finish_turn(
820 store: &dyn SessionStore,
821 events: &dyn EventSink,
822 session_id: &bob_core::types::SessionId,
823 session: &bob_core::types::SessionState,
824 result: FinishResult<'_>,
825) -> Result<AgentRunResult, AgentError> {
826 store.save(session_id, session).await?;
827 events.emit(AgentEvent::TurnCompleted {
828 session_id: session_id.clone(),
829 finish_reason: result.finish_reason,
830 usage: result.usage.clone(),
831 });
832 Ok(AgentRunResult::Finished(AgentResponse {
833 content: result.content.to_string(),
834 tool_transcript: result.tool_transcript,
835 usage: result.usage,
836 finish_reason: result.finish_reason,
837 }))
838}
839
840pub async fn run_turn_stream(
842 llm: Arc<dyn LlmPort>,
843 tools: Arc<dyn ToolPort>,
844 store: Arc<dyn SessionStore>,
845 events: Arc<dyn EventSink>,
846 req: AgentRequest,
847 policy: TurnPolicy,
848 default_model: String,
849) -> Result<AgentEventStream, AgentError> {
850 let tool_policy: Arc<dyn ToolPolicyPort> = Arc::new(crate::DefaultToolPolicyPort);
851 let approval: Arc<dyn ApprovalPort> = Arc::new(crate::AllowAllApprovalPort);
852 let checkpoint_store: Arc<dyn TurnCheckpointStorePort> =
853 Arc::new(crate::NoOpCheckpointStorePort);
854 let artifact_store: Arc<dyn ArtifactStorePort> = Arc::new(crate::NoOpArtifactStorePort);
855 let cost_meter: Arc<dyn CostMeterPort> = Arc::new(crate::NoOpCostMeterPort);
856 let context_compactor: Arc<dyn ContextCompactorPort> =
857 Arc::new(crate::prompt::WindowContextCompactor::default());
858 let journal: Arc<dyn ToolJournalPort> = Arc::new(crate::NoOpToolJournalPort);
859 run_turn_stream_with_controls(
860 llm,
861 tools,
862 store,
863 events,
864 req,
865 policy,
866 default_model,
867 tool_policy,
868 approval,
869 crate::DispatchMode::NativePreferred,
870 checkpoint_store,
871 artifact_store,
872 cost_meter,
873 context_compactor,
874 journal,
875 )
876 .await
877}
878
879#[expect(
881 clippy::too_many_arguments,
882 reason = "streaming entrypoint exposes all ports and controls explicitly for composition roots"
883)]
884pub(crate) async fn run_turn_stream_with_controls(
885 llm: Arc<dyn LlmPort>,
886 tools: Arc<dyn ToolPort>,
887 store: Arc<dyn SessionStore>,
888 events: Arc<dyn EventSink>,
889 req: AgentRequest,
890 policy: TurnPolicy,
891 default_model: String,
892 tool_policy: Arc<dyn ToolPolicyPort>,
893 approval: Arc<dyn ApprovalPort>,
894 dispatch_mode: crate::DispatchMode,
895 checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
896 artifact_store: Arc<dyn ArtifactStorePort>,
897 cost_meter: Arc<dyn CostMeterPort>,
898 context_compactor: Arc<dyn ContextCompactorPort>,
899 journal: Arc<dyn ToolJournalPort>,
900) -> Result<AgentEventStream, AgentError> {
901 let (tx, rx) = flume::bounded::<AgentStreamEvent>(STREAM_CHANNEL_CAPACITY);
902 let config = StreamRunConfig {
903 policy,
904 default_model,
905 tool_policy,
906 approval,
907 dispatch_mode,
908 checkpoint_store,
909 artifact_store,
910 cost_meter,
911 context_compactor,
912 journal,
913 };
914
915 tokio::spawn(async move {
916 if let Err(err) = run_turn_stream_inner(llm, tools, store, events, req, &config, &tx).await
917 {
918 let _ = tx.send_async(AgentStreamEvent::Error { error: err.to_string() }).await;
919 }
920 });
921
922 Ok(Box::pin(rx.into_stream()))
923}
924
925struct StreamRunConfig {
926 policy: TurnPolicy,
927 default_model: String,
928 tool_policy: Arc<dyn ToolPolicyPort>,
929 approval: Arc<dyn ApprovalPort>,
930 dispatch_mode: crate::DispatchMode,
931 checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
932 artifact_store: Arc<dyn ArtifactStorePort>,
933 cost_meter: Arc<dyn CostMeterPort>,
934 context_compactor: Arc<dyn ContextCompactorPort>,
935 journal: Arc<dyn ToolJournalPort>,
936}
937
938async fn run_turn_stream_inner(
939 llm: Arc<dyn LlmPort>,
940 tools: Arc<dyn ToolPort>,
941 store: Arc<dyn SessionStore>,
942 events: Arc<dyn EventSink>,
943 req: AgentRequest,
944 config: &StreamRunConfig,
945 tx: &flume::Sender<AgentStreamEvent>,
946) -> Result<(), AgentError> {
947 let model = req.model.as_deref().unwrap_or(&config.default_model);
948 let cancel_token = req.cancel_token.clone();
949 let selected_skills = resolve_selected_skills(&req);
950 let tool_call_policy = resolve_tool_call_policy(&req);
951
952 let mut session = store.load(&req.session_id).await?.unwrap_or_default();
953 let system_instructions = resolve_system_instructions_with_memory(&req, &session);
954 let tool_descriptors = tools.list_tools().await?;
955 let mut guard = LoopGuard::new(config.policy.clone());
956 let mut total_usage = TokenUsage::default();
957 let mut consecutive_parse_failures: u32 = 0;
958 let mut next_call_id: u64 = 0;
959 let mut last_tool_call_signature: Option<String> = None;
960 let mut same_tool_call_streak: u32 = 0;
961
962 events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
963 if !selected_skills.is_empty() {
964 events.emit(AgentEvent::SkillsSelected {
965 session_id: req.session_id.clone(),
966 skill_names: selected_skills.clone(),
967 });
968 }
969 session.messages.push(Message::text(Role::User, req.input.clone()));
970
971 loop {
972 let current_step = guard.steps.saturating_add(1);
973
974 if let Some(ref token) = cancel_token &&
975 token.is_cancelled()
976 {
977 events.emit(AgentEvent::Error {
978 session_id: req.session_id.clone(),
979 step: Some(current_step),
980 error: "turn cancelled".to_string(),
981 });
982 events.emit(AgentEvent::TurnCompleted {
983 session_id: req.session_id.clone(),
984 finish_reason: FinishReason::Cancelled,
985 usage: total_usage.clone(),
986 });
987 store.save(&req.session_id, &session).await?;
988 let _ = tx
989 .send_async(AgentStreamEvent::Error { error: "turn cancelled".to_string() })
990 .await;
991 let _ = tx.send_async(AgentStreamEvent::Finished { usage: total_usage.clone() }).await;
992 return Ok(());
993 }
994
995 config.cost_meter.check_budget(&req.session_id).await?;
996
997 if !guard.can_continue() {
998 let reason = guard.reason();
999 let msg = format!("Turn stopped: {reason:?}");
1000 events.emit(AgentEvent::Error {
1001 session_id: req.session_id.clone(),
1002 step: Some(current_step),
1003 error: msg.clone(),
1004 });
1005 events.emit(AgentEvent::TurnCompleted {
1006 session_id: req.session_id.clone(),
1007 finish_reason: FinishReason::GuardExceeded,
1008 usage: total_usage.clone(),
1009 });
1010 store.save(&req.session_id, &session).await?;
1011 let _ = tx.send_async(AgentStreamEvent::Error { error: msg }).await;
1012 let _ = tx.send_async(AgentStreamEvent::Finished { usage: total_usage.clone() }).await;
1013 return Ok(());
1014 }
1015
1016 let llm_request = crate::prompt::build_llm_request_with_options(
1017 model,
1018 &session,
1019 &tool_descriptors,
1020 &system_instructions,
1021 prompt_options_for_mode(config.dispatch_mode, llm.as_ref(), req.output_schema.clone()),
1022 config.context_compactor.as_ref(),
1023 );
1024 events.emit(AgentEvent::LlmCallStarted {
1025 session_id: req.session_id.clone(),
1026 step: current_step,
1027 model: model.to_string(),
1028 });
1029
1030 let mut assistant_content = String::new();
1031 let mut llm_usage = TokenUsage::default();
1032 let mut llm_tool_calls: Vec<ToolCall> = Vec::new();
1033 let mut llm_finish_reason = FinishReason::Stop;
1034 let mut fallback_to_complete = false;
1035
1036 let llm_request = llm_request.await;
1037 if llm.capabilities().native_tool_calling {
1038 fallback_to_complete = true;
1039 } else {
1040 match llm.complete_stream(llm_request.clone()).await {
1041 Ok(mut stream) => {
1042 while let Some(item) = stream.next().await {
1043 match item {
1044 Ok(bob_core::types::LlmStreamChunk::TextDelta(delta)) => {
1045 assistant_content.push_str(&delta);
1046 let _ = tx
1047 .send_async(AgentStreamEvent::TextDelta { content: delta })
1048 .await;
1049 }
1050 Ok(bob_core::types::LlmStreamChunk::Done { usage }) => {
1051 llm_usage = usage;
1052 }
1053 Err(err) => {
1054 events.emit(AgentEvent::Error {
1055 session_id: req.session_id.clone(),
1056 step: Some(current_step),
1057 error: err.to_string(),
1058 });
1059 return Err(AgentError::Llm(err));
1060 }
1061 }
1062 }
1063 }
1064 Err(err) => {
1065 fallback_to_complete = true;
1066 let _ = err;
1067 }
1068 }
1069 }
1070
1071 if fallback_to_complete {
1075 let llm_response = llm.complete(llm_request).await?;
1076 assistant_content = llm_response.content.clone();
1077 llm_usage = llm_response.usage;
1078 llm_finish_reason = llm_response.finish_reason;
1079 llm_tool_calls = llm_response.tool_calls;
1080 let _ =
1081 tx.send_async(AgentStreamEvent::TextDelta { content: llm_response.content }).await;
1082 }
1083
1084 guard.record_step();
1085 total_usage.prompt_tokens += llm_usage.prompt_tokens;
1086 total_usage.completion_tokens += llm_usage.completion_tokens;
1087 session.total_usage.prompt_tokens =
1088 session.total_usage.prompt_tokens.saturating_add(llm_usage.prompt_tokens);
1089 session.total_usage.completion_tokens =
1090 session.total_usage.completion_tokens.saturating_add(llm_usage.completion_tokens);
1091 config.cost_meter.record_llm_usage(&req.session_id, model, &llm_usage).await?;
1092 events.emit(AgentEvent::LlmCallCompleted {
1093 session_id: req.session_id.clone(),
1094 step: current_step,
1095 model: model.to_string(),
1096 usage: llm_usage.clone(),
1097 });
1098 let native_tool_call = if llm.capabilities().native_tool_calling {
1099 llm_tool_calls.first().cloned()
1100 } else {
1101 None
1102 };
1103 let assistant_message = if llm_tool_calls.is_empty() {
1104 Message::text(Role::Assistant, assistant_content.clone())
1105 } else {
1106 Message::assistant_tool_calls(assistant_content.clone(), llm_tool_calls.clone())
1107 };
1108 session.messages.push(assistant_message);
1109
1110 let _ = config
1111 .checkpoint_store
1112 .save_checkpoint(&TurnCheckpoint {
1113 session_id: req.session_id.clone(),
1114 step: guard.steps,
1115 tool_calls: guard.tool_calls,
1116 usage: total_usage.clone(),
1117 })
1118 .await;
1119
1120 let response_for_dispatch = bob_core::types::LlmResponse {
1121 content: assistant_content.clone(),
1122 usage: llm_usage.clone(),
1123 finish_reason: llm_finish_reason,
1124 tool_calls: llm_tool_calls,
1125 };
1126
1127 if let Ok(action) =
1128 parse_action_for_mode(config.dispatch_mode, llm.as_ref(), &response_for_dispatch)
1129 {
1130 consecutive_parse_failures = 0;
1131 match action {
1132 AgentAction::Final { .. } | AgentAction::AskUser { .. } => {
1133 store.save(&req.session_id, &session).await?;
1134 events.emit(AgentEvent::TurnCompleted {
1135 session_id: req.session_id.clone(),
1136 finish_reason: FinishReason::Stop,
1137 usage: total_usage.clone(),
1138 });
1139 let _ = tx
1140 .send_async(AgentStreamEvent::Finished { usage: total_usage.clone() })
1141 .await;
1142 return Ok(());
1143 }
1144 AgentAction::ToolCall { name, arguments } => {
1145 let tool_call_id = native_tool_call
1146 .as_ref()
1147 .filter(|call| call.name == name && call.arguments == arguments)
1148 .and_then(|call| call.call_id.clone());
1149 let call_signature = format!(
1150 "{}:{}",
1151 name,
1152 serde_json::to_string(&arguments).unwrap_or_default()
1153 );
1154 if last_tool_call_signature.as_ref() == Some(&call_signature) {
1155 same_tool_call_streak = same_tool_call_streak.saturating_add(1);
1156 } else {
1157 same_tool_call_streak = 1;
1158 last_tool_call_signature = Some(call_signature);
1159 }
1160 if same_tool_call_streak > MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS {
1161 next_call_id += 1;
1162 let call_id = format!("call-{next_call_id}");
1163 events.emit(AgentEvent::ToolCallStarted {
1164 session_id: req.session_id.clone(),
1165 step: current_step,
1166 name: name.clone(),
1167 });
1168 let _ = tx.send(AgentStreamEvent::ToolCallStarted {
1169 name: name.clone(),
1170 call_id: call_id.clone(),
1171 });
1172 guard.record_tool_call();
1173 let duplicate_result = ToolResult {
1174 name: name.clone(),
1175 output: serde_json::json!({
1176 "error": format!(
1177 "consecutive duplicate tool call limit reached (>{MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS}); skipping to prevent loop"
1178 )
1179 }),
1180 is_error: true,
1181 };
1182 let _ = config
1183 .cost_meter
1184 .record_tool_result(&req.session_id, &duplicate_result)
1185 .await;
1186 events.emit(AgentEvent::ToolCallCompleted {
1187 session_id: req.session_id.clone(),
1188 step: current_step,
1189 name: name.clone(),
1190 is_error: true,
1191 });
1192 let output_str =
1193 serde_json::to_string(&duplicate_result.output).unwrap_or_default();
1194 session.messages.push(Message::tool_result(
1195 name.clone(),
1196 tool_call_id.clone(),
1197 output_str,
1198 ));
1199 let _ = config
1200 .artifact_store
1201 .put(ArtifactRecord {
1202 session_id: req.session_id.clone(),
1203 kind: "tool_result".to_string(),
1204 name: name.clone(),
1205 content: duplicate_result.output.clone(),
1206 })
1207 .await;
1208 let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
1209 call_id,
1210 result: duplicate_result,
1211 });
1212 continue;
1213 }
1214
1215 events.emit(AgentEvent::ToolCallStarted {
1216 session_id: req.session_id.clone(),
1217 step: current_step,
1218 name: name.clone(),
1219 });
1220 next_call_id += 1;
1221 let call_id = format!("call-{next_call_id}");
1222 let _ = tx.send(AgentStreamEvent::ToolCallStarted {
1223 name: name.clone(),
1224 call_id: call_id.clone(),
1225 });
1226 let approval_context = ApprovalContext {
1227 session_id: req.session_id.clone(),
1228 turn_step: guard.steps.max(1),
1229 selected_skills: selected_skills.clone(),
1230 };
1231
1232 let call_fingerprint = JournalEntry::fingerprint(&name, &arguments);
1234 let tool_result = if let Ok(Some(cached)) =
1235 config.journal.lookup(&req.session_id, &call_fingerprint).await
1236 {
1237 tracing::debug!(
1238 session_id = %req.session_id,
1239 tool = %name,
1240 "replaying tool result from journal"
1241 );
1242 ToolResult {
1243 name: cached.tool_name,
1244 output: cached.result,
1245 is_error: cached.is_error,
1246 }
1247 } else {
1248 let result = execute_tool_call(
1249 tools.as_ref(),
1250 &mut guard,
1251 ToolCall::new(name.clone(), arguments.clone()),
1252 &tool_call_policy,
1253 config.tool_policy.as_ref(),
1254 config.approval.as_ref(),
1255 &approval_context,
1256 config.policy.tool_timeout_ms,
1257 )
1258 .await;
1259 let _ = config
1261 .journal
1262 .append(JournalEntry {
1263 session_id: req.session_id.clone(),
1264 call_fingerprint: call_fingerprint.clone(),
1265 tool_name: name.clone(),
1266 arguments: arguments.clone(),
1267 result: result.output.clone(),
1268 is_error: result.is_error,
1269 timestamp_ms: bob_core::tape::now_ms(),
1270 })
1271 .await;
1272 result
1273 };
1274
1275 guard.record_tool_call();
1276 let _ =
1277 config.cost_meter.record_tool_result(&req.session_id, &tool_result).await;
1278 let is_error = tool_result.is_error;
1279 events.emit(AgentEvent::ToolCallCompleted {
1280 session_id: req.session_id.clone(),
1281 step: current_step,
1282 name: name.clone(),
1283 is_error,
1284 });
1285 let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
1286 call_id,
1287 result: tool_result.clone(),
1288 });
1289
1290 let output_str = serde_json::to_string(&tool_result.output).unwrap_or_default();
1291 session.messages.push(Message::tool_result(
1292 name.clone(),
1293 tool_call_id,
1294 output_str,
1295 ));
1296 let _ = config
1297 .artifact_store
1298 .put(ArtifactRecord {
1299 session_id: req.session_id.clone(),
1300 kind: "tool_result".to_string(),
1301 name: name.clone(),
1302 content: tool_result.output.clone(),
1303 })
1304 .await;
1305 }
1306 }
1307 } else {
1308 consecutive_parse_failures += 1;
1309 last_tool_call_signature = None;
1310 same_tool_call_streak = 0;
1311 if consecutive_parse_failures >= 2 {
1312 store.save(&req.session_id, &session).await?;
1313 events.emit(AgentEvent::Error {
1314 session_id: req.session_id.clone(),
1315 step: Some(current_step),
1316 error: "LLM produced invalid JSON after re-prompt".to_string(),
1317 });
1318 return Err(AgentError::Internal(
1319 "LLM produced invalid JSON after re-prompt".into(),
1320 ));
1321 }
1322 session.messages.push(Message::text(
1323 Role::User,
1324 "Your response was not valid JSON. \
1325 Please respond with exactly one JSON object \
1326 matching the required schema.",
1327 ));
1328 }
1329 }
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334 use super::*;
1335
1336 fn test_policy() -> TurnPolicy {
1338 TurnPolicy {
1339 max_steps: 3,
1340 max_tool_calls: 2,
1341 max_consecutive_errors: 2,
1342 turn_timeout_ms: 100,
1343 tool_timeout_ms: 50,
1344 }
1345 }
1346
1347 #[test]
1348 fn trips_on_max_steps() {
1349 let mut guard = LoopGuard::new(test_policy());
1350 assert!(guard.can_continue());
1351
1352 for _ in 0..3 {
1353 guard.record_step();
1354 }
1355
1356 assert!(!guard.can_continue(), "guard should trip after reaching max_steps");
1357 assert_eq!(guard.reason(), GuardReason::MaxSteps);
1358 }
1359
1360 #[test]
1361 fn trips_on_max_tool_calls() {
1362 let mut guard = LoopGuard::new(test_policy());
1363 assert!(guard.can_continue());
1364
1365 for _ in 0..2 {
1366 guard.record_tool_call();
1367 }
1368
1369 assert!(!guard.can_continue(), "guard should trip after reaching max_tool_calls");
1370 assert_eq!(guard.reason(), GuardReason::MaxToolCalls);
1371 }
1372
1373 #[test]
1374 fn trips_on_max_consecutive_errors() {
1375 let mut guard = LoopGuard::new(test_policy());
1376 assert!(guard.can_continue());
1377
1378 for _ in 0..2 {
1379 guard.record_error();
1380 }
1381
1382 assert!(!guard.can_continue(), "guard should trip after reaching max_consecutive_errors");
1383 assert_eq!(guard.reason(), GuardReason::MaxConsecutiveErrors);
1384 }
1385
1386 #[tokio::test]
1387 async fn trips_on_timeout() {
1388 let guard = LoopGuard::new(test_policy());
1389 assert!(guard.can_continue());
1390 assert!(!guard.timed_out());
1391
1392 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
1394
1395 assert!(!guard.can_continue(), "guard should trip after timeout");
1396 assert!(guard.timed_out());
1397 assert_eq!(guard.reason(), GuardReason::TurnTimeout);
1398 }
1399
1400 #[test]
1401 fn reset_errors_clears_counter() {
1402 let mut guard = LoopGuard::new(test_policy());
1403
1404 guard.record_error();
1405 guard.reset_errors();
1406
1407 guard.record_error();
1409 assert!(guard.can_continue(), "single error after reset should not trip guard");
1410 }
1411
1412 use std::{
1415 collections::{HashMap, VecDeque},
1416 sync::{Arc, Mutex},
1417 };
1418
1419 use bob_core::{
1420 error::{CostError, LlmError, StoreError, ToolError},
1421 ports::{
1422 ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
1423 ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
1424 },
1425 types::{
1426 AgentEvent, AgentRequest, AgentRunResult, AgentStreamEvent, ApprovalContext,
1427 ApprovalDecision, ArtifactRecord, CancelToken, LlmRequest, LlmResponse, LlmStream,
1428 LlmStreamChunk, SessionId, SessionState, ToolCall, ToolDescriptor, ToolResult,
1429 TurnCheckpoint,
1430 },
1431 };
1432 use futures_util::StreamExt;
1433
1434 struct SequentialLlm {
1438 responses: Mutex<VecDeque<Result<LlmResponse, LlmError>>>,
1439 }
1440
1441 impl SequentialLlm {
1442 fn from_contents(contents: Vec<&str>) -> Self {
1443 let responses = contents
1444 .into_iter()
1445 .map(|c| {
1446 Ok(LlmResponse {
1447 content: c.to_string(),
1448 usage: TokenUsage::default(),
1449 finish_reason: FinishReason::Stop,
1450 tool_calls: Vec::new(),
1451 })
1452 })
1453 .collect();
1454 Self { responses: Mutex::new(responses) }
1455 }
1456
1457 fn from_responses(responses: Vec<LlmResponse>) -> Self {
1458 let queued = responses.into_iter().map(Ok).collect();
1459 Self { responses: Mutex::new(queued) }
1460 }
1461 }
1462
1463 #[async_trait::async_trait]
1464 impl LlmPort for SequentialLlm {
1465 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1466 let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
1467 q.pop_front().unwrap_or_else(|| {
1468 Ok(LlmResponse {
1469 content: r#"{"type": "final", "content": "fallback"}"#.to_string(),
1470 usage: TokenUsage::default(),
1471 finish_reason: FinishReason::Stop,
1472 tool_calls: Vec::new(),
1473 })
1474 })
1475 }
1476
1477 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1478 Err(LlmError::Provider("not implemented".into()))
1479 }
1480 }
1481
1482 struct MockToolPort {
1484 tools: Vec<ToolDescriptor>,
1485 call_results: Mutex<VecDeque<Result<ToolResult, ToolError>>>,
1486 }
1487
1488 impl MockToolPort {
1489 fn empty() -> Self {
1490 Self { tools: vec![], call_results: Mutex::new(VecDeque::new()) }
1491 }
1492
1493 fn with_tool_and_results(
1494 tool_name: &str,
1495 results: Vec<Result<ToolResult, ToolError>>,
1496 ) -> Self {
1497 Self {
1498 tools: vec![
1499 ToolDescriptor::new(tool_name, format!("{tool_name} tool"))
1500 .with_input_schema(serde_json::json!({"type": "object"})),
1501 ],
1502 call_results: Mutex::new(results.into()),
1503 }
1504 }
1505 }
1506
1507 #[async_trait::async_trait]
1508 impl ToolPort for MockToolPort {
1509 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1510 Ok(self.tools.clone())
1511 }
1512
1513 async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
1514 let mut q = self.call_results.lock().unwrap_or_else(|p| p.into_inner());
1515 q.pop_front().unwrap_or_else(|| {
1516 Ok(ToolResult {
1517 name: call.name,
1518 output: serde_json::json!({"result": "default"}),
1519 is_error: false,
1520 })
1521 })
1522 }
1523 }
1524
1525 struct NoCallToolPort {
1526 tools: Vec<ToolDescriptor>,
1527 }
1528
1529 #[async_trait::async_trait]
1530 impl ToolPort for NoCallToolPort {
1531 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1532 Ok(self.tools.clone())
1533 }
1534
1535 async fn call_tool(&self, _call: ToolCall) -> Result<ToolResult, ToolError> {
1536 Err(ToolError::Execution(
1537 "tool call should be blocked by policy before execution".to_string(),
1538 ))
1539 }
1540 }
1541
1542 struct AllowAllPolicyPort;
1543
1544 impl ToolPolicyPort for AllowAllPolicyPort {
1545 fn is_tool_allowed(
1546 &self,
1547 _tool: &str,
1548 _deny_tools: &[String],
1549 _allow_tools: Option<&[String]>,
1550 ) -> bool {
1551 true
1552 }
1553 }
1554
1555 struct DenySearchPolicyPort;
1556
1557 impl ToolPolicyPort for DenySearchPolicyPort {
1558 fn is_tool_allowed(
1559 &self,
1560 tool: &str,
1561 _deny_tools: &[String],
1562 _allow_tools: Option<&[String]>,
1563 ) -> bool {
1564 tool != "search"
1565 }
1566 }
1567
1568 struct AlwaysApprovePort;
1569
1570 #[async_trait::async_trait]
1571 impl ApprovalPort for AlwaysApprovePort {
1572 async fn approve_tool_call(
1573 &self,
1574 _call: &ToolCall,
1575 _context: &ApprovalContext,
1576 ) -> Result<ApprovalDecision, ToolError> {
1577 Ok(ApprovalDecision::Approved)
1578 }
1579 }
1580
1581 struct AlwaysDenyApprovalPort;
1582
1583 #[async_trait::async_trait]
1584 impl ApprovalPort for AlwaysDenyApprovalPort {
1585 async fn approve_tool_call(
1586 &self,
1587 _call: &ToolCall,
1588 _context: &ApprovalContext,
1589 ) -> Result<ApprovalDecision, ToolError> {
1590 Ok(ApprovalDecision::Denied {
1591 reason: "approval policy rejected tool call".to_string(),
1592 })
1593 }
1594 }
1595
1596 struct CountingCheckpointPort {
1597 saved: Mutex<Vec<TurnCheckpoint>>,
1598 }
1599
1600 impl CountingCheckpointPort {
1601 fn new() -> Self {
1602 Self { saved: Mutex::new(Vec::new()) }
1603 }
1604 }
1605
1606 #[async_trait::async_trait]
1607 impl TurnCheckpointStorePort for CountingCheckpointPort {
1608 async fn save_checkpoint(&self, checkpoint: &TurnCheckpoint) -> Result<(), StoreError> {
1609 self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(checkpoint.clone());
1610 Ok(())
1611 }
1612
1613 async fn load_latest(
1614 &self,
1615 _session_id: &SessionId,
1616 ) -> Result<Option<TurnCheckpoint>, StoreError> {
1617 Ok(None)
1618 }
1619 }
1620
1621 struct NoopArtifactStore;
1622
1623 #[async_trait::async_trait]
1624 impl ArtifactStorePort for NoopArtifactStore {
1625 async fn put(&self, _artifact: ArtifactRecord) -> Result<(), StoreError> {
1626 Ok(())
1627 }
1628
1629 async fn list_by_session(
1630 &self,
1631 _session_id: &SessionId,
1632 ) -> Result<Vec<ArtifactRecord>, StoreError> {
1633 Ok(Vec::new())
1634 }
1635 }
1636
1637 struct CountingArtifactStore {
1638 saved: Mutex<Vec<ArtifactRecord>>,
1639 }
1640
1641 impl CountingArtifactStore {
1642 fn new() -> Self {
1643 Self { saved: Mutex::new(Vec::new()) }
1644 }
1645 }
1646
1647 #[async_trait::async_trait]
1648 impl ArtifactStorePort for CountingArtifactStore {
1649 async fn put(&self, artifact: ArtifactRecord) -> Result<(), StoreError> {
1650 self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(artifact);
1651 Ok(())
1652 }
1653
1654 async fn list_by_session(
1655 &self,
1656 _session_id: &SessionId,
1657 ) -> Result<Vec<ArtifactRecord>, StoreError> {
1658 Ok(self.saved.lock().unwrap_or_else(|p| p.into_inner()).clone())
1659 }
1660 }
1661
1662 struct CountingCostMeter {
1663 llm_calls: Mutex<u32>,
1664 tool_results: Mutex<u32>,
1665 }
1666
1667 impl CountingCostMeter {
1668 fn new() -> Self {
1669 Self { llm_calls: Mutex::new(0), tool_results: Mutex::new(0) }
1670 }
1671 }
1672
1673 #[async_trait::async_trait]
1674 impl CostMeterPort for CountingCostMeter {
1675 async fn check_budget(&self, _session_id: &SessionId) -> Result<(), CostError> {
1676 Ok(())
1677 }
1678
1679 async fn record_llm_usage(
1680 &self,
1681 _session_id: &SessionId,
1682 _model: &str,
1683 _usage: &TokenUsage,
1684 ) -> Result<(), CostError> {
1685 let mut count = self.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
1686 *count += 1;
1687 Ok(())
1688 }
1689
1690 async fn record_tool_result(
1691 &self,
1692 _session_id: &SessionId,
1693 _tool_result: &ToolResult,
1694 ) -> Result<(), CostError> {
1695 let mut count = self.tool_results.lock().unwrap_or_else(|p| p.into_inner());
1696 *count += 1;
1697 Ok(())
1698 }
1699 }
1700
1701 struct MemoryStore {
1702 data: Mutex<HashMap<SessionId, SessionState>>,
1703 }
1704
1705 impl MemoryStore {
1706 fn new() -> Self {
1707 Self { data: Mutex::new(HashMap::new()) }
1708 }
1709 }
1710
1711 struct FailingSaveStore;
1712
1713 #[async_trait::async_trait]
1714 impl SessionStore for FailingSaveStore {
1715 async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1716 Ok(None)
1717 }
1718
1719 async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
1720 Err(StoreError::Backend("simulated save failure".into()))
1721 }
1722 }
1723
1724 #[async_trait::async_trait]
1725 impl SessionStore for MemoryStore {
1726 async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1727 let map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1728 Ok(map.get(id).cloned())
1729 }
1730
1731 async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
1732 let mut map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1733 map.insert(id.clone(), state.clone());
1734 Ok(())
1735 }
1736 }
1737
1738 struct CollectingSink {
1739 events: Mutex<Vec<AgentEvent>>,
1740 }
1741
1742 impl CollectingSink {
1743 fn new() -> Self {
1744 Self { events: Mutex::new(Vec::new()) }
1745 }
1746
1747 fn event_count(&self) -> usize {
1748 self.events.lock().unwrap_or_else(|p| p.into_inner()).len()
1749 }
1750
1751 fn all_events(&self) -> Vec<AgentEvent> {
1752 self.events.lock().unwrap_or_else(|p| p.into_inner()).clone()
1753 }
1754 }
1755
1756 impl EventSink for CollectingSink {
1757 fn emit(&self, event: AgentEvent) {
1758 self.events.lock().unwrap_or_else(|p| p.into_inner()).push(event);
1759 }
1760 }
1761
1762 fn make_request(input: &str) -> AgentRequest {
1763 AgentRequest {
1764 input: input.into(),
1765 session_id: "test-session".into(),
1766 model: None,
1767 context: bob_core::types::RequestContext::default(),
1768 cancel_token: None,
1769 output_schema: None,
1770 max_output_retries: 0,
1771 }
1772 }
1773
1774 fn generous_policy() -> TurnPolicy {
1775 TurnPolicy {
1776 max_steps: 20,
1777 max_tool_calls: 10,
1778 max_consecutive_errors: 3,
1779 turn_timeout_ms: 30_000,
1780 tool_timeout_ms: 5_000,
1781 }
1782 }
1783
1784 struct StreamLlm {
1785 chunks: Mutex<VecDeque<Result<LlmStreamChunk, LlmError>>>,
1786 }
1787
1788 impl StreamLlm {
1789 fn new(chunks: Vec<Result<LlmStreamChunk, LlmError>>) -> Self {
1790 Self { chunks: Mutex::new(chunks.into()) }
1791 }
1792 }
1793
1794 #[async_trait::async_trait]
1795 impl LlmPort for StreamLlm {
1796 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1797 Err(LlmError::Provider("complete() should not be called in stream test".into()))
1798 }
1799
1800 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1801 let mut chunks = self.chunks.lock().unwrap_or_else(|p| p.into_inner());
1802 let items: Vec<Result<LlmStreamChunk, LlmError>> = chunks.drain(..).collect();
1803 Ok(Box::pin(futures_util::stream::iter(items)))
1804 }
1805 }
1806
1807 struct InspectingLlm {
1808 expected_substring: String,
1809 }
1810
1811 #[async_trait::async_trait]
1812 impl LlmPort for InspectingLlm {
1813 async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
1814 let system = req
1815 .messages
1816 .iter()
1817 .find(|m| m.role == Role::System)
1818 .map(|m| m.content.clone())
1819 .unwrap_or_default();
1820 if !system.contains(&self.expected_substring) {
1821 return Err(LlmError::Provider(format!(
1822 "expected system prompt to include '{}', got: {}",
1823 self.expected_substring, system
1824 )));
1825 }
1826 Ok(LlmResponse {
1827 content: r#"{"type": "final", "content": "ok"}"#.to_string(),
1828 usage: TokenUsage::default(),
1829 finish_reason: FinishReason::Stop,
1830 tool_calls: Vec::new(),
1831 })
1832 }
1833
1834 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1835 Err(LlmError::Provider("not used".into()))
1836 }
1837 }
1838
1839 #[tokio::test]
1842 async fn tc01_simple_final_response() {
1843 let llm =
1844 SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "Hello there!"}"#]);
1845 let tools = MockToolPort::empty();
1846 let store = MemoryStore::new();
1847 let sink = CollectingSink::new();
1848
1849 let result = run_turn(
1850 &llm,
1851 &tools,
1852 &store,
1853 &sink,
1854 make_request("Hi"),
1855 &generous_policy(),
1856 "test-model",
1857 )
1858 .await;
1859
1860 assert!(
1861 matches!(&result, Ok(AgentRunResult::Finished(_))),
1862 "expected Finished, got {result:?}"
1863 );
1864 let resp = match result {
1865 Ok(AgentRunResult::Finished(r)) => r,
1866 _ => return,
1867 };
1868
1869 assert_eq!(resp.content, "Hello there!");
1870 assert_eq!(resp.finish_reason, FinishReason::Stop);
1871 assert!(resp.tool_transcript.is_empty());
1872 assert!(sink.event_count() >= 3, "should emit TurnStarted, LlmCall*, TurnCompleted");
1873 }
1874
1875 #[tokio::test]
1878 async fn tc02_tool_call_then_final() {
1879 let llm = SequentialLlm::from_contents(vec![
1880 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1881 r#"{"type": "final", "content": "Found results."}"#,
1882 ]);
1883 let tools = MockToolPort::with_tool_and_results(
1884 "search",
1885 vec![Ok(ToolResult {
1886 name: "search".into(),
1887 output: serde_json::json!({"hits": 42}),
1888 is_error: false,
1889 })],
1890 );
1891 let store = MemoryStore::new();
1892 let sink = CollectingSink::new();
1893
1894 let result = run_turn(
1895 &llm,
1896 &tools,
1897 &store,
1898 &sink,
1899 make_request("Search for rust"),
1900 &generous_policy(),
1901 "test-model",
1902 )
1903 .await;
1904
1905 assert!(
1906 matches!(&result, Ok(AgentRunResult::Finished(_))),
1907 "expected Finished, got {result:?}"
1908 );
1909 let resp = match result {
1910 Ok(AgentRunResult::Finished(r)) => r,
1911 _ => return,
1912 };
1913
1914 assert_eq!(resp.content, "Found results.");
1915 assert_eq!(resp.finish_reason, FinishReason::Stop);
1916 assert_eq!(resp.tool_transcript.len(), 1);
1917 assert_eq!(resp.tool_transcript[0].name, "search");
1918 assert!(!resp.tool_transcript[0].is_error);
1919 }
1920
1921 #[tokio::test]
1924 async fn tc03_parse_error_reprompt_success() {
1925 let llm = SequentialLlm::from_contents(vec![
1926 "This is not JSON at all.",
1927 r#"{"type": "final", "content": "Recovered"}"#,
1928 ]);
1929 let tools = MockToolPort::empty();
1930 let store = MemoryStore::new();
1931 let sink = CollectingSink::new();
1932
1933 let result = run_turn(
1934 &llm,
1935 &tools,
1936 &store,
1937 &sink,
1938 make_request("Hi"),
1939 &generous_policy(),
1940 "test-model",
1941 )
1942 .await;
1943
1944 assert!(
1945 matches!(&result, Ok(AgentRunResult::Finished(_))),
1946 "expected Finished after re-prompt, got {result:?}"
1947 );
1948 let resp = match result {
1949 Ok(AgentRunResult::Finished(r)) => r,
1950 _ => return,
1951 };
1952
1953 assert_eq!(resp.content, "Recovered");
1954 assert_eq!(resp.finish_reason, FinishReason::Stop);
1955 }
1956
1957 #[tokio::test]
1960 async fn tc04_double_parse_error() {
1961 let llm = SequentialLlm::from_contents(vec!["not json 1", "not json 2"]);
1962 let tools = MockToolPort::empty();
1963 let store = MemoryStore::new();
1964 let sink = CollectingSink::new();
1965
1966 let result = run_turn(
1967 &llm,
1968 &tools,
1969 &store,
1970 &sink,
1971 make_request("Hi"),
1972 &generous_policy(),
1973 "test-model",
1974 )
1975 .await;
1976
1977 assert!(result.is_err(), "should return error after two parse failures");
1978 let msg = match result {
1979 Err(err) => err.to_string(),
1980 Ok(value) => format!("unexpected success: {value:?}"),
1981 };
1982 assert!(msg.contains("invalid JSON"), "error message = {msg}");
1983 }
1984
1985 #[tokio::test]
1988 async fn tc05_max_steps_exhaustion() {
1989 let llm = SequentialLlm::from_contents(vec![
1991 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1992 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1993 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1994 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1995 ]);
1996 let tools = MockToolPort::with_tool_and_results(
1997 "t1",
1998 vec![
1999 Ok(ToolResult {
2000 name: "t1".into(),
2001 output: serde_json::json!(null),
2002 is_error: false,
2003 }),
2004 Ok(ToolResult {
2005 name: "t1".into(),
2006 output: serde_json::json!(null),
2007 is_error: false,
2008 }),
2009 Ok(ToolResult {
2010 name: "t1".into(),
2011 output: serde_json::json!(null),
2012 is_error: false,
2013 }),
2014 ],
2015 );
2016 let store = MemoryStore::new();
2017 let sink = CollectingSink::new();
2018
2019 let policy = TurnPolicy {
2020 max_steps: 2,
2021 max_tool_calls: 10,
2022 max_consecutive_errors: 5,
2023 turn_timeout_ms: 30_000,
2024 tool_timeout_ms: 5_000,
2025 };
2026
2027 let result =
2028 run_turn(&llm, &tools, &store, &sink, make_request("do work"), &policy, "test-model")
2029 .await;
2030
2031 assert!(
2032 matches!(&result, Ok(AgentRunResult::Finished(_))),
2033 "expected Finished with GuardExceeded, got {result:?}"
2034 );
2035 let resp = match result {
2036 Ok(AgentRunResult::Finished(r)) => r,
2037 _ => return,
2038 };
2039
2040 assert_eq!(resp.finish_reason, FinishReason::GuardExceeded);
2041 assert!(resp.content.contains("MaxSteps"), "content = {}", resp.content);
2042 }
2043
2044 #[tokio::test]
2047 async fn tc06_cancellation() {
2048 let llm = SequentialLlm::from_contents(vec![
2049 r#"{"type": "final", "content": "should not reach"}"#,
2050 ]);
2051 let tools = MockToolPort::empty();
2052 let store = MemoryStore::new();
2053 let sink = CollectingSink::new();
2054
2055 let token = CancelToken::new();
2056 token.cancel();
2058
2059 let mut req = make_request("Hi");
2060 req.cancel_token = Some(token);
2061
2062 let result =
2063 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2064
2065 assert!(
2066 matches!(&result, Ok(AgentRunResult::Finished(_))),
2067 "expected Finished with Cancelled, got {result:?}"
2068 );
2069 let resp = match result {
2070 Ok(AgentRunResult::Finished(r)) => r,
2071 _ => return,
2072 };
2073
2074 assert_eq!(resp.finish_reason, FinishReason::Cancelled);
2075 }
2076
2077 #[tokio::test]
2080 async fn tc07_tool_error_then_final() {
2081 let llm = SequentialLlm::from_contents(vec![
2082 r#"{"type": "tool_call", "name": "flaky_tool", "arguments": {}}"#,
2083 r#"{"type": "final", "content": "Recovered from tool error."}"#,
2084 ]);
2085 let tools = MockToolPort::with_tool_and_results(
2086 "flaky_tool",
2087 vec![Err(ToolError::Execution("connection refused".into()))],
2088 );
2089 let store = MemoryStore::new();
2090 let sink = CollectingSink::new();
2091
2092 let result = run_turn(
2093 &llm,
2094 &tools,
2095 &store,
2096 &sink,
2097 make_request("call flaky"),
2098 &generous_policy(),
2099 "test-model",
2100 )
2101 .await;
2102
2103 assert!(
2104 matches!(&result, Ok(AgentRunResult::Finished(_))),
2105 "expected Finished, got {result:?}"
2106 );
2107 let resp = match result {
2108 Ok(AgentRunResult::Finished(r)) => r,
2109 _ => return,
2110 };
2111
2112 assert_eq!(resp.content, "Recovered from tool error.");
2113 assert_eq!(resp.tool_transcript.len(), 1);
2114 assert!(resp.tool_transcript[0].is_error);
2115 }
2116
2117 #[tokio::test]
2118 async fn tc08_save_failure_is_propagated() {
2119 let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "done"}"#]);
2120 let tools = MockToolPort::empty();
2121 let store = FailingSaveStore;
2122 let sink = CollectingSink::new();
2123
2124 let result = run_turn(
2125 &llm,
2126 &tools,
2127 &store,
2128 &sink,
2129 make_request("hello"),
2130 &generous_policy(),
2131 "test-model",
2132 )
2133 .await;
2134
2135 assert!(matches!(result, Err(AgentError::Store(_))), "expected Store error to be returned");
2136 }
2137
2138 #[tokio::test]
2139 async fn tc09_stream_turn_emits_text_and_finished() {
2140 let llm: Arc<dyn LlmPort> = Arc::new(StreamLlm::new(vec![
2141 Ok(LlmStreamChunk::TextDelta("{\"type\":\"final\",\"content\":\"he".into())),
2142 Ok(LlmStreamChunk::TextDelta("llo\"}".into())),
2143 Ok(LlmStreamChunk::Done {
2144 usage: TokenUsage { prompt_tokens: 3, completion_tokens: 4 },
2145 }),
2146 ]));
2147 let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
2148 let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
2149 let sink: Arc<dyn EventSink> = Arc::new(CollectingSink::new());
2150
2151 let stream_result = run_turn_stream(
2152 llm,
2153 tools,
2154 store,
2155 sink,
2156 make_request("hello"),
2157 generous_policy(),
2158 "test-model".to_string(),
2159 )
2160 .await;
2161 assert!(stream_result.is_ok(), "run_turn_stream should produce a stream");
2162 let mut stream = match stream_result {
2163 Ok(stream) => stream,
2164 Err(_) => return,
2165 };
2166
2167 let mut saw_text = false;
2168 let mut saw_finished = false;
2169 while let Some(event) = stream.next().await {
2170 match event {
2171 AgentStreamEvent::TextDelta { content } => {
2172 saw_text = saw_text || !content.is_empty();
2173 }
2174 AgentStreamEvent::Finished { usage } => {
2175 saw_finished = true;
2176 assert_eq!(usage.prompt_tokens, 3);
2177 assert_eq!(usage.completion_tokens, 4);
2178 }
2179 AgentStreamEvent::ToolCallStarted { .. } |
2180 AgentStreamEvent::ToolCallCompleted { .. } |
2181 AgentStreamEvent::Error { .. } => {}
2182 }
2183 }
2184
2185 assert!(saw_text, "expected at least one text delta");
2186 assert!(saw_finished, "expected a finished event");
2187 }
2188
2189 #[tokio::test]
2190 async fn tc10_skills_prompt_context_is_injected() {
2191 let llm = InspectingLlm { expected_substring: "Skill: rust-review".to_string() };
2192 let tools = MockToolPort::empty();
2193 let store = MemoryStore::new();
2194 let sink = CollectingSink::new();
2195
2196 let mut req = make_request("review this code");
2197 req.context.system_prompt = Some("Skill: rust-review\nUse strict checks.".to_string());
2198
2199 let result =
2200 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2201
2202 assert!(result.is_ok(), "run should succeed when skills prompt is injected");
2203 }
2204
2205 #[tokio::test]
2206 async fn tc11_selected_skills_context_emits_event() {
2207 let llm =
2208 SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "looks good"}"#]);
2209 let tools = MockToolPort::empty();
2210 let store = MemoryStore::new();
2211 let sink = CollectingSink::new();
2212
2213 let mut req = make_request("review code");
2214 req.context.selected_skills = vec!["rust-review".to_string(), "security-audit".to_string()];
2215
2216 let result =
2217 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2218 assert!(result.is_ok(), "run should succeed");
2219
2220 let events = sink.all_events();
2221 assert!(
2222 events.iter().any(|event| matches!(
2223 event,
2224 AgentEvent::SkillsSelected { skill_names, .. }
2225 if skill_names == &vec!["rust-review".to_string(), "security-audit".to_string()]
2226 )),
2227 "skills.selected event should be emitted with context skill names"
2228 );
2229 }
2230
2231 #[tokio::test]
2232 async fn tc12_policy_deny_tool_blocks_execution() {
2233 let llm = SequentialLlm::from_contents(vec![
2234 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
2235 r#"{"type": "final", "content": "done"}"#,
2236 ]);
2237 let tools = NoCallToolPort {
2238 tools: vec![
2239 ToolDescriptor::new("search", "search tool")
2240 .with_input_schema(serde_json::json!({"type":"object"})),
2241 ],
2242 };
2243 let store = MemoryStore::new();
2244 let sink = CollectingSink::new();
2245
2246 let mut req = make_request("search rust");
2247 req.context.tool_policy.deny_tools =
2248 vec!["search".to_string(), "local/shell_exec".to_string()];
2249
2250 let result =
2251 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2252 assert!(
2253 matches!(&result, Ok(AgentRunResult::Finished(_))),
2254 "expected finished response, got {result:?}"
2255 );
2256 let resp = match result {
2257 Ok(AgentRunResult::Finished(r)) => r,
2258 _ => return,
2259 };
2260
2261 assert_eq!(resp.finish_reason, FinishReason::Stop);
2262 assert_eq!(resp.tool_transcript.len(), 1);
2263 assert!(resp.tool_transcript[0].is_error);
2264 assert!(
2265 resp.tool_transcript[0].output.to_string().contains("denied"),
2266 "tool error should explain policy denial"
2267 );
2268 }
2269
2270 #[tokio::test]
2271 async fn tc13_approval_denied_blocks_execution() {
2272 let llm = SequentialLlm::from_contents(vec![
2273 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
2274 r#"{"type": "final", "content": "done"}"#,
2275 ]);
2276 let tools = NoCallToolPort {
2277 tools: vec![
2278 ToolDescriptor::new("search", "search tool")
2279 .with_input_schema(serde_json::json!({"type":"object"})),
2280 ],
2281 };
2282 let store = MemoryStore::new();
2283 let sink = CollectingSink::new();
2284 let req = make_request("search rust");
2285 let tool_policy = AllowAllPolicyPort;
2286 let approval = AlwaysDenyApprovalPort;
2287
2288 let result = run_turn_with_controls(
2289 &llm,
2290 &tools,
2291 &store,
2292 &sink,
2293 req,
2294 &generous_policy(),
2295 "test-model",
2296 &tool_policy,
2297 &approval,
2298 )
2299 .await;
2300 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
2301 let resp = match result {
2302 Ok(AgentRunResult::Finished(r)) => r,
2303 _ => return,
2304 };
2305
2306 assert_eq!(resp.tool_transcript.len(), 1);
2307 assert!(resp.tool_transcript[0].is_error);
2308 assert!(
2309 resp.tool_transcript[0].output.to_string().contains("approval policy rejected"),
2310 "tool error should explain approval denial"
2311 );
2312 }
2313
2314 #[tokio::test]
2315 async fn tc14_custom_policy_port_blocks_execution() {
2316 let llm = SequentialLlm::from_contents(vec![
2317 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
2318 r#"{"type": "final", "content": "done"}"#,
2319 ]);
2320 let tools = NoCallToolPort {
2321 tools: vec![
2322 ToolDescriptor::new("search", "search tool")
2323 .with_input_schema(serde_json::json!({"type":"object"})),
2324 ],
2325 };
2326 let store = MemoryStore::new();
2327 let sink = CollectingSink::new();
2328 let req = make_request("search rust");
2329 let tool_policy = DenySearchPolicyPort;
2330 let approval = AlwaysApprovePort;
2331
2332 let result = run_turn_with_controls(
2333 &llm,
2334 &tools,
2335 &store,
2336 &sink,
2337 req,
2338 &generous_policy(),
2339 "test-model",
2340 &tool_policy,
2341 &approval,
2342 )
2343 .await;
2344 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
2345 let resp = match result {
2346 Ok(AgentRunResult::Finished(r)) => r,
2347 _ => return,
2348 };
2349
2350 assert_eq!(resp.tool_transcript.len(), 1);
2351 assert!(resp.tool_transcript[0].is_error);
2352 assert!(
2353 resp.tool_transcript[0].output.to_string().contains("denied"),
2354 "tool error should explain policy denial"
2355 );
2356 }
2357
2358 #[tokio::test]
2359 async fn tc15_native_dispatch_mode_uses_llm_tool_calls() {
2360 struct NativeToolLlm {
2361 responses: Mutex<VecDeque<LlmResponse>>,
2362 }
2363
2364 #[async_trait::async_trait]
2365 impl LlmPort for NativeToolLlm {
2366 fn capabilities(&self) -> bob_core::types::LlmCapabilities {
2367 bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
2368 }
2369
2370 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
2371 let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
2372 Ok(q.pop_front().unwrap_or(LlmResponse {
2373 content: r#"{"type":"final","content":"fallback"}"#.to_string(),
2374 usage: TokenUsage::default(),
2375 finish_reason: FinishReason::Stop,
2376 tool_calls: Vec::new(),
2377 }))
2378 }
2379
2380 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
2381 Err(LlmError::Provider("not used".into()))
2382 }
2383 }
2384
2385 let llm = NativeToolLlm {
2386 responses: Mutex::new(VecDeque::from(vec![
2387 LlmResponse {
2388 content: "ignored".to_string(),
2389 usage: TokenUsage::default(),
2390 finish_reason: FinishReason::Stop,
2391 tool_calls: vec![
2392 ToolCall::new("search", serde_json::json!({"q":"rust"}))
2393 .with_call_id("call-search-1"),
2394 ],
2395 },
2396 LlmResponse {
2397 content: r#"{"type":"final","content":"done"}"#.to_string(),
2398 usage: TokenUsage::default(),
2399 finish_reason: FinishReason::Stop,
2400 tool_calls: Vec::new(),
2401 },
2402 ])),
2403 };
2404 let tools = MockToolPort::with_tool_and_results(
2405 "search",
2406 vec![Ok(ToolResult {
2407 name: "search".to_string(),
2408 output: serde_json::json!({"hits": 2}),
2409 is_error: false,
2410 })],
2411 );
2412 let store = MemoryStore::new();
2413 let sink = CollectingSink::new();
2414 let checkpoint = CountingCheckpointPort::new();
2415 let artifacts = NoopArtifactStore;
2416 let cost = CountingCostMeter::new();
2417 let policy = AllowAllPolicyPort;
2418 let approval = AlwaysApprovePort;
2419
2420 let result = run_turn_with_extensions(
2421 &llm,
2422 &tools,
2423 &store,
2424 &sink,
2425 make_request("search rust"),
2426 &generous_policy(),
2427 "test-model",
2428 &policy,
2429 &approval,
2430 crate::DispatchMode::NativePreferred,
2431 &checkpoint,
2432 &artifacts,
2433 &cost,
2434 &crate::prompt::WindowContextCompactor::default(),
2435 &crate::NoOpToolJournalPort,
2436 )
2437 .await;
2438
2439 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2440 let resp = match result {
2441 Ok(AgentRunResult::Finished(r)) => r,
2442 _ => return,
2443 };
2444 assert_eq!(resp.tool_transcript.len(), 1);
2445 assert_eq!(resp.tool_transcript[0].name, "search");
2446
2447 let saved = store.load(&"test-session".to_string()).await;
2448 let saved = match saved {
2449 Ok(Some(state)) => state,
2450 other => panic!("expected saved state, got {other:?}"),
2451 };
2452 assert!(
2453 saved.messages.iter().any(|message| {
2454 message.role == Role::Assistant &&
2455 message.tool_calls.len() == 1 &&
2456 message.tool_calls[0].call_id.as_deref() == Some("call-search-1")
2457 }),
2458 "assistant tool call should be persisted structurally",
2459 );
2460 assert!(
2461 saved.messages.iter().any(|message| {
2462 message.role == Role::Tool &&
2463 message.tool_call_id.as_deref() == Some("call-search-1") &&
2464 message.tool_name.as_deref() == Some("search")
2465 }),
2466 "tool result should retain tool metadata",
2467 );
2468 }
2469
2470 #[tokio::test]
2471 async fn tc16_checkpoint_and_cost_ports_are_invoked() {
2472 let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "ok"}"#]);
2473 let tools = MockToolPort::empty();
2474 let store = MemoryStore::new();
2475 let sink = CollectingSink::new();
2476 let checkpoint = CountingCheckpointPort::new();
2477 let artifacts = NoopArtifactStore;
2478 let cost = CountingCostMeter::new();
2479 let policy = AllowAllPolicyPort;
2480 let approval = AlwaysApprovePort;
2481
2482 let result = run_turn_with_extensions(
2483 &llm,
2484 &tools,
2485 &store,
2486 &sink,
2487 make_request("hello"),
2488 &generous_policy(),
2489 "test-model",
2490 &policy,
2491 &approval,
2492 crate::DispatchMode::PromptGuided,
2493 &checkpoint,
2494 &artifacts,
2495 &cost,
2496 &crate::prompt::WindowContextCompactor::default(),
2497 &crate::NoOpToolJournalPort,
2498 )
2499 .await;
2500 assert!(result.is_ok(), "turn should succeed");
2501 let checkpoints = checkpoint.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
2502 let llm_calls = *cost.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
2503 assert!(checkpoints >= 1, "checkpoint port should be invoked at least once");
2504 assert!(llm_calls >= 1, "cost meter should record llm usage");
2505 }
2506
2507 #[tokio::test]
2508 async fn tc17_session_usage_accumulates_and_persists() {
2509 let llm_first = SequentialLlm::from_responses(vec![LlmResponse {
2510 content: r#"{"type":"final","content":"first"}"#.to_string(),
2511 usage: TokenUsage { prompt_tokens: 10, completion_tokens: 5 },
2512 finish_reason: FinishReason::Stop,
2513 tool_calls: Vec::new(),
2514 }]);
2515 let llm_second = SequentialLlm::from_responses(vec![LlmResponse {
2516 content: r#"{"type":"final","content":"second"}"#.to_string(),
2517 usage: TokenUsage { prompt_tokens: 3, completion_tokens: 2 },
2518 finish_reason: FinishReason::Stop,
2519 tool_calls: Vec::new(),
2520 }]);
2521 let tools = MockToolPort::empty();
2522 let store = MemoryStore::new();
2523 let sink = CollectingSink::new();
2524
2525 let first = run_turn(
2526 &llm_first,
2527 &tools,
2528 &store,
2529 &sink,
2530 make_request("hello"),
2531 &generous_policy(),
2532 "test-model",
2533 )
2534 .await;
2535 assert!(first.is_ok(), "first run should succeed");
2536
2537 let second = run_turn(
2538 &llm_second,
2539 &tools,
2540 &store,
2541 &sink,
2542 make_request("again"),
2543 &generous_policy(),
2544 "test-model",
2545 )
2546 .await;
2547 assert!(second.is_ok(), "second run should succeed");
2548
2549 let loaded = store.load(&"test-session".to_string()).await;
2550 assert!(loaded.is_ok(), "session should be persisted");
2551 let state = loaded.ok().flatten();
2552 assert!(state.is_some(), "session state should exist");
2553 let state = state.unwrap_or_default();
2554 assert_eq!(state.total_usage.prompt_tokens, 13);
2555 assert_eq!(state.total_usage.completion_tokens, 7);
2556 }
2557
2558 struct FallbackOnlyLlm {
2559 response: LlmResponse,
2560 }
2561
2562 #[async_trait::async_trait]
2563 impl LlmPort for FallbackOnlyLlm {
2564 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
2565 Ok(self.response.clone())
2566 }
2567
2568 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
2569 Err(LlmError::Provider("streaming not available".to_string()))
2570 }
2571 }
2572
2573 #[tokio::test]
2574 async fn tc18_stream_fallback_does_not_emit_error_event() {
2575 let llm: Arc<dyn LlmPort> = Arc::new(FallbackOnlyLlm {
2576 response: LlmResponse {
2577 content: r#"{"type":"final","content":"done"}"#.to_string(),
2578 usage: TokenUsage { prompt_tokens: 2, completion_tokens: 1 },
2579 finish_reason: FinishReason::Stop,
2580 tool_calls: Vec::new(),
2581 },
2582 });
2583 let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
2584 let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
2585 let sink = Arc::new(CollectingSink::new());
2586 let sink_dyn: Arc<dyn EventSink> = sink.clone();
2587
2588 let stream_result = run_turn_stream(
2589 llm,
2590 tools,
2591 store,
2592 sink_dyn,
2593 make_request("hello"),
2594 generous_policy(),
2595 "test-model".to_string(),
2596 )
2597 .await;
2598 assert!(stream_result.is_ok(), "stream run should succeed with fallback");
2599 let mut stream = match stream_result {
2600 Ok(stream) => stream,
2601 Err(_) => return,
2602 };
2603
2604 while let Some(_event) = stream.next().await {}
2605
2606 let events = sink.all_events();
2607 assert!(
2608 !events.iter().any(|event| matches!(event, AgentEvent::Error { .. })),
2609 "fallback should not emit AgentEvent::Error when complete() succeeds"
2610 );
2611 }
2612
2613 struct NativeStreamingBypassLlm {
2614 complete_calls: std::sync::atomic::AtomicUsize,
2615 complete_stream_calls: std::sync::atomic::AtomicUsize,
2616 response: LlmResponse,
2617 }
2618
2619 #[async_trait::async_trait]
2620 impl LlmPort for NativeStreamingBypassLlm {
2621 fn capabilities(&self) -> bob_core::types::LlmCapabilities {
2622 bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
2623 }
2624
2625 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
2626 self.complete_calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2627 Ok(self.response.clone())
2628 }
2629
2630 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
2631 self.complete_stream_calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2632 Err(LlmError::Provider(
2633 "complete_stream() should be bypassed for native tool calling".to_string(),
2634 ))
2635 }
2636 }
2637
2638 #[tokio::test]
2639 async fn tc19_stream_native_tool_calling_bypasses_complete_stream() {
2640 let llm_impl = Arc::new(NativeStreamingBypassLlm {
2641 complete_calls: std::sync::atomic::AtomicUsize::new(0),
2642 complete_stream_calls: std::sync::atomic::AtomicUsize::new(0),
2643 response: LlmResponse {
2644 content: r#"{"type":"final","content":"done"}"#.to_string(),
2645 usage: TokenUsage { prompt_tokens: 2, completion_tokens: 1 },
2646 finish_reason: FinishReason::Stop,
2647 tool_calls: Vec::new(),
2648 },
2649 });
2650 let llm: Arc<dyn LlmPort> = llm_impl.clone();
2651 let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
2652 let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
2653 let sink = Arc::new(CollectingSink::new());
2654 let sink_dyn: Arc<dyn EventSink> = sink.clone();
2655
2656 let stream_result = run_turn_stream(
2657 llm,
2658 tools,
2659 store,
2660 sink_dyn,
2661 make_request("hello"),
2662 generous_policy(),
2663 "test-model".to_string(),
2664 )
2665 .await;
2666 assert!(
2667 stream_result.is_ok(),
2668 "stream run should succeed through complete() for native tool calling"
2669 );
2670 let mut stream = match stream_result {
2671 Ok(stream) => stream,
2672 Err(_) => return,
2673 };
2674
2675 while let Some(_event) = stream.next().await {}
2676
2677 assert_eq!(
2678 llm_impl.complete_calls.load(std::sync::atomic::Ordering::SeqCst),
2679 1,
2680 "native tool-calling stream path should use complete()"
2681 );
2682 assert_eq!(
2683 llm_impl.complete_stream_calls.load(std::sync::atomic::Ordering::SeqCst),
2684 0,
2685 "native tool-calling stream path should bypass complete_stream()"
2686 );
2687 assert!(
2688 !sink.all_events().iter().any(|event| matches!(event, AgentEvent::Error { .. })),
2689 "native-tool fallback should stay on the success path"
2690 );
2691 }
2692
2693 #[tokio::test]
2694 async fn tc20_non_consecutive_duplicate_tool_calls_are_allowed() {
2695 let llm = SequentialLlm::from_contents(vec![
2696 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2697 r#"{"type":"tool_call","name":"tool_b","arguments":{"q":"docs"}}"#,
2698 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2699 r#"{"type":"final","content":"done"}"#,
2700 ]);
2701 let tools = MockToolPort::with_tool_and_results(
2702 "tool_a",
2703 vec![
2704 Ok(ToolResult {
2705 name: "tool_a".to_string(),
2706 output: serde_json::json!({"ok": 1}),
2707 is_error: false,
2708 }),
2709 Ok(ToolResult {
2710 name: "tool_b".to_string(),
2711 output: serde_json::json!({"ok": 2}),
2712 is_error: false,
2713 }),
2714 Ok(ToolResult {
2715 name: "tool_a".to_string(),
2716 output: serde_json::json!({"ok": 3}),
2717 is_error: false,
2718 }),
2719 ],
2720 );
2721 let store = MemoryStore::new();
2722 let sink = CollectingSink::new();
2723
2724 let result = run_turn(
2725 &llm,
2726 &tools,
2727 &store,
2728 &sink,
2729 make_request("repeat searches"),
2730 &generous_policy(),
2731 "test-model",
2732 )
2733 .await;
2734 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2735 let resp = match result {
2736 Ok(AgentRunResult::Finished(resp)) => resp,
2737 _ => return,
2738 };
2739
2740 assert_eq!(resp.tool_transcript.len(), 3);
2741 assert!(resp.tool_transcript.iter().all(|entry| !entry.is_error));
2742 }
2743
2744 #[tokio::test]
2745 async fn tc21_excessive_consecutive_duplicate_tool_calls_are_blocked() {
2746 let llm = SequentialLlm::from_contents(vec![
2747 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2748 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2749 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2750 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2751 r#"{"type":"final","content":"done"}"#,
2752 ]);
2753 let tools = MockToolPort::with_tool_and_results(
2754 "tool_a",
2755 vec![
2756 Ok(ToolResult {
2757 name: "tool_a".to_string(),
2758 output: serde_json::json!({"ok": 1}),
2759 is_error: false,
2760 }),
2761 Ok(ToolResult {
2762 name: "tool_a".to_string(),
2763 output: serde_json::json!({"ok": 2}),
2764 is_error: false,
2765 }),
2766 Ok(ToolResult {
2767 name: "tool_a".to_string(),
2768 output: serde_json::json!({"ok": 3}),
2769 is_error: false,
2770 }),
2771 ],
2772 );
2773 let store = MemoryStore::new();
2774 let sink = CollectingSink::new();
2775
2776 let result = run_turn(
2777 &llm,
2778 &tools,
2779 &store,
2780 &sink,
2781 make_request("poll repeatedly"),
2782 &generous_policy(),
2783 "test-model",
2784 )
2785 .await;
2786 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2787 let resp = match result {
2788 Ok(AgentRunResult::Finished(resp)) => resp,
2789 _ => return,
2790 };
2791
2792 assert_eq!(resp.tool_transcript.len(), 4);
2793 assert!(!resp.tool_transcript[2].is_error);
2794 assert!(resp.tool_transcript[3].is_error);
2795 assert!(
2796 resp.tool_transcript[3]
2797 .output
2798 .to_string()
2799 .contains("consecutive duplicate tool call limit reached"),
2800 "expected duplicate-call protection error in transcript"
2801 );
2802 }
2803
2804 #[tokio::test]
2805 async fn tc22_duplicate_block_path_records_artifact_and_cost() {
2806 let llm = SequentialLlm::from_contents(vec![
2807 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2808 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2809 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2810 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2811 r#"{"type":"final","content":"done"}"#,
2812 ]);
2813 let tools = MockToolPort::with_tool_and_results(
2814 "tool_a",
2815 vec![
2816 Ok(ToolResult {
2817 name: "tool_a".to_string(),
2818 output: serde_json::json!({"ok": 1}),
2819 is_error: false,
2820 }),
2821 Ok(ToolResult {
2822 name: "tool_a".to_string(),
2823 output: serde_json::json!({"ok": 2}),
2824 is_error: false,
2825 }),
2826 Ok(ToolResult {
2827 name: "tool_a".to_string(),
2828 output: serde_json::json!({"ok": 3}),
2829 is_error: false,
2830 }),
2831 ],
2832 );
2833 let store = MemoryStore::new();
2834 let sink = CollectingSink::new();
2835 let checkpoint = CountingCheckpointPort::new();
2836 let artifacts = CountingArtifactStore::new();
2837 let cost = CountingCostMeter::new();
2838 let policy = AllowAllPolicyPort;
2839 let approval = AlwaysApprovePort;
2840
2841 let result = run_turn_with_extensions(
2842 &llm,
2843 &tools,
2844 &store,
2845 &sink,
2846 make_request("poll repeatedly"),
2847 &generous_policy(),
2848 "test-model",
2849 &policy,
2850 &approval,
2851 crate::DispatchMode::PromptGuided,
2852 &checkpoint,
2853 &artifacts,
2854 &cost,
2855 &crate::prompt::WindowContextCompactor::default(),
2856 &crate::NoOpToolJournalPort,
2857 )
2858 .await;
2859 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2860
2861 let tool_results = *cost.tool_results.lock().unwrap_or_else(|p| p.into_inner());
2862 assert_eq!(tool_results, 4, "cost meter should record all tool outcomes");
2863 let saved_artifacts = artifacts.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
2864 assert_eq!(saved_artifacts, 4, "artifact store should record all tool outcomes");
2865 }
2866}