1use std::sync::Arc;
43
44use bob_core::{
45 error::AgentError,
46 normalize_tool_list,
47 ports::{
48 ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
49 ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
50 },
51 types::{
52 AgentAction, AgentEvent, AgentEventStream, AgentRequest, AgentResponse, AgentRunResult,
53 AgentStreamEvent, ApprovalContext, ApprovalDecision, ArtifactRecord, FinishReason,
54 GuardReason, Message, Role, TokenUsage, ToolCall, ToolResult, TurnCheckpoint, TurnPolicy,
55 },
56};
57use futures_util::StreamExt;
58use tokio::time::Instant;
59use tokio_stream::wrappers::UnboundedReceiverStream;
60
61#[derive(Debug)]
64pub struct LoopGuard {
65 policy: TurnPolicy,
66 steps: u32,
67 tool_calls: u32,
68 consecutive_errors: u32,
69 start: Instant,
70}
71
72impl LoopGuard {
73 #[must_use]
75 pub fn new(policy: TurnPolicy) -> Self {
76 Self { policy, steps: 0, tool_calls: 0, consecutive_errors: 0, start: Instant::now() }
77 }
78
79 #[must_use]
81 pub fn can_continue(&self) -> bool {
82 self.steps < self.policy.max_steps &&
83 self.tool_calls < self.policy.max_tool_calls &&
84 self.consecutive_errors < self.policy.max_consecutive_errors &&
85 !self.timed_out()
86 }
87
88 pub fn record_step(&mut self) {
90 self.steps += 1;
91 }
92
93 pub fn record_tool_call(&mut self) {
95 self.tool_calls += 1;
96 }
97
98 pub fn record_error(&mut self) {
100 self.consecutive_errors += 1;
101 }
102
103 pub fn reset_errors(&mut self) {
105 self.consecutive_errors = 0;
106 }
107
108 #[must_use]
112 pub fn reason(&self) -> GuardReason {
113 if self.steps >= self.policy.max_steps {
114 GuardReason::MaxSteps
115 } else if self.tool_calls >= self.policy.max_tool_calls {
116 GuardReason::MaxToolCalls
117 } else if self.consecutive_errors >= self.policy.max_consecutive_errors {
118 GuardReason::MaxConsecutiveErrors
119 } else if self.timed_out() {
120 GuardReason::TurnTimeout
121 } else {
122 GuardReason::Cancelled
124 }
125 }
126
127 #[must_use]
129 pub fn timed_out(&self) -> bool {
130 self.start.elapsed().as_millis() >= u128::from(self.policy.turn_timeout_ms)
131 }
132}
133
134const DEFAULT_SYSTEM_INSTRUCTIONS: &str = "\
137You are a helpful AI assistant. \
138Think step by step before answering. \
139When you need external information, use the available tools.";
140
141fn resolve_system_instructions(req: &AgentRequest) -> String {
142 let Some(skills_prompt) = req.context.system_prompt.as_deref() else {
143 return DEFAULT_SYSTEM_INSTRUCTIONS.to_string();
144 };
145
146 if skills_prompt.trim().is_empty() {
147 DEFAULT_SYSTEM_INSTRUCTIONS.to_string()
148 } else {
149 format!("{DEFAULT_SYSTEM_INSTRUCTIONS}\n\n{skills_prompt}")
150 }
151}
152
153fn resolve_selected_skills(req: &AgentRequest) -> Vec<String> {
154 req.context.selected_skills.clone()
155}
156
157#[derive(Debug, Clone, Default)]
158struct ToolCallPolicy {
159 deny_tools: Vec<String>,
160 allow_tools: Option<Vec<String>>,
161}
162
163fn resolve_tool_call_policy(req: &AgentRequest) -> ToolCallPolicy {
164 let deny_tools =
165 normalize_tool_list(req.context.tool_policy.deny_tools.iter().map(String::as_str));
166 let allow_tools = req
167 .context
168 .tool_policy
169 .allow_tools
170 .as_ref()
171 .map(|tools| normalize_tool_list(tools.iter().map(String::as_str)));
172 ToolCallPolicy { deny_tools, allow_tools }
173}
174
175fn prompt_options_for_mode(
176 dispatch_mode: crate::DispatchMode,
177 llm: &dyn LlmPort,
178) -> crate::prompt::PromptBuildOptions {
179 match dispatch_mode {
180 crate::DispatchMode::PromptGuided => crate::prompt::PromptBuildOptions::default(),
181 crate::DispatchMode::NativePreferred => {
182 if llm.capabilities().native_tool_calling {
183 crate::prompt::PromptBuildOptions {
184 include_action_schema: false,
185 include_tool_schema: false,
186 }
187 } else {
188 crate::prompt::PromptBuildOptions::default()
189 }
190 }
191 }
192}
193
194fn parse_action_for_mode(
195 dispatch_mode: crate::DispatchMode,
196 llm: &dyn LlmPort,
197 response: &bob_core::types::LlmResponse,
198) -> Result<AgentAction, crate::action::ActionParseError> {
199 match dispatch_mode {
200 crate::DispatchMode::PromptGuided => crate::action::parse_action(&response.content),
201 crate::DispatchMode::NativePreferred => {
202 if llm.capabilities().native_tool_calling &&
203 let Some(tool_call) = response.tool_calls.first()
204 {
205 return Ok(AgentAction::ToolCall {
206 name: tool_call.name.clone(),
207 arguments: tool_call.arguments.clone(),
208 });
209 }
210 crate::action::parse_action(&response.content)
211 }
212 }
213}
214
215#[expect(
216 clippy::too_many_arguments,
217 reason = "tool execution needs explicit policy, approval, and timeout dependencies"
218)]
219async fn execute_tool_call(
220 tools: &dyn ToolPort,
221 guard: &mut LoopGuard,
222 tool_call: ToolCall,
223 policy: &ToolCallPolicy,
224 tool_policy_port: &dyn ToolPolicyPort,
225 approval_port: &dyn ApprovalPort,
226 approval_context: &ApprovalContext,
227 timeout_ms: u64,
228) -> ToolResult {
229 if !tool_policy_port.is_tool_allowed(
230 &tool_call.name,
231 &policy.deny_tools,
232 policy.allow_tools.as_deref(),
233 ) {
234 guard.record_error();
235 return ToolResult {
236 name: tool_call.name.clone(),
237 output: serde_json::json!({
238 "error": format!("tool '{}' denied by policy", tool_call.name)
239 }),
240 is_error: true,
241 };
242 }
243
244 match approval_port.approve_tool_call(&tool_call, approval_context).await {
245 Ok(ApprovalDecision::Approved) => {}
246 Ok(ApprovalDecision::Denied { reason }) => {
247 guard.record_error();
248 return ToolResult {
249 name: tool_call.name.clone(),
250 output: serde_json::json!({"error": reason}),
251 is_error: true,
252 };
253 }
254 Err(err) => {
255 guard.record_error();
256 return ToolResult {
257 name: tool_call.name.clone(),
258 output: serde_json::json!({"error": err.to_string()}),
259 is_error: true,
260 };
261 }
262 }
263
264 match tokio::time::timeout(
265 std::time::Duration::from_millis(timeout_ms),
266 tools.call_tool(tool_call.clone()),
267 )
268 .await
269 {
270 Ok(Ok(result)) => {
271 guard.reset_errors();
272 result
273 }
274 Ok(Err(err)) => {
275 guard.record_error();
276 ToolResult {
277 name: tool_call.name,
278 output: serde_json::json!({"error": err.to_string()}),
279 is_error: true,
280 }
281 }
282 Err(_) => {
283 guard.record_error();
284 ToolResult {
285 name: tool_call.name,
286 output: serde_json::json!({"error": "tool call timed out"}),
287 is_error: true,
288 }
289 }
290 }
291}
292
293pub async fn run_turn(
300 llm: &dyn LlmPort,
301 tools: &dyn ToolPort,
302 store: &dyn SessionStore,
303 events: &dyn EventSink,
304 req: AgentRequest,
305 policy: &TurnPolicy,
306 default_model: &str,
307) -> Result<AgentRunResult, AgentError> {
308 let tool_policy = crate::DefaultToolPolicyPort;
309 let approval = crate::AllowAllApprovalPort;
310 let checkpoint_store = crate::NoOpCheckpointStorePort;
311 let artifact_store = crate::NoOpArtifactStorePort;
312 let cost_meter = crate::NoOpCostMeterPort;
313 run_turn_with_extensions(
314 llm,
315 tools,
316 store,
317 events,
318 req,
319 policy,
320 default_model,
321 &tool_policy,
322 &approval,
323 crate::DispatchMode::NativePreferred,
324 &checkpoint_store,
325 &artifact_store,
326 &cost_meter,
327 )
328 .await
329}
330
331#[cfg_attr(
333 not(test),
334 expect(
335 dead_code,
336 reason = "reserved wrapper for partial control injection in external integrations"
337 )
338)]
339#[expect(
340 clippy::too_many_arguments,
341 reason = "wrapper exposes explicit dependency ports for compatibility and testability"
342)]
343pub(crate) async fn run_turn_with_controls(
344 llm: &dyn LlmPort,
345 tools: &dyn ToolPort,
346 store: &dyn SessionStore,
347 events: &dyn EventSink,
348 req: AgentRequest,
349 policy: &TurnPolicy,
350 default_model: &str,
351 tool_policy_port: &dyn ToolPolicyPort,
352 approval_port: &dyn ApprovalPort,
353) -> Result<AgentRunResult, AgentError> {
354 let checkpoint_store = crate::NoOpCheckpointStorePort;
355 let artifact_store = crate::NoOpArtifactStorePort;
356 let cost_meter = crate::NoOpCostMeterPort;
357 run_turn_with_extensions(
358 llm,
359 tools,
360 store,
361 events,
362 req,
363 policy,
364 default_model,
365 tool_policy_port,
366 approval_port,
367 crate::DispatchMode::PromptGuided,
368 &checkpoint_store,
369 &artifact_store,
370 &cost_meter,
371 )
372 .await
373}
374
375#[expect(
377 clippy::too_many_arguments,
378 reason = "core entrypoint exposes all ports explicitly for adapter injection"
379)]
380pub(crate) async fn run_turn_with_extensions(
381 llm: &dyn LlmPort,
382 tools: &dyn ToolPort,
383 store: &dyn SessionStore,
384 events: &dyn EventSink,
385 req: AgentRequest,
386 policy: &TurnPolicy,
387 default_model: &str,
388 tool_policy_port: &dyn ToolPolicyPort,
389 approval_port: &dyn ApprovalPort,
390 dispatch_mode: crate::DispatchMode,
391 checkpoint_store: &dyn TurnCheckpointStorePort,
392 artifact_store: &dyn ArtifactStorePort,
393 cost_meter: &dyn CostMeterPort,
394) -> Result<AgentRunResult, AgentError> {
395 let model = req.model.as_deref().unwrap_or(default_model);
396 let cancel_token = req.cancel_token.clone();
397 let system_instructions = resolve_system_instructions(&req);
398 let selected_skills = resolve_selected_skills(&req);
399 let tool_call_policy = resolve_tool_call_policy(&req);
400
401 let mut session = store.load(&req.session_id).await?.unwrap_or_default();
402 let tool_descriptors = tools.list_tools().await?;
403 let mut guard = LoopGuard::new(policy.clone());
404
405 let mut tool_view = crate::progressive_tools::ProgressiveToolView::new(tool_descriptors);
407
408 events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
409 if !selected_skills.is_empty() {
410 events.emit(AgentEvent::SkillsSelected { skill_names: selected_skills.clone() });
411 }
412
413 session.messages.push(Message { role: Role::User, content: req.input.clone() });
414
415 let mut tool_transcript: Vec<ToolResult> = Vec::new();
416 let mut total_usage = TokenUsage::default();
417 let mut consecutive_parse_failures: u32 = 0;
418 let mut seen_tool_calls: std::collections::HashSet<String> = std::collections::HashSet::new();
420
421 loop {
422 if let Some(ref token) = cancel_token &&
423 token.is_cancelled()
424 {
425 return finish_turn(
426 store,
427 events,
428 &req.session_id,
429 &session,
430 FinishResult {
431 content: "Turn cancelled.",
432 tool_transcript,
433 usage: total_usage,
434 finish_reason: FinishReason::Cancelled,
435 },
436 )
437 .await;
438 }
439
440 cost_meter.check_budget(&req.session_id).await?;
441
442 if !guard.can_continue() {
443 let reason = guard.reason();
444 let msg = format!("Turn stopped: {reason:?}");
445 return finish_turn(
446 store,
447 events,
448 &req.session_id,
449 &session,
450 FinishResult {
451 content: &msg,
452 tool_transcript,
453 usage: total_usage,
454 finish_reason: FinishReason::GuardExceeded,
455 },
456 )
457 .await;
458 }
459
460 let mut augmented_instructions = system_instructions.clone();
462 let tool_summary = tool_view.summary_prompt();
463 if !tool_summary.is_empty() {
464 augmented_instructions.push('\n');
465 augmented_instructions.push('\n');
466 augmented_instructions.push_str(&tool_summary);
467 }
468
469 let active_tools = tool_view.activated_tools();
470 let llm_request = crate::prompt::build_llm_request_with_options(
471 model,
472 &session,
473 &active_tools,
474 &augmented_instructions,
475 prompt_options_for_mode(dispatch_mode, llm),
476 );
477
478 events.emit(AgentEvent::LlmCallStarted { model: model.to_string() });
479
480 let llm_response = if let Some(ref token) = cancel_token {
481 tokio::select! {
482 result = llm.complete(llm_request) => result?,
483 () = token.cancelled() => {
484 return finish_turn(
485 store, events, &req.session_id, &session,
486 FinishResult { content: "Turn cancelled.", tool_transcript, usage: total_usage, finish_reason: FinishReason::Cancelled },
487 ).await;
488 }
489 }
490 } else {
491 llm.complete(llm_request).await?
492 };
493
494 guard.record_step();
495 total_usage.prompt_tokens += llm_response.usage.prompt_tokens;
496 total_usage.completion_tokens += llm_response.usage.completion_tokens;
497 cost_meter.record_llm_usage(&req.session_id, model, &llm_response.usage).await?;
498
499 events.emit(AgentEvent::LlmCallCompleted { usage: llm_response.usage.clone() });
500
501 tool_view.activate_hints(&llm_response.content);
503
504 session
505 .messages
506 .push(Message { role: Role::Assistant, content: llm_response.content.clone() });
507
508 let _ = checkpoint_store
509 .save_checkpoint(&TurnCheckpoint {
510 session_id: req.session_id.clone(),
511 step: guard.steps,
512 tool_calls: guard.tool_calls,
513 usage: total_usage.clone(),
514 })
515 .await;
516
517 match parse_action_for_mode(dispatch_mode, llm, &llm_response) {
518 Ok(action) => {
519 consecutive_parse_failures = 0;
520 match action {
521 AgentAction::Final { content } => {
522 return finish_turn(
523 store,
524 events,
525 &req.session_id,
526 &session,
527 FinishResult {
528 content: &content,
529 tool_transcript,
530 usage: total_usage,
531 finish_reason: FinishReason::Stop,
532 },
533 )
534 .await;
535 }
536 AgentAction::AskUser { question } => {
537 return finish_turn(
538 store,
539 events,
540 &req.session_id,
541 &session,
542 FinishResult {
543 content: &question,
544 tool_transcript,
545 usage: total_usage,
546 finish_reason: FinishReason::Stop,
547 },
548 )
549 .await;
550 }
551 AgentAction::ToolCall { name, arguments } => {
552 tool_view.activate(&name);
554
555 let call_signature = format!(
557 "{}:{}",
558 name,
559 serde_json::to_string(&arguments).unwrap_or_default()
560 );
561 if !seen_tool_calls.insert(call_signature) {
562 let dup_result = ToolResult {
563 name: name.clone(),
564 output: serde_json::json!({
565 "error": "duplicate tool call detected (same name + arguments) — skipping to prevent loop"
566 }),
567 is_error: true,
568 };
569 guard.record_tool_call();
570 let output_str =
571 serde_json::to_string(&dup_result.output).unwrap_or_default();
572 session
573 .messages
574 .push(Message { role: Role::Tool, content: output_str });
575 events.emit(AgentEvent::ToolCallCompleted { name, is_error: true });
576 tool_transcript.push(dup_result);
577 continue;
578 }
579
580 events.emit(AgentEvent::ToolCallStarted { name: name.clone() });
581 let approval_context = ApprovalContext {
582 session_id: req.session_id.clone(),
583 turn_step: guard.steps.max(1),
584 selected_skills: selected_skills.clone(),
585 };
586
587 let tool_result = execute_tool_call(
588 tools,
589 &mut guard,
590 ToolCall { name: name.clone(), arguments },
591 &tool_call_policy,
592 tool_policy_port,
593 approval_port,
594 &approval_context,
595 policy.tool_timeout_ms,
596 )
597 .await;
598
599 guard.record_tool_call();
600 let _ = cost_meter.record_tool_result(&req.session_id, &tool_result).await;
601
602 let is_error = tool_result.is_error;
603 events.emit(AgentEvent::ToolCallCompleted { name: name.clone(), is_error });
604
605 let output_str =
606 serde_json::to_string(&tool_result.output).unwrap_or_default();
607 session.messages.push(Message { role: Role::Tool, content: output_str });
608
609 let _ = artifact_store
610 .put(ArtifactRecord {
611 session_id: req.session_id.clone(),
612 kind: "tool_result".to_string(),
613 name: name.clone(),
614 content: tool_result.output.clone(),
615 })
616 .await;
617
618 tool_transcript.push(tool_result);
619 }
620 }
621 }
622 Err(_parse_err) => {
623 consecutive_parse_failures += 1;
624 if consecutive_parse_failures >= 2 {
625 let _ = store.save(&req.session_id, &session).await;
626 return Err(AgentError::Internal(
627 "LLM produced invalid JSON after re-prompt".into(),
628 ));
629 }
630 session.messages.push(Message {
631 role: Role::User,
632 content: "Your response was not valid JSON. \
633 Please respond with exactly one JSON object \
634 matching the required schema."
635 .into(),
636 });
637 }
638 }
639 }
640}
641
642struct FinishResult<'a> {
644 content: &'a str,
645 tool_transcript: Vec<ToolResult>,
646 usage: TokenUsage,
647 finish_reason: FinishReason,
648}
649
650async fn finish_turn(
652 store: &dyn SessionStore,
653 events: &dyn EventSink,
654 session_id: &bob_core::types::SessionId,
655 session: &bob_core::types::SessionState,
656 result: FinishResult<'_>,
657) -> Result<AgentRunResult, AgentError> {
658 store.save(session_id, session).await?;
659 events.emit(AgentEvent::TurnCompleted { finish_reason: result.finish_reason });
660 Ok(AgentRunResult::Finished(AgentResponse {
661 content: result.content.to_string(),
662 tool_transcript: result.tool_transcript,
663 usage: result.usage,
664 finish_reason: result.finish_reason,
665 }))
666}
667
668pub async fn run_turn_stream(
670 llm: Arc<dyn LlmPort>,
671 tools: Arc<dyn ToolPort>,
672 store: Arc<dyn SessionStore>,
673 events: Arc<dyn EventSink>,
674 req: AgentRequest,
675 policy: TurnPolicy,
676 default_model: String,
677) -> Result<AgentEventStream, AgentError> {
678 let tool_policy: Arc<dyn ToolPolicyPort> = Arc::new(crate::DefaultToolPolicyPort);
679 let approval: Arc<dyn ApprovalPort> = Arc::new(crate::AllowAllApprovalPort);
680 let checkpoint_store: Arc<dyn TurnCheckpointStorePort> =
681 Arc::new(crate::NoOpCheckpointStorePort);
682 let artifact_store: Arc<dyn ArtifactStorePort> = Arc::new(crate::NoOpArtifactStorePort);
683 let cost_meter: Arc<dyn CostMeterPort> = Arc::new(crate::NoOpCostMeterPort);
684 run_turn_stream_with_controls(
685 llm,
686 tools,
687 store,
688 events,
689 req,
690 policy,
691 default_model,
692 tool_policy,
693 approval,
694 crate::DispatchMode::NativePreferred,
695 checkpoint_store,
696 artifact_store,
697 cost_meter,
698 )
699 .await
700}
701
702#[expect(
704 clippy::too_many_arguments,
705 reason = "streaming entrypoint exposes all ports and controls explicitly for composition roots"
706)]
707pub(crate) async fn run_turn_stream_with_controls(
708 llm: Arc<dyn LlmPort>,
709 tools: Arc<dyn ToolPort>,
710 store: Arc<dyn SessionStore>,
711 events: Arc<dyn EventSink>,
712 req: AgentRequest,
713 policy: TurnPolicy,
714 default_model: String,
715 tool_policy: Arc<dyn ToolPolicyPort>,
716 approval: Arc<dyn ApprovalPort>,
717 dispatch_mode: crate::DispatchMode,
718 checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
719 artifact_store: Arc<dyn ArtifactStorePort>,
720 cost_meter: Arc<dyn CostMeterPort>,
721) -> Result<AgentEventStream, AgentError> {
722 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<AgentStreamEvent>();
723 let config = StreamRunConfig {
724 policy,
725 default_model,
726 tool_policy,
727 approval,
728 dispatch_mode,
729 checkpoint_store,
730 artifact_store,
731 cost_meter,
732 };
733
734 tokio::spawn(async move {
735 if let Err(err) = run_turn_stream_inner(llm, tools, store, events, req, &config, &tx).await
736 {
737 let _ = tx.send(AgentStreamEvent::Error { error: err.to_string() });
738 }
739 });
740
741 Ok(Box::pin(UnboundedReceiverStream::new(rx)))
742}
743
744struct StreamRunConfig {
745 policy: TurnPolicy,
746 default_model: String,
747 tool_policy: Arc<dyn ToolPolicyPort>,
748 approval: Arc<dyn ApprovalPort>,
749 dispatch_mode: crate::DispatchMode,
750 checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
751 artifact_store: Arc<dyn ArtifactStorePort>,
752 cost_meter: Arc<dyn CostMeterPort>,
753}
754
755async fn run_turn_stream_inner(
756 llm: Arc<dyn LlmPort>,
757 tools: Arc<dyn ToolPort>,
758 store: Arc<dyn SessionStore>,
759 events: Arc<dyn EventSink>,
760 req: AgentRequest,
761 config: &StreamRunConfig,
762 tx: &tokio::sync::mpsc::UnboundedSender<AgentStreamEvent>,
763) -> Result<(), AgentError> {
764 let model = req.model.as_deref().unwrap_or(&config.default_model);
765 let cancel_token = req.cancel_token.clone();
766 let system_instructions = resolve_system_instructions(&req);
767 let selected_skills = resolve_selected_skills(&req);
768 let tool_call_policy = resolve_tool_call_policy(&req);
769
770 let mut session = store.load(&req.session_id).await?.unwrap_or_default();
771 let tool_descriptors = tools.list_tools().await?;
772 let mut guard = LoopGuard::new(config.policy.clone());
773 let mut total_usage = TokenUsage::default();
774 let mut consecutive_parse_failures: u32 = 0;
775 let mut next_call_id: u64 = 0;
776
777 events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
778 if !selected_skills.is_empty() {
779 events.emit(AgentEvent::SkillsSelected { skill_names: selected_skills.clone() });
780 }
781 session.messages.push(Message { role: Role::User, content: req.input.clone() });
782
783 loop {
784 if let Some(ref token) = cancel_token &&
785 token.is_cancelled()
786 {
787 events.emit(AgentEvent::Error { error: "turn cancelled".to_string() });
788 events.emit(AgentEvent::TurnCompleted { finish_reason: FinishReason::Cancelled });
789 store.save(&req.session_id, &session).await?;
790 let _ = tx.send(AgentStreamEvent::Error { error: "turn cancelled".to_string() });
791 let _ = tx.send(AgentStreamEvent::Finished { usage: total_usage.clone() });
792 return Ok(());
793 }
794
795 config.cost_meter.check_budget(&req.session_id).await?;
796
797 if !guard.can_continue() {
798 let reason = guard.reason();
799 let msg = format!("Turn stopped: {reason:?}");
800 events.emit(AgentEvent::Error { error: msg.clone() });
801 events.emit(AgentEvent::TurnCompleted { finish_reason: FinishReason::GuardExceeded });
802 store.save(&req.session_id, &session).await?;
803 let _ = tx.send(AgentStreamEvent::Error { error: msg });
804 let _ = tx.send(AgentStreamEvent::Finished { usage: total_usage.clone() });
805 return Ok(());
806 }
807
808 let llm_request = crate::prompt::build_llm_request_with_options(
809 model,
810 &session,
811 &tool_descriptors,
812 &system_instructions,
813 prompt_options_for_mode(config.dispatch_mode, llm.as_ref()),
814 );
815 events.emit(AgentEvent::LlmCallStarted { model: model.to_string() });
816
817 let mut assistant_content = String::new();
818 let mut llm_usage = TokenUsage::default();
819 let mut llm_tool_calls: Vec<ToolCall> = Vec::new();
820 let mut fallback_to_complete = false;
821
822 match llm.complete_stream(llm_request.clone()).await {
823 Ok(mut stream) => {
824 while let Some(item) = stream.next().await {
825 match item {
826 Ok(bob_core::types::LlmStreamChunk::TextDelta(delta)) => {
827 assistant_content.push_str(&delta);
828 let _ = tx.send(AgentStreamEvent::TextDelta { content: delta });
829 }
830 Ok(bob_core::types::LlmStreamChunk::Done { usage }) => {
831 llm_usage = usage;
832 }
833 Err(err) => {
834 events.emit(AgentEvent::Error { error: err.to_string() });
835 return Err(AgentError::Llm(err));
836 }
837 }
838 }
839 }
840 Err(err) => {
841 fallback_to_complete = true;
842 events.emit(AgentEvent::Error { error: err.to_string() });
843 }
844 }
845
846 if fallback_to_complete {
848 let llm_response = llm.complete(llm_request).await?;
849 assistant_content = llm_response.content.clone();
850 llm_usage = llm_response.usage;
851 llm_tool_calls = llm_response.tool_calls;
852 let _ = tx.send(AgentStreamEvent::TextDelta { content: llm_response.content });
853 }
854
855 guard.record_step();
856 total_usage.prompt_tokens += llm_usage.prompt_tokens;
857 total_usage.completion_tokens += llm_usage.completion_tokens;
858 config.cost_meter.record_llm_usage(&req.session_id, model, &llm_usage).await?;
859 events.emit(AgentEvent::LlmCallCompleted { usage: llm_usage.clone() });
860 session
861 .messages
862 .push(Message { role: Role::Assistant, content: assistant_content.clone() });
863
864 let _ = config
865 .checkpoint_store
866 .save_checkpoint(&TurnCheckpoint {
867 session_id: req.session_id.clone(),
868 step: guard.steps,
869 tool_calls: guard.tool_calls,
870 usage: total_usage.clone(),
871 })
872 .await;
873
874 let response_for_dispatch = bob_core::types::LlmResponse {
875 content: assistant_content.clone(),
876 usage: llm_usage.clone(),
877 finish_reason: FinishReason::Stop,
878 tool_calls: llm_tool_calls,
879 };
880
881 if let Ok(action) =
882 parse_action_for_mode(config.dispatch_mode, llm.as_ref(), &response_for_dispatch)
883 {
884 consecutive_parse_failures = 0;
885 match action {
886 AgentAction::Final { .. } | AgentAction::AskUser { .. } => {
887 store.save(&req.session_id, &session).await?;
888 events.emit(AgentEvent::TurnCompleted { finish_reason: FinishReason::Stop });
889 let _ = tx.send(AgentStreamEvent::Finished { usage: total_usage.clone() });
890 return Ok(());
891 }
892 AgentAction::ToolCall { name, arguments } => {
893 events.emit(AgentEvent::ToolCallStarted { name: name.clone() });
894 next_call_id += 1;
895 let call_id = format!("call-{next_call_id}");
896 let _ = tx.send(AgentStreamEvent::ToolCallStarted {
897 name: name.clone(),
898 call_id: call_id.clone(),
899 });
900 let approval_context = ApprovalContext {
901 session_id: req.session_id.clone(),
902 turn_step: guard.steps.max(1),
903 selected_skills: selected_skills.clone(),
904 };
905
906 let tool_result = execute_tool_call(
907 tools.as_ref(),
908 &mut guard,
909 ToolCall { name: name.clone(), arguments },
910 &tool_call_policy,
911 config.tool_policy.as_ref(),
912 config.approval.as_ref(),
913 &approval_context,
914 config.policy.tool_timeout_ms,
915 )
916 .await;
917
918 guard.record_tool_call();
919 let _ =
920 config.cost_meter.record_tool_result(&req.session_id, &tool_result).await;
921 let is_error = tool_result.is_error;
922 events.emit(AgentEvent::ToolCallCompleted { name: name.clone(), is_error });
923 let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
924 call_id,
925 result: tool_result.clone(),
926 });
927
928 let output_str = serde_json::to_string(&tool_result.output).unwrap_or_default();
929 session.messages.push(Message { role: Role::Tool, content: output_str });
930 let _ = config
931 .artifact_store
932 .put(ArtifactRecord {
933 session_id: req.session_id.clone(),
934 kind: "tool_result".to_string(),
935 name: name.clone(),
936 content: tool_result.output.clone(),
937 })
938 .await;
939 }
940 }
941 } else {
942 consecutive_parse_failures += 1;
943 if consecutive_parse_failures >= 2 {
944 store.save(&req.session_id, &session).await?;
945 events.emit(AgentEvent::Error {
946 error: "LLM produced invalid JSON after re-prompt".to_string(),
947 });
948 return Err(AgentError::Internal(
949 "LLM produced invalid JSON after re-prompt".into(),
950 ));
951 }
952 session.messages.push(Message {
953 role: Role::User,
954 content: "Your response was not valid JSON. \
955 Please respond with exactly one JSON object \
956 matching the required schema."
957 .into(),
958 });
959 }
960 }
961}
962
963#[cfg(test)]
964mod tests {
965 use super::*;
966
967 fn test_policy() -> TurnPolicy {
969 TurnPolicy {
970 max_steps: 3,
971 max_tool_calls: 2,
972 max_consecutive_errors: 2,
973 turn_timeout_ms: 100,
974 tool_timeout_ms: 50,
975 }
976 }
977
978 #[test]
979 fn trips_on_max_steps() {
980 let mut guard = LoopGuard::new(test_policy());
981 assert!(guard.can_continue());
982
983 for _ in 0..3 {
984 guard.record_step();
985 }
986
987 assert!(!guard.can_continue(), "guard should trip after reaching max_steps");
988 assert_eq!(guard.reason(), GuardReason::MaxSteps);
989 }
990
991 #[test]
992 fn trips_on_max_tool_calls() {
993 let mut guard = LoopGuard::new(test_policy());
994 assert!(guard.can_continue());
995
996 for _ in 0..2 {
997 guard.record_tool_call();
998 }
999
1000 assert!(!guard.can_continue(), "guard should trip after reaching max_tool_calls");
1001 assert_eq!(guard.reason(), GuardReason::MaxToolCalls);
1002 }
1003
1004 #[test]
1005 fn trips_on_max_consecutive_errors() {
1006 let mut guard = LoopGuard::new(test_policy());
1007 assert!(guard.can_continue());
1008
1009 for _ in 0..2 {
1010 guard.record_error();
1011 }
1012
1013 assert!(!guard.can_continue(), "guard should trip after reaching max_consecutive_errors");
1014 assert_eq!(guard.reason(), GuardReason::MaxConsecutiveErrors);
1015 }
1016
1017 #[tokio::test]
1018 async fn trips_on_timeout() {
1019 let guard = LoopGuard::new(test_policy());
1020 assert!(guard.can_continue());
1021 assert!(!guard.timed_out());
1022
1023 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
1025
1026 assert!(!guard.can_continue(), "guard should trip after timeout");
1027 assert!(guard.timed_out());
1028 assert_eq!(guard.reason(), GuardReason::TurnTimeout);
1029 }
1030
1031 #[test]
1032 fn reset_errors_clears_counter() {
1033 let mut guard = LoopGuard::new(test_policy());
1034
1035 guard.record_error();
1036 guard.reset_errors();
1037
1038 guard.record_error();
1040 assert!(guard.can_continue(), "single error after reset should not trip guard");
1041 }
1042
1043 use std::{
1046 collections::{HashMap, VecDeque},
1047 sync::{Arc, Mutex},
1048 };
1049
1050 use bob_core::{
1051 error::{CostError, LlmError, StoreError, ToolError},
1052 ports::{
1053 ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
1054 ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
1055 },
1056 types::{
1057 AgentEvent, AgentRequest, AgentRunResult, AgentStreamEvent, ApprovalContext,
1058 ApprovalDecision, ArtifactRecord, CancelToken, LlmRequest, LlmResponse, LlmStream,
1059 LlmStreamChunk, SessionId, SessionState, ToolCall, ToolDescriptor, ToolResult,
1060 ToolSource, TurnCheckpoint,
1061 },
1062 };
1063 use futures_util::StreamExt;
1064
1065 struct SequentialLlm {
1069 responses: Mutex<VecDeque<Result<LlmResponse, LlmError>>>,
1070 }
1071
1072 impl SequentialLlm {
1073 fn from_contents(contents: Vec<&str>) -> Self {
1074 let responses = contents
1075 .into_iter()
1076 .map(|c| {
1077 Ok(LlmResponse {
1078 content: c.to_string(),
1079 usage: TokenUsage::default(),
1080 finish_reason: FinishReason::Stop,
1081 tool_calls: Vec::new(),
1082 })
1083 })
1084 .collect();
1085 Self { responses: Mutex::new(responses) }
1086 }
1087 }
1088
1089 #[async_trait::async_trait]
1090 impl LlmPort for SequentialLlm {
1091 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1092 let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
1093 q.pop_front().unwrap_or_else(|| {
1094 Ok(LlmResponse {
1095 content: r#"{"type": "final", "content": "fallback"}"#.to_string(),
1096 usage: TokenUsage::default(),
1097 finish_reason: FinishReason::Stop,
1098 tool_calls: Vec::new(),
1099 })
1100 })
1101 }
1102
1103 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1104 Err(LlmError::Provider("not implemented".into()))
1105 }
1106 }
1107
1108 struct MockToolPort {
1110 tools: Vec<ToolDescriptor>,
1111 call_results: Mutex<VecDeque<Result<ToolResult, ToolError>>>,
1112 }
1113
1114 impl MockToolPort {
1115 fn empty() -> Self {
1116 Self { tools: vec![], call_results: Mutex::new(VecDeque::new()) }
1117 }
1118
1119 fn with_tool_and_results(
1120 tool_name: &str,
1121 results: Vec<Result<ToolResult, ToolError>>,
1122 ) -> Self {
1123 Self {
1124 tools: vec![ToolDescriptor {
1125 id: tool_name.to_string(),
1126 description: format!("{tool_name} tool"),
1127 input_schema: serde_json::json!({"type": "object"}),
1128 source: ToolSource::Local,
1129 }],
1130 call_results: Mutex::new(results.into()),
1131 }
1132 }
1133 }
1134
1135 #[async_trait::async_trait]
1136 impl ToolPort for MockToolPort {
1137 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1138 Ok(self.tools.clone())
1139 }
1140
1141 async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
1142 let mut q = self.call_results.lock().unwrap_or_else(|p| p.into_inner());
1143 q.pop_front().unwrap_or_else(|| {
1144 Ok(ToolResult {
1145 name: call.name,
1146 output: serde_json::json!({"result": "default"}),
1147 is_error: false,
1148 })
1149 })
1150 }
1151 }
1152
1153 struct NoCallToolPort {
1154 tools: Vec<ToolDescriptor>,
1155 }
1156
1157 #[async_trait::async_trait]
1158 impl ToolPort for NoCallToolPort {
1159 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1160 Ok(self.tools.clone())
1161 }
1162
1163 async fn call_tool(&self, _call: ToolCall) -> Result<ToolResult, ToolError> {
1164 Err(ToolError::Execution(
1165 "tool call should be blocked by policy before execution".to_string(),
1166 ))
1167 }
1168 }
1169
1170 struct AllowAllPolicyPort;
1171
1172 impl ToolPolicyPort for AllowAllPolicyPort {
1173 fn is_tool_allowed(
1174 &self,
1175 _tool: &str,
1176 _deny_tools: &[String],
1177 _allow_tools: Option<&[String]>,
1178 ) -> bool {
1179 true
1180 }
1181 }
1182
1183 struct DenySearchPolicyPort;
1184
1185 impl ToolPolicyPort for DenySearchPolicyPort {
1186 fn is_tool_allowed(
1187 &self,
1188 tool: &str,
1189 _deny_tools: &[String],
1190 _allow_tools: Option<&[String]>,
1191 ) -> bool {
1192 tool != "search"
1193 }
1194 }
1195
1196 struct AlwaysApprovePort;
1197
1198 #[async_trait::async_trait]
1199 impl ApprovalPort for AlwaysApprovePort {
1200 async fn approve_tool_call(
1201 &self,
1202 _call: &ToolCall,
1203 _context: &ApprovalContext,
1204 ) -> Result<ApprovalDecision, ToolError> {
1205 Ok(ApprovalDecision::Approved)
1206 }
1207 }
1208
1209 struct AlwaysDenyApprovalPort;
1210
1211 #[async_trait::async_trait]
1212 impl ApprovalPort for AlwaysDenyApprovalPort {
1213 async fn approve_tool_call(
1214 &self,
1215 _call: &ToolCall,
1216 _context: &ApprovalContext,
1217 ) -> Result<ApprovalDecision, ToolError> {
1218 Ok(ApprovalDecision::Denied {
1219 reason: "approval policy rejected tool call".to_string(),
1220 })
1221 }
1222 }
1223
1224 struct CountingCheckpointPort {
1225 saved: Mutex<Vec<TurnCheckpoint>>,
1226 }
1227
1228 impl CountingCheckpointPort {
1229 fn new() -> Self {
1230 Self { saved: Mutex::new(Vec::new()) }
1231 }
1232 }
1233
1234 #[async_trait::async_trait]
1235 impl TurnCheckpointStorePort for CountingCheckpointPort {
1236 async fn save_checkpoint(&self, checkpoint: &TurnCheckpoint) -> Result<(), StoreError> {
1237 self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(checkpoint.clone());
1238 Ok(())
1239 }
1240
1241 async fn load_latest(
1242 &self,
1243 _session_id: &SessionId,
1244 ) -> Result<Option<TurnCheckpoint>, StoreError> {
1245 Ok(None)
1246 }
1247 }
1248
1249 struct NoopArtifactStore;
1250
1251 #[async_trait::async_trait]
1252 impl ArtifactStorePort for NoopArtifactStore {
1253 async fn put(&self, _artifact: ArtifactRecord) -> Result<(), StoreError> {
1254 Ok(())
1255 }
1256
1257 async fn list_by_session(
1258 &self,
1259 _session_id: &SessionId,
1260 ) -> Result<Vec<ArtifactRecord>, StoreError> {
1261 Ok(Vec::new())
1262 }
1263 }
1264
1265 struct CountingCostMeter {
1266 llm_calls: Mutex<u32>,
1267 }
1268
1269 impl CountingCostMeter {
1270 fn new() -> Self {
1271 Self { llm_calls: Mutex::new(0) }
1272 }
1273 }
1274
1275 #[async_trait::async_trait]
1276 impl CostMeterPort for CountingCostMeter {
1277 async fn check_budget(&self, _session_id: &SessionId) -> Result<(), CostError> {
1278 Ok(())
1279 }
1280
1281 async fn record_llm_usage(
1282 &self,
1283 _session_id: &SessionId,
1284 _model: &str,
1285 _usage: &TokenUsage,
1286 ) -> Result<(), CostError> {
1287 let mut count = self.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
1288 *count += 1;
1289 Ok(())
1290 }
1291
1292 async fn record_tool_result(
1293 &self,
1294 _session_id: &SessionId,
1295 _tool_result: &ToolResult,
1296 ) -> Result<(), CostError> {
1297 Ok(())
1298 }
1299 }
1300
1301 struct MemoryStore {
1302 data: Mutex<HashMap<SessionId, SessionState>>,
1303 }
1304
1305 impl MemoryStore {
1306 fn new() -> Self {
1307 Self { data: Mutex::new(HashMap::new()) }
1308 }
1309 }
1310
1311 struct FailingSaveStore;
1312
1313 #[async_trait::async_trait]
1314 impl SessionStore for FailingSaveStore {
1315 async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1316 Ok(None)
1317 }
1318
1319 async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
1320 Err(StoreError::Backend("simulated save failure".into()))
1321 }
1322 }
1323
1324 #[async_trait::async_trait]
1325 impl SessionStore for MemoryStore {
1326 async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1327 let map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1328 Ok(map.get(id).cloned())
1329 }
1330
1331 async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
1332 let mut map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1333 map.insert(id.clone(), state.clone());
1334 Ok(())
1335 }
1336 }
1337
1338 struct CollectingSink {
1339 events: Mutex<Vec<AgentEvent>>,
1340 }
1341
1342 impl CollectingSink {
1343 fn new() -> Self {
1344 Self { events: Mutex::new(Vec::new()) }
1345 }
1346
1347 fn event_count(&self) -> usize {
1348 self.events.lock().unwrap_or_else(|p| p.into_inner()).len()
1349 }
1350
1351 fn all_events(&self) -> Vec<AgentEvent> {
1352 self.events.lock().unwrap_or_else(|p| p.into_inner()).clone()
1353 }
1354 }
1355
1356 impl EventSink for CollectingSink {
1357 fn emit(&self, event: AgentEvent) {
1358 self.events.lock().unwrap_or_else(|p| p.into_inner()).push(event);
1359 }
1360 }
1361
1362 fn make_request(input: &str) -> AgentRequest {
1363 AgentRequest {
1364 input: input.into(),
1365 session_id: "test-session".into(),
1366 model: None,
1367 context: bob_core::types::RequestContext::default(),
1368 cancel_token: None,
1369 }
1370 }
1371
1372 fn generous_policy() -> TurnPolicy {
1373 TurnPolicy {
1374 max_steps: 20,
1375 max_tool_calls: 10,
1376 max_consecutive_errors: 3,
1377 turn_timeout_ms: 30_000,
1378 tool_timeout_ms: 5_000,
1379 }
1380 }
1381
1382 struct StreamLlm {
1383 chunks: Mutex<VecDeque<Result<LlmStreamChunk, LlmError>>>,
1384 }
1385
1386 impl StreamLlm {
1387 fn new(chunks: Vec<Result<LlmStreamChunk, LlmError>>) -> Self {
1388 Self { chunks: Mutex::new(chunks.into()) }
1389 }
1390 }
1391
1392 #[async_trait::async_trait]
1393 impl LlmPort for StreamLlm {
1394 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1395 Err(LlmError::Provider("complete() should not be called in stream test".into()))
1396 }
1397
1398 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1399 let mut chunks = self.chunks.lock().unwrap_or_else(|p| p.into_inner());
1400 let items: Vec<Result<LlmStreamChunk, LlmError>> = chunks.drain(..).collect();
1401 Ok(Box::pin(futures_util::stream::iter(items)))
1402 }
1403 }
1404
1405 struct InspectingLlm {
1406 expected_substring: String,
1407 }
1408
1409 #[async_trait::async_trait]
1410 impl LlmPort for InspectingLlm {
1411 async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
1412 let system = req
1413 .messages
1414 .iter()
1415 .find(|m| m.role == Role::System)
1416 .map(|m| m.content.clone())
1417 .unwrap_or_default();
1418 if !system.contains(&self.expected_substring) {
1419 return Err(LlmError::Provider(format!(
1420 "expected system prompt to include '{}', got: {}",
1421 self.expected_substring, system
1422 )));
1423 }
1424 Ok(LlmResponse {
1425 content: r#"{"type": "final", "content": "ok"}"#.to_string(),
1426 usage: TokenUsage::default(),
1427 finish_reason: FinishReason::Stop,
1428 tool_calls: Vec::new(),
1429 })
1430 }
1431
1432 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1433 Err(LlmError::Provider("not used".into()))
1434 }
1435 }
1436
1437 #[tokio::test]
1440 async fn tc01_simple_final_response() {
1441 let llm =
1442 SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "Hello there!"}"#]);
1443 let tools = MockToolPort::empty();
1444 let store = MemoryStore::new();
1445 let sink = CollectingSink::new();
1446
1447 let result = run_turn(
1448 &llm,
1449 &tools,
1450 &store,
1451 &sink,
1452 make_request("Hi"),
1453 &generous_policy(),
1454 "test-model",
1455 )
1456 .await;
1457
1458 assert!(
1459 matches!(&result, Ok(AgentRunResult::Finished(_))),
1460 "expected Finished, got {result:?}"
1461 );
1462 let resp = match result {
1463 Ok(AgentRunResult::Finished(r)) => r,
1464 _ => return,
1465 };
1466
1467 assert_eq!(resp.content, "Hello there!");
1468 assert_eq!(resp.finish_reason, FinishReason::Stop);
1469 assert!(resp.tool_transcript.is_empty());
1470 assert!(sink.event_count() >= 3, "should emit TurnStarted, LlmCall*, TurnCompleted");
1471 }
1472
1473 #[tokio::test]
1476 async fn tc02_tool_call_then_final() {
1477 let llm = SequentialLlm::from_contents(vec![
1478 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1479 r#"{"type": "final", "content": "Found results."}"#,
1480 ]);
1481 let tools = MockToolPort::with_tool_and_results(
1482 "search",
1483 vec![Ok(ToolResult {
1484 name: "search".into(),
1485 output: serde_json::json!({"hits": 42}),
1486 is_error: false,
1487 })],
1488 );
1489 let store = MemoryStore::new();
1490 let sink = CollectingSink::new();
1491
1492 let result = run_turn(
1493 &llm,
1494 &tools,
1495 &store,
1496 &sink,
1497 make_request("Search for rust"),
1498 &generous_policy(),
1499 "test-model",
1500 )
1501 .await;
1502
1503 assert!(
1504 matches!(&result, Ok(AgentRunResult::Finished(_))),
1505 "expected Finished, got {result:?}"
1506 );
1507 let resp = match result {
1508 Ok(AgentRunResult::Finished(r)) => r,
1509 _ => return,
1510 };
1511
1512 assert_eq!(resp.content, "Found results.");
1513 assert_eq!(resp.finish_reason, FinishReason::Stop);
1514 assert_eq!(resp.tool_transcript.len(), 1);
1515 assert_eq!(resp.tool_transcript[0].name, "search");
1516 assert!(!resp.tool_transcript[0].is_error);
1517 }
1518
1519 #[tokio::test]
1522 async fn tc03_parse_error_reprompt_success() {
1523 let llm = SequentialLlm::from_contents(vec![
1524 "This is not JSON at all.",
1525 r#"{"type": "final", "content": "Recovered"}"#,
1526 ]);
1527 let tools = MockToolPort::empty();
1528 let store = MemoryStore::new();
1529 let sink = CollectingSink::new();
1530
1531 let result = run_turn(
1532 &llm,
1533 &tools,
1534 &store,
1535 &sink,
1536 make_request("Hi"),
1537 &generous_policy(),
1538 "test-model",
1539 )
1540 .await;
1541
1542 assert!(
1543 matches!(&result, Ok(AgentRunResult::Finished(_))),
1544 "expected Finished after re-prompt, got {result:?}"
1545 );
1546 let resp = match result {
1547 Ok(AgentRunResult::Finished(r)) => r,
1548 _ => return,
1549 };
1550
1551 assert_eq!(resp.content, "Recovered");
1552 assert_eq!(resp.finish_reason, FinishReason::Stop);
1553 }
1554
1555 #[tokio::test]
1558 async fn tc04_double_parse_error() {
1559 let llm = SequentialLlm::from_contents(vec!["not json 1", "not json 2"]);
1560 let tools = MockToolPort::empty();
1561 let store = MemoryStore::new();
1562 let sink = CollectingSink::new();
1563
1564 let result = run_turn(
1565 &llm,
1566 &tools,
1567 &store,
1568 &sink,
1569 make_request("Hi"),
1570 &generous_policy(),
1571 "test-model",
1572 )
1573 .await;
1574
1575 assert!(result.is_err(), "should return error after two parse failures");
1576 let msg = match result {
1577 Err(err) => err.to_string(),
1578 Ok(value) => format!("unexpected success: {value:?}"),
1579 };
1580 assert!(msg.contains("invalid JSON"), "error message = {msg}");
1581 }
1582
1583 #[tokio::test]
1586 async fn tc05_max_steps_exhaustion() {
1587 let llm = SequentialLlm::from_contents(vec![
1589 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1590 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1591 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1592 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1593 ]);
1594 let tools = MockToolPort::with_tool_and_results(
1595 "t1",
1596 vec![
1597 Ok(ToolResult {
1598 name: "t1".into(),
1599 output: serde_json::json!(null),
1600 is_error: false,
1601 }),
1602 Ok(ToolResult {
1603 name: "t1".into(),
1604 output: serde_json::json!(null),
1605 is_error: false,
1606 }),
1607 Ok(ToolResult {
1608 name: "t1".into(),
1609 output: serde_json::json!(null),
1610 is_error: false,
1611 }),
1612 ],
1613 );
1614 let store = MemoryStore::new();
1615 let sink = CollectingSink::new();
1616
1617 let policy = TurnPolicy {
1618 max_steps: 2,
1619 max_tool_calls: 10,
1620 max_consecutive_errors: 5,
1621 turn_timeout_ms: 30_000,
1622 tool_timeout_ms: 5_000,
1623 };
1624
1625 let result =
1626 run_turn(&llm, &tools, &store, &sink, make_request("do work"), &policy, "test-model")
1627 .await;
1628
1629 assert!(
1630 matches!(&result, Ok(AgentRunResult::Finished(_))),
1631 "expected Finished with GuardExceeded, got {result:?}"
1632 );
1633 let resp = match result {
1634 Ok(AgentRunResult::Finished(r)) => r,
1635 _ => return,
1636 };
1637
1638 assert_eq!(resp.finish_reason, FinishReason::GuardExceeded);
1639 assert!(resp.content.contains("MaxSteps"), "content = {}", resp.content);
1640 }
1641
1642 #[tokio::test]
1645 async fn tc06_cancellation() {
1646 let llm = SequentialLlm::from_contents(vec![
1647 r#"{"type": "final", "content": "should not reach"}"#,
1648 ]);
1649 let tools = MockToolPort::empty();
1650 let store = MemoryStore::new();
1651 let sink = CollectingSink::new();
1652
1653 let token = CancelToken::new();
1654 token.cancel();
1656
1657 let mut req = make_request("Hi");
1658 req.cancel_token = Some(token);
1659
1660 let result =
1661 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1662
1663 assert!(
1664 matches!(&result, Ok(AgentRunResult::Finished(_))),
1665 "expected Finished with Cancelled, got {result:?}"
1666 );
1667 let resp = match result {
1668 Ok(AgentRunResult::Finished(r)) => r,
1669 _ => return,
1670 };
1671
1672 assert_eq!(resp.finish_reason, FinishReason::Cancelled);
1673 }
1674
1675 #[tokio::test]
1678 async fn tc07_tool_error_then_final() {
1679 let llm = SequentialLlm::from_contents(vec![
1680 r#"{"type": "tool_call", "name": "flaky_tool", "arguments": {}}"#,
1681 r#"{"type": "final", "content": "Recovered from tool error."}"#,
1682 ]);
1683 let tools = MockToolPort::with_tool_and_results(
1684 "flaky_tool",
1685 vec![Err(ToolError::Execution("connection refused".into()))],
1686 );
1687 let store = MemoryStore::new();
1688 let sink = CollectingSink::new();
1689
1690 let result = run_turn(
1691 &llm,
1692 &tools,
1693 &store,
1694 &sink,
1695 make_request("call flaky"),
1696 &generous_policy(),
1697 "test-model",
1698 )
1699 .await;
1700
1701 assert!(
1702 matches!(&result, Ok(AgentRunResult::Finished(_))),
1703 "expected Finished, got {result:?}"
1704 );
1705 let resp = match result {
1706 Ok(AgentRunResult::Finished(r)) => r,
1707 _ => return,
1708 };
1709
1710 assert_eq!(resp.content, "Recovered from tool error.");
1711 assert_eq!(resp.tool_transcript.len(), 1);
1712 assert!(resp.tool_transcript[0].is_error);
1713 }
1714
1715 #[tokio::test]
1716 async fn tc08_save_failure_is_propagated() {
1717 let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "done"}"#]);
1718 let tools = MockToolPort::empty();
1719 let store = FailingSaveStore;
1720 let sink = CollectingSink::new();
1721
1722 let result = run_turn(
1723 &llm,
1724 &tools,
1725 &store,
1726 &sink,
1727 make_request("hello"),
1728 &generous_policy(),
1729 "test-model",
1730 )
1731 .await;
1732
1733 assert!(matches!(result, Err(AgentError::Store(_))), "expected Store error to be returned");
1734 }
1735
1736 #[tokio::test]
1737 async fn tc09_stream_turn_emits_text_and_finished() {
1738 let llm: Arc<dyn LlmPort> = Arc::new(StreamLlm::new(vec![
1739 Ok(LlmStreamChunk::TextDelta("{\"type\":\"final\",\"content\":\"he".into())),
1740 Ok(LlmStreamChunk::TextDelta("llo\"}".into())),
1741 Ok(LlmStreamChunk::Done {
1742 usage: TokenUsage { prompt_tokens: 3, completion_tokens: 4 },
1743 }),
1744 ]));
1745 let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
1746 let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
1747 let sink: Arc<dyn EventSink> = Arc::new(CollectingSink::new());
1748
1749 let stream_result = run_turn_stream(
1750 llm,
1751 tools,
1752 store,
1753 sink,
1754 make_request("hello"),
1755 generous_policy(),
1756 "test-model".to_string(),
1757 )
1758 .await;
1759 assert!(stream_result.is_ok(), "run_turn_stream should produce a stream");
1760 let mut stream = match stream_result {
1761 Ok(stream) => stream,
1762 Err(_) => return,
1763 };
1764
1765 let mut saw_text = false;
1766 let mut saw_finished = false;
1767 while let Some(event) = stream.next().await {
1768 match event {
1769 AgentStreamEvent::TextDelta { content } => {
1770 saw_text = saw_text || !content.is_empty();
1771 }
1772 AgentStreamEvent::Finished { usage } => {
1773 saw_finished = true;
1774 assert_eq!(usage.prompt_tokens, 3);
1775 assert_eq!(usage.completion_tokens, 4);
1776 }
1777 AgentStreamEvent::ToolCallStarted { .. } |
1778 AgentStreamEvent::ToolCallCompleted { .. } |
1779 AgentStreamEvent::Error { .. } => {}
1780 }
1781 }
1782
1783 assert!(saw_text, "expected at least one text delta");
1784 assert!(saw_finished, "expected a finished event");
1785 }
1786
1787 #[tokio::test]
1788 async fn tc10_skills_prompt_context_is_injected() {
1789 let llm = InspectingLlm { expected_substring: "Skill: rust-review".to_string() };
1790 let tools = MockToolPort::empty();
1791 let store = MemoryStore::new();
1792 let sink = CollectingSink::new();
1793
1794 let mut req = make_request("review this code");
1795 req.context.system_prompt = Some("Skill: rust-review\nUse strict checks.".to_string());
1796
1797 let result =
1798 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1799
1800 assert!(result.is_ok(), "run should succeed when skills prompt is injected");
1801 }
1802
1803 #[tokio::test]
1804 async fn tc11_selected_skills_context_emits_event() {
1805 let llm =
1806 SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "looks good"}"#]);
1807 let tools = MockToolPort::empty();
1808 let store = MemoryStore::new();
1809 let sink = CollectingSink::new();
1810
1811 let mut req = make_request("review code");
1812 req.context.selected_skills = vec!["rust-review".to_string(), "security-audit".to_string()];
1813
1814 let result =
1815 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1816 assert!(result.is_ok(), "run should succeed");
1817
1818 let events = sink.all_events();
1819 assert!(
1820 events.iter().any(|event| matches!(
1821 event,
1822 AgentEvent::SkillsSelected { skill_names }
1823 if skill_names == &vec!["rust-review".to_string(), "security-audit".to_string()]
1824 )),
1825 "skills.selected event should be emitted with context skill names"
1826 );
1827 }
1828
1829 #[tokio::test]
1830 async fn tc12_policy_deny_tool_blocks_execution() {
1831 let llm = SequentialLlm::from_contents(vec![
1832 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1833 r#"{"type": "final", "content": "done"}"#,
1834 ]);
1835 let tools = NoCallToolPort {
1836 tools: vec![ToolDescriptor {
1837 id: "search".to_string(),
1838 description: "search tool".to_string(),
1839 input_schema: serde_json::json!({"type":"object"}),
1840 source: ToolSource::Local,
1841 }],
1842 };
1843 let store = MemoryStore::new();
1844 let sink = CollectingSink::new();
1845
1846 let mut req = make_request("search rust");
1847 req.context.tool_policy.deny_tools =
1848 vec!["search".to_string(), "local/shell_exec".to_string()];
1849
1850 let result =
1851 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1852 assert!(
1853 matches!(&result, Ok(AgentRunResult::Finished(_))),
1854 "expected finished response, got {result:?}"
1855 );
1856 let resp = match result {
1857 Ok(AgentRunResult::Finished(r)) => r,
1858 _ => return,
1859 };
1860
1861 assert_eq!(resp.finish_reason, FinishReason::Stop);
1862 assert_eq!(resp.tool_transcript.len(), 1);
1863 assert!(resp.tool_transcript[0].is_error);
1864 assert!(
1865 resp.tool_transcript[0].output.to_string().contains("denied"),
1866 "tool error should explain policy denial"
1867 );
1868 }
1869
1870 #[tokio::test]
1871 async fn tc13_approval_denied_blocks_execution() {
1872 let llm = SequentialLlm::from_contents(vec![
1873 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1874 r#"{"type": "final", "content": "done"}"#,
1875 ]);
1876 let tools = NoCallToolPort {
1877 tools: vec![ToolDescriptor {
1878 id: "search".to_string(),
1879 description: "search tool".to_string(),
1880 input_schema: serde_json::json!({"type":"object"}),
1881 source: ToolSource::Local,
1882 }],
1883 };
1884 let store = MemoryStore::new();
1885 let sink = CollectingSink::new();
1886 let req = make_request("search rust");
1887 let tool_policy = AllowAllPolicyPort;
1888 let approval = AlwaysDenyApprovalPort;
1889
1890 let result = run_turn_with_controls(
1891 &llm,
1892 &tools,
1893 &store,
1894 &sink,
1895 req,
1896 &generous_policy(),
1897 "test-model",
1898 &tool_policy,
1899 &approval,
1900 )
1901 .await;
1902 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
1903 let resp = match result {
1904 Ok(AgentRunResult::Finished(r)) => r,
1905 _ => return,
1906 };
1907
1908 assert_eq!(resp.tool_transcript.len(), 1);
1909 assert!(resp.tool_transcript[0].is_error);
1910 assert!(
1911 resp.tool_transcript[0].output.to_string().contains("approval policy rejected"),
1912 "tool error should explain approval denial"
1913 );
1914 }
1915
1916 #[tokio::test]
1917 async fn tc14_custom_policy_port_blocks_execution() {
1918 let llm = SequentialLlm::from_contents(vec![
1919 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1920 r#"{"type": "final", "content": "done"}"#,
1921 ]);
1922 let tools = NoCallToolPort {
1923 tools: vec![ToolDescriptor {
1924 id: "search".to_string(),
1925 description: "search tool".to_string(),
1926 input_schema: serde_json::json!({"type":"object"}),
1927 source: ToolSource::Local,
1928 }],
1929 };
1930 let store = MemoryStore::new();
1931 let sink = CollectingSink::new();
1932 let req = make_request("search rust");
1933 let tool_policy = DenySearchPolicyPort;
1934 let approval = AlwaysApprovePort;
1935
1936 let result = run_turn_with_controls(
1937 &llm,
1938 &tools,
1939 &store,
1940 &sink,
1941 req,
1942 &generous_policy(),
1943 "test-model",
1944 &tool_policy,
1945 &approval,
1946 )
1947 .await;
1948 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
1949 let resp = match result {
1950 Ok(AgentRunResult::Finished(r)) => r,
1951 _ => return,
1952 };
1953
1954 assert_eq!(resp.tool_transcript.len(), 1);
1955 assert!(resp.tool_transcript[0].is_error);
1956 assert!(
1957 resp.tool_transcript[0].output.to_string().contains("denied"),
1958 "tool error should explain policy denial"
1959 );
1960 }
1961
1962 #[tokio::test]
1963 async fn tc15_native_dispatch_mode_uses_llm_tool_calls() {
1964 struct NativeToolLlm {
1965 responses: Mutex<VecDeque<LlmResponse>>,
1966 }
1967
1968 #[async_trait::async_trait]
1969 impl LlmPort for NativeToolLlm {
1970 fn capabilities(&self) -> bob_core::types::LlmCapabilities {
1971 bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
1972 }
1973
1974 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1975 let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
1976 Ok(q.pop_front().unwrap_or(LlmResponse {
1977 content: r#"{"type":"final","content":"fallback"}"#.to_string(),
1978 usage: TokenUsage::default(),
1979 finish_reason: FinishReason::Stop,
1980 tool_calls: Vec::new(),
1981 }))
1982 }
1983
1984 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1985 Err(LlmError::Provider("not used".into()))
1986 }
1987 }
1988
1989 let llm = NativeToolLlm {
1990 responses: Mutex::new(VecDeque::from(vec![
1991 LlmResponse {
1992 content: "ignored".to_string(),
1993 usage: TokenUsage::default(),
1994 finish_reason: FinishReason::Stop,
1995 tool_calls: vec![ToolCall {
1996 name: "search".to_string(),
1997 arguments: serde_json::json!({"q":"rust"}),
1998 }],
1999 },
2000 LlmResponse {
2001 content: r#"{"type":"final","content":"done"}"#.to_string(),
2002 usage: TokenUsage::default(),
2003 finish_reason: FinishReason::Stop,
2004 tool_calls: Vec::new(),
2005 },
2006 ])),
2007 };
2008 let tools = MockToolPort::with_tool_and_results(
2009 "search",
2010 vec![Ok(ToolResult {
2011 name: "search".to_string(),
2012 output: serde_json::json!({"hits": 2}),
2013 is_error: false,
2014 })],
2015 );
2016 let store = MemoryStore::new();
2017 let sink = CollectingSink::new();
2018 let checkpoint = CountingCheckpointPort::new();
2019 let artifacts = NoopArtifactStore;
2020 let cost = CountingCostMeter::new();
2021 let policy = AllowAllPolicyPort;
2022 let approval = AlwaysApprovePort;
2023
2024 let result = run_turn_with_extensions(
2025 &llm,
2026 &tools,
2027 &store,
2028 &sink,
2029 make_request("search rust"),
2030 &generous_policy(),
2031 "test-model",
2032 &policy,
2033 &approval,
2034 crate::DispatchMode::NativePreferred,
2035 &checkpoint,
2036 &artifacts,
2037 &cost,
2038 )
2039 .await;
2040
2041 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
2042 let resp = match result {
2043 Ok(AgentRunResult::Finished(r)) => r,
2044 _ => return,
2045 };
2046 assert_eq!(resp.tool_transcript.len(), 1);
2047 assert_eq!(resp.tool_transcript[0].name, "search");
2048 }
2049
2050 #[tokio::test]
2051 async fn tc16_checkpoint_and_cost_ports_are_invoked() {
2052 let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "ok"}"#]);
2053 let tools = MockToolPort::empty();
2054 let store = MemoryStore::new();
2055 let sink = CollectingSink::new();
2056 let checkpoint = CountingCheckpointPort::new();
2057 let artifacts = NoopArtifactStore;
2058 let cost = CountingCostMeter::new();
2059 let policy = AllowAllPolicyPort;
2060 let approval = AlwaysApprovePort;
2061
2062 let result = run_turn_with_extensions(
2063 &llm,
2064 &tools,
2065 &store,
2066 &sink,
2067 make_request("hello"),
2068 &generous_policy(),
2069 "test-model",
2070 &policy,
2071 &approval,
2072 crate::DispatchMode::PromptGuided,
2073 &checkpoint,
2074 &artifacts,
2075 &cost,
2076 )
2077 .await;
2078 assert!(result.is_ok(), "turn should succeed");
2079 let checkpoints = checkpoint.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
2080 let llm_calls = *cost.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
2081 assert!(checkpoints >= 1, "checkpoint port should be invoked at least once");
2082 assert!(llm_calls >= 1, "cost meter should record llm usage");
2083 }
2084}