1use std::sync::Arc;
43
44use bob_core::{
45 error::AgentError,
46 journal::{JournalEntry, ToolJournalPort},
47 normalize_tool_list,
48 ports::{
49 ApprovalPort, ArtifactStorePort, ContextCompactorPort, CostMeterPort, EventSink, LlmPort,
50 SessionStore, ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
51 },
52 types::{
53 AgentAction, AgentEvent, AgentEventStream, AgentRequest, AgentResponse, AgentRunResult,
54 AgentStreamEvent, ApprovalContext, ApprovalDecision, ArtifactRecord, FinishReason,
55 GuardReason, Message, Role, TokenUsage, ToolCall, ToolResult, TurnCheckpoint, TurnPolicy,
56 },
57};
58use futures_util::StreamExt;
59use tokio::time::Instant;
60
61const STREAM_CHANNEL_CAPACITY: usize = 256;
63
64#[derive(Debug)]
67pub struct LoopGuard {
68 policy: TurnPolicy,
69 steps: u32,
70 tool_calls: u32,
71 consecutive_errors: u32,
72 start: Instant,
73}
74
75impl LoopGuard {
76 #[must_use]
78 pub fn new(policy: TurnPolicy) -> Self {
79 Self { policy, steps: 0, tool_calls: 0, consecutive_errors: 0, start: Instant::now() }
80 }
81
82 #[must_use]
84 pub fn can_continue(&self) -> bool {
85 self.steps < self.policy.max_steps &&
86 self.tool_calls < self.policy.max_tool_calls &&
87 self.consecutive_errors < self.policy.max_consecutive_errors &&
88 !self.timed_out()
89 }
90
91 pub fn record_step(&mut self) {
93 self.steps += 1;
94 }
95
96 pub fn record_tool_call(&mut self) {
98 self.tool_calls += 1;
99 }
100
101 pub fn record_error(&mut self) {
103 self.consecutive_errors += 1;
104 }
105
106 pub fn reset_errors(&mut self) {
108 self.consecutive_errors = 0;
109 }
110
111 #[must_use]
115 pub fn reason(&self) -> GuardReason {
116 if self.steps >= self.policy.max_steps {
117 GuardReason::MaxSteps
118 } else if self.tool_calls >= self.policy.max_tool_calls {
119 GuardReason::MaxToolCalls
120 } else if self.consecutive_errors >= self.policy.max_consecutive_errors {
121 GuardReason::MaxConsecutiveErrors
122 } else if self.timed_out() {
123 GuardReason::TurnTimeout
124 } else {
125 GuardReason::Cancelled
127 }
128 }
129
130 #[must_use]
132 pub fn timed_out(&self) -> bool {
133 self.start.elapsed().as_millis() >= u128::from(self.policy.turn_timeout_ms)
134 }
135}
136
137const DEFAULT_SYSTEM_INSTRUCTIONS: &str = "\
140You are a helpful AI assistant. \
141Think step by step before answering. \
142When you need external information, use the available tools.";
143const MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS: u32 = 3;
144
145fn resolve_system_instructions(req: &AgentRequest) -> String {
146 let Some(skills_prompt) = req.context.system_prompt.as_deref() else {
147 return DEFAULT_SYSTEM_INSTRUCTIONS.to_string();
148 };
149
150 if skills_prompt.trim().is_empty() {
151 DEFAULT_SYSTEM_INSTRUCTIONS.to_string()
152 } else {
153 format!("{DEFAULT_SYSTEM_INSTRUCTIONS}\n\n{skills_prompt}")
154 }
155}
156
157fn resolve_selected_skills(req: &AgentRequest) -> Vec<String> {
158 req.context.selected_skills.clone()
159}
160
161#[derive(Debug, Clone, Default)]
162struct ToolCallPolicy {
163 deny_tools: Vec<String>,
164 allow_tools: Option<Vec<String>>,
165}
166
167fn resolve_tool_call_policy(req: &AgentRequest) -> ToolCallPolicy {
168 let deny_tools =
169 normalize_tool_list(req.context.tool_policy.deny_tools.iter().map(String::as_str));
170 let allow_tools = req
171 .context
172 .tool_policy
173 .allow_tools
174 .as_ref()
175 .map(|tools| normalize_tool_list(tools.iter().map(String::as_str)));
176 ToolCallPolicy { deny_tools, allow_tools }
177}
178
179fn prompt_options_for_mode(
180 dispatch_mode: crate::DispatchMode,
181 llm: &dyn LlmPort,
182 output_schema: Option<serde_json::Value>,
183) -> crate::prompt::PromptBuildOptions {
184 let mut opts = match dispatch_mode {
185 crate::DispatchMode::PromptGuided => crate::prompt::PromptBuildOptions::default(),
186 crate::DispatchMode::NativePreferred => {
187 if llm.capabilities().native_tool_calling {
188 crate::prompt::PromptBuildOptions {
189 include_action_schema: false,
190 include_tool_schema: false,
191 ..Default::default()
192 }
193 } else {
194 crate::prompt::PromptBuildOptions::default()
195 }
196 }
197 };
198 opts.structured_output = output_schema;
199 opts
200}
201
202fn parse_action_for_mode(
203 dispatch_mode: crate::DispatchMode,
204 llm: &dyn LlmPort,
205 response: &bob_core::types::LlmResponse,
206) -> Result<AgentAction, crate::action::ActionParseError> {
207 match dispatch_mode {
208 crate::DispatchMode::PromptGuided => crate::action::parse_action(&response.content),
209 crate::DispatchMode::NativePreferred => {
210 if llm.capabilities().native_tool_calling &&
211 let Some(tool_call) = response.tool_calls.first()
212 {
213 return Ok(AgentAction::ToolCall {
214 name: tool_call.name.clone(),
215 arguments: tool_call.arguments.clone(),
216 });
217 }
218 crate::action::parse_action(&response.content)
219 }
220 }
221}
222
223#[expect(
224 clippy::too_many_arguments,
225 reason = "tool execution needs explicit policy, approval, and timeout dependencies"
226)]
227async fn execute_tool_call(
228 tools: &dyn ToolPort,
229 guard: &mut LoopGuard,
230 tool_call: ToolCall,
231 policy: &ToolCallPolicy,
232 tool_policy_port: &dyn ToolPolicyPort,
233 approval_port: &dyn ApprovalPort,
234 approval_context: &ApprovalContext,
235 timeout_ms: u64,
236) -> ToolResult {
237 if !tool_policy_port.is_tool_allowed(
238 &tool_call.name,
239 &policy.deny_tools,
240 policy.allow_tools.as_deref(),
241 ) {
242 guard.record_error();
243 return ToolResult {
244 name: tool_call.name.clone(),
245 output: serde_json::json!({
246 "error": format!("tool '{}' denied by policy", tool_call.name)
247 }),
248 is_error: true,
249 };
250 }
251
252 match approval_port.approve_tool_call(&tool_call, approval_context).await {
253 Ok(ApprovalDecision::Approved) => {}
254 Ok(ApprovalDecision::Denied { reason }) => {
255 guard.record_error();
256 return ToolResult {
257 name: tool_call.name.clone(),
258 output: serde_json::json!({"error": reason}),
259 is_error: true,
260 };
261 }
262 Err(err) => {
263 guard.record_error();
264 return ToolResult {
265 name: tool_call.name.clone(),
266 output: serde_json::json!({"error": err.to_string()}),
267 is_error: true,
268 };
269 }
270 }
271
272 match tokio::time::timeout(
273 std::time::Duration::from_millis(timeout_ms),
274 tools.call_tool(tool_call.clone()),
275 )
276 .await
277 {
278 Ok(Ok(result)) => {
279 guard.reset_errors();
280 result
281 }
282 Ok(Err(err)) => {
283 guard.record_error();
284 ToolResult {
285 name: tool_call.name,
286 output: serde_json::json!({"error": err.to_string()}),
287 is_error: true,
288 }
289 }
290 Err(_) => {
291 guard.record_error();
292 ToolResult {
293 name: tool_call.name,
294 output: serde_json::json!({"error": "tool call timed out"}),
295 is_error: true,
296 }
297 }
298 }
299}
300
301pub async fn run_turn(
308 llm: &dyn LlmPort,
309 tools: &dyn ToolPort,
310 store: &dyn SessionStore,
311 events: &dyn EventSink,
312 req: AgentRequest,
313 policy: &TurnPolicy,
314 default_model: &str,
315) -> Result<AgentRunResult, AgentError> {
316 let tool_policy = crate::DefaultToolPolicyPort;
317 let approval = crate::AllowAllApprovalPort;
318 let checkpoint_store = crate::NoOpCheckpointStorePort;
319 let artifact_store = crate::NoOpArtifactStorePort;
320 let cost_meter = crate::NoOpCostMeterPort;
321 let compactor = crate::prompt::WindowContextCompactor::default();
322 let journal = crate::NoOpToolJournalPort;
323 run_turn_with_extensions(
324 llm,
325 tools,
326 store,
327 events,
328 req,
329 policy,
330 default_model,
331 &tool_policy,
332 &approval,
333 crate::DispatchMode::NativePreferred,
334 &checkpoint_store,
335 &artifact_store,
336 &cost_meter,
337 &compactor,
338 &journal,
339 )
340 .await
341}
342
343#[cfg_attr(
345 not(test),
346 expect(
347 dead_code,
348 reason = "reserved wrapper for partial control injection in external integrations"
349 )
350)]
351#[expect(
352 clippy::too_many_arguments,
353 reason = "wrapper exposes explicit dependency ports for compatibility and testability"
354)]
355pub(crate) async fn run_turn_with_controls(
356 llm: &dyn LlmPort,
357 tools: &dyn ToolPort,
358 store: &dyn SessionStore,
359 events: &dyn EventSink,
360 req: AgentRequest,
361 policy: &TurnPolicy,
362 default_model: &str,
363 tool_policy_port: &dyn ToolPolicyPort,
364 approval_port: &dyn ApprovalPort,
365) -> Result<AgentRunResult, AgentError> {
366 let checkpoint_store = crate::NoOpCheckpointStorePort;
367 let artifact_store = crate::NoOpArtifactStorePort;
368 let cost_meter = crate::NoOpCostMeterPort;
369 let compactor = crate::prompt::WindowContextCompactor::default();
370 let journal = crate::NoOpToolJournalPort;
371 run_turn_with_extensions(
372 llm,
373 tools,
374 store,
375 events,
376 req,
377 policy,
378 default_model,
379 tool_policy_port,
380 approval_port,
381 crate::DispatchMode::PromptGuided,
382 &checkpoint_store,
383 &artifact_store,
384 &cost_meter,
385 &compactor,
386 &journal,
387 )
388 .await
389}
390
391#[expect(
393 clippy::too_many_arguments,
394 reason = "core entrypoint exposes all ports explicitly for adapter injection"
395)]
396pub(crate) async fn run_turn_with_extensions(
397 llm: &dyn LlmPort,
398 tools: &dyn ToolPort,
399 store: &dyn SessionStore,
400 events: &dyn EventSink,
401 req: AgentRequest,
402 policy: &TurnPolicy,
403 default_model: &str,
404 tool_policy_port: &dyn ToolPolicyPort,
405 approval_port: &dyn ApprovalPort,
406 dispatch_mode: crate::DispatchMode,
407 checkpoint_store: &dyn TurnCheckpointStorePort,
408 artifact_store: &dyn ArtifactStorePort,
409 cost_meter: &dyn CostMeterPort,
410 context_compactor: &dyn ContextCompactorPort,
411 journal: &dyn ToolJournalPort,
412) -> Result<AgentRunResult, AgentError> {
413 let model = req.model.as_deref().unwrap_or(default_model);
414 let cancel_token = req.cancel_token.clone();
415 let system_instructions = resolve_system_instructions(&req);
416 let selected_skills = resolve_selected_skills(&req);
417 let tool_call_policy = resolve_tool_call_policy(&req);
418
419 let mut session = store.load(&req.session_id).await?.unwrap_or_default();
420 let tool_descriptors = tools.list_tools().await?;
421 let mut guard = LoopGuard::new(policy.clone());
422
423 let mut tool_view = crate::progressive_tools::ProgressiveToolView::new(tool_descriptors);
425
426 events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
427 if !selected_skills.is_empty() {
428 events.emit(AgentEvent::SkillsSelected {
429 session_id: req.session_id.clone(),
430 skill_names: selected_skills.clone(),
431 });
432 }
433
434 session.messages.push(Message::text(Role::User, req.input.clone()));
435
436 let mut tool_transcript: Vec<ToolResult> = Vec::new();
437 let mut total_usage = TokenUsage::default();
438 let mut consecutive_parse_failures: u32 = 0;
439 let mut consecutive_validation_failures: u32 = 0;
440 let max_output_retries = req.max_output_retries;
441 let mut last_tool_call_signature: Option<String> = None;
443 let mut same_tool_call_streak: u32 = 0;
444
445 loop {
446 let current_step = guard.steps.saturating_add(1);
447
448 if let Some(ref token) = cancel_token &&
449 token.is_cancelled()
450 {
451 return finish_turn(
452 store,
453 events,
454 &req.session_id,
455 &session,
456 FinishResult {
457 content: "Turn cancelled.",
458 tool_transcript,
459 usage: total_usage,
460 finish_reason: FinishReason::Cancelled,
461 },
462 )
463 .await;
464 }
465
466 cost_meter.check_budget(&req.session_id).await?;
467
468 if !guard.can_continue() {
469 let reason = guard.reason();
470 let msg = format!("Turn stopped: {reason:?}");
471 return finish_turn(
472 store,
473 events,
474 &req.session_id,
475 &session,
476 FinishResult {
477 content: &msg,
478 tool_transcript,
479 usage: total_usage,
480 finish_reason: FinishReason::GuardExceeded,
481 },
482 )
483 .await;
484 }
485
486 let mut augmented_instructions = system_instructions.clone();
488 let tool_summary = tool_view.summary_prompt();
489 if !tool_summary.is_empty() {
490 augmented_instructions.push('\n');
491 augmented_instructions.push('\n');
492 augmented_instructions.push_str(&tool_summary);
493 }
494
495 let active_tools = tool_view.activated_tools();
496 let llm_request = crate::prompt::build_llm_request_with_options(
497 model,
498 &session,
499 &active_tools,
500 &augmented_instructions,
501 prompt_options_for_mode(dispatch_mode, llm, req.output_schema.clone()),
502 context_compactor,
503 )
504 .await;
505
506 events.emit(AgentEvent::LlmCallStarted {
507 session_id: req.session_id.clone(),
508 step: current_step,
509 model: model.to_string(),
510 });
511
512 let llm_response = if let Some(ref token) = cancel_token {
513 tokio::select! {
514 result = llm.complete(llm_request.clone()) => result?,
515 () = token.cancelled() => {
516 return finish_turn(
517 store, events, &req.session_id, &session,
518 FinishResult { content: "Turn cancelled.", tool_transcript, usage: total_usage, finish_reason: FinishReason::Cancelled },
519 ).await;
520 }
521 }
522 } else {
523 llm.complete(llm_request).await?
524 };
525
526 guard.record_step();
527 total_usage.prompt_tokens += llm_response.usage.prompt_tokens;
528 total_usage.completion_tokens += llm_response.usage.completion_tokens;
529 session.total_usage.prompt_tokens =
530 session.total_usage.prompt_tokens.saturating_add(llm_response.usage.prompt_tokens);
531 session.total_usage.completion_tokens = session
532 .total_usage
533 .completion_tokens
534 .saturating_add(llm_response.usage.completion_tokens);
535 cost_meter.record_llm_usage(&req.session_id, model, &llm_response.usage).await?;
536
537 events.emit(AgentEvent::LlmCallCompleted {
538 session_id: req.session_id.clone(),
539 step: current_step,
540 model: model.to_string(),
541 usage: llm_response.usage.clone(),
542 });
543 let native_tool_call = if llm.capabilities().native_tool_calling {
544 llm_response.tool_calls.first().cloned()
545 } else {
546 None
547 };
548
549 tool_view.activate_hints(&llm_response.content);
551
552 let assistant_message = if llm_response.tool_calls.is_empty() {
553 Message::text(Role::Assistant, llm_response.content.clone())
554 } else {
555 Message::assistant_tool_calls(
556 llm_response.content.clone(),
557 llm_response.tool_calls.clone(),
558 )
559 };
560 session.messages.push(assistant_message);
561
562 let _ = checkpoint_store
563 .save_checkpoint(&TurnCheckpoint {
564 session_id: req.session_id.clone(),
565 step: guard.steps,
566 tool_calls: guard.tool_calls,
567 usage: total_usage.clone(),
568 })
569 .await;
570
571 match parse_action_for_mode(dispatch_mode, llm, &llm_response) {
572 Ok(action) => {
573 consecutive_parse_failures = 0;
574 match action {
575 AgentAction::Final { content } => {
576 if let Some(ref schema) = req.output_schema {
578 match crate::output_validation::validate_output_str(&content, schema) {
579 Ok(_) => {}
580 Err(validation_err) => {
581 consecutive_validation_failures += 1;
582 if consecutive_validation_failures > max_output_retries {
583 tracing::warn!(
584 session_id = %req.session_id,
585 "output schema validation failed after {} retries",
586 max_output_retries,
587 );
588 } else {
590 let prompt =
591 crate::output_validation::validation_error_prompt(
592 &content,
593 &validation_err,
594 );
595 session.messages.push(Message::text(Role::User, prompt));
596 continue;
597 }
598 }
599 }
600 }
601 return finish_turn(
602 store,
603 events,
604 &req.session_id,
605 &session,
606 FinishResult {
607 content: &content,
608 tool_transcript,
609 usage: total_usage,
610 finish_reason: FinishReason::Stop,
611 },
612 )
613 .await;
614 }
615 AgentAction::AskUser { question } => {
616 return finish_turn(
617 store,
618 events,
619 &req.session_id,
620 &session,
621 FinishResult {
622 content: &question,
623 tool_transcript,
624 usage: total_usage,
625 finish_reason: FinishReason::Stop,
626 },
627 )
628 .await;
629 }
630 AgentAction::ToolCall { name, arguments } => {
631 let tool_call_id = native_tool_call
632 .as_ref()
633 .filter(|call| call.name == name && call.arguments == arguments)
634 .and_then(|call| call.call_id.clone());
635 tool_view.activate(&name);
637
638 let call_signature = format!(
639 "{}:{}",
640 name,
641 serde_json::to_string(&arguments).unwrap_or_default()
642 );
643 if last_tool_call_signature.as_ref() == Some(&call_signature) {
644 same_tool_call_streak = same_tool_call_streak.saturating_add(1);
645 } else {
646 same_tool_call_streak = 1;
647 last_tool_call_signature = Some(call_signature);
648 }
649
650 if same_tool_call_streak > MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS {
651 events.emit(AgentEvent::ToolCallStarted {
652 session_id: req.session_id.clone(),
653 step: current_step,
654 name: name.clone(),
655 });
656 let dup_result = ToolResult {
657 name: name.clone(),
658 output: serde_json::json!({
659 "error": format!(
660 "consecutive duplicate tool call limit reached (>{MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS}); skipping to prevent loop"
661 )
662 }),
663 is_error: true,
664 };
665 guard.record_tool_call();
666 let _ =
667 cost_meter.record_tool_result(&req.session_id, &dup_result).await;
668 let output_str =
669 serde_json::to_string(&dup_result.output).unwrap_or_default();
670 session.messages.push(Message::tool_result(
671 name.clone(),
672 tool_call_id.clone(),
673 output_str,
674 ));
675 events.emit(AgentEvent::ToolCallCompleted {
676 session_id: req.session_id.clone(),
677 step: current_step,
678 name: name.clone(),
679 is_error: true,
680 });
681 let _ = artifact_store
682 .put(ArtifactRecord {
683 session_id: req.session_id.clone(),
684 kind: "tool_result".to_string(),
685 name: name.clone(),
686 content: dup_result.output.clone(),
687 })
688 .await;
689 tool_transcript.push(dup_result);
690 continue;
691 }
692
693 events.emit(AgentEvent::ToolCallStarted {
694 session_id: req.session_id.clone(),
695 step: current_step,
696 name: name.clone(),
697 });
698 let approval_context = ApprovalContext {
699 session_id: req.session_id.clone(),
700 turn_step: guard.steps.max(1),
701 selected_skills: selected_skills.clone(),
702 };
703
704 let call_fingerprint = JournalEntry::fingerprint(&name, &arguments);
706 let tool_result = if let Ok(Some(cached)) =
707 journal.lookup(&req.session_id, &call_fingerprint).await
708 {
709 tracing::debug!(
710 session_id = %req.session_id,
711 tool = %name,
712 "replaying tool result from journal"
713 );
714 ToolResult {
715 name: cached.tool_name,
716 output: cached.result,
717 is_error: cached.is_error,
718 }
719 } else {
720 let result = execute_tool_call(
721 tools,
722 &mut guard,
723 ToolCall::new(name.clone(), arguments.clone()),
724 &tool_call_policy,
725 tool_policy_port,
726 approval_port,
727 &approval_context,
728 policy.tool_timeout_ms,
729 )
730 .await;
731 let _ = journal
733 .append(JournalEntry {
734 session_id: req.session_id.clone(),
735 call_fingerprint: call_fingerprint.clone(),
736 tool_name: name.clone(),
737 arguments: arguments.clone(),
738 result: result.output.clone(),
739 is_error: result.is_error,
740 timestamp_ms: bob_core::tape::now_ms(),
741 })
742 .await;
743 result
744 };
745
746 guard.record_tool_call();
747 let _ = cost_meter.record_tool_result(&req.session_id, &tool_result).await;
748
749 let is_error = tool_result.is_error;
750 events.emit(AgentEvent::ToolCallCompleted {
751 session_id: req.session_id.clone(),
752 step: current_step,
753 name: name.clone(),
754 is_error,
755 });
756
757 let output_str =
758 serde_json::to_string(&tool_result.output).unwrap_or_default();
759 session.messages.push(Message::tool_result(
760 name.clone(),
761 tool_call_id,
762 output_str,
763 ));
764
765 let _ = artifact_store
766 .put(ArtifactRecord {
767 session_id: req.session_id.clone(),
768 kind: "tool_result".to_string(),
769 name: name.clone(),
770 content: tool_result.output.clone(),
771 })
772 .await;
773
774 tool_transcript.push(tool_result);
775 }
776 }
777 }
778 Err(_parse_err) => {
779 consecutive_parse_failures += 1;
780 last_tool_call_signature = None;
781 same_tool_call_streak = 0;
782 if consecutive_parse_failures >= 2 {
783 let _ = store.save(&req.session_id, &session).await;
784 return Err(AgentError::Internal(
785 "LLM produced invalid JSON after re-prompt".into(),
786 ));
787 }
788 session.messages.push(Message::text(
789 Role::User,
790 "Your response was not valid JSON. \
791 Please respond with exactly one JSON object \
792 matching the required schema.",
793 ));
794 }
795 }
796 }
797}
798
799struct FinishResult<'a> {
801 content: &'a str,
802 tool_transcript: Vec<ToolResult>,
803 usage: TokenUsage,
804 finish_reason: FinishReason,
805}
806
807async fn finish_turn(
809 store: &dyn SessionStore,
810 events: &dyn EventSink,
811 session_id: &bob_core::types::SessionId,
812 session: &bob_core::types::SessionState,
813 result: FinishResult<'_>,
814) -> Result<AgentRunResult, AgentError> {
815 store.save(session_id, session).await?;
816 events.emit(AgentEvent::TurnCompleted {
817 session_id: session_id.clone(),
818 finish_reason: result.finish_reason,
819 usage: result.usage.clone(),
820 });
821 Ok(AgentRunResult::Finished(AgentResponse {
822 content: result.content.to_string(),
823 tool_transcript: result.tool_transcript,
824 usage: result.usage,
825 finish_reason: result.finish_reason,
826 }))
827}
828
829pub async fn run_turn_stream(
831 llm: Arc<dyn LlmPort>,
832 tools: Arc<dyn ToolPort>,
833 store: Arc<dyn SessionStore>,
834 events: Arc<dyn EventSink>,
835 req: AgentRequest,
836 policy: TurnPolicy,
837 default_model: String,
838) -> Result<AgentEventStream, AgentError> {
839 let tool_policy: Arc<dyn ToolPolicyPort> = Arc::new(crate::DefaultToolPolicyPort);
840 let approval: Arc<dyn ApprovalPort> = Arc::new(crate::AllowAllApprovalPort);
841 let checkpoint_store: Arc<dyn TurnCheckpointStorePort> =
842 Arc::new(crate::NoOpCheckpointStorePort);
843 let artifact_store: Arc<dyn ArtifactStorePort> = Arc::new(crate::NoOpArtifactStorePort);
844 let cost_meter: Arc<dyn CostMeterPort> = Arc::new(crate::NoOpCostMeterPort);
845 let context_compactor: Arc<dyn ContextCompactorPort> =
846 Arc::new(crate::prompt::WindowContextCompactor::default());
847 let journal: Arc<dyn ToolJournalPort> = Arc::new(crate::NoOpToolJournalPort);
848 run_turn_stream_with_controls(
849 llm,
850 tools,
851 store,
852 events,
853 req,
854 policy,
855 default_model,
856 tool_policy,
857 approval,
858 crate::DispatchMode::NativePreferred,
859 checkpoint_store,
860 artifact_store,
861 cost_meter,
862 context_compactor,
863 journal,
864 )
865 .await
866}
867
868#[expect(
870 clippy::too_many_arguments,
871 reason = "streaming entrypoint exposes all ports and controls explicitly for composition roots"
872)]
873pub(crate) async fn run_turn_stream_with_controls(
874 llm: Arc<dyn LlmPort>,
875 tools: Arc<dyn ToolPort>,
876 store: Arc<dyn SessionStore>,
877 events: Arc<dyn EventSink>,
878 req: AgentRequest,
879 policy: TurnPolicy,
880 default_model: String,
881 tool_policy: Arc<dyn ToolPolicyPort>,
882 approval: Arc<dyn ApprovalPort>,
883 dispatch_mode: crate::DispatchMode,
884 checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
885 artifact_store: Arc<dyn ArtifactStorePort>,
886 cost_meter: Arc<dyn CostMeterPort>,
887 context_compactor: Arc<dyn ContextCompactorPort>,
888 journal: Arc<dyn ToolJournalPort>,
889) -> Result<AgentEventStream, AgentError> {
890 let (tx, rx) = flume::bounded::<AgentStreamEvent>(STREAM_CHANNEL_CAPACITY);
891 let config = StreamRunConfig {
892 policy,
893 default_model,
894 tool_policy,
895 approval,
896 dispatch_mode,
897 checkpoint_store,
898 artifact_store,
899 cost_meter,
900 context_compactor,
901 journal,
902 };
903
904 tokio::spawn(async move {
905 if let Err(err) = run_turn_stream_inner(llm, tools, store, events, req, &config, &tx).await
906 {
907 let _ = tx.send_async(AgentStreamEvent::Error { error: err.to_string() }).await;
908 }
909 });
910
911 Ok(Box::pin(rx.into_stream()))
912}
913
914struct StreamRunConfig {
915 policy: TurnPolicy,
916 default_model: String,
917 tool_policy: Arc<dyn ToolPolicyPort>,
918 approval: Arc<dyn ApprovalPort>,
919 dispatch_mode: crate::DispatchMode,
920 checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
921 artifact_store: Arc<dyn ArtifactStorePort>,
922 cost_meter: Arc<dyn CostMeterPort>,
923 context_compactor: Arc<dyn ContextCompactorPort>,
924 journal: Arc<dyn ToolJournalPort>,
925}
926
927async fn run_turn_stream_inner(
928 llm: Arc<dyn LlmPort>,
929 tools: Arc<dyn ToolPort>,
930 store: Arc<dyn SessionStore>,
931 events: Arc<dyn EventSink>,
932 req: AgentRequest,
933 config: &StreamRunConfig,
934 tx: &flume::Sender<AgentStreamEvent>,
935) -> Result<(), AgentError> {
936 let model = req.model.as_deref().unwrap_or(&config.default_model);
937 let cancel_token = req.cancel_token.clone();
938 let system_instructions = resolve_system_instructions(&req);
939 let selected_skills = resolve_selected_skills(&req);
940 let tool_call_policy = resolve_tool_call_policy(&req);
941
942 let mut session = store.load(&req.session_id).await?.unwrap_or_default();
943 let tool_descriptors = tools.list_tools().await?;
944 let mut guard = LoopGuard::new(config.policy.clone());
945 let mut total_usage = TokenUsage::default();
946 let mut consecutive_parse_failures: u32 = 0;
947 let mut next_call_id: u64 = 0;
948 let mut last_tool_call_signature: Option<String> = None;
949 let mut same_tool_call_streak: u32 = 0;
950
951 events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
952 if !selected_skills.is_empty() {
953 events.emit(AgentEvent::SkillsSelected {
954 session_id: req.session_id.clone(),
955 skill_names: selected_skills.clone(),
956 });
957 }
958 session.messages.push(Message::text(Role::User, req.input.clone()));
959
960 loop {
961 let current_step = guard.steps.saturating_add(1);
962
963 if let Some(ref token) = cancel_token &&
964 token.is_cancelled()
965 {
966 events.emit(AgentEvent::Error {
967 session_id: req.session_id.clone(),
968 step: Some(current_step),
969 error: "turn cancelled".to_string(),
970 });
971 events.emit(AgentEvent::TurnCompleted {
972 session_id: req.session_id.clone(),
973 finish_reason: FinishReason::Cancelled,
974 usage: total_usage.clone(),
975 });
976 store.save(&req.session_id, &session).await?;
977 let _ = tx
978 .send_async(AgentStreamEvent::Error { error: "turn cancelled".to_string() })
979 .await;
980 let _ = tx.send_async(AgentStreamEvent::Finished { usage: total_usage.clone() }).await;
981 return Ok(());
982 }
983
984 config.cost_meter.check_budget(&req.session_id).await?;
985
986 if !guard.can_continue() {
987 let reason = guard.reason();
988 let msg = format!("Turn stopped: {reason:?}");
989 events.emit(AgentEvent::Error {
990 session_id: req.session_id.clone(),
991 step: Some(current_step),
992 error: msg.clone(),
993 });
994 events.emit(AgentEvent::TurnCompleted {
995 session_id: req.session_id.clone(),
996 finish_reason: FinishReason::GuardExceeded,
997 usage: total_usage.clone(),
998 });
999 store.save(&req.session_id, &session).await?;
1000 let _ = tx.send_async(AgentStreamEvent::Error { error: msg }).await;
1001 let _ = tx.send_async(AgentStreamEvent::Finished { usage: total_usage.clone() }).await;
1002 return Ok(());
1003 }
1004
1005 let llm_request = crate::prompt::build_llm_request_with_options(
1006 model,
1007 &session,
1008 &tool_descriptors,
1009 &system_instructions,
1010 prompt_options_for_mode(config.dispatch_mode, llm.as_ref(), req.output_schema.clone()),
1011 config.context_compactor.as_ref(),
1012 );
1013 events.emit(AgentEvent::LlmCallStarted {
1014 session_id: req.session_id.clone(),
1015 step: current_step,
1016 model: model.to_string(),
1017 });
1018
1019 let mut assistant_content = String::new();
1020 let mut llm_usage = TokenUsage::default();
1021 let mut llm_tool_calls: Vec<ToolCall> = Vec::new();
1022 let mut llm_finish_reason = FinishReason::Stop;
1023 let mut fallback_to_complete = false;
1024
1025 let llm_request = llm_request.await;
1026 if llm.capabilities().native_tool_calling {
1027 fallback_to_complete = true;
1028 } else {
1029 match llm.complete_stream(llm_request.clone()).await {
1030 Ok(mut stream) => {
1031 while let Some(item) = stream.next().await {
1032 match item {
1033 Ok(bob_core::types::LlmStreamChunk::TextDelta(delta)) => {
1034 assistant_content.push_str(&delta);
1035 let _ = tx
1036 .send_async(AgentStreamEvent::TextDelta { content: delta })
1037 .await;
1038 }
1039 Ok(bob_core::types::LlmStreamChunk::Done { usage }) => {
1040 llm_usage = usage;
1041 }
1042 Err(err) => {
1043 events.emit(AgentEvent::Error {
1044 session_id: req.session_id.clone(),
1045 step: Some(current_step),
1046 error: err.to_string(),
1047 });
1048 return Err(AgentError::Llm(err));
1049 }
1050 }
1051 }
1052 }
1053 Err(err) => {
1054 fallback_to_complete = true;
1055 let _ = err;
1056 }
1057 }
1058 }
1059
1060 if fallback_to_complete {
1064 let llm_response = llm.complete(llm_request).await?;
1065 assistant_content = llm_response.content.clone();
1066 llm_usage = llm_response.usage;
1067 llm_finish_reason = llm_response.finish_reason;
1068 llm_tool_calls = llm_response.tool_calls;
1069 let _ =
1070 tx.send_async(AgentStreamEvent::TextDelta { content: llm_response.content }).await;
1071 }
1072
1073 guard.record_step();
1074 total_usage.prompt_tokens += llm_usage.prompt_tokens;
1075 total_usage.completion_tokens += llm_usage.completion_tokens;
1076 session.total_usage.prompt_tokens =
1077 session.total_usage.prompt_tokens.saturating_add(llm_usage.prompt_tokens);
1078 session.total_usage.completion_tokens =
1079 session.total_usage.completion_tokens.saturating_add(llm_usage.completion_tokens);
1080 config.cost_meter.record_llm_usage(&req.session_id, model, &llm_usage).await?;
1081 events.emit(AgentEvent::LlmCallCompleted {
1082 session_id: req.session_id.clone(),
1083 step: current_step,
1084 model: model.to_string(),
1085 usage: llm_usage.clone(),
1086 });
1087 let native_tool_call = if llm.capabilities().native_tool_calling {
1088 llm_tool_calls.first().cloned()
1089 } else {
1090 None
1091 };
1092 let assistant_message = if llm_tool_calls.is_empty() {
1093 Message::text(Role::Assistant, assistant_content.clone())
1094 } else {
1095 Message::assistant_tool_calls(assistant_content.clone(), llm_tool_calls.clone())
1096 };
1097 session.messages.push(assistant_message);
1098
1099 let _ = config
1100 .checkpoint_store
1101 .save_checkpoint(&TurnCheckpoint {
1102 session_id: req.session_id.clone(),
1103 step: guard.steps,
1104 tool_calls: guard.tool_calls,
1105 usage: total_usage.clone(),
1106 })
1107 .await;
1108
1109 let response_for_dispatch = bob_core::types::LlmResponse {
1110 content: assistant_content.clone(),
1111 usage: llm_usage.clone(),
1112 finish_reason: llm_finish_reason,
1113 tool_calls: llm_tool_calls,
1114 };
1115
1116 if let Ok(action) =
1117 parse_action_for_mode(config.dispatch_mode, llm.as_ref(), &response_for_dispatch)
1118 {
1119 consecutive_parse_failures = 0;
1120 match action {
1121 AgentAction::Final { .. } | AgentAction::AskUser { .. } => {
1122 store.save(&req.session_id, &session).await?;
1123 events.emit(AgentEvent::TurnCompleted {
1124 session_id: req.session_id.clone(),
1125 finish_reason: FinishReason::Stop,
1126 usage: total_usage.clone(),
1127 });
1128 let _ = tx
1129 .send_async(AgentStreamEvent::Finished { usage: total_usage.clone() })
1130 .await;
1131 return Ok(());
1132 }
1133 AgentAction::ToolCall { name, arguments } => {
1134 let tool_call_id = native_tool_call
1135 .as_ref()
1136 .filter(|call| call.name == name && call.arguments == arguments)
1137 .and_then(|call| call.call_id.clone());
1138 let call_signature = format!(
1139 "{}:{}",
1140 name,
1141 serde_json::to_string(&arguments).unwrap_or_default()
1142 );
1143 if last_tool_call_signature.as_ref() == Some(&call_signature) {
1144 same_tool_call_streak = same_tool_call_streak.saturating_add(1);
1145 } else {
1146 same_tool_call_streak = 1;
1147 last_tool_call_signature = Some(call_signature);
1148 }
1149 if same_tool_call_streak > MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS {
1150 next_call_id += 1;
1151 let call_id = format!("call-{next_call_id}");
1152 events.emit(AgentEvent::ToolCallStarted {
1153 session_id: req.session_id.clone(),
1154 step: current_step,
1155 name: name.clone(),
1156 });
1157 let _ = tx.send(AgentStreamEvent::ToolCallStarted {
1158 name: name.clone(),
1159 call_id: call_id.clone(),
1160 });
1161 guard.record_tool_call();
1162 let duplicate_result = ToolResult {
1163 name: name.clone(),
1164 output: serde_json::json!({
1165 "error": format!(
1166 "consecutive duplicate tool call limit reached (>{MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS}); skipping to prevent loop"
1167 )
1168 }),
1169 is_error: true,
1170 };
1171 let _ = config
1172 .cost_meter
1173 .record_tool_result(&req.session_id, &duplicate_result)
1174 .await;
1175 events.emit(AgentEvent::ToolCallCompleted {
1176 session_id: req.session_id.clone(),
1177 step: current_step,
1178 name: name.clone(),
1179 is_error: true,
1180 });
1181 let output_str =
1182 serde_json::to_string(&duplicate_result.output).unwrap_or_default();
1183 session.messages.push(Message::tool_result(
1184 name.clone(),
1185 tool_call_id.clone(),
1186 output_str,
1187 ));
1188 let _ = config
1189 .artifact_store
1190 .put(ArtifactRecord {
1191 session_id: req.session_id.clone(),
1192 kind: "tool_result".to_string(),
1193 name: name.clone(),
1194 content: duplicate_result.output.clone(),
1195 })
1196 .await;
1197 let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
1198 call_id,
1199 result: duplicate_result,
1200 });
1201 continue;
1202 }
1203
1204 events.emit(AgentEvent::ToolCallStarted {
1205 session_id: req.session_id.clone(),
1206 step: current_step,
1207 name: name.clone(),
1208 });
1209 next_call_id += 1;
1210 let call_id = format!("call-{next_call_id}");
1211 let _ = tx.send(AgentStreamEvent::ToolCallStarted {
1212 name: name.clone(),
1213 call_id: call_id.clone(),
1214 });
1215 let approval_context = ApprovalContext {
1216 session_id: req.session_id.clone(),
1217 turn_step: guard.steps.max(1),
1218 selected_skills: selected_skills.clone(),
1219 };
1220
1221 let call_fingerprint = JournalEntry::fingerprint(&name, &arguments);
1223 let tool_result = if let Ok(Some(cached)) =
1224 config.journal.lookup(&req.session_id, &call_fingerprint).await
1225 {
1226 tracing::debug!(
1227 session_id = %req.session_id,
1228 tool = %name,
1229 "replaying tool result from journal"
1230 );
1231 ToolResult {
1232 name: cached.tool_name,
1233 output: cached.result,
1234 is_error: cached.is_error,
1235 }
1236 } else {
1237 let result = execute_tool_call(
1238 tools.as_ref(),
1239 &mut guard,
1240 ToolCall::new(name.clone(), arguments.clone()),
1241 &tool_call_policy,
1242 config.tool_policy.as_ref(),
1243 config.approval.as_ref(),
1244 &approval_context,
1245 config.policy.tool_timeout_ms,
1246 )
1247 .await;
1248 let _ = config
1250 .journal
1251 .append(JournalEntry {
1252 session_id: req.session_id.clone(),
1253 call_fingerprint: call_fingerprint.clone(),
1254 tool_name: name.clone(),
1255 arguments: arguments.clone(),
1256 result: result.output.clone(),
1257 is_error: result.is_error,
1258 timestamp_ms: bob_core::tape::now_ms(),
1259 })
1260 .await;
1261 result
1262 };
1263
1264 guard.record_tool_call();
1265 let _ =
1266 config.cost_meter.record_tool_result(&req.session_id, &tool_result).await;
1267 let is_error = tool_result.is_error;
1268 events.emit(AgentEvent::ToolCallCompleted {
1269 session_id: req.session_id.clone(),
1270 step: current_step,
1271 name: name.clone(),
1272 is_error,
1273 });
1274 let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
1275 call_id,
1276 result: tool_result.clone(),
1277 });
1278
1279 let output_str = serde_json::to_string(&tool_result.output).unwrap_or_default();
1280 session.messages.push(Message::tool_result(
1281 name.clone(),
1282 tool_call_id,
1283 output_str,
1284 ));
1285 let _ = config
1286 .artifact_store
1287 .put(ArtifactRecord {
1288 session_id: req.session_id.clone(),
1289 kind: "tool_result".to_string(),
1290 name: name.clone(),
1291 content: tool_result.output.clone(),
1292 })
1293 .await;
1294 }
1295 }
1296 } else {
1297 consecutive_parse_failures += 1;
1298 last_tool_call_signature = None;
1299 same_tool_call_streak = 0;
1300 if consecutive_parse_failures >= 2 {
1301 store.save(&req.session_id, &session).await?;
1302 events.emit(AgentEvent::Error {
1303 session_id: req.session_id.clone(),
1304 step: Some(current_step),
1305 error: "LLM produced invalid JSON after re-prompt".to_string(),
1306 });
1307 return Err(AgentError::Internal(
1308 "LLM produced invalid JSON after re-prompt".into(),
1309 ));
1310 }
1311 session.messages.push(Message::text(
1312 Role::User,
1313 "Your response was not valid JSON. \
1314 Please respond with exactly one JSON object \
1315 matching the required schema.",
1316 ));
1317 }
1318 }
1319}
1320
1321#[cfg(test)]
1322mod tests {
1323 use super::*;
1324
1325 fn test_policy() -> TurnPolicy {
1327 TurnPolicy {
1328 max_steps: 3,
1329 max_tool_calls: 2,
1330 max_consecutive_errors: 2,
1331 turn_timeout_ms: 100,
1332 tool_timeout_ms: 50,
1333 }
1334 }
1335
1336 #[test]
1337 fn trips_on_max_steps() {
1338 let mut guard = LoopGuard::new(test_policy());
1339 assert!(guard.can_continue());
1340
1341 for _ in 0..3 {
1342 guard.record_step();
1343 }
1344
1345 assert!(!guard.can_continue(), "guard should trip after reaching max_steps");
1346 assert_eq!(guard.reason(), GuardReason::MaxSteps);
1347 }
1348
1349 #[test]
1350 fn trips_on_max_tool_calls() {
1351 let mut guard = LoopGuard::new(test_policy());
1352 assert!(guard.can_continue());
1353
1354 for _ in 0..2 {
1355 guard.record_tool_call();
1356 }
1357
1358 assert!(!guard.can_continue(), "guard should trip after reaching max_tool_calls");
1359 assert_eq!(guard.reason(), GuardReason::MaxToolCalls);
1360 }
1361
1362 #[test]
1363 fn trips_on_max_consecutive_errors() {
1364 let mut guard = LoopGuard::new(test_policy());
1365 assert!(guard.can_continue());
1366
1367 for _ in 0..2 {
1368 guard.record_error();
1369 }
1370
1371 assert!(!guard.can_continue(), "guard should trip after reaching max_consecutive_errors");
1372 assert_eq!(guard.reason(), GuardReason::MaxConsecutiveErrors);
1373 }
1374
1375 #[tokio::test]
1376 async fn trips_on_timeout() {
1377 let guard = LoopGuard::new(test_policy());
1378 assert!(guard.can_continue());
1379 assert!(!guard.timed_out());
1380
1381 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
1383
1384 assert!(!guard.can_continue(), "guard should trip after timeout");
1385 assert!(guard.timed_out());
1386 assert_eq!(guard.reason(), GuardReason::TurnTimeout);
1387 }
1388
1389 #[test]
1390 fn reset_errors_clears_counter() {
1391 let mut guard = LoopGuard::new(test_policy());
1392
1393 guard.record_error();
1394 guard.reset_errors();
1395
1396 guard.record_error();
1398 assert!(guard.can_continue(), "single error after reset should not trip guard");
1399 }
1400
1401 use std::{
1404 collections::{HashMap, VecDeque},
1405 sync::{Arc, Mutex},
1406 };
1407
1408 use bob_core::{
1409 error::{CostError, LlmError, StoreError, ToolError},
1410 ports::{
1411 ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
1412 ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
1413 },
1414 types::{
1415 AgentEvent, AgentRequest, AgentRunResult, AgentStreamEvent, ApprovalContext,
1416 ApprovalDecision, ArtifactRecord, CancelToken, LlmRequest, LlmResponse, LlmStream,
1417 LlmStreamChunk, SessionId, SessionState, ToolCall, ToolDescriptor, ToolResult,
1418 TurnCheckpoint,
1419 },
1420 };
1421 use futures_util::StreamExt;
1422
1423 struct SequentialLlm {
1427 responses: Mutex<VecDeque<Result<LlmResponse, LlmError>>>,
1428 }
1429
1430 impl SequentialLlm {
1431 fn from_contents(contents: Vec<&str>) -> Self {
1432 let responses = contents
1433 .into_iter()
1434 .map(|c| {
1435 Ok(LlmResponse {
1436 content: c.to_string(),
1437 usage: TokenUsage::default(),
1438 finish_reason: FinishReason::Stop,
1439 tool_calls: Vec::new(),
1440 })
1441 })
1442 .collect();
1443 Self { responses: Mutex::new(responses) }
1444 }
1445
1446 fn from_responses(responses: Vec<LlmResponse>) -> Self {
1447 let queued = responses.into_iter().map(Ok).collect();
1448 Self { responses: Mutex::new(queued) }
1449 }
1450 }
1451
1452 #[async_trait::async_trait]
1453 impl LlmPort for SequentialLlm {
1454 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1455 let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
1456 q.pop_front().unwrap_or_else(|| {
1457 Ok(LlmResponse {
1458 content: r#"{"type": "final", "content": "fallback"}"#.to_string(),
1459 usage: TokenUsage::default(),
1460 finish_reason: FinishReason::Stop,
1461 tool_calls: Vec::new(),
1462 })
1463 })
1464 }
1465
1466 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1467 Err(LlmError::Provider("not implemented".into()))
1468 }
1469 }
1470
1471 struct MockToolPort {
1473 tools: Vec<ToolDescriptor>,
1474 call_results: Mutex<VecDeque<Result<ToolResult, ToolError>>>,
1475 }
1476
1477 impl MockToolPort {
1478 fn empty() -> Self {
1479 Self { tools: vec![], call_results: Mutex::new(VecDeque::new()) }
1480 }
1481
1482 fn with_tool_and_results(
1483 tool_name: &str,
1484 results: Vec<Result<ToolResult, ToolError>>,
1485 ) -> Self {
1486 Self {
1487 tools: vec![
1488 ToolDescriptor::new(tool_name, format!("{tool_name} tool"))
1489 .with_input_schema(serde_json::json!({"type": "object"})),
1490 ],
1491 call_results: Mutex::new(results.into()),
1492 }
1493 }
1494 }
1495
1496 #[async_trait::async_trait]
1497 impl ToolPort for MockToolPort {
1498 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1499 Ok(self.tools.clone())
1500 }
1501
1502 async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
1503 let mut q = self.call_results.lock().unwrap_or_else(|p| p.into_inner());
1504 q.pop_front().unwrap_or_else(|| {
1505 Ok(ToolResult {
1506 name: call.name,
1507 output: serde_json::json!({"result": "default"}),
1508 is_error: false,
1509 })
1510 })
1511 }
1512 }
1513
1514 struct NoCallToolPort {
1515 tools: Vec<ToolDescriptor>,
1516 }
1517
1518 #[async_trait::async_trait]
1519 impl ToolPort for NoCallToolPort {
1520 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1521 Ok(self.tools.clone())
1522 }
1523
1524 async fn call_tool(&self, _call: ToolCall) -> Result<ToolResult, ToolError> {
1525 Err(ToolError::Execution(
1526 "tool call should be blocked by policy before execution".to_string(),
1527 ))
1528 }
1529 }
1530
1531 struct AllowAllPolicyPort;
1532
1533 impl ToolPolicyPort for AllowAllPolicyPort {
1534 fn is_tool_allowed(
1535 &self,
1536 _tool: &str,
1537 _deny_tools: &[String],
1538 _allow_tools: Option<&[String]>,
1539 ) -> bool {
1540 true
1541 }
1542 }
1543
1544 struct DenySearchPolicyPort;
1545
1546 impl ToolPolicyPort for DenySearchPolicyPort {
1547 fn is_tool_allowed(
1548 &self,
1549 tool: &str,
1550 _deny_tools: &[String],
1551 _allow_tools: Option<&[String]>,
1552 ) -> bool {
1553 tool != "search"
1554 }
1555 }
1556
1557 struct AlwaysApprovePort;
1558
1559 #[async_trait::async_trait]
1560 impl ApprovalPort for AlwaysApprovePort {
1561 async fn approve_tool_call(
1562 &self,
1563 _call: &ToolCall,
1564 _context: &ApprovalContext,
1565 ) -> Result<ApprovalDecision, ToolError> {
1566 Ok(ApprovalDecision::Approved)
1567 }
1568 }
1569
1570 struct AlwaysDenyApprovalPort;
1571
1572 #[async_trait::async_trait]
1573 impl ApprovalPort for AlwaysDenyApprovalPort {
1574 async fn approve_tool_call(
1575 &self,
1576 _call: &ToolCall,
1577 _context: &ApprovalContext,
1578 ) -> Result<ApprovalDecision, ToolError> {
1579 Ok(ApprovalDecision::Denied {
1580 reason: "approval policy rejected tool call".to_string(),
1581 })
1582 }
1583 }
1584
1585 struct CountingCheckpointPort {
1586 saved: Mutex<Vec<TurnCheckpoint>>,
1587 }
1588
1589 impl CountingCheckpointPort {
1590 fn new() -> Self {
1591 Self { saved: Mutex::new(Vec::new()) }
1592 }
1593 }
1594
1595 #[async_trait::async_trait]
1596 impl TurnCheckpointStorePort for CountingCheckpointPort {
1597 async fn save_checkpoint(&self, checkpoint: &TurnCheckpoint) -> Result<(), StoreError> {
1598 self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(checkpoint.clone());
1599 Ok(())
1600 }
1601
1602 async fn load_latest(
1603 &self,
1604 _session_id: &SessionId,
1605 ) -> Result<Option<TurnCheckpoint>, StoreError> {
1606 Ok(None)
1607 }
1608 }
1609
1610 struct NoopArtifactStore;
1611
1612 #[async_trait::async_trait]
1613 impl ArtifactStorePort for NoopArtifactStore {
1614 async fn put(&self, _artifact: ArtifactRecord) -> Result<(), StoreError> {
1615 Ok(())
1616 }
1617
1618 async fn list_by_session(
1619 &self,
1620 _session_id: &SessionId,
1621 ) -> Result<Vec<ArtifactRecord>, StoreError> {
1622 Ok(Vec::new())
1623 }
1624 }
1625
1626 struct CountingArtifactStore {
1627 saved: Mutex<Vec<ArtifactRecord>>,
1628 }
1629
1630 impl CountingArtifactStore {
1631 fn new() -> Self {
1632 Self { saved: Mutex::new(Vec::new()) }
1633 }
1634 }
1635
1636 #[async_trait::async_trait]
1637 impl ArtifactStorePort for CountingArtifactStore {
1638 async fn put(&self, artifact: ArtifactRecord) -> Result<(), StoreError> {
1639 self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(artifact);
1640 Ok(())
1641 }
1642
1643 async fn list_by_session(
1644 &self,
1645 _session_id: &SessionId,
1646 ) -> Result<Vec<ArtifactRecord>, StoreError> {
1647 Ok(self.saved.lock().unwrap_or_else(|p| p.into_inner()).clone())
1648 }
1649 }
1650
1651 struct CountingCostMeter {
1652 llm_calls: Mutex<u32>,
1653 tool_results: Mutex<u32>,
1654 }
1655
1656 impl CountingCostMeter {
1657 fn new() -> Self {
1658 Self { llm_calls: Mutex::new(0), tool_results: Mutex::new(0) }
1659 }
1660 }
1661
1662 #[async_trait::async_trait]
1663 impl CostMeterPort for CountingCostMeter {
1664 async fn check_budget(&self, _session_id: &SessionId) -> Result<(), CostError> {
1665 Ok(())
1666 }
1667
1668 async fn record_llm_usage(
1669 &self,
1670 _session_id: &SessionId,
1671 _model: &str,
1672 _usage: &TokenUsage,
1673 ) -> Result<(), CostError> {
1674 let mut count = self.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
1675 *count += 1;
1676 Ok(())
1677 }
1678
1679 async fn record_tool_result(
1680 &self,
1681 _session_id: &SessionId,
1682 _tool_result: &ToolResult,
1683 ) -> Result<(), CostError> {
1684 let mut count = self.tool_results.lock().unwrap_or_else(|p| p.into_inner());
1685 *count += 1;
1686 Ok(())
1687 }
1688 }
1689
1690 struct MemoryStore {
1691 data: Mutex<HashMap<SessionId, SessionState>>,
1692 }
1693
1694 impl MemoryStore {
1695 fn new() -> Self {
1696 Self { data: Mutex::new(HashMap::new()) }
1697 }
1698 }
1699
1700 struct FailingSaveStore;
1701
1702 #[async_trait::async_trait]
1703 impl SessionStore for FailingSaveStore {
1704 async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1705 Ok(None)
1706 }
1707
1708 async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
1709 Err(StoreError::Backend("simulated save failure".into()))
1710 }
1711 }
1712
1713 #[async_trait::async_trait]
1714 impl SessionStore for MemoryStore {
1715 async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1716 let map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1717 Ok(map.get(id).cloned())
1718 }
1719
1720 async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
1721 let mut map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1722 map.insert(id.clone(), state.clone());
1723 Ok(())
1724 }
1725 }
1726
1727 struct CollectingSink {
1728 events: Mutex<Vec<AgentEvent>>,
1729 }
1730
1731 impl CollectingSink {
1732 fn new() -> Self {
1733 Self { events: Mutex::new(Vec::new()) }
1734 }
1735
1736 fn event_count(&self) -> usize {
1737 self.events.lock().unwrap_or_else(|p| p.into_inner()).len()
1738 }
1739
1740 fn all_events(&self) -> Vec<AgentEvent> {
1741 self.events.lock().unwrap_or_else(|p| p.into_inner()).clone()
1742 }
1743 }
1744
1745 impl EventSink for CollectingSink {
1746 fn emit(&self, event: AgentEvent) {
1747 self.events.lock().unwrap_or_else(|p| p.into_inner()).push(event);
1748 }
1749 }
1750
1751 fn make_request(input: &str) -> AgentRequest {
1752 AgentRequest {
1753 input: input.into(),
1754 session_id: "test-session".into(),
1755 model: None,
1756 context: bob_core::types::RequestContext::default(),
1757 cancel_token: None,
1758 output_schema: None,
1759 max_output_retries: 0,
1760 }
1761 }
1762
1763 fn generous_policy() -> TurnPolicy {
1764 TurnPolicy {
1765 max_steps: 20,
1766 max_tool_calls: 10,
1767 max_consecutive_errors: 3,
1768 turn_timeout_ms: 30_000,
1769 tool_timeout_ms: 5_000,
1770 }
1771 }
1772
1773 struct StreamLlm {
1774 chunks: Mutex<VecDeque<Result<LlmStreamChunk, LlmError>>>,
1775 }
1776
1777 impl StreamLlm {
1778 fn new(chunks: Vec<Result<LlmStreamChunk, LlmError>>) -> Self {
1779 Self { chunks: Mutex::new(chunks.into()) }
1780 }
1781 }
1782
1783 #[async_trait::async_trait]
1784 impl LlmPort for StreamLlm {
1785 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1786 Err(LlmError::Provider("complete() should not be called in stream test".into()))
1787 }
1788
1789 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1790 let mut chunks = self.chunks.lock().unwrap_or_else(|p| p.into_inner());
1791 let items: Vec<Result<LlmStreamChunk, LlmError>> = chunks.drain(..).collect();
1792 Ok(Box::pin(futures_util::stream::iter(items)))
1793 }
1794 }
1795
1796 struct InspectingLlm {
1797 expected_substring: String,
1798 }
1799
1800 #[async_trait::async_trait]
1801 impl LlmPort for InspectingLlm {
1802 async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
1803 let system = req
1804 .messages
1805 .iter()
1806 .find(|m| m.role == Role::System)
1807 .map(|m| m.content.clone())
1808 .unwrap_or_default();
1809 if !system.contains(&self.expected_substring) {
1810 return Err(LlmError::Provider(format!(
1811 "expected system prompt to include '{}', got: {}",
1812 self.expected_substring, system
1813 )));
1814 }
1815 Ok(LlmResponse {
1816 content: r#"{"type": "final", "content": "ok"}"#.to_string(),
1817 usage: TokenUsage::default(),
1818 finish_reason: FinishReason::Stop,
1819 tool_calls: Vec::new(),
1820 })
1821 }
1822
1823 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1824 Err(LlmError::Provider("not used".into()))
1825 }
1826 }
1827
1828 #[tokio::test]
1831 async fn tc01_simple_final_response() {
1832 let llm =
1833 SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "Hello there!"}"#]);
1834 let tools = MockToolPort::empty();
1835 let store = MemoryStore::new();
1836 let sink = CollectingSink::new();
1837
1838 let result = run_turn(
1839 &llm,
1840 &tools,
1841 &store,
1842 &sink,
1843 make_request("Hi"),
1844 &generous_policy(),
1845 "test-model",
1846 )
1847 .await;
1848
1849 assert!(
1850 matches!(&result, Ok(AgentRunResult::Finished(_))),
1851 "expected Finished, got {result:?}"
1852 );
1853 let resp = match result {
1854 Ok(AgentRunResult::Finished(r)) => r,
1855 _ => return,
1856 };
1857
1858 assert_eq!(resp.content, "Hello there!");
1859 assert_eq!(resp.finish_reason, FinishReason::Stop);
1860 assert!(resp.tool_transcript.is_empty());
1861 assert!(sink.event_count() >= 3, "should emit TurnStarted, LlmCall*, TurnCompleted");
1862 }
1863
1864 #[tokio::test]
1867 async fn tc02_tool_call_then_final() {
1868 let llm = SequentialLlm::from_contents(vec![
1869 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1870 r#"{"type": "final", "content": "Found results."}"#,
1871 ]);
1872 let tools = MockToolPort::with_tool_and_results(
1873 "search",
1874 vec![Ok(ToolResult {
1875 name: "search".into(),
1876 output: serde_json::json!({"hits": 42}),
1877 is_error: false,
1878 })],
1879 );
1880 let store = MemoryStore::new();
1881 let sink = CollectingSink::new();
1882
1883 let result = run_turn(
1884 &llm,
1885 &tools,
1886 &store,
1887 &sink,
1888 make_request("Search for rust"),
1889 &generous_policy(),
1890 "test-model",
1891 )
1892 .await;
1893
1894 assert!(
1895 matches!(&result, Ok(AgentRunResult::Finished(_))),
1896 "expected Finished, got {result:?}"
1897 );
1898 let resp = match result {
1899 Ok(AgentRunResult::Finished(r)) => r,
1900 _ => return,
1901 };
1902
1903 assert_eq!(resp.content, "Found results.");
1904 assert_eq!(resp.finish_reason, FinishReason::Stop);
1905 assert_eq!(resp.tool_transcript.len(), 1);
1906 assert_eq!(resp.tool_transcript[0].name, "search");
1907 assert!(!resp.tool_transcript[0].is_error);
1908 }
1909
1910 #[tokio::test]
1913 async fn tc03_parse_error_reprompt_success() {
1914 let llm = SequentialLlm::from_contents(vec![
1915 "This is not JSON at all.",
1916 r#"{"type": "final", "content": "Recovered"}"#,
1917 ]);
1918 let tools = MockToolPort::empty();
1919 let store = MemoryStore::new();
1920 let sink = CollectingSink::new();
1921
1922 let result = run_turn(
1923 &llm,
1924 &tools,
1925 &store,
1926 &sink,
1927 make_request("Hi"),
1928 &generous_policy(),
1929 "test-model",
1930 )
1931 .await;
1932
1933 assert!(
1934 matches!(&result, Ok(AgentRunResult::Finished(_))),
1935 "expected Finished after re-prompt, got {result:?}"
1936 );
1937 let resp = match result {
1938 Ok(AgentRunResult::Finished(r)) => r,
1939 _ => return,
1940 };
1941
1942 assert_eq!(resp.content, "Recovered");
1943 assert_eq!(resp.finish_reason, FinishReason::Stop);
1944 }
1945
1946 #[tokio::test]
1949 async fn tc04_double_parse_error() {
1950 let llm = SequentialLlm::from_contents(vec!["not json 1", "not json 2"]);
1951 let tools = MockToolPort::empty();
1952 let store = MemoryStore::new();
1953 let sink = CollectingSink::new();
1954
1955 let result = run_turn(
1956 &llm,
1957 &tools,
1958 &store,
1959 &sink,
1960 make_request("Hi"),
1961 &generous_policy(),
1962 "test-model",
1963 )
1964 .await;
1965
1966 assert!(result.is_err(), "should return error after two parse failures");
1967 let msg = match result {
1968 Err(err) => err.to_string(),
1969 Ok(value) => format!("unexpected success: {value:?}"),
1970 };
1971 assert!(msg.contains("invalid JSON"), "error message = {msg}");
1972 }
1973
1974 #[tokio::test]
1977 async fn tc05_max_steps_exhaustion() {
1978 let llm = SequentialLlm::from_contents(vec![
1980 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1981 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1982 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1983 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1984 ]);
1985 let tools = MockToolPort::with_tool_and_results(
1986 "t1",
1987 vec![
1988 Ok(ToolResult {
1989 name: "t1".into(),
1990 output: serde_json::json!(null),
1991 is_error: false,
1992 }),
1993 Ok(ToolResult {
1994 name: "t1".into(),
1995 output: serde_json::json!(null),
1996 is_error: false,
1997 }),
1998 Ok(ToolResult {
1999 name: "t1".into(),
2000 output: serde_json::json!(null),
2001 is_error: false,
2002 }),
2003 ],
2004 );
2005 let store = MemoryStore::new();
2006 let sink = CollectingSink::new();
2007
2008 let policy = TurnPolicy {
2009 max_steps: 2,
2010 max_tool_calls: 10,
2011 max_consecutive_errors: 5,
2012 turn_timeout_ms: 30_000,
2013 tool_timeout_ms: 5_000,
2014 };
2015
2016 let result =
2017 run_turn(&llm, &tools, &store, &sink, make_request("do work"), &policy, "test-model")
2018 .await;
2019
2020 assert!(
2021 matches!(&result, Ok(AgentRunResult::Finished(_))),
2022 "expected Finished with GuardExceeded, got {result:?}"
2023 );
2024 let resp = match result {
2025 Ok(AgentRunResult::Finished(r)) => r,
2026 _ => return,
2027 };
2028
2029 assert_eq!(resp.finish_reason, FinishReason::GuardExceeded);
2030 assert!(resp.content.contains("MaxSteps"), "content = {}", resp.content);
2031 }
2032
2033 #[tokio::test]
2036 async fn tc06_cancellation() {
2037 let llm = SequentialLlm::from_contents(vec![
2038 r#"{"type": "final", "content": "should not reach"}"#,
2039 ]);
2040 let tools = MockToolPort::empty();
2041 let store = MemoryStore::new();
2042 let sink = CollectingSink::new();
2043
2044 let token = CancelToken::new();
2045 token.cancel();
2047
2048 let mut req = make_request("Hi");
2049 req.cancel_token = Some(token);
2050
2051 let result =
2052 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2053
2054 assert!(
2055 matches!(&result, Ok(AgentRunResult::Finished(_))),
2056 "expected Finished with Cancelled, got {result:?}"
2057 );
2058 let resp = match result {
2059 Ok(AgentRunResult::Finished(r)) => r,
2060 _ => return,
2061 };
2062
2063 assert_eq!(resp.finish_reason, FinishReason::Cancelled);
2064 }
2065
2066 #[tokio::test]
2069 async fn tc07_tool_error_then_final() {
2070 let llm = SequentialLlm::from_contents(vec![
2071 r#"{"type": "tool_call", "name": "flaky_tool", "arguments": {}}"#,
2072 r#"{"type": "final", "content": "Recovered from tool error."}"#,
2073 ]);
2074 let tools = MockToolPort::with_tool_and_results(
2075 "flaky_tool",
2076 vec![Err(ToolError::Execution("connection refused".into()))],
2077 );
2078 let store = MemoryStore::new();
2079 let sink = CollectingSink::new();
2080
2081 let result = run_turn(
2082 &llm,
2083 &tools,
2084 &store,
2085 &sink,
2086 make_request("call flaky"),
2087 &generous_policy(),
2088 "test-model",
2089 )
2090 .await;
2091
2092 assert!(
2093 matches!(&result, Ok(AgentRunResult::Finished(_))),
2094 "expected Finished, got {result:?}"
2095 );
2096 let resp = match result {
2097 Ok(AgentRunResult::Finished(r)) => r,
2098 _ => return,
2099 };
2100
2101 assert_eq!(resp.content, "Recovered from tool error.");
2102 assert_eq!(resp.tool_transcript.len(), 1);
2103 assert!(resp.tool_transcript[0].is_error);
2104 }
2105
2106 #[tokio::test]
2107 async fn tc08_save_failure_is_propagated() {
2108 let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "done"}"#]);
2109 let tools = MockToolPort::empty();
2110 let store = FailingSaveStore;
2111 let sink = CollectingSink::new();
2112
2113 let result = run_turn(
2114 &llm,
2115 &tools,
2116 &store,
2117 &sink,
2118 make_request("hello"),
2119 &generous_policy(),
2120 "test-model",
2121 )
2122 .await;
2123
2124 assert!(matches!(result, Err(AgentError::Store(_))), "expected Store error to be returned");
2125 }
2126
2127 #[tokio::test]
2128 async fn tc09_stream_turn_emits_text_and_finished() {
2129 let llm: Arc<dyn LlmPort> = Arc::new(StreamLlm::new(vec![
2130 Ok(LlmStreamChunk::TextDelta("{\"type\":\"final\",\"content\":\"he".into())),
2131 Ok(LlmStreamChunk::TextDelta("llo\"}".into())),
2132 Ok(LlmStreamChunk::Done {
2133 usage: TokenUsage { prompt_tokens: 3, completion_tokens: 4 },
2134 }),
2135 ]));
2136 let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
2137 let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
2138 let sink: Arc<dyn EventSink> = Arc::new(CollectingSink::new());
2139
2140 let stream_result = run_turn_stream(
2141 llm,
2142 tools,
2143 store,
2144 sink,
2145 make_request("hello"),
2146 generous_policy(),
2147 "test-model".to_string(),
2148 )
2149 .await;
2150 assert!(stream_result.is_ok(), "run_turn_stream should produce a stream");
2151 let mut stream = match stream_result {
2152 Ok(stream) => stream,
2153 Err(_) => return,
2154 };
2155
2156 let mut saw_text = false;
2157 let mut saw_finished = false;
2158 while let Some(event) = stream.next().await {
2159 match event {
2160 AgentStreamEvent::TextDelta { content } => {
2161 saw_text = saw_text || !content.is_empty();
2162 }
2163 AgentStreamEvent::Finished { usage } => {
2164 saw_finished = true;
2165 assert_eq!(usage.prompt_tokens, 3);
2166 assert_eq!(usage.completion_tokens, 4);
2167 }
2168 AgentStreamEvent::ToolCallStarted { .. } |
2169 AgentStreamEvent::ToolCallCompleted { .. } |
2170 AgentStreamEvent::Error { .. } => {}
2171 }
2172 }
2173
2174 assert!(saw_text, "expected at least one text delta");
2175 assert!(saw_finished, "expected a finished event");
2176 }
2177
2178 #[tokio::test]
2179 async fn tc10_skills_prompt_context_is_injected() {
2180 let llm = InspectingLlm { expected_substring: "Skill: rust-review".to_string() };
2181 let tools = MockToolPort::empty();
2182 let store = MemoryStore::new();
2183 let sink = CollectingSink::new();
2184
2185 let mut req = make_request("review this code");
2186 req.context.system_prompt = Some("Skill: rust-review\nUse strict checks.".to_string());
2187
2188 let result =
2189 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2190
2191 assert!(result.is_ok(), "run should succeed when skills prompt is injected");
2192 }
2193
2194 #[tokio::test]
2195 async fn tc11_selected_skills_context_emits_event() {
2196 let llm =
2197 SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "looks good"}"#]);
2198 let tools = MockToolPort::empty();
2199 let store = MemoryStore::new();
2200 let sink = CollectingSink::new();
2201
2202 let mut req = make_request("review code");
2203 req.context.selected_skills = vec!["rust-review".to_string(), "security-audit".to_string()];
2204
2205 let result =
2206 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2207 assert!(result.is_ok(), "run should succeed");
2208
2209 let events = sink.all_events();
2210 assert!(
2211 events.iter().any(|event| matches!(
2212 event,
2213 AgentEvent::SkillsSelected { skill_names, .. }
2214 if skill_names == &vec!["rust-review".to_string(), "security-audit".to_string()]
2215 )),
2216 "skills.selected event should be emitted with context skill names"
2217 );
2218 }
2219
2220 #[tokio::test]
2221 async fn tc12_policy_deny_tool_blocks_execution() {
2222 let llm = SequentialLlm::from_contents(vec![
2223 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
2224 r#"{"type": "final", "content": "done"}"#,
2225 ]);
2226 let tools = NoCallToolPort {
2227 tools: vec![
2228 ToolDescriptor::new("search", "search tool")
2229 .with_input_schema(serde_json::json!({"type":"object"})),
2230 ],
2231 };
2232 let store = MemoryStore::new();
2233 let sink = CollectingSink::new();
2234
2235 let mut req = make_request("search rust");
2236 req.context.tool_policy.deny_tools =
2237 vec!["search".to_string(), "local/shell_exec".to_string()];
2238
2239 let result =
2240 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
2241 assert!(
2242 matches!(&result, Ok(AgentRunResult::Finished(_))),
2243 "expected finished response, got {result:?}"
2244 );
2245 let resp = match result {
2246 Ok(AgentRunResult::Finished(r)) => r,
2247 _ => return,
2248 };
2249
2250 assert_eq!(resp.finish_reason, FinishReason::Stop);
2251 assert_eq!(resp.tool_transcript.len(), 1);
2252 assert!(resp.tool_transcript[0].is_error);
2253 assert!(
2254 resp.tool_transcript[0].output.to_string().contains("denied"),
2255 "tool error should explain policy denial"
2256 );
2257 }
2258
2259 #[tokio::test]
2260 async fn tc13_approval_denied_blocks_execution() {
2261 let llm = SequentialLlm::from_contents(vec![
2262 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
2263 r#"{"type": "final", "content": "done"}"#,
2264 ]);
2265 let tools = NoCallToolPort {
2266 tools: vec![
2267 ToolDescriptor::new("search", "search tool")
2268 .with_input_schema(serde_json::json!({"type":"object"})),
2269 ],
2270 };
2271 let store = MemoryStore::new();
2272 let sink = CollectingSink::new();
2273 let req = make_request("search rust");
2274 let tool_policy = AllowAllPolicyPort;
2275 let approval = AlwaysDenyApprovalPort;
2276
2277 let result = run_turn_with_controls(
2278 &llm,
2279 &tools,
2280 &store,
2281 &sink,
2282 req,
2283 &generous_policy(),
2284 "test-model",
2285 &tool_policy,
2286 &approval,
2287 )
2288 .await;
2289 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
2290 let resp = match result {
2291 Ok(AgentRunResult::Finished(r)) => r,
2292 _ => return,
2293 };
2294
2295 assert_eq!(resp.tool_transcript.len(), 1);
2296 assert!(resp.tool_transcript[0].is_error);
2297 assert!(
2298 resp.tool_transcript[0].output.to_string().contains("approval policy rejected"),
2299 "tool error should explain approval denial"
2300 );
2301 }
2302
2303 #[tokio::test]
2304 async fn tc14_custom_policy_port_blocks_execution() {
2305 let llm = SequentialLlm::from_contents(vec![
2306 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
2307 r#"{"type": "final", "content": "done"}"#,
2308 ]);
2309 let tools = NoCallToolPort {
2310 tools: vec![
2311 ToolDescriptor::new("search", "search tool")
2312 .with_input_schema(serde_json::json!({"type":"object"})),
2313 ],
2314 };
2315 let store = MemoryStore::new();
2316 let sink = CollectingSink::new();
2317 let req = make_request("search rust");
2318 let tool_policy = DenySearchPolicyPort;
2319 let approval = AlwaysApprovePort;
2320
2321 let result = run_turn_with_controls(
2322 &llm,
2323 &tools,
2324 &store,
2325 &sink,
2326 req,
2327 &generous_policy(),
2328 "test-model",
2329 &tool_policy,
2330 &approval,
2331 )
2332 .await;
2333 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
2334 let resp = match result {
2335 Ok(AgentRunResult::Finished(r)) => r,
2336 _ => return,
2337 };
2338
2339 assert_eq!(resp.tool_transcript.len(), 1);
2340 assert!(resp.tool_transcript[0].is_error);
2341 assert!(
2342 resp.tool_transcript[0].output.to_string().contains("denied"),
2343 "tool error should explain policy denial"
2344 );
2345 }
2346
2347 #[tokio::test]
2348 async fn tc15_native_dispatch_mode_uses_llm_tool_calls() {
2349 struct NativeToolLlm {
2350 responses: Mutex<VecDeque<LlmResponse>>,
2351 }
2352
2353 #[async_trait::async_trait]
2354 impl LlmPort for NativeToolLlm {
2355 fn capabilities(&self) -> bob_core::types::LlmCapabilities {
2356 bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
2357 }
2358
2359 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
2360 let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
2361 Ok(q.pop_front().unwrap_or(LlmResponse {
2362 content: r#"{"type":"final","content":"fallback"}"#.to_string(),
2363 usage: TokenUsage::default(),
2364 finish_reason: FinishReason::Stop,
2365 tool_calls: Vec::new(),
2366 }))
2367 }
2368
2369 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
2370 Err(LlmError::Provider("not used".into()))
2371 }
2372 }
2373
2374 let llm = NativeToolLlm {
2375 responses: Mutex::new(VecDeque::from(vec![
2376 LlmResponse {
2377 content: "ignored".to_string(),
2378 usage: TokenUsage::default(),
2379 finish_reason: FinishReason::Stop,
2380 tool_calls: vec![
2381 ToolCall::new("search", serde_json::json!({"q":"rust"}))
2382 .with_call_id("call-search-1"),
2383 ],
2384 },
2385 LlmResponse {
2386 content: r#"{"type":"final","content":"done"}"#.to_string(),
2387 usage: TokenUsage::default(),
2388 finish_reason: FinishReason::Stop,
2389 tool_calls: Vec::new(),
2390 },
2391 ])),
2392 };
2393 let tools = MockToolPort::with_tool_and_results(
2394 "search",
2395 vec![Ok(ToolResult {
2396 name: "search".to_string(),
2397 output: serde_json::json!({"hits": 2}),
2398 is_error: false,
2399 })],
2400 );
2401 let store = MemoryStore::new();
2402 let sink = CollectingSink::new();
2403 let checkpoint = CountingCheckpointPort::new();
2404 let artifacts = NoopArtifactStore;
2405 let cost = CountingCostMeter::new();
2406 let policy = AllowAllPolicyPort;
2407 let approval = AlwaysApprovePort;
2408
2409 let result = run_turn_with_extensions(
2410 &llm,
2411 &tools,
2412 &store,
2413 &sink,
2414 make_request("search rust"),
2415 &generous_policy(),
2416 "test-model",
2417 &policy,
2418 &approval,
2419 crate::DispatchMode::NativePreferred,
2420 &checkpoint,
2421 &artifacts,
2422 &cost,
2423 &crate::prompt::WindowContextCompactor::default(),
2424 &crate::NoOpToolJournalPort,
2425 )
2426 .await;
2427
2428 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2429 let resp = match result {
2430 Ok(AgentRunResult::Finished(r)) => r,
2431 _ => return,
2432 };
2433 assert_eq!(resp.tool_transcript.len(), 1);
2434 assert_eq!(resp.tool_transcript[0].name, "search");
2435
2436 let saved = store.load(&"test-session".to_string()).await;
2437 let saved = match saved {
2438 Ok(Some(state)) => state,
2439 other => panic!("expected saved state, got {other:?}"),
2440 };
2441 assert!(
2442 saved.messages.iter().any(|message| {
2443 message.role == Role::Assistant &&
2444 message.tool_calls.len() == 1 &&
2445 message.tool_calls[0].call_id.as_deref() == Some("call-search-1")
2446 }),
2447 "assistant tool call should be persisted structurally",
2448 );
2449 assert!(
2450 saved.messages.iter().any(|message| {
2451 message.role == Role::Tool &&
2452 message.tool_call_id.as_deref() == Some("call-search-1") &&
2453 message.tool_name.as_deref() == Some("search")
2454 }),
2455 "tool result should retain tool metadata",
2456 );
2457 }
2458
2459 #[tokio::test]
2460 async fn tc16_checkpoint_and_cost_ports_are_invoked() {
2461 let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "ok"}"#]);
2462 let tools = MockToolPort::empty();
2463 let store = MemoryStore::new();
2464 let sink = CollectingSink::new();
2465 let checkpoint = CountingCheckpointPort::new();
2466 let artifacts = NoopArtifactStore;
2467 let cost = CountingCostMeter::new();
2468 let policy = AllowAllPolicyPort;
2469 let approval = AlwaysApprovePort;
2470
2471 let result = run_turn_with_extensions(
2472 &llm,
2473 &tools,
2474 &store,
2475 &sink,
2476 make_request("hello"),
2477 &generous_policy(),
2478 "test-model",
2479 &policy,
2480 &approval,
2481 crate::DispatchMode::PromptGuided,
2482 &checkpoint,
2483 &artifacts,
2484 &cost,
2485 &crate::prompt::WindowContextCompactor::default(),
2486 &crate::NoOpToolJournalPort,
2487 )
2488 .await;
2489 assert!(result.is_ok(), "turn should succeed");
2490 let checkpoints = checkpoint.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
2491 let llm_calls = *cost.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
2492 assert!(checkpoints >= 1, "checkpoint port should be invoked at least once");
2493 assert!(llm_calls >= 1, "cost meter should record llm usage");
2494 }
2495
2496 #[tokio::test]
2497 async fn tc17_session_usage_accumulates_and_persists() {
2498 let llm_first = SequentialLlm::from_responses(vec![LlmResponse {
2499 content: r#"{"type":"final","content":"first"}"#.to_string(),
2500 usage: TokenUsage { prompt_tokens: 10, completion_tokens: 5 },
2501 finish_reason: FinishReason::Stop,
2502 tool_calls: Vec::new(),
2503 }]);
2504 let llm_second = SequentialLlm::from_responses(vec![LlmResponse {
2505 content: r#"{"type":"final","content":"second"}"#.to_string(),
2506 usage: TokenUsage { prompt_tokens: 3, completion_tokens: 2 },
2507 finish_reason: FinishReason::Stop,
2508 tool_calls: Vec::new(),
2509 }]);
2510 let tools = MockToolPort::empty();
2511 let store = MemoryStore::new();
2512 let sink = CollectingSink::new();
2513
2514 let first = run_turn(
2515 &llm_first,
2516 &tools,
2517 &store,
2518 &sink,
2519 make_request("hello"),
2520 &generous_policy(),
2521 "test-model",
2522 )
2523 .await;
2524 assert!(first.is_ok(), "first run should succeed");
2525
2526 let second = run_turn(
2527 &llm_second,
2528 &tools,
2529 &store,
2530 &sink,
2531 make_request("again"),
2532 &generous_policy(),
2533 "test-model",
2534 )
2535 .await;
2536 assert!(second.is_ok(), "second run should succeed");
2537
2538 let loaded = store.load(&"test-session".to_string()).await;
2539 assert!(loaded.is_ok(), "session should be persisted");
2540 let state = loaded.ok().flatten();
2541 assert!(state.is_some(), "session state should exist");
2542 let state = state.unwrap_or_default();
2543 assert_eq!(state.total_usage.prompt_tokens, 13);
2544 assert_eq!(state.total_usage.completion_tokens, 7);
2545 }
2546
2547 struct FallbackOnlyLlm {
2548 response: LlmResponse,
2549 }
2550
2551 #[async_trait::async_trait]
2552 impl LlmPort for FallbackOnlyLlm {
2553 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
2554 Ok(self.response.clone())
2555 }
2556
2557 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
2558 Err(LlmError::Provider("streaming not available".to_string()))
2559 }
2560 }
2561
2562 #[tokio::test]
2563 async fn tc18_stream_fallback_does_not_emit_error_event() {
2564 let llm: Arc<dyn LlmPort> = Arc::new(FallbackOnlyLlm {
2565 response: LlmResponse {
2566 content: r#"{"type":"final","content":"done"}"#.to_string(),
2567 usage: TokenUsage { prompt_tokens: 2, completion_tokens: 1 },
2568 finish_reason: FinishReason::Stop,
2569 tool_calls: Vec::new(),
2570 },
2571 });
2572 let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
2573 let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
2574 let sink = Arc::new(CollectingSink::new());
2575 let sink_dyn: Arc<dyn EventSink> = sink.clone();
2576
2577 let stream_result = run_turn_stream(
2578 llm,
2579 tools,
2580 store,
2581 sink_dyn,
2582 make_request("hello"),
2583 generous_policy(),
2584 "test-model".to_string(),
2585 )
2586 .await;
2587 assert!(stream_result.is_ok(), "stream run should succeed with fallback");
2588 let mut stream = match stream_result {
2589 Ok(stream) => stream,
2590 Err(_) => return,
2591 };
2592
2593 while let Some(_event) = stream.next().await {}
2594
2595 let events = sink.all_events();
2596 assert!(
2597 !events.iter().any(|event| matches!(event, AgentEvent::Error { .. })),
2598 "fallback should not emit AgentEvent::Error when complete() succeeds"
2599 );
2600 }
2601
2602 struct NativeStreamingBypassLlm {
2603 complete_calls: std::sync::atomic::AtomicUsize,
2604 complete_stream_calls: std::sync::atomic::AtomicUsize,
2605 response: LlmResponse,
2606 }
2607
2608 #[async_trait::async_trait]
2609 impl LlmPort for NativeStreamingBypassLlm {
2610 fn capabilities(&self) -> bob_core::types::LlmCapabilities {
2611 bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
2612 }
2613
2614 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
2615 self.complete_calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2616 Ok(self.response.clone())
2617 }
2618
2619 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
2620 self.complete_stream_calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2621 Err(LlmError::Provider(
2622 "complete_stream() should be bypassed for native tool calling".to_string(),
2623 ))
2624 }
2625 }
2626
2627 #[tokio::test]
2628 async fn tc19_stream_native_tool_calling_bypasses_complete_stream() {
2629 let llm_impl = Arc::new(NativeStreamingBypassLlm {
2630 complete_calls: std::sync::atomic::AtomicUsize::new(0),
2631 complete_stream_calls: std::sync::atomic::AtomicUsize::new(0),
2632 response: LlmResponse {
2633 content: r#"{"type":"final","content":"done"}"#.to_string(),
2634 usage: TokenUsage { prompt_tokens: 2, completion_tokens: 1 },
2635 finish_reason: FinishReason::Stop,
2636 tool_calls: Vec::new(),
2637 },
2638 });
2639 let llm: Arc<dyn LlmPort> = llm_impl.clone();
2640 let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
2641 let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
2642 let sink = Arc::new(CollectingSink::new());
2643 let sink_dyn: Arc<dyn EventSink> = sink.clone();
2644
2645 let stream_result = run_turn_stream(
2646 llm,
2647 tools,
2648 store,
2649 sink_dyn,
2650 make_request("hello"),
2651 generous_policy(),
2652 "test-model".to_string(),
2653 )
2654 .await;
2655 assert!(
2656 stream_result.is_ok(),
2657 "stream run should succeed through complete() for native tool calling"
2658 );
2659 let mut stream = match stream_result {
2660 Ok(stream) => stream,
2661 Err(_) => return,
2662 };
2663
2664 while let Some(_event) = stream.next().await {}
2665
2666 assert_eq!(
2667 llm_impl.complete_calls.load(std::sync::atomic::Ordering::SeqCst),
2668 1,
2669 "native tool-calling stream path should use complete()"
2670 );
2671 assert_eq!(
2672 llm_impl.complete_stream_calls.load(std::sync::atomic::Ordering::SeqCst),
2673 0,
2674 "native tool-calling stream path should bypass complete_stream()"
2675 );
2676 assert!(
2677 !sink.all_events().iter().any(|event| matches!(event, AgentEvent::Error { .. })),
2678 "native-tool fallback should stay on the success path"
2679 );
2680 }
2681
2682 #[tokio::test]
2683 async fn tc20_non_consecutive_duplicate_tool_calls_are_allowed() {
2684 let llm = SequentialLlm::from_contents(vec![
2685 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2686 r#"{"type":"tool_call","name":"tool_b","arguments":{"q":"docs"}}"#,
2687 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2688 r#"{"type":"final","content":"done"}"#,
2689 ]);
2690 let tools = MockToolPort::with_tool_and_results(
2691 "tool_a",
2692 vec![
2693 Ok(ToolResult {
2694 name: "tool_a".to_string(),
2695 output: serde_json::json!({"ok": 1}),
2696 is_error: false,
2697 }),
2698 Ok(ToolResult {
2699 name: "tool_b".to_string(),
2700 output: serde_json::json!({"ok": 2}),
2701 is_error: false,
2702 }),
2703 Ok(ToolResult {
2704 name: "tool_a".to_string(),
2705 output: serde_json::json!({"ok": 3}),
2706 is_error: false,
2707 }),
2708 ],
2709 );
2710 let store = MemoryStore::new();
2711 let sink = CollectingSink::new();
2712
2713 let result = run_turn(
2714 &llm,
2715 &tools,
2716 &store,
2717 &sink,
2718 make_request("repeat searches"),
2719 &generous_policy(),
2720 "test-model",
2721 )
2722 .await;
2723 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2724 let resp = match result {
2725 Ok(AgentRunResult::Finished(resp)) => resp,
2726 _ => return,
2727 };
2728
2729 assert_eq!(resp.tool_transcript.len(), 3);
2730 assert!(resp.tool_transcript.iter().all(|entry| !entry.is_error));
2731 }
2732
2733 #[tokio::test]
2734 async fn tc21_excessive_consecutive_duplicate_tool_calls_are_blocked() {
2735 let llm = SequentialLlm::from_contents(vec![
2736 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2737 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2738 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2739 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2740 r#"{"type":"final","content":"done"}"#,
2741 ]);
2742 let tools = MockToolPort::with_tool_and_results(
2743 "tool_a",
2744 vec![
2745 Ok(ToolResult {
2746 name: "tool_a".to_string(),
2747 output: serde_json::json!({"ok": 1}),
2748 is_error: false,
2749 }),
2750 Ok(ToolResult {
2751 name: "tool_a".to_string(),
2752 output: serde_json::json!({"ok": 2}),
2753 is_error: false,
2754 }),
2755 Ok(ToolResult {
2756 name: "tool_a".to_string(),
2757 output: serde_json::json!({"ok": 3}),
2758 is_error: false,
2759 }),
2760 ],
2761 );
2762 let store = MemoryStore::new();
2763 let sink = CollectingSink::new();
2764
2765 let result = run_turn(
2766 &llm,
2767 &tools,
2768 &store,
2769 &sink,
2770 make_request("poll repeatedly"),
2771 &generous_policy(),
2772 "test-model",
2773 )
2774 .await;
2775 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2776 let resp = match result {
2777 Ok(AgentRunResult::Finished(resp)) => resp,
2778 _ => return,
2779 };
2780
2781 assert_eq!(resp.tool_transcript.len(), 4);
2782 assert!(!resp.tool_transcript[2].is_error);
2783 assert!(resp.tool_transcript[3].is_error);
2784 assert!(
2785 resp.tool_transcript[3]
2786 .output
2787 .to_string()
2788 .contains("consecutive duplicate tool call limit reached"),
2789 "expected duplicate-call protection error in transcript"
2790 );
2791 }
2792
2793 #[tokio::test]
2794 async fn tc22_duplicate_block_path_records_artifact_and_cost() {
2795 let llm = SequentialLlm::from_contents(vec![
2796 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2797 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2798 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2799 r#"{"type":"tool_call","name":"tool_a","arguments":{"q":"rust"}}"#,
2800 r#"{"type":"final","content":"done"}"#,
2801 ]);
2802 let tools = MockToolPort::with_tool_and_results(
2803 "tool_a",
2804 vec![
2805 Ok(ToolResult {
2806 name: "tool_a".to_string(),
2807 output: serde_json::json!({"ok": 1}),
2808 is_error: false,
2809 }),
2810 Ok(ToolResult {
2811 name: "tool_a".to_string(),
2812 output: serde_json::json!({"ok": 2}),
2813 is_error: false,
2814 }),
2815 Ok(ToolResult {
2816 name: "tool_a".to_string(),
2817 output: serde_json::json!({"ok": 3}),
2818 is_error: false,
2819 }),
2820 ],
2821 );
2822 let store = MemoryStore::new();
2823 let sink = CollectingSink::new();
2824 let checkpoint = CountingCheckpointPort::new();
2825 let artifacts = CountingArtifactStore::new();
2826 let cost = CountingCostMeter::new();
2827 let policy = AllowAllPolicyPort;
2828 let approval = AlwaysApprovePort;
2829
2830 let result = run_turn_with_extensions(
2831 &llm,
2832 &tools,
2833 &store,
2834 &sink,
2835 make_request("poll repeatedly"),
2836 &generous_policy(),
2837 "test-model",
2838 &policy,
2839 &approval,
2840 crate::DispatchMode::PromptGuided,
2841 &checkpoint,
2842 &artifacts,
2843 &cost,
2844 &crate::prompt::WindowContextCompactor::default(),
2845 &crate::NoOpToolJournalPort,
2846 )
2847 .await;
2848 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2849
2850 let tool_results = *cost.tool_results.lock().unwrap_or_else(|p| p.into_inner());
2851 assert_eq!(tool_results, 4, "cost meter should record all tool outcomes");
2852 let saved_artifacts = artifacts.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
2853 assert_eq!(saved_artifacts, 4, "artifact store should record all tool outcomes");
2854 }
2855}