1use std::sync::Arc;
43
44use bob_core::{
45 error::AgentError,
46 normalize_tool_list,
47 ports::{
48 ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
49 ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
50 },
51 types::{
52 AgentAction, AgentEvent, AgentEventStream, AgentRequest, AgentResponse, AgentRunResult,
53 AgentStreamEvent, ApprovalContext, ApprovalDecision, ArtifactRecord, FinishReason,
54 GuardReason, Message, Role, TokenUsage, ToolCall, ToolResult, TurnCheckpoint, TurnPolicy,
55 },
56};
57use futures_util::StreamExt;
58use tokio::time::Instant;
59use tokio_stream::wrappers::UnboundedReceiverStream;
60
61#[derive(Debug)]
64pub struct LoopGuard {
65 policy: TurnPolicy,
66 steps: u32,
67 tool_calls: u32,
68 consecutive_errors: u32,
69 start: Instant,
70}
71
72impl LoopGuard {
73 #[must_use]
75 pub fn new(policy: TurnPolicy) -> Self {
76 Self { policy, steps: 0, tool_calls: 0, consecutive_errors: 0, start: Instant::now() }
77 }
78
79 #[must_use]
81 pub fn can_continue(&self) -> bool {
82 self.steps < self.policy.max_steps &&
83 self.tool_calls < self.policy.max_tool_calls &&
84 self.consecutive_errors < self.policy.max_consecutive_errors &&
85 !self.timed_out()
86 }
87
88 pub fn record_step(&mut self) {
90 self.steps += 1;
91 }
92
93 pub fn record_tool_call(&mut self) {
95 self.tool_calls += 1;
96 }
97
98 pub fn record_error(&mut self) {
100 self.consecutive_errors += 1;
101 }
102
103 pub fn reset_errors(&mut self) {
105 self.consecutive_errors = 0;
106 }
107
108 #[must_use]
112 pub fn reason(&self) -> GuardReason {
113 if self.steps >= self.policy.max_steps {
114 GuardReason::MaxSteps
115 } else if self.tool_calls >= self.policy.max_tool_calls {
116 GuardReason::MaxToolCalls
117 } else if self.consecutive_errors >= self.policy.max_consecutive_errors {
118 GuardReason::MaxConsecutiveErrors
119 } else if self.timed_out() {
120 GuardReason::TurnTimeout
121 } else {
122 GuardReason::Cancelled
124 }
125 }
126
127 #[must_use]
129 pub fn timed_out(&self) -> bool {
130 self.start.elapsed().as_millis() >= u128::from(self.policy.turn_timeout_ms)
131 }
132}
133
134const DEFAULT_SYSTEM_INSTRUCTIONS: &str = "\
137You are a helpful AI assistant. \
138Think step by step before answering. \
139When you need external information, use the available tools.";
140
141fn resolve_system_instructions(req: &AgentRequest) -> String {
142 let Some(skills_prompt) = req.context.system_prompt.as_deref() else {
143 return DEFAULT_SYSTEM_INSTRUCTIONS.to_string();
144 };
145
146 if skills_prompt.trim().is_empty() {
147 DEFAULT_SYSTEM_INSTRUCTIONS.to_string()
148 } else {
149 format!("{DEFAULT_SYSTEM_INSTRUCTIONS}\n\n{skills_prompt}")
150 }
151}
152
153fn resolve_selected_skills(req: &AgentRequest) -> Vec<String> {
154 req.context.selected_skills.clone()
155}
156
157#[derive(Debug, Clone, Default)]
158struct ToolCallPolicy {
159 deny_tools: Vec<String>,
160 allow_tools: Option<Vec<String>>,
161}
162
163fn resolve_tool_call_policy(req: &AgentRequest) -> ToolCallPolicy {
164 let deny_tools =
165 normalize_tool_list(req.context.tool_policy.deny_tools.iter().map(String::as_str));
166 let allow_tools = req
167 .context
168 .tool_policy
169 .allow_tools
170 .as_ref()
171 .map(|tools| normalize_tool_list(tools.iter().map(String::as_str)));
172 ToolCallPolicy { deny_tools, allow_tools }
173}
174
175fn prompt_options_for_mode(
176 dispatch_mode: crate::DispatchMode,
177 llm: &dyn LlmPort,
178) -> crate::prompt::PromptBuildOptions {
179 match dispatch_mode {
180 crate::DispatchMode::PromptGuided => crate::prompt::PromptBuildOptions::default(),
181 crate::DispatchMode::NativePreferred => {
182 if llm.capabilities().native_tool_calling {
183 crate::prompt::PromptBuildOptions {
184 include_action_schema: false,
185 include_tool_schema: false,
186 }
187 } else {
188 crate::prompt::PromptBuildOptions::default()
189 }
190 }
191 }
192}
193
194fn parse_action_for_mode(
195 dispatch_mode: crate::DispatchMode,
196 llm: &dyn LlmPort,
197 response: &bob_core::types::LlmResponse,
198) -> Result<AgentAction, crate::action::ActionParseError> {
199 match dispatch_mode {
200 crate::DispatchMode::PromptGuided => crate::action::parse_action(&response.content),
201 crate::DispatchMode::NativePreferred => {
202 if llm.capabilities().native_tool_calling &&
203 let Some(tool_call) = response.tool_calls.first()
204 {
205 return Ok(AgentAction::ToolCall {
206 name: tool_call.name.clone(),
207 arguments: tool_call.arguments.clone(),
208 });
209 }
210 crate::action::parse_action(&response.content)
211 }
212 }
213}
214
215#[expect(
216 clippy::too_many_arguments,
217 reason = "tool execution needs explicit policy, approval, and timeout dependencies"
218)]
219async fn execute_tool_call(
220 tools: &dyn ToolPort,
221 guard: &mut LoopGuard,
222 tool_call: ToolCall,
223 policy: &ToolCallPolicy,
224 tool_policy_port: &dyn ToolPolicyPort,
225 approval_port: &dyn ApprovalPort,
226 approval_context: &ApprovalContext,
227 timeout_ms: u64,
228) -> ToolResult {
229 if !tool_policy_port.is_tool_allowed(
230 &tool_call.name,
231 &policy.deny_tools,
232 policy.allow_tools.as_deref(),
233 ) {
234 guard.record_error();
235 return ToolResult {
236 name: tool_call.name.clone(),
237 output: serde_json::json!({
238 "error": format!("tool '{}' denied by policy", tool_call.name)
239 }),
240 is_error: true,
241 };
242 }
243
244 match approval_port.approve_tool_call(&tool_call, approval_context).await {
245 Ok(ApprovalDecision::Approved) => {}
246 Ok(ApprovalDecision::Denied { reason }) => {
247 guard.record_error();
248 return ToolResult {
249 name: tool_call.name.clone(),
250 output: serde_json::json!({"error": reason}),
251 is_error: true,
252 };
253 }
254 Err(err) => {
255 guard.record_error();
256 return ToolResult {
257 name: tool_call.name.clone(),
258 output: serde_json::json!({"error": err.to_string()}),
259 is_error: true,
260 };
261 }
262 }
263
264 match tokio::time::timeout(
265 std::time::Duration::from_millis(timeout_ms),
266 tools.call_tool(tool_call.clone()),
267 )
268 .await
269 {
270 Ok(Ok(result)) => {
271 guard.reset_errors();
272 result
273 }
274 Ok(Err(err)) => {
275 guard.record_error();
276 ToolResult {
277 name: tool_call.name,
278 output: serde_json::json!({"error": err.to_string()}),
279 is_error: true,
280 }
281 }
282 Err(_) => {
283 guard.record_error();
284 ToolResult {
285 name: tool_call.name,
286 output: serde_json::json!({"error": "tool call timed out"}),
287 is_error: true,
288 }
289 }
290 }
291}
292
293pub async fn run_turn(
300 llm: &dyn LlmPort,
301 tools: &dyn ToolPort,
302 store: &dyn SessionStore,
303 events: &dyn EventSink,
304 req: AgentRequest,
305 policy: &TurnPolicy,
306 default_model: &str,
307) -> Result<AgentRunResult, AgentError> {
308 let tool_policy = crate::DefaultToolPolicyPort;
309 let approval = crate::AllowAllApprovalPort;
310 let checkpoint_store = crate::NoOpCheckpointStorePort;
311 let artifact_store = crate::NoOpArtifactStorePort;
312 let cost_meter = crate::NoOpCostMeterPort;
313 run_turn_with_extensions(
314 llm,
315 tools,
316 store,
317 events,
318 req,
319 policy,
320 default_model,
321 &tool_policy,
322 &approval,
323 crate::DispatchMode::NativePreferred,
324 &checkpoint_store,
325 &artifact_store,
326 &cost_meter,
327 )
328 .await
329}
330
331#[cfg_attr(
333 not(test),
334 expect(
335 dead_code,
336 reason = "reserved wrapper for partial control injection in external integrations"
337 )
338)]
339#[expect(
340 clippy::too_many_arguments,
341 reason = "wrapper exposes explicit dependency ports for compatibility and testability"
342)]
343pub(crate) async fn run_turn_with_controls(
344 llm: &dyn LlmPort,
345 tools: &dyn ToolPort,
346 store: &dyn SessionStore,
347 events: &dyn EventSink,
348 req: AgentRequest,
349 policy: &TurnPolicy,
350 default_model: &str,
351 tool_policy_port: &dyn ToolPolicyPort,
352 approval_port: &dyn ApprovalPort,
353) -> Result<AgentRunResult, AgentError> {
354 let checkpoint_store = crate::NoOpCheckpointStorePort;
355 let artifact_store = crate::NoOpArtifactStorePort;
356 let cost_meter = crate::NoOpCostMeterPort;
357 run_turn_with_extensions(
358 llm,
359 tools,
360 store,
361 events,
362 req,
363 policy,
364 default_model,
365 tool_policy_port,
366 approval_port,
367 crate::DispatchMode::PromptGuided,
368 &checkpoint_store,
369 &artifact_store,
370 &cost_meter,
371 )
372 .await
373}
374
375#[expect(
377 clippy::too_many_arguments,
378 reason = "core entrypoint exposes all ports explicitly for adapter injection"
379)]
380pub(crate) async fn run_turn_with_extensions(
381 llm: &dyn LlmPort,
382 tools: &dyn ToolPort,
383 store: &dyn SessionStore,
384 events: &dyn EventSink,
385 req: AgentRequest,
386 policy: &TurnPolicy,
387 default_model: &str,
388 tool_policy_port: &dyn ToolPolicyPort,
389 approval_port: &dyn ApprovalPort,
390 dispatch_mode: crate::DispatchMode,
391 checkpoint_store: &dyn TurnCheckpointStorePort,
392 artifact_store: &dyn ArtifactStorePort,
393 cost_meter: &dyn CostMeterPort,
394) -> Result<AgentRunResult, AgentError> {
395 let model = req.model.as_deref().unwrap_or(default_model);
396 let cancel_token = req.cancel_token.clone();
397 let system_instructions = resolve_system_instructions(&req);
398 let selected_skills = resolve_selected_skills(&req);
399 let tool_call_policy = resolve_tool_call_policy(&req);
400
401 let mut session = store.load(&req.session_id).await?.unwrap_or_default();
402 let tool_descriptors = tools.list_tools().await?;
403 let mut guard = LoopGuard::new(policy.clone());
404
405 events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
406 if !selected_skills.is_empty() {
407 events.emit(AgentEvent::SkillsSelected { skill_names: selected_skills.clone() });
408 }
409
410 session.messages.push(Message { role: Role::User, content: req.input.clone() });
411
412 let mut tool_transcript: Vec<ToolResult> = Vec::new();
413 let mut total_usage = TokenUsage::default();
414 let mut consecutive_parse_failures: u32 = 0;
415
416 loop {
417 if let Some(ref token) = cancel_token &&
418 token.is_cancelled()
419 {
420 return finish_turn(
421 store,
422 events,
423 &req.session_id,
424 &session,
425 FinishResult {
426 content: "Turn cancelled.",
427 tool_transcript,
428 usage: total_usage,
429 finish_reason: FinishReason::Cancelled,
430 },
431 )
432 .await;
433 }
434
435 cost_meter.check_budget(&req.session_id).await?;
436
437 if !guard.can_continue() {
438 let reason = guard.reason();
439 let msg = format!("Turn stopped: {reason:?}");
440 return finish_turn(
441 store,
442 events,
443 &req.session_id,
444 &session,
445 FinishResult {
446 content: &msg,
447 tool_transcript,
448 usage: total_usage,
449 finish_reason: FinishReason::GuardExceeded,
450 },
451 )
452 .await;
453 }
454
455 let llm_request = crate::prompt::build_llm_request_with_options(
456 model,
457 &session,
458 &tool_descriptors,
459 &system_instructions,
460 prompt_options_for_mode(dispatch_mode, llm),
461 );
462
463 events.emit(AgentEvent::LlmCallStarted { model: model.to_string() });
464
465 let llm_response = if let Some(ref token) = cancel_token {
466 tokio::select! {
467 result = llm.complete(llm_request) => result?,
468 () = token.cancelled() => {
469 return finish_turn(
470 store, events, &req.session_id, &session,
471 FinishResult { content: "Turn cancelled.", tool_transcript, usage: total_usage, finish_reason: FinishReason::Cancelled },
472 ).await;
473 }
474 }
475 } else {
476 llm.complete(llm_request).await?
477 };
478
479 guard.record_step();
480 total_usage.prompt_tokens += llm_response.usage.prompt_tokens;
481 total_usage.completion_tokens += llm_response.usage.completion_tokens;
482 cost_meter.record_llm_usage(&req.session_id, model, &llm_response.usage).await?;
483
484 events.emit(AgentEvent::LlmCallCompleted { usage: llm_response.usage.clone() });
485
486 session
487 .messages
488 .push(Message { role: Role::Assistant, content: llm_response.content.clone() });
489
490 let _ = checkpoint_store
491 .save_checkpoint(&TurnCheckpoint {
492 session_id: req.session_id.clone(),
493 step: guard.steps,
494 tool_calls: guard.tool_calls,
495 usage: total_usage.clone(),
496 })
497 .await;
498
499 match parse_action_for_mode(dispatch_mode, llm, &llm_response) {
500 Ok(action) => {
501 consecutive_parse_failures = 0;
502 match action {
503 AgentAction::Final { content } => {
504 return finish_turn(
505 store,
506 events,
507 &req.session_id,
508 &session,
509 FinishResult {
510 content: &content,
511 tool_transcript,
512 usage: total_usage,
513 finish_reason: FinishReason::Stop,
514 },
515 )
516 .await;
517 }
518 AgentAction::AskUser { question } => {
519 return finish_turn(
520 store,
521 events,
522 &req.session_id,
523 &session,
524 FinishResult {
525 content: &question,
526 tool_transcript,
527 usage: total_usage,
528 finish_reason: FinishReason::Stop,
529 },
530 )
531 .await;
532 }
533 AgentAction::ToolCall { name, arguments } => {
534 events.emit(AgentEvent::ToolCallStarted { name: name.clone() });
535 let approval_context = ApprovalContext {
536 session_id: req.session_id.clone(),
537 turn_step: guard.steps.max(1),
538 selected_skills: selected_skills.clone(),
539 };
540
541 let tool_result = execute_tool_call(
542 tools,
543 &mut guard,
544 ToolCall { name: name.clone(), arguments },
545 &tool_call_policy,
546 tool_policy_port,
547 approval_port,
548 &approval_context,
549 policy.tool_timeout_ms,
550 )
551 .await;
552
553 guard.record_tool_call();
554 let _ = cost_meter.record_tool_result(&req.session_id, &tool_result).await;
555
556 let is_error = tool_result.is_error;
557 events.emit(AgentEvent::ToolCallCompleted { name: name.clone(), is_error });
558
559 let output_str =
560 serde_json::to_string(&tool_result.output).unwrap_or_default();
561 session.messages.push(Message { role: Role::Tool, content: output_str });
562
563 let _ = artifact_store
564 .put(ArtifactRecord {
565 session_id: req.session_id.clone(),
566 kind: "tool_result".to_string(),
567 name: name.clone(),
568 content: tool_result.output.clone(),
569 })
570 .await;
571
572 tool_transcript.push(tool_result);
573 }
574 }
575 }
576 Err(_parse_err) => {
577 consecutive_parse_failures += 1;
578 if consecutive_parse_failures >= 2 {
579 let _ = store.save(&req.session_id, &session).await;
580 return Err(AgentError::Internal(
581 "LLM produced invalid JSON after re-prompt".into(),
582 ));
583 }
584 session.messages.push(Message {
585 role: Role::User,
586 content: "Your response was not valid JSON. \
587 Please respond with exactly one JSON object \
588 matching the required schema."
589 .into(),
590 });
591 }
592 }
593 }
594}
595
596struct FinishResult<'a> {
598 content: &'a str,
599 tool_transcript: Vec<ToolResult>,
600 usage: TokenUsage,
601 finish_reason: FinishReason,
602}
603
604async fn finish_turn(
606 store: &dyn SessionStore,
607 events: &dyn EventSink,
608 session_id: &bob_core::types::SessionId,
609 session: &bob_core::types::SessionState,
610 result: FinishResult<'_>,
611) -> Result<AgentRunResult, AgentError> {
612 store.save(session_id, session).await?;
613 events.emit(AgentEvent::TurnCompleted { finish_reason: result.finish_reason });
614 Ok(AgentRunResult::Finished(AgentResponse {
615 content: result.content.to_string(),
616 tool_transcript: result.tool_transcript,
617 usage: result.usage,
618 finish_reason: result.finish_reason,
619 }))
620}
621
622pub async fn run_turn_stream(
624 llm: Arc<dyn LlmPort>,
625 tools: Arc<dyn ToolPort>,
626 store: Arc<dyn SessionStore>,
627 events: Arc<dyn EventSink>,
628 req: AgentRequest,
629 policy: TurnPolicy,
630 default_model: String,
631) -> Result<AgentEventStream, AgentError> {
632 let tool_policy: Arc<dyn ToolPolicyPort> = Arc::new(crate::DefaultToolPolicyPort);
633 let approval: Arc<dyn ApprovalPort> = Arc::new(crate::AllowAllApprovalPort);
634 let checkpoint_store: Arc<dyn TurnCheckpointStorePort> =
635 Arc::new(crate::NoOpCheckpointStorePort);
636 let artifact_store: Arc<dyn ArtifactStorePort> = Arc::new(crate::NoOpArtifactStorePort);
637 let cost_meter: Arc<dyn CostMeterPort> = Arc::new(crate::NoOpCostMeterPort);
638 run_turn_stream_with_controls(
639 llm,
640 tools,
641 store,
642 events,
643 req,
644 policy,
645 default_model,
646 tool_policy,
647 approval,
648 crate::DispatchMode::NativePreferred,
649 checkpoint_store,
650 artifact_store,
651 cost_meter,
652 )
653 .await
654}
655
656#[expect(
658 clippy::too_many_arguments,
659 reason = "streaming entrypoint exposes all ports and controls explicitly for composition roots"
660)]
661pub(crate) async fn run_turn_stream_with_controls(
662 llm: Arc<dyn LlmPort>,
663 tools: Arc<dyn ToolPort>,
664 store: Arc<dyn SessionStore>,
665 events: Arc<dyn EventSink>,
666 req: AgentRequest,
667 policy: TurnPolicy,
668 default_model: String,
669 tool_policy: Arc<dyn ToolPolicyPort>,
670 approval: Arc<dyn ApprovalPort>,
671 dispatch_mode: crate::DispatchMode,
672 checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
673 artifact_store: Arc<dyn ArtifactStorePort>,
674 cost_meter: Arc<dyn CostMeterPort>,
675) -> Result<AgentEventStream, AgentError> {
676 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<AgentStreamEvent>();
677 let config = StreamRunConfig {
678 policy,
679 default_model,
680 tool_policy,
681 approval,
682 dispatch_mode,
683 checkpoint_store,
684 artifact_store,
685 cost_meter,
686 };
687
688 tokio::spawn(async move {
689 if let Err(err) = run_turn_stream_inner(llm, tools, store, events, req, &config, &tx).await
690 {
691 let _ = tx.send(AgentStreamEvent::Error { error: err.to_string() });
692 }
693 });
694
695 Ok(Box::pin(UnboundedReceiverStream::new(rx)))
696}
697
698struct StreamRunConfig {
699 policy: TurnPolicy,
700 default_model: String,
701 tool_policy: Arc<dyn ToolPolicyPort>,
702 approval: Arc<dyn ApprovalPort>,
703 dispatch_mode: crate::DispatchMode,
704 checkpoint_store: Arc<dyn TurnCheckpointStorePort>,
705 artifact_store: Arc<dyn ArtifactStorePort>,
706 cost_meter: Arc<dyn CostMeterPort>,
707}
708
709async fn run_turn_stream_inner(
710 llm: Arc<dyn LlmPort>,
711 tools: Arc<dyn ToolPort>,
712 store: Arc<dyn SessionStore>,
713 events: Arc<dyn EventSink>,
714 req: AgentRequest,
715 config: &StreamRunConfig,
716 tx: &tokio::sync::mpsc::UnboundedSender<AgentStreamEvent>,
717) -> Result<(), AgentError> {
718 let model = req.model.as_deref().unwrap_or(&config.default_model);
719 let cancel_token = req.cancel_token.clone();
720 let system_instructions = resolve_system_instructions(&req);
721 let selected_skills = resolve_selected_skills(&req);
722 let tool_call_policy = resolve_tool_call_policy(&req);
723
724 let mut session = store.load(&req.session_id).await?.unwrap_or_default();
725 let tool_descriptors = tools.list_tools().await?;
726 let mut guard = LoopGuard::new(config.policy.clone());
727 let mut total_usage = TokenUsage::default();
728 let mut consecutive_parse_failures: u32 = 0;
729 let mut next_call_id: u64 = 0;
730
731 events.emit(AgentEvent::TurnStarted { session_id: req.session_id.clone() });
732 if !selected_skills.is_empty() {
733 events.emit(AgentEvent::SkillsSelected { skill_names: selected_skills.clone() });
734 }
735 session.messages.push(Message { role: Role::User, content: req.input.clone() });
736
737 loop {
738 if let Some(ref token) = cancel_token &&
739 token.is_cancelled()
740 {
741 events.emit(AgentEvent::Error { error: "turn cancelled".to_string() });
742 events.emit(AgentEvent::TurnCompleted { finish_reason: FinishReason::Cancelled });
743 store.save(&req.session_id, &session).await?;
744 let _ = tx.send(AgentStreamEvent::Error { error: "turn cancelled".to_string() });
745 let _ = tx.send(AgentStreamEvent::Finished { usage: total_usage.clone() });
746 return Ok(());
747 }
748
749 config.cost_meter.check_budget(&req.session_id).await?;
750
751 if !guard.can_continue() {
752 let reason = guard.reason();
753 let msg = format!("Turn stopped: {reason:?}");
754 events.emit(AgentEvent::Error { error: msg.clone() });
755 events.emit(AgentEvent::TurnCompleted { finish_reason: FinishReason::GuardExceeded });
756 store.save(&req.session_id, &session).await?;
757 let _ = tx.send(AgentStreamEvent::Error { error: msg });
758 let _ = tx.send(AgentStreamEvent::Finished { usage: total_usage.clone() });
759 return Ok(());
760 }
761
762 let llm_request = crate::prompt::build_llm_request_with_options(
763 model,
764 &session,
765 &tool_descriptors,
766 &system_instructions,
767 prompt_options_for_mode(config.dispatch_mode, llm.as_ref()),
768 );
769 events.emit(AgentEvent::LlmCallStarted { model: model.to_string() });
770
771 let mut assistant_content = String::new();
772 let mut llm_usage = TokenUsage::default();
773 let mut llm_tool_calls: Vec<ToolCall> = Vec::new();
774 let mut fallback_to_complete = false;
775
776 match llm.complete_stream(llm_request.clone()).await {
777 Ok(mut stream) => {
778 while let Some(item) = stream.next().await {
779 match item {
780 Ok(bob_core::types::LlmStreamChunk::TextDelta(delta)) => {
781 assistant_content.push_str(&delta);
782 let _ = tx.send(AgentStreamEvent::TextDelta { content: delta });
783 }
784 Ok(bob_core::types::LlmStreamChunk::Done { usage }) => {
785 llm_usage = usage;
786 }
787 Err(err) => {
788 events.emit(AgentEvent::Error { error: err.to_string() });
789 return Err(AgentError::Llm(err));
790 }
791 }
792 }
793 }
794 Err(err) => {
795 fallback_to_complete = true;
796 events.emit(AgentEvent::Error { error: err.to_string() });
797 }
798 }
799
800 if fallback_to_complete {
802 let llm_response = llm.complete(llm_request).await?;
803 assistant_content = llm_response.content.clone();
804 llm_usage = llm_response.usage;
805 llm_tool_calls = llm_response.tool_calls;
806 let _ = tx.send(AgentStreamEvent::TextDelta { content: llm_response.content });
807 }
808
809 guard.record_step();
810 total_usage.prompt_tokens += llm_usage.prompt_tokens;
811 total_usage.completion_tokens += llm_usage.completion_tokens;
812 config.cost_meter.record_llm_usage(&req.session_id, model, &llm_usage).await?;
813 events.emit(AgentEvent::LlmCallCompleted { usage: llm_usage.clone() });
814 session
815 .messages
816 .push(Message { role: Role::Assistant, content: assistant_content.clone() });
817
818 let _ = config
819 .checkpoint_store
820 .save_checkpoint(&TurnCheckpoint {
821 session_id: req.session_id.clone(),
822 step: guard.steps,
823 tool_calls: guard.tool_calls,
824 usage: total_usage.clone(),
825 })
826 .await;
827
828 let response_for_dispatch = bob_core::types::LlmResponse {
829 content: assistant_content.clone(),
830 usage: llm_usage.clone(),
831 finish_reason: FinishReason::Stop,
832 tool_calls: llm_tool_calls,
833 };
834
835 if let Ok(action) =
836 parse_action_for_mode(config.dispatch_mode, llm.as_ref(), &response_for_dispatch)
837 {
838 consecutive_parse_failures = 0;
839 match action {
840 AgentAction::Final { .. } | AgentAction::AskUser { .. } => {
841 store.save(&req.session_id, &session).await?;
842 events.emit(AgentEvent::TurnCompleted { finish_reason: FinishReason::Stop });
843 let _ = tx.send(AgentStreamEvent::Finished { usage: total_usage.clone() });
844 return Ok(());
845 }
846 AgentAction::ToolCall { name, arguments } => {
847 events.emit(AgentEvent::ToolCallStarted { name: name.clone() });
848 next_call_id += 1;
849 let call_id = format!("call-{next_call_id}");
850 let _ = tx.send(AgentStreamEvent::ToolCallStarted {
851 name: name.clone(),
852 call_id: call_id.clone(),
853 });
854 let approval_context = ApprovalContext {
855 session_id: req.session_id.clone(),
856 turn_step: guard.steps.max(1),
857 selected_skills: selected_skills.clone(),
858 };
859
860 let tool_result = execute_tool_call(
861 tools.as_ref(),
862 &mut guard,
863 ToolCall { name: name.clone(), arguments },
864 &tool_call_policy,
865 config.tool_policy.as_ref(),
866 config.approval.as_ref(),
867 &approval_context,
868 config.policy.tool_timeout_ms,
869 )
870 .await;
871
872 guard.record_tool_call();
873 let _ =
874 config.cost_meter.record_tool_result(&req.session_id, &tool_result).await;
875 let is_error = tool_result.is_error;
876 events.emit(AgentEvent::ToolCallCompleted { name: name.clone(), is_error });
877 let _ = tx.send(AgentStreamEvent::ToolCallCompleted {
878 call_id,
879 result: tool_result.clone(),
880 });
881
882 let output_str = serde_json::to_string(&tool_result.output).unwrap_or_default();
883 session.messages.push(Message { role: Role::Tool, content: output_str });
884 let _ = config
885 .artifact_store
886 .put(ArtifactRecord {
887 session_id: req.session_id.clone(),
888 kind: "tool_result".to_string(),
889 name: name.clone(),
890 content: tool_result.output.clone(),
891 })
892 .await;
893 }
894 }
895 } else {
896 consecutive_parse_failures += 1;
897 if consecutive_parse_failures >= 2 {
898 store.save(&req.session_id, &session).await?;
899 events.emit(AgentEvent::Error {
900 error: "LLM produced invalid JSON after re-prompt".to_string(),
901 });
902 return Err(AgentError::Internal(
903 "LLM produced invalid JSON after re-prompt".into(),
904 ));
905 }
906 session.messages.push(Message {
907 role: Role::User,
908 content: "Your response was not valid JSON. \
909 Please respond with exactly one JSON object \
910 matching the required schema."
911 .into(),
912 });
913 }
914 }
915}
916
917#[cfg(test)]
918mod tests {
919 use super::*;
920
921 fn test_policy() -> TurnPolicy {
923 TurnPolicy {
924 max_steps: 3,
925 max_tool_calls: 2,
926 max_consecutive_errors: 2,
927 turn_timeout_ms: 100,
928 tool_timeout_ms: 50,
929 }
930 }
931
932 #[test]
933 fn trips_on_max_steps() {
934 let mut guard = LoopGuard::new(test_policy());
935 assert!(guard.can_continue());
936
937 for _ in 0..3 {
938 guard.record_step();
939 }
940
941 assert!(!guard.can_continue(), "guard should trip after reaching max_steps");
942 assert_eq!(guard.reason(), GuardReason::MaxSteps);
943 }
944
945 #[test]
946 fn trips_on_max_tool_calls() {
947 let mut guard = LoopGuard::new(test_policy());
948 assert!(guard.can_continue());
949
950 for _ in 0..2 {
951 guard.record_tool_call();
952 }
953
954 assert!(!guard.can_continue(), "guard should trip after reaching max_tool_calls");
955 assert_eq!(guard.reason(), GuardReason::MaxToolCalls);
956 }
957
958 #[test]
959 fn trips_on_max_consecutive_errors() {
960 let mut guard = LoopGuard::new(test_policy());
961 assert!(guard.can_continue());
962
963 for _ in 0..2 {
964 guard.record_error();
965 }
966
967 assert!(!guard.can_continue(), "guard should trip after reaching max_consecutive_errors");
968 assert_eq!(guard.reason(), GuardReason::MaxConsecutiveErrors);
969 }
970
971 #[tokio::test]
972 async fn trips_on_timeout() {
973 let guard = LoopGuard::new(test_policy());
974 assert!(guard.can_continue());
975 assert!(!guard.timed_out());
976
977 tokio::time::sleep(std::time::Duration::from_millis(150)).await;
979
980 assert!(!guard.can_continue(), "guard should trip after timeout");
981 assert!(guard.timed_out());
982 assert_eq!(guard.reason(), GuardReason::TurnTimeout);
983 }
984
985 #[test]
986 fn reset_errors_clears_counter() {
987 let mut guard = LoopGuard::new(test_policy());
988
989 guard.record_error();
990 guard.reset_errors();
991
992 guard.record_error();
994 assert!(guard.can_continue(), "single error after reset should not trip guard");
995 }
996
997 use std::{
1000 collections::{HashMap, VecDeque},
1001 sync::{Arc, Mutex},
1002 };
1003
1004 use bob_core::{
1005 error::{CostError, LlmError, StoreError, ToolError},
1006 ports::{
1007 ApprovalPort, ArtifactStorePort, CostMeterPort, EventSink, LlmPort, SessionStore,
1008 ToolPolicyPort, ToolPort, TurnCheckpointStorePort,
1009 },
1010 types::{
1011 AgentEvent, AgentRequest, AgentRunResult, AgentStreamEvent, ApprovalContext,
1012 ApprovalDecision, ArtifactRecord, CancelToken, LlmRequest, LlmResponse, LlmStream,
1013 LlmStreamChunk, SessionId, SessionState, ToolCall, ToolDescriptor, ToolResult,
1014 ToolSource, TurnCheckpoint,
1015 },
1016 };
1017 use futures_util::StreamExt;
1018
1019 struct SequentialLlm {
1023 responses: Mutex<VecDeque<Result<LlmResponse, LlmError>>>,
1024 }
1025
1026 impl SequentialLlm {
1027 fn from_contents(contents: Vec<&str>) -> Self {
1028 let responses = contents
1029 .into_iter()
1030 .map(|c| {
1031 Ok(LlmResponse {
1032 content: c.to_string(),
1033 usage: TokenUsage::default(),
1034 finish_reason: FinishReason::Stop,
1035 tool_calls: Vec::new(),
1036 })
1037 })
1038 .collect();
1039 Self { responses: Mutex::new(responses) }
1040 }
1041 }
1042
1043 #[async_trait::async_trait]
1044 impl LlmPort for SequentialLlm {
1045 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1046 let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
1047 q.pop_front().unwrap_or_else(|| {
1048 Ok(LlmResponse {
1049 content: r#"{"type": "final", "content": "fallback"}"#.to_string(),
1050 usage: TokenUsage::default(),
1051 finish_reason: FinishReason::Stop,
1052 tool_calls: Vec::new(),
1053 })
1054 })
1055 }
1056
1057 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1058 Err(LlmError::Provider("not implemented".into()))
1059 }
1060 }
1061
1062 struct MockToolPort {
1064 tools: Vec<ToolDescriptor>,
1065 call_results: Mutex<VecDeque<Result<ToolResult, ToolError>>>,
1066 }
1067
1068 impl MockToolPort {
1069 fn empty() -> Self {
1070 Self { tools: vec![], call_results: Mutex::new(VecDeque::new()) }
1071 }
1072
1073 fn with_tool_and_results(
1074 tool_name: &str,
1075 results: Vec<Result<ToolResult, ToolError>>,
1076 ) -> Self {
1077 Self {
1078 tools: vec![ToolDescriptor {
1079 id: tool_name.to_string(),
1080 description: format!("{tool_name} tool"),
1081 input_schema: serde_json::json!({"type": "object"}),
1082 source: ToolSource::Local,
1083 }],
1084 call_results: Mutex::new(results.into()),
1085 }
1086 }
1087 }
1088
1089 #[async_trait::async_trait]
1090 impl ToolPort for MockToolPort {
1091 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1092 Ok(self.tools.clone())
1093 }
1094
1095 async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
1096 let mut q = self.call_results.lock().unwrap_or_else(|p| p.into_inner());
1097 q.pop_front().unwrap_or_else(|| {
1098 Ok(ToolResult {
1099 name: call.name,
1100 output: serde_json::json!({"result": "default"}),
1101 is_error: false,
1102 })
1103 })
1104 }
1105 }
1106
1107 struct NoCallToolPort {
1108 tools: Vec<ToolDescriptor>,
1109 }
1110
1111 #[async_trait::async_trait]
1112 impl ToolPort for NoCallToolPort {
1113 async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
1114 Ok(self.tools.clone())
1115 }
1116
1117 async fn call_tool(&self, _call: ToolCall) -> Result<ToolResult, ToolError> {
1118 Err(ToolError::Execution(
1119 "tool call should be blocked by policy before execution".to_string(),
1120 ))
1121 }
1122 }
1123
1124 struct AllowAllPolicyPort;
1125
1126 impl ToolPolicyPort for AllowAllPolicyPort {
1127 fn is_tool_allowed(
1128 &self,
1129 _tool: &str,
1130 _deny_tools: &[String],
1131 _allow_tools: Option<&[String]>,
1132 ) -> bool {
1133 true
1134 }
1135 }
1136
1137 struct DenySearchPolicyPort;
1138
1139 impl ToolPolicyPort for DenySearchPolicyPort {
1140 fn is_tool_allowed(
1141 &self,
1142 tool: &str,
1143 _deny_tools: &[String],
1144 _allow_tools: Option<&[String]>,
1145 ) -> bool {
1146 tool != "search"
1147 }
1148 }
1149
1150 struct AlwaysApprovePort;
1151
1152 #[async_trait::async_trait]
1153 impl ApprovalPort for AlwaysApprovePort {
1154 async fn approve_tool_call(
1155 &self,
1156 _call: &ToolCall,
1157 _context: &ApprovalContext,
1158 ) -> Result<ApprovalDecision, ToolError> {
1159 Ok(ApprovalDecision::Approved)
1160 }
1161 }
1162
1163 struct AlwaysDenyApprovalPort;
1164
1165 #[async_trait::async_trait]
1166 impl ApprovalPort for AlwaysDenyApprovalPort {
1167 async fn approve_tool_call(
1168 &self,
1169 _call: &ToolCall,
1170 _context: &ApprovalContext,
1171 ) -> Result<ApprovalDecision, ToolError> {
1172 Ok(ApprovalDecision::Denied {
1173 reason: "approval policy rejected tool call".to_string(),
1174 })
1175 }
1176 }
1177
1178 struct CountingCheckpointPort {
1179 saved: Mutex<Vec<TurnCheckpoint>>,
1180 }
1181
1182 impl CountingCheckpointPort {
1183 fn new() -> Self {
1184 Self { saved: Mutex::new(Vec::new()) }
1185 }
1186 }
1187
1188 #[async_trait::async_trait]
1189 impl TurnCheckpointStorePort for CountingCheckpointPort {
1190 async fn save_checkpoint(&self, checkpoint: &TurnCheckpoint) -> Result<(), StoreError> {
1191 self.saved.lock().unwrap_or_else(|p| p.into_inner()).push(checkpoint.clone());
1192 Ok(())
1193 }
1194
1195 async fn load_latest(
1196 &self,
1197 _session_id: &SessionId,
1198 ) -> Result<Option<TurnCheckpoint>, StoreError> {
1199 Ok(None)
1200 }
1201 }
1202
1203 struct NoopArtifactStore;
1204
1205 #[async_trait::async_trait]
1206 impl ArtifactStorePort for NoopArtifactStore {
1207 async fn put(&self, _artifact: ArtifactRecord) -> Result<(), StoreError> {
1208 Ok(())
1209 }
1210
1211 async fn list_by_session(
1212 &self,
1213 _session_id: &SessionId,
1214 ) -> Result<Vec<ArtifactRecord>, StoreError> {
1215 Ok(Vec::new())
1216 }
1217 }
1218
1219 struct CountingCostMeter {
1220 llm_calls: Mutex<u32>,
1221 }
1222
1223 impl CountingCostMeter {
1224 fn new() -> Self {
1225 Self { llm_calls: Mutex::new(0) }
1226 }
1227 }
1228
1229 #[async_trait::async_trait]
1230 impl CostMeterPort for CountingCostMeter {
1231 async fn check_budget(&self, _session_id: &SessionId) -> Result<(), CostError> {
1232 Ok(())
1233 }
1234
1235 async fn record_llm_usage(
1236 &self,
1237 _session_id: &SessionId,
1238 _model: &str,
1239 _usage: &TokenUsage,
1240 ) -> Result<(), CostError> {
1241 let mut count = self.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
1242 *count += 1;
1243 Ok(())
1244 }
1245
1246 async fn record_tool_result(
1247 &self,
1248 _session_id: &SessionId,
1249 _tool_result: &ToolResult,
1250 ) -> Result<(), CostError> {
1251 Ok(())
1252 }
1253 }
1254
1255 struct MemoryStore {
1256 data: Mutex<HashMap<SessionId, SessionState>>,
1257 }
1258
1259 impl MemoryStore {
1260 fn new() -> Self {
1261 Self { data: Mutex::new(HashMap::new()) }
1262 }
1263 }
1264
1265 struct FailingSaveStore;
1266
1267 #[async_trait::async_trait]
1268 impl SessionStore for FailingSaveStore {
1269 async fn load(&self, _id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1270 Ok(None)
1271 }
1272
1273 async fn save(&self, _id: &SessionId, _state: &SessionState) -> Result<(), StoreError> {
1274 Err(StoreError::Backend("simulated save failure".into()))
1275 }
1276 }
1277
1278 #[async_trait::async_trait]
1279 impl SessionStore for MemoryStore {
1280 async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
1281 let map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1282 Ok(map.get(id).cloned())
1283 }
1284
1285 async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
1286 let mut map = self.data.lock().unwrap_or_else(|p| p.into_inner());
1287 map.insert(id.clone(), state.clone());
1288 Ok(())
1289 }
1290 }
1291
1292 struct CollectingSink {
1293 events: Mutex<Vec<AgentEvent>>,
1294 }
1295
1296 impl CollectingSink {
1297 fn new() -> Self {
1298 Self { events: Mutex::new(Vec::new()) }
1299 }
1300
1301 fn event_count(&self) -> usize {
1302 self.events.lock().unwrap_or_else(|p| p.into_inner()).len()
1303 }
1304
1305 fn all_events(&self) -> Vec<AgentEvent> {
1306 self.events.lock().unwrap_or_else(|p| p.into_inner()).clone()
1307 }
1308 }
1309
1310 impl EventSink for CollectingSink {
1311 fn emit(&self, event: AgentEvent) {
1312 self.events.lock().unwrap_or_else(|p| p.into_inner()).push(event);
1313 }
1314 }
1315
1316 fn make_request(input: &str) -> AgentRequest {
1317 AgentRequest {
1318 input: input.into(),
1319 session_id: "test-session".into(),
1320 model: None,
1321 context: bob_core::types::RequestContext::default(),
1322 cancel_token: None,
1323 }
1324 }
1325
1326 fn generous_policy() -> TurnPolicy {
1327 TurnPolicy {
1328 max_steps: 20,
1329 max_tool_calls: 10,
1330 max_consecutive_errors: 3,
1331 turn_timeout_ms: 30_000,
1332 tool_timeout_ms: 5_000,
1333 }
1334 }
1335
1336 struct StreamLlm {
1337 chunks: Mutex<VecDeque<Result<LlmStreamChunk, LlmError>>>,
1338 }
1339
1340 impl StreamLlm {
1341 fn new(chunks: Vec<Result<LlmStreamChunk, LlmError>>) -> Self {
1342 Self { chunks: Mutex::new(chunks.into()) }
1343 }
1344 }
1345
1346 #[async_trait::async_trait]
1347 impl LlmPort for StreamLlm {
1348 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1349 Err(LlmError::Provider("complete() should not be called in stream test".into()))
1350 }
1351
1352 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1353 let mut chunks = self.chunks.lock().unwrap_or_else(|p| p.into_inner());
1354 let items: Vec<Result<LlmStreamChunk, LlmError>> = chunks.drain(..).collect();
1355 Ok(Box::pin(futures_util::stream::iter(items)))
1356 }
1357 }
1358
1359 struct InspectingLlm {
1360 expected_substring: String,
1361 }
1362
1363 #[async_trait::async_trait]
1364 impl LlmPort for InspectingLlm {
1365 async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError> {
1366 let system = req
1367 .messages
1368 .iter()
1369 .find(|m| m.role == Role::System)
1370 .map(|m| m.content.clone())
1371 .unwrap_or_default();
1372 if !system.contains(&self.expected_substring) {
1373 return Err(LlmError::Provider(format!(
1374 "expected system prompt to include '{}', got: {}",
1375 self.expected_substring, system
1376 )));
1377 }
1378 Ok(LlmResponse {
1379 content: r#"{"type": "final", "content": "ok"}"#.to_string(),
1380 usage: TokenUsage::default(),
1381 finish_reason: FinishReason::Stop,
1382 tool_calls: Vec::new(),
1383 })
1384 }
1385
1386 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1387 Err(LlmError::Provider("not used".into()))
1388 }
1389 }
1390
1391 #[tokio::test]
1394 async fn tc01_simple_final_response() {
1395 let llm =
1396 SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "Hello there!"}"#]);
1397 let tools = MockToolPort::empty();
1398 let store = MemoryStore::new();
1399 let sink = CollectingSink::new();
1400
1401 let result = run_turn(
1402 &llm,
1403 &tools,
1404 &store,
1405 &sink,
1406 make_request("Hi"),
1407 &generous_policy(),
1408 "test-model",
1409 )
1410 .await;
1411
1412 assert!(
1413 matches!(&result, Ok(AgentRunResult::Finished(_))),
1414 "expected Finished, got {result:?}"
1415 );
1416 let resp = match result {
1417 Ok(AgentRunResult::Finished(r)) => r,
1418 _ => return,
1419 };
1420
1421 assert_eq!(resp.content, "Hello there!");
1422 assert_eq!(resp.finish_reason, FinishReason::Stop);
1423 assert!(resp.tool_transcript.is_empty());
1424 assert!(sink.event_count() >= 3, "should emit TurnStarted, LlmCall*, TurnCompleted");
1425 }
1426
1427 #[tokio::test]
1430 async fn tc02_tool_call_then_final() {
1431 let llm = SequentialLlm::from_contents(vec![
1432 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1433 r#"{"type": "final", "content": "Found results."}"#,
1434 ]);
1435 let tools = MockToolPort::with_tool_and_results(
1436 "search",
1437 vec![Ok(ToolResult {
1438 name: "search".into(),
1439 output: serde_json::json!({"hits": 42}),
1440 is_error: false,
1441 })],
1442 );
1443 let store = MemoryStore::new();
1444 let sink = CollectingSink::new();
1445
1446 let result = run_turn(
1447 &llm,
1448 &tools,
1449 &store,
1450 &sink,
1451 make_request("Search for rust"),
1452 &generous_policy(),
1453 "test-model",
1454 )
1455 .await;
1456
1457 assert!(
1458 matches!(&result, Ok(AgentRunResult::Finished(_))),
1459 "expected Finished, got {result:?}"
1460 );
1461 let resp = match result {
1462 Ok(AgentRunResult::Finished(r)) => r,
1463 _ => return,
1464 };
1465
1466 assert_eq!(resp.content, "Found results.");
1467 assert_eq!(resp.finish_reason, FinishReason::Stop);
1468 assert_eq!(resp.tool_transcript.len(), 1);
1469 assert_eq!(resp.tool_transcript[0].name, "search");
1470 assert!(!resp.tool_transcript[0].is_error);
1471 }
1472
1473 #[tokio::test]
1476 async fn tc03_parse_error_reprompt_success() {
1477 let llm = SequentialLlm::from_contents(vec![
1478 "This is not JSON at all.",
1479 r#"{"type": "final", "content": "Recovered"}"#,
1480 ]);
1481 let tools = MockToolPort::empty();
1482 let store = MemoryStore::new();
1483 let sink = CollectingSink::new();
1484
1485 let result = run_turn(
1486 &llm,
1487 &tools,
1488 &store,
1489 &sink,
1490 make_request("Hi"),
1491 &generous_policy(),
1492 "test-model",
1493 )
1494 .await;
1495
1496 assert!(
1497 matches!(&result, Ok(AgentRunResult::Finished(_))),
1498 "expected Finished after re-prompt, got {result:?}"
1499 );
1500 let resp = match result {
1501 Ok(AgentRunResult::Finished(r)) => r,
1502 _ => return,
1503 };
1504
1505 assert_eq!(resp.content, "Recovered");
1506 assert_eq!(resp.finish_reason, FinishReason::Stop);
1507 }
1508
1509 #[tokio::test]
1512 async fn tc04_double_parse_error() {
1513 let llm = SequentialLlm::from_contents(vec!["not json 1", "not json 2"]);
1514 let tools = MockToolPort::empty();
1515 let store = MemoryStore::new();
1516 let sink = CollectingSink::new();
1517
1518 let result = run_turn(
1519 &llm,
1520 &tools,
1521 &store,
1522 &sink,
1523 make_request("Hi"),
1524 &generous_policy(),
1525 "test-model",
1526 )
1527 .await;
1528
1529 assert!(result.is_err(), "should return error after two parse failures");
1530 let msg = match result {
1531 Err(err) => err.to_string(),
1532 Ok(value) => format!("unexpected success: {value:?}"),
1533 };
1534 assert!(msg.contains("invalid JSON"), "error message = {msg}");
1535 }
1536
1537 #[tokio::test]
1540 async fn tc05_max_steps_exhaustion() {
1541 let llm = SequentialLlm::from_contents(vec![
1543 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1544 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1545 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1546 r#"{"type": "tool_call", "name": "t1", "arguments": {}}"#,
1547 ]);
1548 let tools = MockToolPort::with_tool_and_results(
1549 "t1",
1550 vec![
1551 Ok(ToolResult {
1552 name: "t1".into(),
1553 output: serde_json::json!(null),
1554 is_error: false,
1555 }),
1556 Ok(ToolResult {
1557 name: "t1".into(),
1558 output: serde_json::json!(null),
1559 is_error: false,
1560 }),
1561 Ok(ToolResult {
1562 name: "t1".into(),
1563 output: serde_json::json!(null),
1564 is_error: false,
1565 }),
1566 ],
1567 );
1568 let store = MemoryStore::new();
1569 let sink = CollectingSink::new();
1570
1571 let policy = TurnPolicy {
1572 max_steps: 2,
1573 max_tool_calls: 10,
1574 max_consecutive_errors: 5,
1575 turn_timeout_ms: 30_000,
1576 tool_timeout_ms: 5_000,
1577 };
1578
1579 let result =
1580 run_turn(&llm, &tools, &store, &sink, make_request("do work"), &policy, "test-model")
1581 .await;
1582
1583 assert!(
1584 matches!(&result, Ok(AgentRunResult::Finished(_))),
1585 "expected Finished with GuardExceeded, got {result:?}"
1586 );
1587 let resp = match result {
1588 Ok(AgentRunResult::Finished(r)) => r,
1589 _ => return,
1590 };
1591
1592 assert_eq!(resp.finish_reason, FinishReason::GuardExceeded);
1593 assert!(resp.content.contains("MaxSteps"), "content = {}", resp.content);
1594 }
1595
1596 #[tokio::test]
1599 async fn tc06_cancellation() {
1600 let llm = SequentialLlm::from_contents(vec![
1601 r#"{"type": "final", "content": "should not reach"}"#,
1602 ]);
1603 let tools = MockToolPort::empty();
1604 let store = MemoryStore::new();
1605 let sink = CollectingSink::new();
1606
1607 let token = CancelToken::new();
1608 token.cancel();
1610
1611 let mut req = make_request("Hi");
1612 req.cancel_token = Some(token);
1613
1614 let result =
1615 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1616
1617 assert!(
1618 matches!(&result, Ok(AgentRunResult::Finished(_))),
1619 "expected Finished with Cancelled, got {result:?}"
1620 );
1621 let resp = match result {
1622 Ok(AgentRunResult::Finished(r)) => r,
1623 _ => return,
1624 };
1625
1626 assert_eq!(resp.finish_reason, FinishReason::Cancelled);
1627 }
1628
1629 #[tokio::test]
1632 async fn tc07_tool_error_then_final() {
1633 let llm = SequentialLlm::from_contents(vec![
1634 r#"{"type": "tool_call", "name": "flaky_tool", "arguments": {}}"#,
1635 r#"{"type": "final", "content": "Recovered from tool error."}"#,
1636 ]);
1637 let tools = MockToolPort::with_tool_and_results(
1638 "flaky_tool",
1639 vec![Err(ToolError::Execution("connection refused".into()))],
1640 );
1641 let store = MemoryStore::new();
1642 let sink = CollectingSink::new();
1643
1644 let result = run_turn(
1645 &llm,
1646 &tools,
1647 &store,
1648 &sink,
1649 make_request("call flaky"),
1650 &generous_policy(),
1651 "test-model",
1652 )
1653 .await;
1654
1655 assert!(
1656 matches!(&result, Ok(AgentRunResult::Finished(_))),
1657 "expected Finished, got {result:?}"
1658 );
1659 let resp = match result {
1660 Ok(AgentRunResult::Finished(r)) => r,
1661 _ => return,
1662 };
1663
1664 assert_eq!(resp.content, "Recovered from tool error.");
1665 assert_eq!(resp.tool_transcript.len(), 1);
1666 assert!(resp.tool_transcript[0].is_error);
1667 }
1668
1669 #[tokio::test]
1670 async fn tc08_save_failure_is_propagated() {
1671 let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "done"}"#]);
1672 let tools = MockToolPort::empty();
1673 let store = FailingSaveStore;
1674 let sink = CollectingSink::new();
1675
1676 let result = run_turn(
1677 &llm,
1678 &tools,
1679 &store,
1680 &sink,
1681 make_request("hello"),
1682 &generous_policy(),
1683 "test-model",
1684 )
1685 .await;
1686
1687 assert!(matches!(result, Err(AgentError::Store(_))), "expected Store error to be returned");
1688 }
1689
1690 #[tokio::test]
1691 async fn tc09_stream_turn_emits_text_and_finished() {
1692 let llm: Arc<dyn LlmPort> = Arc::new(StreamLlm::new(vec![
1693 Ok(LlmStreamChunk::TextDelta("{\"type\":\"final\",\"content\":\"he".into())),
1694 Ok(LlmStreamChunk::TextDelta("llo\"}".into())),
1695 Ok(LlmStreamChunk::Done {
1696 usage: TokenUsage { prompt_tokens: 3, completion_tokens: 4 },
1697 }),
1698 ]));
1699 let tools: Arc<dyn ToolPort> = Arc::new(MockToolPort::empty());
1700 let store: Arc<dyn SessionStore> = Arc::new(MemoryStore::new());
1701 let sink: Arc<dyn EventSink> = Arc::new(CollectingSink::new());
1702
1703 let stream_result = run_turn_stream(
1704 llm,
1705 tools,
1706 store,
1707 sink,
1708 make_request("hello"),
1709 generous_policy(),
1710 "test-model".to_string(),
1711 )
1712 .await;
1713 assert!(stream_result.is_ok(), "run_turn_stream should produce a stream");
1714 let mut stream = match stream_result {
1715 Ok(stream) => stream,
1716 Err(_) => return,
1717 };
1718
1719 let mut saw_text = false;
1720 let mut saw_finished = false;
1721 while let Some(event) = stream.next().await {
1722 match event {
1723 AgentStreamEvent::TextDelta { content } => {
1724 saw_text = saw_text || !content.is_empty();
1725 }
1726 AgentStreamEvent::Finished { usage } => {
1727 saw_finished = true;
1728 assert_eq!(usage.prompt_tokens, 3);
1729 assert_eq!(usage.completion_tokens, 4);
1730 }
1731 AgentStreamEvent::ToolCallStarted { .. } |
1732 AgentStreamEvent::ToolCallCompleted { .. } |
1733 AgentStreamEvent::Error { .. } => {}
1734 }
1735 }
1736
1737 assert!(saw_text, "expected at least one text delta");
1738 assert!(saw_finished, "expected a finished event");
1739 }
1740
1741 #[tokio::test]
1742 async fn tc10_skills_prompt_context_is_injected() {
1743 let llm = InspectingLlm { expected_substring: "Skill: rust-review".to_string() };
1744 let tools = MockToolPort::empty();
1745 let store = MemoryStore::new();
1746 let sink = CollectingSink::new();
1747
1748 let mut req = make_request("review this code");
1749 req.context.system_prompt = Some("Skill: rust-review\nUse strict checks.".to_string());
1750
1751 let result =
1752 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1753
1754 assert!(result.is_ok(), "run should succeed when skills prompt is injected");
1755 }
1756
1757 #[tokio::test]
1758 async fn tc11_selected_skills_context_emits_event() {
1759 let llm =
1760 SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "looks good"}"#]);
1761 let tools = MockToolPort::empty();
1762 let store = MemoryStore::new();
1763 let sink = CollectingSink::new();
1764
1765 let mut req = make_request("review code");
1766 req.context.selected_skills = vec!["rust-review".to_string(), "security-audit".to_string()];
1767
1768 let result =
1769 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1770 assert!(result.is_ok(), "run should succeed");
1771
1772 let events = sink.all_events();
1773 assert!(
1774 events.iter().any(|event| matches!(
1775 event,
1776 AgentEvent::SkillsSelected { skill_names }
1777 if skill_names == &vec!["rust-review".to_string(), "security-audit".to_string()]
1778 )),
1779 "skills.selected event should be emitted with context skill names"
1780 );
1781 }
1782
1783 #[tokio::test]
1784 async fn tc12_policy_deny_tool_blocks_execution() {
1785 let llm = SequentialLlm::from_contents(vec![
1786 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1787 r#"{"type": "final", "content": "done"}"#,
1788 ]);
1789 let tools = NoCallToolPort {
1790 tools: vec![ToolDescriptor {
1791 id: "search".to_string(),
1792 description: "search tool".to_string(),
1793 input_schema: serde_json::json!({"type":"object"}),
1794 source: ToolSource::Local,
1795 }],
1796 };
1797 let store = MemoryStore::new();
1798 let sink = CollectingSink::new();
1799
1800 let mut req = make_request("search rust");
1801 req.context.tool_policy.deny_tools =
1802 vec!["search".to_string(), "local/shell_exec".to_string()];
1803
1804 let result =
1805 run_turn(&llm, &tools, &store, &sink, req, &generous_policy(), "test-model").await;
1806 assert!(
1807 matches!(&result, Ok(AgentRunResult::Finished(_))),
1808 "expected finished response, got {result:?}"
1809 );
1810 let resp = match result {
1811 Ok(AgentRunResult::Finished(r)) => r,
1812 _ => return,
1813 };
1814
1815 assert_eq!(resp.finish_reason, FinishReason::Stop);
1816 assert_eq!(resp.tool_transcript.len(), 1);
1817 assert!(resp.tool_transcript[0].is_error);
1818 assert!(
1819 resp.tool_transcript[0].output.to_string().contains("denied"),
1820 "tool error should explain policy denial"
1821 );
1822 }
1823
1824 #[tokio::test]
1825 async fn tc13_approval_denied_blocks_execution() {
1826 let llm = SequentialLlm::from_contents(vec![
1827 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1828 r#"{"type": "final", "content": "done"}"#,
1829 ]);
1830 let tools = NoCallToolPort {
1831 tools: vec![ToolDescriptor {
1832 id: "search".to_string(),
1833 description: "search tool".to_string(),
1834 input_schema: serde_json::json!({"type":"object"}),
1835 source: ToolSource::Local,
1836 }],
1837 };
1838 let store = MemoryStore::new();
1839 let sink = CollectingSink::new();
1840 let req = make_request("search rust");
1841 let tool_policy = AllowAllPolicyPort;
1842 let approval = AlwaysDenyApprovalPort;
1843
1844 let result = run_turn_with_controls(
1845 &llm,
1846 &tools,
1847 &store,
1848 &sink,
1849 req,
1850 &generous_policy(),
1851 "test-model",
1852 &tool_policy,
1853 &approval,
1854 )
1855 .await;
1856 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
1857 let resp = match result {
1858 Ok(AgentRunResult::Finished(r)) => r,
1859 _ => return,
1860 };
1861
1862 assert_eq!(resp.tool_transcript.len(), 1);
1863 assert!(resp.tool_transcript[0].is_error);
1864 assert!(
1865 resp.tool_transcript[0].output.to_string().contains("approval policy rejected"),
1866 "tool error should explain approval denial"
1867 );
1868 }
1869
1870 #[tokio::test]
1871 async fn tc14_custom_policy_port_blocks_execution() {
1872 let llm = SequentialLlm::from_contents(vec![
1873 r#"{"type": "tool_call", "name": "search", "arguments": {"q": "rust"}}"#,
1874 r#"{"type": "final", "content": "done"}"#,
1875 ]);
1876 let tools = NoCallToolPort {
1877 tools: vec![ToolDescriptor {
1878 id: "search".to_string(),
1879 description: "search tool".to_string(),
1880 input_schema: serde_json::json!({"type":"object"}),
1881 source: ToolSource::Local,
1882 }],
1883 };
1884 let store = MemoryStore::new();
1885 let sink = CollectingSink::new();
1886 let req = make_request("search rust");
1887 let tool_policy = DenySearchPolicyPort;
1888 let approval = AlwaysApprovePort;
1889
1890 let result = run_turn_with_controls(
1891 &llm,
1892 &tools,
1893 &store,
1894 &sink,
1895 req,
1896 &generous_policy(),
1897 "test-model",
1898 &tool_policy,
1899 &approval,
1900 )
1901 .await;
1902 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected result {result:?}");
1903 let resp = match result {
1904 Ok(AgentRunResult::Finished(r)) => r,
1905 _ => return,
1906 };
1907
1908 assert_eq!(resp.tool_transcript.len(), 1);
1909 assert!(resp.tool_transcript[0].is_error);
1910 assert!(
1911 resp.tool_transcript[0].output.to_string().contains("denied"),
1912 "tool error should explain policy denial"
1913 );
1914 }
1915
1916 #[tokio::test]
1917 async fn tc15_native_dispatch_mode_uses_llm_tool_calls() {
1918 struct NativeToolLlm {
1919 responses: Mutex<VecDeque<LlmResponse>>,
1920 }
1921
1922 #[async_trait::async_trait]
1923 impl LlmPort for NativeToolLlm {
1924 fn capabilities(&self) -> bob_core::types::LlmCapabilities {
1925 bob_core::types::LlmCapabilities { native_tool_calling: true, streaming: true }
1926 }
1927
1928 async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
1929 let mut q = self.responses.lock().unwrap_or_else(|p| p.into_inner());
1930 Ok(q.pop_front().unwrap_or(LlmResponse {
1931 content: r#"{"type":"final","content":"fallback"}"#.to_string(),
1932 usage: TokenUsage::default(),
1933 finish_reason: FinishReason::Stop,
1934 tool_calls: Vec::new(),
1935 }))
1936 }
1937
1938 async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
1939 Err(LlmError::Provider("not used".into()))
1940 }
1941 }
1942
1943 let llm = NativeToolLlm {
1944 responses: Mutex::new(VecDeque::from(vec![
1945 LlmResponse {
1946 content: "ignored".to_string(),
1947 usage: TokenUsage::default(),
1948 finish_reason: FinishReason::Stop,
1949 tool_calls: vec![ToolCall {
1950 name: "search".to_string(),
1951 arguments: serde_json::json!({"q":"rust"}),
1952 }],
1953 },
1954 LlmResponse {
1955 content: r#"{"type":"final","content":"done"}"#.to_string(),
1956 usage: TokenUsage::default(),
1957 finish_reason: FinishReason::Stop,
1958 tool_calls: Vec::new(),
1959 },
1960 ])),
1961 };
1962 let tools = MockToolPort::with_tool_and_results(
1963 "search",
1964 vec![Ok(ToolResult {
1965 name: "search".to_string(),
1966 output: serde_json::json!({"hits": 2}),
1967 is_error: false,
1968 })],
1969 );
1970 let store = MemoryStore::new();
1971 let sink = CollectingSink::new();
1972 let checkpoint = CountingCheckpointPort::new();
1973 let artifacts = NoopArtifactStore;
1974 let cost = CountingCostMeter::new();
1975 let policy = AllowAllPolicyPort;
1976 let approval = AlwaysApprovePort;
1977
1978 let result = run_turn_with_extensions(
1979 &llm,
1980 &tools,
1981 &store,
1982 &sink,
1983 make_request("search rust"),
1984 &generous_policy(),
1985 "test-model",
1986 &policy,
1987 &approval,
1988 crate::DispatchMode::NativePreferred,
1989 &checkpoint,
1990 &artifacts,
1991 &cost,
1992 )
1993 .await;
1994
1995 assert!(matches!(&result, Ok(AgentRunResult::Finished(_))), "unexpected {result:?}");
1996 let resp = match result {
1997 Ok(AgentRunResult::Finished(r)) => r,
1998 _ => return,
1999 };
2000 assert_eq!(resp.tool_transcript.len(), 1);
2001 assert_eq!(resp.tool_transcript[0].name, "search");
2002 }
2003
2004 #[tokio::test]
2005 async fn tc16_checkpoint_and_cost_ports_are_invoked() {
2006 let llm = SequentialLlm::from_contents(vec![r#"{"type": "final", "content": "ok"}"#]);
2007 let tools = MockToolPort::empty();
2008 let store = MemoryStore::new();
2009 let sink = CollectingSink::new();
2010 let checkpoint = CountingCheckpointPort::new();
2011 let artifacts = NoopArtifactStore;
2012 let cost = CountingCostMeter::new();
2013 let policy = AllowAllPolicyPort;
2014 let approval = AlwaysApprovePort;
2015
2016 let result = run_turn_with_extensions(
2017 &llm,
2018 &tools,
2019 &store,
2020 &sink,
2021 make_request("hello"),
2022 &generous_policy(),
2023 "test-model",
2024 &policy,
2025 &approval,
2026 crate::DispatchMode::PromptGuided,
2027 &checkpoint,
2028 &artifacts,
2029 &cost,
2030 )
2031 .await;
2032 assert!(result.is_ok(), "turn should succeed");
2033 let checkpoints = checkpoint.saved.lock().unwrap_or_else(|p| p.into_inner()).len();
2034 let llm_calls = *cost.llm_calls.lock().unwrap_or_else(|p| p.into_inner());
2035 assert!(checkpoints >= 1, "checkpoint port should be invoked at least once");
2036 assert!(llm_calls >= 1, "cost meter should record llm usage");
2037 }
2038}