1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::time::Instant;
4
5use futures::{future::join_all, StreamExt};
6use imp_llm::{
7 AssistantMessage, ContentBlock, Context, Cost, Message, Model, RequestOptions, StopReason,
8 StreamEvent, ThinkingLevel, Usage,
9};
10use tokio::sync::mpsc;
11
12use imp_llm::provider::RetryPolicy;
13
14use crate::config::{AgentMode, Config, ContextConfig, ContinuePolicy};
15use crate::error::Result;
16use crate::guardrails::{self, GuardrailConfig, GuardrailLevel, GuardrailProfile};
17use crate::hooks::{HookBackgroundEvent, HookEvent, HookRunner};
18use crate::mana_review::{ManaReviewState, TurnManaReview, TurnManaReviewAccumulator};
19use crate::roles::Role;
20use crate::tools::{LuaToolLoader, ToolRegistry};
21use crate::ui::NotifyLevel;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum TimingStage {
25 LlmRequestStart,
26 FirstStreamEvent,
27 FirstTextDelta,
28 FirstToolCall,
29 MessageEnd,
30}
31
32impl TimingStage {
33 pub fn as_str(self) -> &'static str {
34 match self {
35 Self::LlmRequestStart => "llm_request_start",
36 Self::FirstStreamEvent => "first_stream_event",
37 Self::FirstTextDelta => "first_text_delta",
38 Self::FirstToolCall => "first_tool_call",
39 Self::MessageEnd => "message_end",
40 }
41 }
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub struct TimingEvent {
46 pub turn: u32,
47 pub stage: TimingStage,
48 pub since_turn_start_ms: u64,
49 pub since_llm_request_start_ms: u64,
50}
51
52#[derive(Debug, Clone)]
54pub enum AgentEvent {
55 AgentStart {
56 model: String,
57 timestamp: u64,
58 },
59 AgentEnd {
60 usage: Usage,
61 cost: Cost,
62 },
63 TurnStart {
64 index: u32,
65 },
66 TurnAssessment {
67 index: u32,
68 assessment: NextActionAssessment,
69 },
70 TurnEnd {
71 index: u32,
72 message: AssistantMessage,
73 mana_review: TurnManaReview,
74 },
75 MessageStart {
76 message: Message,
77 },
78 MessageDelta {
79 delta: StreamEvent,
80 },
81 MessageEnd {
82 message: Message,
83 },
84 ToolExecutionStart {
85 tool_call_id: String,
86 tool_name: String,
87 args: serde_json::Value,
88 },
89 ToolOutputDelta {
90 tool_call_id: String,
91 text: String,
92 },
93 ToolExecutionEnd {
94 tool_call_id: String,
95 result: imp_llm::ToolResultMessage,
96 },
97 Warning {
98 message: String,
99 },
100 Timing {
101 timing: TimingEvent,
102 },
103 Error {
104 error: String,
105 },
106}
107
108#[derive(Debug, Clone)]
110pub enum AgentCommand {
111 Cancel,
112 Steer(String),
113 FollowUp(String),
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117enum NextAction {
118 Continue {
119 prompt: String,
120 reason: ContinueReason,
121 },
122 Stop {
123 reason: NextActionStopReason,
124 },
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128enum ContinueReason {
129 ExternalizationNeeded,
130 HighConfidenceVisibleNextStep,
131 ExecutionDebt,
132}
133
134impl ContinueReason {
135 fn as_str(self) -> &'static str {
136 match self {
137 Self::ExternalizationNeeded => "externalization_needed",
138 Self::HighConfidenceVisibleNextStep => "high_confidence_visible_next_step",
139 Self::ExecutionDebt => "execution_debt",
140 }
141 }
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145enum NextActionStopReason {
146 NoAutomaticFollowUp,
147 NoProgress,
148 RepeatedAction,
149 UserBlocker,
150 ExecutionBlocked,
151 DecompositionCompleted,
152 WorkCompleted,
153}
154
155impl NextActionStopReason {
156 fn as_str(self) -> &'static str {
157 match self {
158 Self::NoAutomaticFollowUp => "no_automatic_follow_up",
159 Self::NoProgress => "no_progress",
160 Self::RepeatedAction => "repeated_action",
161 Self::UserBlocker => "user_blocker",
162 Self::ExecutionBlocked => "execution_blocked",
163 Self::DecompositionCompleted => "decomposition_completed",
164 Self::WorkCompleted => "work_completed",
165 }
166 }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq)]
170struct RuntimeEvidence {
171 repeated_action: bool,
172 execution_stop_reason: Option<NextActionStopReason>,
173 work_completed: bool,
174 execution_debt: bool,
175 execution_evidence: bool,
176 planning_only_progress: bool,
177}
178
179#[derive(Debug, Clone, PartialEq, Eq)]
180struct ManaEvidence {
181 stop_reason: Option<NextActionStopReason>,
182}
183
184#[derive(Debug, Clone, PartialEq, Eq)]
185struct TextFallbackEvidence {
186 planner_stop_reason: Option<NextActionStopReason>,
187 execution_stop_reason: Option<NextActionStopReason>,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq)]
191struct ContinueRecommendation {
192 prompt: String,
193 reason: ContinueReason,
194}
195
196#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct NextActionAssessment {
198 pub runtime: NextActionRuntimeEvidence,
199 pub mana: NextActionManaEvidence,
200 pub text_fallback: NextActionTextFallbackEvidence,
201 pub continue_recommendation: Option<NextActionContinueRecommendation>,
202 pub chosen_action: NextActionDebugView,
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct NextActionRuntimeEvidence {
207 pub repeated_action: bool,
208 pub execution_stop_reason: Option<String>,
209 pub work_completed: bool,
210 pub execution_debt: bool,
211 pub execution_evidence: bool,
212 pub planning_only_progress: bool,
213}
214
215#[derive(Debug, Clone, PartialEq, Eq)]
216pub struct NextActionManaEvidence {
217 pub stop_reason: Option<String>,
218}
219
220#[derive(Debug, Clone, PartialEq, Eq)]
221pub struct NextActionTextFallbackEvidence {
222 pub planner_stop_reason: Option<String>,
223 pub execution_stop_reason: Option<String>,
224}
225
226#[derive(Debug, Clone, PartialEq, Eq)]
227pub struct NextActionContinueRecommendation {
228 pub prompt: String,
229 pub reason: String,
230}
231
232#[derive(Debug, Clone, PartialEq, Eq)]
233pub enum NextActionDebugView {
234 Continue { prompt: String, reason: String },
235 Stop { reason: String },
236}
237
238#[derive(Debug, Clone, PartialEq, Eq)]
239struct PostTurnAssessment {
240 runtime: RuntimeEvidence,
241 mana: ManaEvidence,
242 text_fallback: TextFallbackEvidence,
243 continue_recommendation: Option<ContinueRecommendation>,
244}
245
246impl PostTurnAssessment {
247 fn into_next_action(self) -> NextAction {
248 if self.runtime.repeated_action {
249 return NextAction::Stop {
250 reason: NextActionStopReason::RepeatedAction,
251 };
252 }
253
254 if let Some(reason) = self.runtime.execution_stop_reason {
255 return NextAction::Stop { reason };
256 }
257
258 if self.runtime.work_completed {
259 return NextAction::Stop {
260 reason: NextActionStopReason::WorkCompleted,
261 };
262 }
263
264 if let Some(reason) = self.mana.stop_reason {
265 return NextAction::Stop { reason };
266 }
267
268 if let Some(reason) = self.text_fallback.planner_stop_reason {
269 return NextAction::Stop { reason };
270 }
271
272 if let Some(reason) = self.text_fallback.execution_stop_reason {
273 return NextAction::Stop { reason };
274 }
275
276 if let Some(continue_recommendation) = self.continue_recommendation {
277 return NextAction::Continue {
278 prompt: continue_recommendation.prompt,
279 reason: continue_recommendation.reason,
280 };
281 }
282
283 if self.runtime.planning_only_progress {
284 return NextAction::Stop {
285 reason: NextActionStopReason::NoProgress,
286 };
287 }
288
289 NextAction::Stop {
290 reason: NextActionStopReason::NoAutomaticFollowUp,
291 }
292 }
293
294 fn debug_view(&self) -> NextActionAssessment {
295 let chosen_action = match self.clone().into_next_action() {
296 NextAction::Continue { prompt, reason } => NextActionDebugView::Continue {
297 prompt,
298 reason: reason.as_str().to_string(),
299 },
300 NextAction::Stop { reason } => NextActionDebugView::Stop {
301 reason: reason.as_str().to_string(),
302 },
303 };
304
305 NextActionAssessment {
306 runtime: NextActionRuntimeEvidence {
307 repeated_action: self.runtime.repeated_action,
308 execution_stop_reason: self
309 .runtime
310 .execution_stop_reason
311 .map(|reason| reason.as_str().to_string()),
312 work_completed: self.runtime.work_completed,
313 execution_debt: self.runtime.execution_debt,
314 execution_evidence: self.runtime.execution_evidence,
315 planning_only_progress: self.runtime.planning_only_progress,
316 },
317 mana: NextActionManaEvidence {
318 stop_reason: self
319 .mana
320 .stop_reason
321 .map(|reason| reason.as_str().to_string()),
322 },
323 text_fallback: NextActionTextFallbackEvidence {
324 planner_stop_reason: self
325 .text_fallback
326 .planner_stop_reason
327 .map(|reason| reason.as_str().to_string()),
328 execution_stop_reason: self
329 .text_fallback
330 .execution_stop_reason
331 .map(|reason| reason.as_str().to_string()),
332 },
333 continue_recommendation: self.continue_recommendation.clone().map(|recommendation| {
334 NextActionContinueRecommendation {
335 prompt: recommendation.prompt,
336 reason: recommendation.reason.as_str().to_string(),
337 }
338 }),
339 chosen_action,
340 }
341 }
342}
343
344pub struct Agent {
346 pub model: Model,
347 pub thinking_level: ThinkingLevel,
348 pub tools: ToolRegistry,
349 pub messages: Vec<Message>,
350 pub system_prompt: String,
351 pub cwd: PathBuf,
352 pub max_turns: u32,
353 pub max_tokens: Option<u32>,
354 pub role: Option<Role>,
355 pub hooks: HookRunner,
356 pub api_key: String,
357 pub auth_store: Option<std::sync::Arc<tokio::sync::Mutex<imp_llm::auth::AuthStore>>>,
360 pub ui: Arc<dyn crate::ui::UserInterface>,
361 pub context_config: ContextConfig,
363 pub retry_policy: RetryPolicy,
365 pub mode: AgentMode,
367 pub has_mana_skill: bool,
369 pub has_mana_basics_skill: bool,
371 pub has_mana_delegation_skill: bool,
373 pub guardrail_config: GuardrailConfig,
375 pub guardrail_profile: Option<GuardrailProfile>,
377 pub lua_tool_loader: Option<LuaToolLoader>,
379 pub file_cache: Arc<crate::tools::FileCache>,
381 pub checkpoint_state: Arc<crate::tools::CheckpointState>,
383 pub file_tracker: Arc<std::sync::Mutex<crate::tools::FileTracker>>,
385 pub anchor_store: Arc<crate::tools::AnchorStore>,
387 pub read_max_lines: usize,
389 pub cache_options: imp_llm::CacheOptions,
391 last_tool_call: std::sync::Arc<std::sync::Mutex<Option<RepeatedToolCallState>>>,
393 queued_mana_externalization_nudge: bool,
395 pub continue_policy: ContinuePolicy,
397 queued_confidence_continue_nudge: bool,
399 queued_execution_debt_follow_up: bool,
401 turn_mana_review: Arc<std::sync::Mutex<TurnManaReviewAccumulator>>,
403 pub config: Arc<Config>,
405
406 event_tx: mpsc::Sender<AgentEvent>,
407 command_tx: mpsc::Sender<AgentCommand>,
408 command_rx: mpsc::Receiver<AgentCommand>,
409 cancel_token: Arc<std::sync::atomic::AtomicBool>,
410}
411
412pub struct AgentHandle {
414 pub event_rx: mpsc::Receiver<AgentEvent>,
415 pub command_tx: mpsc::Sender<AgentCommand>,
416 pub cancel_token: Arc<std::sync::atomic::AtomicBool>,
417}
418
419#[derive(Debug, Clone)]
420struct RepeatedToolCallState {
421 tool_name: String,
422 args_json: String,
423 consecutive: usize,
424}
425
426#[derive(Debug, Clone)]
427enum RepeatedToolCallCheck {
428 Ok,
429 Warn(String),
430 Block(imp_llm::ToolResultMessage),
431}
432
433impl Agent {
434 pub fn new(model: Model, cwd: PathBuf) -> (Self, AgentHandle) {
435 let (event_tx, event_rx) = mpsc::channel(256);
436 let (command_tx, command_rx) = mpsc::channel(32);
437 let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
438 let mut hooks = HookRunner::new();
439 let background_event_tx = event_tx.clone();
440 hooks.set_background_reporter(Arc::new(move |event: HookBackgroundEvent| {
441 let background_event_tx = background_event_tx.clone();
442 tokio::spawn(async move {
443 let _ = background_event_tx
444 .send(AgentEvent::Warning {
445 message: event.to_string(),
446 })
447 .await;
448 });
449 }));
450
451 let agent = Self {
452 model,
453 thinking_level: ThinkingLevel::Medium,
454 tools: ToolRegistry::new(),
455 messages: Vec::new(),
456 system_prompt: String::new(),
457 cwd,
458 max_turns: 50,
459 max_tokens: None,
460 role: None,
461 hooks,
462 api_key: String::new(),
463 ui: Arc::new(crate::ui::NullInterface),
464 context_config: ContextConfig::default(),
465 retry_policy: RetryPolicy::default(),
466 mode: AgentMode::Full,
467 has_mana_skill: false,
468 has_mana_basics_skill: false,
469 has_mana_delegation_skill: false,
470 guardrail_config: GuardrailConfig::default(),
471 guardrail_profile: None,
472 file_cache: Arc::new(crate::tools::FileCache::new()),
473 checkpoint_state: Arc::new(crate::tools::CheckpointState::new()),
474 file_tracker: Arc::new(std::sync::Mutex::new(crate::tools::FileTracker::new())),
475 anchor_store: Arc::new(crate::tools::AnchorStore::new()),
476 read_max_lines: 500,
477 auth_store: None,
478 cache_options: imp_llm::CacheOptions {
479 cache_system_prompt: true,
480 cache_tools: true,
481 cache_recent_turns: 2,
482 extended_ttl: false,
483 global_scope: false,
484 },
485 last_tool_call: Arc::new(std::sync::Mutex::new(None)),
486 queued_mana_externalization_nudge: false,
487 continue_policy: ContinuePolicy::Disabled,
488 queued_confidence_continue_nudge: false,
489 queued_execution_debt_follow_up: false,
490 turn_mana_review: Arc::new(std::sync::Mutex::new(TurnManaReviewAccumulator::default())),
491 config: Arc::new(Config::default()),
492 lua_tool_loader: None,
493
494 event_tx,
495 command_tx: command_tx.clone(),
496 command_rx,
497 cancel_token: Arc::clone(&cancel_token),
498 };
499
500 let handle = AgentHandle {
501 event_rx,
502 command_tx,
503 cancel_token,
504 };
505
506 (agent, handle)
507 }
508
509 pub async fn run(&mut self, prompt: String) -> Result<()> {
511 self.emit(AgentEvent::AgentStart {
512 model: self.model.meta.id.clone(),
513 timestamp: imp_llm::now(),
514 })
515 .await;
516 self.hooks
517 .fire(&HookEvent::OnAgentStart { prompt: &prompt })
518 .await;
519
520 self.messages.push(Message::user(&prompt));
521
522 self.cancel_token
523 .store(false, std::sync::atomic::Ordering::Relaxed);
524 let mut turn: u32 = 0;
525 let mut total_usage = Usage::default();
526 let mut cancelled = false;
527 let mut queued_follow_ups: std::collections::VecDeque<String> =
528 std::collections::VecDeque::new();
529 let mut queued_pre_turn_follow_ups: std::collections::VecDeque<String> =
530 std::collections::VecDeque::new();
531
532 if let Some(nudge) = mana_skill_follow_up_hint(
533 &prompt,
534 self.mode,
535 !self.tools.is_empty(),
536 self.has_mana_skill,
537 self.has_mana_basics_skill,
538 self.has_mana_delegation_skill,
539 ) {
540 queued_pre_turn_follow_ups.push_back(nudge.to_string());
541 }
542
543 loop {
544 if turn >= self.max_turns {
545 self.emit(AgentEvent::Error {
546 error: format!("Max turns exceeded ({})", self.max_turns),
547 })
548 .await;
549 let cost = total_usage.cost(&self.model.meta.pricing);
550 self.emit(AgentEvent::AgentEnd {
551 usage: total_usage,
552 cost,
553 })
554 .await;
555 return Err(crate::error::Error::MaxTurns(self.max_turns));
556 }
557
558 if turn > 0 {
559 if let Some(follow_up) = queued_pre_turn_follow_ups.pop_front() {
560 self.messages.push(Message::user(&follow_up));
561 }
562 }
563
564 while let Ok(cmd) = self.command_rx.try_recv() {
566 match cmd {
567 AgentCommand::Cancel => {
568 self.cancel_token
569 .store(true, std::sync::atomic::Ordering::Relaxed);
570 cancelled = true;
571 break;
572 }
573 AgentCommand::Steer(msg) => {
574 self.messages.push(Message::user(&msg));
575 }
576 AgentCommand::FollowUp(msg) => queued_follow_ups.push_back(msg),
577 }
578 }
579
580 if cancelled {
581 break;
582 }
583
584 self.emit(AgentEvent::TurnStart { index: turn }).await;
585 if let Ok(mut review) = self.turn_mana_review.lock() {
586 review.begin_turn(turn);
587 }
588 let turn_started_at = Instant::now();
589
590 let mut usage = crate::context::context_usage(&self.messages, &self.model);
591 if usage.ratio >= self.context_config.observation_mask_threshold {
592 crate::context::mask_observations(
593 &mut self.messages,
594 self.context_config.mask_window,
595 );
596 self.hooks
597 .fire(&HookEvent::OnContextThreshold { ratio: usage.ratio })
598 .await;
599 usage = crate::context::context_usage(&self.messages, &self.model);
602 }
603
604 let context = Context {
610 messages: self.messages.clone(),
611 };
612
613 let options = RequestOptions {
614 thinking_level: self.thinking_level,
615 max_tokens: self.max_tokens,
618 temperature: None,
619 system_prompt: self.system_prompt.clone(),
620 tools: self.tools.definitions(),
621 cache_options: self.cache_options.clone(),
622 effort: None,
623 };
624
625 self.hooks.fire(&HookEvent::BeforeLlmCall).await;
626
627 if let Some(ref auth_store) = self.auth_store {
631 let mut store = auth_store.lock().await;
632 if store.is_oauth_expired("anthropic") {
633 match store.resolve_with_refresh("anthropic").await {
634 Ok(new_key) => {
635 self.api_key = new_key;
636 }
637 Err(e) => {
638 let message = format!(
639 "OAuth token refresh failed before request: {e}. Continuing with existing credentials."
640 );
641 let _ = self.ui.notify(&message, NotifyLevel::Warning).await;
642 }
643 }
644 }
645 }
646
647 let llm_request_started_at = Instant::now();
649 self.emit_timing(
650 turn,
651 TimingStage::LlmRequestStart,
652 turn_started_at,
653 llm_request_started_at,
654 )
655 .await;
656 let model = clone_model(&self.model);
657 let context = context.clone();
658 let options = options.clone();
659 let api_key = self.api_key.clone();
660 let mut stream = crate::retry::stream_with_retry(
661 move || {
662 model
663 .provider
664 .stream(&model, context.clone(), options.clone(), &api_key)
665 },
666 self.retry_policy.clone(),
667 );
668
669 let mut ordered_content: Vec<ContentBlock> = Vec::new();
670 let mut tool_calls: Vec<(String, String, serde_json::Value)> = Vec::new();
671 let mut assistant_msg: Option<AssistantMessage> = None;
672 let mut saw_first_stream_event = false;
673 let mut saw_first_text_delta = false;
674 let mut saw_first_tool_call = false;
675 let mut saw_provider_message_end = false;
676 let cancel_token = Arc::clone(&self.cancel_token);
677 cancel_token.store(false, std::sync::atomic::Ordering::Relaxed);
678
679 while let Some(event_result) = stream.next().await {
680 while let Ok(cmd) = self.command_rx.try_recv() {
682 match cmd {
683 AgentCommand::Cancel => {
684 cancel_token.store(true, std::sync::atomic::Ordering::Relaxed);
685 cancelled = true;
686 break;
687 }
688 AgentCommand::Steer(msg) => {
689 self.messages.push(Message::user(&msg));
690 }
691 AgentCommand::FollowUp(msg) => queued_follow_ups.push_back(msg),
692 }
693 }
694
695 if cancelled {
696 break;
697 }
698
699 match event_result {
700 Ok(event) => {
701 if !saw_first_stream_event {
702 saw_first_stream_event = true;
703 self.emit_timing(
704 turn,
705 TimingStage::FirstStreamEvent,
706 turn_started_at,
707 llm_request_started_at,
708 )
709 .await;
710 }
711 self.emit(AgentEvent::MessageDelta {
713 delta: event.clone(),
714 })
715 .await;
716
717 match event {
718 StreamEvent::TextDelta { text } => {
719 if !saw_first_text_delta {
720 saw_first_text_delta = true;
721 self.emit_timing(
722 turn,
723 TimingStage::FirstTextDelta,
724 turn_started_at,
725 llm_request_started_at,
726 )
727 .await;
728 }
729 push_stream_text_block(&mut ordered_content, text);
730 }
731 StreamEvent::ThinkingDelta { text } => {
732 push_stream_thinking_block(&mut ordered_content, text);
733 }
734 StreamEvent::ToolCall {
735 id,
736 name,
737 arguments,
738 } => {
739 if !saw_first_tool_call {
740 saw_first_tool_call = true;
741 self.emit_timing(
742 turn,
743 TimingStage::FirstToolCall,
744 turn_started_at,
745 llm_request_started_at,
746 )
747 .await;
748 }
749 ordered_content.push(ContentBlock::ToolCall {
750 id: id.clone(),
751 name: name.clone(),
752 arguments: arguments.clone(),
753 });
754 tool_calls.push((id, name, arguments));
755 }
756 StreamEvent::MessageEnd { message } => {
757 saw_provider_message_end = true;
758 self.emit_timing(
759 turn,
760 TimingStage::MessageEnd,
761 turn_started_at,
762 llm_request_started_at,
763 )
764 .await;
765 if let Some(ref usage) = message.usage {
766 total_usage.add(usage);
767 }
768 assistant_msg = Some(message);
769 }
770 StreamEvent::MessageStart { .. } => {}
771 StreamEvent::Error { error } => {
772 self.emit(AgentEvent::Error {
773 error: format!(
774 "Provider stream failed after partial output: {error}"
775 ),
776 })
777 .await;
778 let err_msg = AssistantMessage {
780 content: vec![ContentBlock::Text { text: error }],
781 usage: None,
782 stop_reason: StopReason::Error("Stream error".to_string()),
783 timestamp: imp_llm::now(),
784 };
785 self.messages.push(Message::Assistant(err_msg.clone()));
786 let mana_review = self.finish_turn_mana_review(turn);
787 self.emit(AgentEvent::TurnEnd {
788 index: turn,
789 message: err_msg,
790 mana_review,
791 })
792 .await;
793 let cost = total_usage.cost(&self.model.meta.pricing);
794 self.emit(AgentEvent::AgentEnd {
795 usage: total_usage,
796 cost,
797 })
798 .await;
799 return Err(crate::error::Error::Llm(imp_llm::Error::Provider(
800 "Stream error".to_string(),
801 )));
802 }
803 }
804 }
805 Err(e) => {
806 let error = match &e {
807 imp_llm::Error::Stream(message) => {
808 format!("Provider stream failed after partial output: {message}")
809 }
810 _ => e.to_string(),
811 };
812 self.emit(AgentEvent::Error {
813 error: error.clone(),
814 })
815 .await;
816 let cost = total_usage.cost(&self.model.meta.pricing);
817 self.emit(AgentEvent::AgentEnd {
818 usage: total_usage,
819 cost,
820 })
821 .await;
822 return Err(e.into());
823 }
824 }
825 }
826
827 if cancelled {
828 let partial = assistant_msg.unwrap_or_else(|| {
830 build_assistant_message(&ordered_content, &tool_calls, None)
831 });
832 self.messages.push(Message::Assistant(partial.clone()));
833 let mana_review = self.finish_turn_mana_review(turn);
834 self.emit(AgentEvent::TurnEnd {
835 index: turn,
836 message: partial,
837 mana_review,
838 })
839 .await;
840 break;
841 }
842
843 let msg = match assistant_msg {
847 Some(message) => message,
848 None if !saw_provider_message_end => {
849 let error = format!(
850 "Provider stream ended unexpectedly before completing the message (missing terminal completion event after {} content block(s) and {} tool call(s))",
851 ordered_content.len(),
852 tool_calls.len()
853 );
854 self.emit(AgentEvent::Error {
855 error: error.clone(),
856 })
857 .await;
858 let cost = total_usage.cost(&self.model.meta.pricing);
859 self.emit(AgentEvent::AgentEnd {
860 usage: total_usage,
861 cost,
862 })
863 .await;
864 return Err(crate::error::Error::Llm(imp_llm::Error::Stream(error)));
865 }
866 None => build_assistant_message(&ordered_content, &tool_calls, None),
867 };
868
869 self.messages.push(Message::Assistant(msg.clone()));
870
871 if tool_calls.is_empty() {
872 let mana_review = self.finish_turn_mana_review(turn);
874 self.emit(AgentEvent::TurnEnd {
875 index: turn,
876 message: msg.clone(),
877 mana_review: mana_review.clone(),
878 })
879 .await;
880
881 let assessment = self.assess_post_turn(&msg, &[], false, &mana_review);
882 self.emit(AgentEvent::TurnAssessment {
883 index: turn,
884 assessment: assessment.debug_view(),
885 })
886 .await;
887 let next_action = assessment.into_next_action();
888 self.enqueue_next_action(&mut queued_follow_ups, next_action);
889
890 if let Some(follow_up) = queued_follow_ups.pop_front() {
891 self.messages.push(Message::user(&follow_up));
892 turn += 1;
893 continue;
894 }
895 break;
896 }
897
898 let results = self.execute_tools(tool_calls, cancel_token).await;
900
901 for result in &results {
902 self.messages.push(Message::ToolResult(result.clone()));
903 }
904
905 let mana_review = self.finish_turn_mana_review(turn);
906 self.emit(AgentEvent::TurnEnd {
907 index: turn,
908 message: msg.clone(),
909 mana_review: mana_review.clone(),
910 })
911 .await;
912
913 let assessment = self.assess_post_turn(&msg, &results, true, &mana_review);
914 self.emit(AgentEvent::TurnAssessment {
915 index: turn,
916 assessment: assessment.debug_view(),
917 })
918 .await;
919 let next_action = assessment.into_next_action();
920 let should_stop_after_tool_turn = matches!(
921 next_action,
922 NextAction::Stop {
923 reason: NextActionStopReason::RepeatedAction,
924 }
925 );
926 self.enqueue_next_action(&mut queued_follow_ups, next_action);
927
928 if let Some(follow_up) = queued_follow_ups.pop_front() {
929 self.messages.push(Message::user(&follow_up));
930 }
931
932 if should_stop_after_tool_turn {
933 break;
934 }
935
936 turn += 1;
937 }
938
939 let cost = total_usage.cost(&self.model.meta.pricing);
940 self.emit(AgentEvent::AgentEnd {
941 usage: total_usage,
942 cost,
943 })
944 .await;
945
946 if cancelled {
947 return Err(crate::error::Error::Cancelled);
948 }
949
950 Ok(())
951 }
952
953 fn assess_post_turn(
954 &self,
955 message: &AssistantMessage,
956 tool_results: &[imp_llm::ToolResultMessage],
957 _used_tools: bool,
958 mana_review: &TurnManaReview,
959 ) -> PostTurnAssessment {
960 let repeated_action = tool_results_indicate_repeated_action(tool_results);
961 let runtime_execution_stop_reason =
962 tool_results_indicate_execution_blocker(tool_results, self.mode);
963 let work_completed = tool_results_indicate_work_completed(tool_results, self.mode);
964 let execution_debt = tool_results_indicate_execution_debt(tool_results, self.mode);
965 let execution_evidence = tool_results_indicate_execution_evidence(tool_results, self.mode);
966 let planning_only_progress = execution_debt && !execution_evidence;
967 let mana_stop_reason = mana_review_stop_reason(mana_review, self.mode);
968 let planner_text_stop_reason = planner_stop_reason(message, self.mode);
969 let execution_text_stop_reason = execution_stop_reason(message, self.mode);
970
971 let continue_recommendation = if should_queue_mana_externalization_follow_up(
972 message,
973 self.mode,
974 self.has_mana_skill,
975 self.queued_mana_externalization_nudge,
976 ) {
977 Some(ContinueRecommendation {
978 prompt: mana_externalization_follow_up_text().to_string(),
979 reason: ContinueReason::ExternalizationNeeded,
980 })
981 } else if !matches!(self.mode, AgentMode::Planner)
982 && should_queue_execution_debt_follow_up(
983 execution_debt,
984 execution_evidence,
985 self.queued_execution_debt_follow_up,
986 !assistant_message_text(message).trim().is_empty(),
987 )
988 {
989 Some(ContinueRecommendation {
990 prompt: execution_debt_follow_up_text().to_string(),
991 reason: ContinueReason::ExecutionDebt,
992 })
993 } else if should_queue_confidence_continue_follow_up(
994 message,
995 self.mode,
996 self.continue_policy,
997 self.queued_confidence_continue_nudge,
998 ) {
999 Some(ContinueRecommendation {
1000 prompt: confidence_continue_follow_up_text().to_string(),
1001 reason: ContinueReason::HighConfidenceVisibleNextStep,
1002 })
1003 } else {
1004 None
1005 };
1006
1007 PostTurnAssessment {
1008 runtime: RuntimeEvidence {
1009 repeated_action,
1010 execution_stop_reason: runtime_execution_stop_reason,
1011 work_completed,
1012 execution_debt,
1013 execution_evidence,
1014 planning_only_progress,
1015 },
1016 mana: ManaEvidence {
1017 stop_reason: mana_stop_reason,
1018 },
1019 text_fallback: TextFallbackEvidence {
1020 planner_stop_reason: planner_text_stop_reason,
1021 execution_stop_reason: execution_text_stop_reason,
1022 },
1023 continue_recommendation,
1024 }
1025 }
1026
1027 fn enqueue_next_action(
1028 &mut self,
1029 queued_follow_ups: &mut std::collections::VecDeque<String>,
1030 next_action: NextAction,
1031 ) {
1032 match next_action {
1033 NextAction::Continue { prompt, reason } => {
1034 match reason {
1035 ContinueReason::ExternalizationNeeded => {
1036 self.queued_mana_externalization_nudge = true;
1037 }
1038 ContinueReason::HighConfidenceVisibleNextStep => {
1039 self.queued_confidence_continue_nudge = true;
1040 }
1041 ContinueReason::ExecutionDebt => {
1042 self.queued_execution_debt_follow_up = true;
1043 }
1044 }
1045 queued_follow_ups.push_back(prompt);
1046 }
1047 NextAction::Stop { .. } => {}
1048 }
1049 }
1050
1051 async fn emit(&self, event: AgentEvent) {
1052 match &event {
1054 AgentEvent::AgentEnd { .. } => {
1055 self.hooks
1056 .fire(&HookEvent::OnAgentEnd {
1057 messages: &self.messages,
1058 })
1059 .await;
1060 }
1061 AgentEvent::TurnEnd { index, message, .. } => {
1062 self.hooks
1063 .fire(&HookEvent::OnTurnEnd {
1064 index: *index,
1065 message,
1066 })
1067 .await;
1068 }
1069 _ => {}
1070 }
1071 let _ = self.event_tx.send(event).await;
1072 }
1073
1074 async fn emit_timing(
1075 &self,
1076 turn: u32,
1077 stage: TimingStage,
1078 turn_started_at: Instant,
1079 llm_request_started_at: Instant,
1080 ) {
1081 let now = Instant::now();
1082 let timing = TimingEvent {
1083 turn,
1084 stage,
1085 since_turn_start_ms: now.duration_since(turn_started_at).as_millis() as u64,
1086 since_llm_request_start_ms: now.duration_since(llm_request_started_at).as_millis()
1087 as u64,
1088 };
1089 let _ = self.event_tx.send(AgentEvent::Timing { timing }).await;
1090 }
1091
1092 async fn execute_tools(
1094 &self,
1095 calls: Vec<(String, String, serde_json::Value)>,
1096 cancel_token: Arc<std::sync::atomic::AtomicBool>,
1097 ) -> Vec<imp_llm::ToolResultMessage> {
1098 let mut readonly = Vec::new();
1099 let mut mutable = Vec::new();
1100
1101 for (index, (id, name, args)) in calls.into_iter().enumerate() {
1102 if self.tools.get(&name).is_some_and(|tool| tool.is_readonly()) {
1103 readonly.push((index, id, name, args));
1104 } else {
1105 mutable.push((index, id, name, args));
1106 }
1107 }
1108
1109 let mut results = join_all(readonly.into_iter().map(|(index, id, name, args)| {
1110 let cancel_token = Arc::clone(&cancel_token);
1111 async move {
1112 let result = self.execute_one_tool(&id, &name, args, cancel_token).await;
1113 (index, result)
1114 }
1115 }))
1116 .await;
1117
1118 for (index, id, name, args) in mutable {
1119 let result = self
1120 .execute_one_tool(&id, &name, args, Arc::clone(&cancel_token))
1121 .await;
1122 results.push((index, result));
1123 }
1124
1125 results.sort_by_key(|(index, _)| *index);
1126 results.into_iter().map(|(_, result)| result).collect()
1127 }
1128
1129 fn repeated_tool_call_check(
1130 &self,
1131 call_id: &str,
1132 tool_name: &str,
1133 args: &serde_json::Value,
1134 ) -> RepeatedToolCallCheck {
1135 let args_json = serde_json::to_string(args).unwrap_or_else(|_| "<unserializable>".into());
1136 let mut state = match self.last_tool_call.lock() {
1137 Ok(s) => s,
1138 Err(_) => return RepeatedToolCallCheck::Ok,
1139 };
1140
1141 let consecutive = match state.as_mut() {
1142 Some(prev) if prev.tool_name == tool_name && prev.args_json == args_json => {
1143 prev.consecutive += 1;
1144 prev.consecutive
1145 }
1146 _ => {
1147 *state = Some(RepeatedToolCallState {
1148 tool_name: tool_name.to_string(),
1149 args_json,
1150 consecutive: 1,
1151 });
1152 1
1153 }
1154 };
1155
1156 if consecutive == 3 {
1157 return RepeatedToolCallCheck::Warn(format!(
1158 "Warning: identical tool call repeated 3 times in a row for '{tool_name}'. The result may not have changed. Consider using the information you already have or trying a different action."
1159 ));
1160 }
1161
1162 if consecutive >= 4 {
1163 return RepeatedToolCallCheck::Block(
1164 crate::tools::ToolOutput::error(format!(
1165 "Blocked: identical tool call repeated {consecutive} times in a row for '{tool_name}'. The result likely has not changed. Use the information you already have or try a different action."
1166 ))
1167 .into_tool_result(call_id, tool_name),
1168 );
1169 }
1170
1171 RepeatedToolCallCheck::Ok
1172 }
1173
1174 async fn execute_one_tool(
1175 &self,
1176 call_id: &str,
1177 tool_name: &str,
1178 args: serde_json::Value,
1179 cancel_token: Arc<std::sync::atomic::AtomicBool>,
1180 ) -> imp_llm::ToolResultMessage {
1181 let repeat_check = self.repeated_tool_call_check(call_id, tool_name, &args);
1182 if let RepeatedToolCallCheck::Block(loop_result) = repeat_check {
1183 self.emit(AgentEvent::ToolExecutionStart {
1184 tool_call_id: call_id.to_string(),
1185 tool_name: tool_name.to_string(),
1186 args: args.clone(),
1187 })
1188 .await;
1189 self.emit(AgentEvent::ToolExecutionEnd {
1190 tool_call_id: call_id.to_string(),
1191 result: loop_result.clone(),
1192 })
1193 .await;
1194 return loop_result;
1195 }
1196
1197 self.emit(AgentEvent::ToolExecutionStart {
1198 tool_call_id: call_id.to_string(),
1199 tool_name: tool_name.to_string(),
1200 args: args.clone(),
1201 })
1202 .await;
1203
1204 let before_results = self
1205 .hooks
1206 .fire(&HookEvent::BeforeToolCall {
1207 tool_name,
1208 args: &args,
1209 })
1210 .await;
1211
1212 if !self.mode.allows_tool(tool_name) {
1214 let reason = format!(
1215 "Tool '{tool_name}' is not available in {} mode",
1216 format!("{:?}", self.mode).to_lowercase()
1217 );
1218 let result =
1219 crate::tools::ToolOutput::error(reason).into_tool_result(call_id, tool_name);
1220 self.emit(AgentEvent::ToolExecutionEnd {
1221 tool_call_id: call_id.to_string(),
1222 result: result.clone(),
1223 })
1224 .await;
1225 return result;
1226 }
1227
1228 if let Some(blocking_result) = before_results.into_iter().find(|result| result.block) {
1229 let reason = blocking_result
1230 .reason
1231 .unwrap_or_else(|| format!("Tool call blocked by hook: {tool_name}"));
1232 let result =
1233 crate::tools::ToolOutput::error(reason).into_tool_result(call_id, tool_name);
1234 self.emit(AgentEvent::ToolExecutionEnd {
1235 tool_call_id: call_id.to_string(),
1236 result: result.clone(),
1237 })
1238 .await;
1239 return result;
1240 }
1241
1242 if tool_name == "bash" {
1243 if let Some(command) = args.get("command").and_then(|v| v.as_str()) {
1244 if let Some(hint) = mana_bash_equivalent_hint(command) {
1245 let result =
1246 crate::tools::ToolOutput::error(hint).into_tool_result(call_id, tool_name);
1247 self.emit(AgentEvent::ToolExecutionEnd {
1248 tool_call_id: call_id.to_string(),
1249 result: result.clone(),
1250 })
1251 .await;
1252 return result;
1253 }
1254 }
1255 }
1256
1257 if let Some(tool) = self.tools.get(tool_name) {
1260 let schema = tool.parameters();
1261 if let Err(e) = crate::tools::validate_tool_args(&schema, &args) {
1262 let result = crate::tools::ToolOutput::error(e.to_string())
1263 .into_tool_result(call_id, tool_name);
1264 self.emit(AgentEvent::ToolExecutionEnd {
1265 tool_call_id: call_id.to_string(),
1266 result: result.clone(),
1267 })
1268 .await;
1269 return result;
1270 }
1271 }
1272
1273 let mut result = match self.tools.get(tool_name) {
1274 Some(tool) => {
1275 let (update_tx, mut update_rx) = mpsc::channel(64);
1276 let ctx = crate::tools::ToolContext {
1277 cwd: self.cwd.clone(),
1278 cancelled: Arc::clone(&cancel_token),
1279 update_tx,
1280 command_tx: self.command_tx.clone(),
1281 ui: self.ui.clone(),
1282 file_cache: self.file_cache.clone(),
1283 checkpoint_state: self.checkpoint_state.clone(),
1284 file_tracker: self.file_tracker.clone(),
1285 anchor_store: self.anchor_store.clone(),
1286 lua_tool_loader: self.lua_tool_loader.clone(),
1287 mode: self.mode,
1288 read_max_lines: self.read_max_lines,
1289 turn_mana_review: self.turn_mana_review.clone(),
1290 config: self.config.clone(),
1291 };
1292
1293 let event_tx = self.event_tx.clone();
1295 let delta_call_id = call_id.to_string();
1296 let forwarder = tokio::spawn(async move {
1297 while let Some(update) = update_rx.recv().await {
1298 for block in &update.content {
1299 if let imp_llm::ContentBlock::Text { text } = block {
1300 let _ = event_tx
1301 .send(AgentEvent::ToolOutputDelta {
1302 tool_call_id: delta_call_id.clone(),
1303 text: text.clone(),
1304 })
1305 .await;
1306 }
1307 }
1308 }
1309 });
1310
1311 let exec_result = match tool.execute(call_id, args.clone(), ctx).await {
1312 Ok(output) => output.into_tool_result(call_id, tool_name),
1313 Err(e) => crate::tools::ToolOutput::error(e.to_string())
1314 .into_tool_result(call_id, tool_name),
1315 };
1316 forwarder.await.ok();
1317 exec_result
1318 }
1319 None => crate::tools::ToolOutput::error(format!("Unknown tool: {tool_name}"))
1320 .into_tool_result(call_id, tool_name),
1321 };
1322
1323 let after_results = self
1324 .hooks
1325 .fire(&HookEvent::AfterToolCall {
1326 tool_name,
1327 result: &result,
1328 })
1329 .await;
1330
1331 if let Some(modified_content) = after_results
1332 .into_iter()
1333 .filter_map(|hook_result| hook_result.modified_content)
1334 .next_back()
1335 {
1336 result.content = modified_content;
1337 }
1338
1339 if !result.is_error && matches!(tool_name, "write" | "edit" | "multi_edit") {
1340 if let Some(path) = extract_file_path(self.cwd.as_path(), &args) {
1341 self.hooks
1342 .fire(&HookEvent::AfterFileWrite {
1343 file: path.as_path(),
1344 })
1345 .await;
1346
1347 if let Some(profile) = self.guardrail_profile {
1349 if self.guardrail_config.should_check_path(&path) {
1350 let check_results = guardrails::run_after_write_checks(
1351 &self.guardrail_config,
1352 profile,
1353 &self.cwd,
1354 )
1355 .await;
1356
1357 if !check_results.is_empty() {
1358 let level = self.guardrail_config.effective_level();
1359 let msg = guardrails::format_check_results(&check_results, level);
1360 if !msg.is_empty() && msg != "Guardrail checks passed." {
1361 result.content.push(imp_llm::ContentBlock::Text {
1363 text: format!("\n\n{msg}"),
1364 });
1365 if level == GuardrailLevel::Enforce
1366 && check_results.iter().any(|r| !r.success)
1367 {
1368 result.is_error = true;
1369 }
1370 }
1371 }
1372 }
1373 }
1374 }
1375 }
1376
1377 self.emit(AgentEvent::ToolExecutionEnd {
1378 tool_call_id: call_id.to_string(),
1379 result: result.clone(),
1380 })
1381 .await;
1382
1383 if let RepeatedToolCallCheck::Warn(warning) = repeat_check {
1384 result.content.push(imp_llm::ContentBlock::Text {
1385 text: format!("\n\n{warning}"),
1386 });
1387 }
1388
1389 result
1390 }
1391
1392 fn finish_turn_mana_review(&self, turn: u32) -> TurnManaReview {
1393 match self.turn_mana_review.lock() {
1394 Ok(review) => {
1395 let review = review.finalize();
1396 if review.turn_index == turn {
1397 review
1398 } else {
1399 TurnManaReview::no_change(turn)
1400 }
1401 }
1402 Err(_) => TurnManaReview::no_change(turn),
1403 }
1404 }
1405}
1406fn push_stream_text_block(content: &mut Vec<ContentBlock>, text: String) {
1407 if text.is_empty() {
1408 return;
1409 }
1410
1411 if let Some(ContentBlock::Text { text: existing }) = content.last_mut() {
1412 existing.push_str(&text);
1413 } else {
1414 content.push(ContentBlock::Text { text });
1415 }
1416}
1417
1418fn push_stream_thinking_block(content: &mut Vec<ContentBlock>, text: String) {
1419 if text.is_empty() {
1420 return;
1421 }
1422
1423 if let Some(ContentBlock::Thinking { text: existing }) = content.last_mut() {
1424 existing.push_str(&text);
1425 } else {
1426 content.push(ContentBlock::Thinking { text });
1427 }
1428}
1429
1430fn assistant_message_text(message: &AssistantMessage) -> String {
1431 message
1432 .content
1433 .iter()
1434 .filter_map(|block| match block {
1435 ContentBlock::Text { text } => Some(text.as_str()),
1436 _ => None,
1437 })
1438 .collect::<Vec<_>>()
1439 .join("\n")
1440}
1441
1442fn assistant_message_contains_mana_tool_call(message: &AssistantMessage) -> bool {
1443 message.content.iter().any(|block| match block {
1444 ContentBlock::ToolCall { name, .. } => name == "mana",
1445 _ => false,
1446 })
1447}
1448
1449fn should_queue_execution_debt_follow_up(
1450 execution_debt: bool,
1451 execution_evidence: bool,
1452 already_queued: bool,
1453 assistant_finalized: bool,
1454) -> bool {
1455 execution_debt && !execution_evidence && !already_queued && assistant_finalized
1456}
1457
1458fn should_queue_mana_externalization_follow_up(
1459 message: &AssistantMessage,
1460 mode: AgentMode,
1461 has_mana_skill: bool,
1462 already_queued: bool,
1463) -> bool {
1464 if already_queued || !has_mana_skill {
1465 return false;
1466 }
1467
1468 if !matches!(
1469 mode,
1470 AgentMode::Full | AgentMode::Planner | AgentMode::Orchestrator
1471 ) {
1472 return false;
1473 }
1474
1475 if assistant_message_contains_mana_tool_call(message) {
1476 return false;
1477 }
1478
1479 let text = assistant_message_text(message);
1480 if text.trim().is_empty() {
1481 return false;
1482 }
1483
1484 let lower = text.to_ascii_lowercase();
1485 let planning_signal = [
1486 "plan",
1487 "phase",
1488 "rollout",
1489 "decompose",
1490 "break",
1491 "split",
1492 "architecture",
1493 "migration",
1494 "follow-up",
1495 "next step",
1496 "next steps",
1497 "dependency",
1498 "dependencies",
1499 "verify",
1500 "acceptance",
1501 ]
1502 .iter()
1503 .any(|needle| lower.contains(needle));
1504
1505 planning_signal
1506}
1507
1508fn mana_externalization_follow_up_text() -> &'static str {
1509 "Before you continue: externalize the durable plan or decomposition you just described into mana now. Create or update the relevant unit(s) with native mana actions, prefer root scope for cross-project work, and avoid extra chat restatement when the mana tool/UI already makes the delta obvious."
1510}
1511
1512fn should_queue_confidence_continue_follow_up(
1513 message: &AssistantMessage,
1514 mode: AgentMode,
1515 continue_policy: ContinuePolicy,
1516 already_queued: bool,
1517) -> bool {
1518 if already_queued || matches!(continue_policy, ContinuePolicy::Disabled) {
1519 return false;
1520 }
1521
1522 if !matches!(
1523 mode,
1524 AgentMode::Full | AgentMode::Planner | AgentMode::Orchestrator
1525 ) {
1526 return false;
1527 }
1528
1529 if !assistant_message_contains_mana_tool_call(message) {
1530 return false;
1531 }
1532
1533 let text = assistant_message_text(message);
1534 if text.trim().is_empty() {
1535 return false;
1536 }
1537
1538 let lower = text.to_ascii_lowercase();
1539 let positive_signal = [
1540 "done",
1541 "completed",
1542 "finished",
1543 "updated",
1544 "created",
1545 "next",
1546 "continue",
1547 "proceed",
1548 "follow-up",
1549 "follow up",
1550 ]
1551 .iter()
1552 .filter(|needle| lower.contains(**needle))
1553 .count();
1554
1555 let blocker_signal = [
1556 "blocked",
1557 "unclear",
1558 "need your input",
1559 "which should",
1560 "approval",
1561 ]
1562 .iter()
1563 .any(|needle| lower.contains(needle));
1564
1565 if blocker_signal {
1566 return false;
1567 }
1568
1569 let threshold = match continue_policy {
1570 ContinuePolicy::Disabled => return false,
1571 ContinuePolicy::Conservative => 3,
1572 ContinuePolicy::Balanced => 2,
1573 ContinuePolicy::Aggressive => 1,
1574 };
1575
1576 positive_signal >= threshold
1577}
1578
1579fn confidence_continue_follow_up_text() -> &'static str {
1580 "Confidence is high and the mana delta is already visible. Continue to the next small, well-bounded step now using native mana-backed workflow, unless a consequential decision or blocker appears. Do not re-summarize the same visible mana change in chat unless new context needs to be called out."
1581}
1582
1583fn execution_debt_follow_up_text() -> &'static str {
1584 "You have recorded or planned work, but the requested outcome is not satisfied yet. Continue working until the user's requested outcome is satisfied, or until concrete evidence shows it cannot be completed. Do not stop merely because you recorded a plan, updated mana, or completed one intermediate step."
1585}
1586
1587fn tool_results_indicate_repeated_action(tool_results: &[imp_llm::ToolResultMessage]) -> bool {
1588 tool_results.iter().any(|result| {
1589 result.is_error
1590 && result.content.iter().any(|block| match block {
1591 ContentBlock::Text { text } => {
1592 text.contains("Blocked: identical tool call repeated")
1593 }
1594 _ => false,
1595 })
1596 })
1597}
1598
1599fn tool_results_indicate_execution_blocker(
1600 tool_results: &[imp_llm::ToolResultMessage],
1601 mode: AgentMode,
1602) -> Option<NextActionStopReason> {
1603 if !matches!(
1604 mode,
1605 AgentMode::Full | AgentMode::Orchestrator | AgentMode::Worker
1606 ) {
1607 return None;
1608 }
1609
1610 let saw_edit_like_success = tool_results.iter().any(|result| {
1611 !result.is_error && matches!(result.tool_name.as_str(), "write" | "edit" | "multi_edit")
1612 });
1613
1614 for result in tool_results {
1615 let action = result.details.get("action").and_then(|v| v.as_str());
1616
1617 if action == Some("verify")
1618 && result.details.get("passed").and_then(|v| v.as_bool()) == Some(false)
1619 {
1620 return Some(NextActionStopReason::ExecutionBlocked);
1621 }
1622
1623 if result.tool_name == "ask" && !result.is_error {
1624 return Some(NextActionStopReason::UserBlocker);
1625 }
1626
1627 if result.tool_name == "bash" || result.tool_name == "shell" {
1628 let exit_code = result.details.get("exit_code").and_then(|v| v.as_i64());
1629 let timed_out = result.details.get("timed_out").and_then(|v| v.as_bool()) == Some(true);
1630 let cancelled = result.details.get("cancelled").and_then(|v| v.as_bool()) == Some(true);
1631 let command = result
1632 .details
1633 .get("command")
1634 .and_then(|v| v.as_str())
1635 .unwrap_or("")
1636 .to_ascii_lowercase();
1637 let looks_like_check = command.contains("check")
1638 || command.contains("test")
1639 || command.contains("verify")
1640 || command.contains("pytest")
1641 || command.contains("cargo test")
1642 || command.contains("cargo check");
1643
1644 if looks_like_check
1645 && (timed_out || cancelled || exit_code.is_some_and(|code| code != 0))
1646 {
1647 return Some(NextActionStopReason::ExecutionBlocked);
1648 }
1649
1650 if saw_edit_like_success
1651 && (timed_out || cancelled || exit_code.is_some_and(|code| code != 0))
1652 {
1653 return Some(NextActionStopReason::ExecutionBlocked);
1654 }
1655 }
1656 }
1657
1658 None
1659}
1660
1661fn tool_results_indicate_execution_debt(
1662 tool_results: &[imp_llm::ToolResultMessage],
1663 mode: AgentMode,
1664) -> bool {
1665 if !matches!(
1666 mode,
1667 AgentMode::Full | AgentMode::Orchestrator | AgentMode::Worker
1668 ) {
1669 return false;
1670 }
1671
1672 tool_results.iter().any(|result| {
1673 !result.is_error
1674 && result.tool_name == "mana"
1675 && matches!(
1676 result.details.get("action").and_then(|v| v.as_str()),
1677 Some("create" | "update" | "notes_append" | "decision_add" | "dep_add" | "claim")
1678 )
1679 })
1680}
1681
1682fn tool_results_indicate_execution_evidence(
1683 tool_results: &[imp_llm::ToolResultMessage],
1684 mode: AgentMode,
1685) -> bool {
1686 if !matches!(
1687 mode,
1688 AgentMode::Full | AgentMode::Orchestrator | AgentMode::Worker
1689 ) {
1690 return false;
1691 }
1692
1693 tool_results.iter().any(|result| {
1694 if result.is_error {
1695 return false;
1696 }
1697
1698 match result.tool_name.as_str() {
1699 "write" | "edit" | "multi_edit" | "openrouter_secret_run" => true,
1700 "bash" | "shell" => true,
1701 "mana" => matches!(
1702 result.details.get("action").and_then(|v| v.as_str()),
1703 Some("run" | "verify" | "close" | "fail")
1704 ),
1705 _ => false,
1706 }
1707 })
1708}
1709
1710fn tool_results_indicate_work_completed(
1711 tool_results: &[imp_llm::ToolResultMessage],
1712 mode: AgentMode,
1713) -> bool {
1714 if !matches!(
1715 mode,
1716 AgentMode::Full | AgentMode::Orchestrator | AgentMode::Worker
1717 ) {
1718 return false;
1719 }
1720
1721 let mut saw_edit_like_success = false;
1722 let mut saw_successful_check = false;
1723
1724 for result in tool_results {
1725 if result.is_error {
1726 continue;
1727 }
1728
1729 if matches!(result.tool_name.as_str(), "write" | "edit" | "multi_edit") {
1730 saw_edit_like_success = true;
1731 }
1732
1733 let action = result.details.get("action").and_then(|v| v.as_str());
1734 let has_closed_unit = result
1735 .details
1736 .get("unit")
1737 .and_then(|unit| unit.get("status"))
1738 .and_then(|v| v.as_str())
1739 == Some("closed");
1740
1741 if let Some(command) = result.details.get("command").and_then(|v| v.as_str()) {
1742 let exit_code_ok = result.details.get("exit_code").and_then(|v| v.as_i64()) == Some(0);
1743 let command_lower = command.to_ascii_lowercase();
1744 let looks_like_check = command_lower.contains("check")
1745 || command_lower.contains("test")
1746 || command_lower.contains("verify")
1747 || command_lower.contains("pytest")
1748 || command_lower.contains("cargo test")
1749 || command_lower.contains("cargo check");
1750 if exit_code_ok && looks_like_check {
1751 saw_successful_check = true;
1752 }
1753 }
1754
1755 match action {
1756 Some("close") => return true,
1757 Some("verify")
1758 if result.details.get("passed").and_then(|v| v.as_bool()) == Some(true) =>
1759 {
1760 return true;
1761 }
1762 _ if has_closed_unit => return true,
1763 _ => {}
1764 }
1765 }
1766
1767 saw_edit_like_success && saw_successful_check
1768}
1769
1770fn mana_review_stop_reason(
1771 mana_review: &TurnManaReview,
1772 mode: AgentMode,
1773) -> Option<NextActionStopReason> {
1774 match mana_review.state {
1775 ManaReviewState::NeedsDecision => Some(NextActionStopReason::UserBlocker),
1776 ManaReviewState::Changed if matches!(mode, AgentMode::Planner) => {
1777 if !mana_review.proposed_children.is_empty()
1778 || !mana_review.touched_units.is_empty()
1779 || !mana_review.material_field_changes.is_empty()
1780 || !mana_review.notes_appended.is_empty()
1781 || !mana_review.decision_events.is_empty()
1782 {
1783 Some(NextActionStopReason::DecompositionCompleted)
1784 } else {
1785 None
1786 }
1787 }
1788 _ => None,
1789 }
1790}
1791
1792fn planner_stop_reason(
1793 message: &AssistantMessage,
1794 mode: AgentMode,
1795) -> Option<NextActionStopReason> {
1796 if !matches!(mode, AgentMode::Planner) {
1797 return None;
1798 }
1799
1800 classify_stop_reason_from_text(message, true)
1801}
1802
1803fn execution_stop_reason(
1804 message: &AssistantMessage,
1805 mode: AgentMode,
1806) -> Option<NextActionStopReason> {
1807 if !matches!(
1808 mode,
1809 AgentMode::Full | AgentMode::Orchestrator | AgentMode::Worker
1810 ) {
1811 return None;
1812 }
1813
1814 match classify_stop_reason_from_text(message, false) {
1815 Some(
1816 reason @ (NextActionStopReason::UserBlocker | NextActionStopReason::WorkCompleted),
1817 ) => Some(reason),
1818 _ => None,
1819 }
1820}
1821
1822fn classify_stop_reason_from_text(
1823 message: &AssistantMessage,
1824 planner_mode: bool,
1825) -> Option<NextActionStopReason> {
1826 let text = assistant_message_text(message);
1827 if text.trim().is_empty() {
1828 return None;
1829 }
1830
1831 let lower = text.to_ascii_lowercase();
1832
1833 let blocker_signal = [
1834 "blocked",
1835 "need your input",
1836 "which should",
1837 "waiting on you",
1838 "approval",
1839 "before i continue",
1840 "before continuing",
1841 ]
1842 .iter()
1843 .any(|needle| lower.contains(needle));
1844 if blocker_signal {
1845 return Some(NextActionStopReason::UserBlocker);
1846 }
1847
1848 if planner_mode {
1849 let decomposition_complete_signal = [
1850 "externalized into mana",
1851 "created the units",
1852 "created child units",
1853 "decomposition is complete",
1854 "plan is complete",
1855 "ready for handoff",
1856 ]
1857 .iter()
1858 .any(|needle| lower.contains(needle));
1859 if decomposition_complete_signal {
1860 return Some(NextActionStopReason::DecompositionCompleted);
1861 }
1862 } else {
1863 let work_complete_signal = [
1864 "all done",
1865 "done",
1866 "completed",
1867 "finished",
1868 "implemented",
1869 "fixed",
1870 "handled",
1871 ]
1872 .iter()
1873 .any(|needle| lower.contains(needle));
1874 if work_complete_signal {
1875 return Some(NextActionStopReason::WorkCompleted);
1876 }
1877 }
1878
1879 None
1880}
1881
1882fn build_assistant_message(
1885 content: &[ContentBlock],
1886 tool_calls: &[(String, String, serde_json::Value)],
1887 usage: Option<Usage>,
1888) -> AssistantMessage {
1889 let stop_reason = if tool_calls.is_empty() {
1890 StopReason::EndTurn
1891 } else {
1892 StopReason::ToolUse
1893 };
1894
1895 AssistantMessage {
1896 content: content.to_vec(),
1897 usage,
1898 stop_reason,
1899 timestamp: imp_llm::now(),
1900 }
1901}
1902
1903fn clone_model(model: &Model) -> Model {
1904 Model {
1905 meta: model.meta.clone(),
1906 provider: Arc::clone(&model.provider),
1907 }
1908}
1909
1910fn extract_file_path(cwd: &Path, args: &serde_json::Value) -> Option<PathBuf> {
1911 let raw_path = args.get("path")?.as_str()?;
1912 if raw_path.is_empty() {
1913 return None;
1914 }
1915
1916 let path = PathBuf::from(raw_path);
1917 if path.is_absolute() {
1918 Some(path)
1919 } else {
1920 Some(cwd.join(path))
1921 }
1922}
1923
1924fn mana_bash_equivalent_hint(command: &str) -> Option<&'static str> {
1925 let trimmed = command.trim();
1926 let rest = trimmed.strip_prefix("mana")?;
1927 if rest.chars().next().is_some_and(|c| !c.is_whitespace()) {
1928 return None;
1929 }
1930
1931 let action = rest.split_whitespace().next().unwrap_or("");
1932 match action {
1933 "status" | "list" | "ls" | "show" | "read" | "create" | "close" | "update" | "run"
1934 | "run_state" | "evaluate" | "agents" | "logs" | "next" | "claim" | "release" | "tree" => {
1935 Some("Use the native mana tool instead of `bash` for this mana command. For orchestration, the native tool supports canonical target selection (`id`, `targets`, or all ready work) plus background run tracking.")
1936 }
1937 _ => None,
1938 }
1939}
1940
1941fn mana_skill_follow_up_hint(
1942 prompt: &str,
1943 mode: AgentMode,
1944 tools_available: bool,
1945 has_mana_skill: bool,
1946 has_mana_basics_skill: bool,
1947 _has_mana_delegation_skill: bool,
1948) -> Option<&'static str> {
1949 if !tools_available {
1950 return None;
1951 }
1952
1953 let lower = prompt.to_ascii_lowercase();
1954
1955 let orchestration_signal = [
1956 "spawn",
1957 "delegate",
1958 "decompose",
1959 "decomposition",
1960 "split this",
1961 "break this up",
1962 "break it up",
1963 "parallel",
1964 "spawn workers",
1965 "spawn worker",
1966 "worker spawn",
1967 "orchestrate",
1968 "orchestration",
1969 "create a unit",
1970 "create units",
1971 "mana run",
1972 ]
1973 .iter()
1974 .any(|needle| lower.contains(needle));
1975
1976 let mana_signal = [
1977 " mana ",
1978 "mana status",
1979 "mana list",
1980 "mana show",
1981 "mana update",
1982 "mana create",
1983 "mana run",
1984 "unit",
1985 "units",
1986 ]
1987 .iter()
1988 .any(|needle| lower.contains(needle));
1989
1990 match mode {
1991 AgentMode::Full | AgentMode::Orchestrator | AgentMode::Planner
1992 if orchestration_signal || mana_signal =>
1993 {
1994 if has_mana_skill {
1995 Some("Before you continue: load `mana` with `read` and follow it for unit design, decomposition, retries, and worker handoff.")
1996 } else {
1997 None
1998 }
1999 }
2000 AgentMode::Worker | AgentMode::Auditor if mana_signal => {
2001 if has_mana_basics_skill {
2002 Some("Before you continue: load `mana-basics` with `read` and follow the allowed native mana workflow for this mode.")
2003 } else if has_mana_skill {
2004 Some("Before you continue: load `mana` with `read` and follow the allowed native mana workflow for this mode.")
2005 } else {
2006 None
2007 }
2008 }
2009 _ => None,
2010 }
2011}
2012
2013#[cfg(test)]
2014mod tests {
2015 use super::*;
2016 use std::pin::Pin;
2017 use std::sync::{
2018 atomic::{AtomicUsize, Ordering},
2019 Arc, Mutex as StdMutex,
2020 };
2021 use std::time::Duration;
2022
2023 use async_trait::async_trait;
2024 use futures_core::Stream;
2025 use imp_llm::auth::{ApiKey, AuthStore};
2026 use imp_llm::model::{Capabilities, ModelMeta, ModelPricing};
2027 use imp_llm::provider::Provider;
2028 use tokio::sync::{Mutex, Notify};
2029
2030 struct MockProvider {
2033 responses: Mutex<Vec<Vec<imp_llm::Result<StreamEvent>>>>,
2034 }
2035
2036 impl MockProvider {
2037 fn new(responses: Vec<Vec<StreamEvent>>) -> Self {
2038 Self {
2039 responses: Mutex::new(
2040 responses
2041 .into_iter()
2042 .map(|events| events.into_iter().map(Ok).collect())
2043 .collect(),
2044 ),
2045 }
2046 }
2047
2048 fn new_results(responses: Vec<Vec<imp_llm::Result<StreamEvent>>>) -> Self {
2049 Self {
2050 responses: Mutex::new(responses),
2051 }
2052 }
2053 }
2054
2055 #[async_trait]
2056 impl Provider for MockProvider {
2057 fn stream(
2058 &self,
2059 _model: &Model,
2060 _context: Context,
2061 _options: RequestOptions,
2062 _api_key: &str,
2063 ) -> Pin<Box<dyn Stream<Item = imp_llm::Result<StreamEvent>> + Send>> {
2064 let mut responses = self.responses.try_lock().expect("MockProvider lock");
2067 let events = if responses.is_empty() {
2068 vec![Ok(StreamEvent::Error {
2069 error: "No more mock responses".to_string(),
2070 })]
2071 } else {
2072 responses.remove(0)
2073 };
2074 let stream = futures::stream::iter(events);
2075 Box::pin(stream)
2076 }
2077
2078 async fn resolve_auth(&self, _auth: &AuthStore) -> imp_llm::Result<ApiKey> {
2079 Ok("mock-key".to_string())
2080 }
2081
2082 fn id(&self) -> &str {
2083 "mock"
2084 }
2085
2086 fn models(&self) -> &[ModelMeta] {
2087 &[]
2088 }
2089 }
2090
2091 fn test_model(provider: Arc<dyn Provider>) -> Model {
2092 test_model_with_context_window(provider, 200_000)
2093 }
2094
2095 fn test_model_with_context_window(provider: Arc<dyn Provider>, context_window: u32) -> Model {
2096 Model {
2097 meta: ModelMeta {
2098 id: "test-model".to_string(),
2099 provider: "mock".to_string(),
2100 name: "Test Model".to_string(),
2101 context_window,
2102 max_output_tokens: 16_384,
2103 pricing: ModelPricing {
2104 input_per_mtok: 3.0,
2105 output_per_mtok: 15.0,
2106 cache_read_per_mtok: 0.3,
2107 cache_write_per_mtok: 3.75,
2108 },
2109 capabilities: Capabilities {
2110 reasoning: true,
2111 images: false,
2112 tool_use: true,
2113 },
2114 },
2115 provider,
2116 }
2117 }
2118
2119 fn text_response(text: &str, input_tokens: u32, output_tokens: u32) -> Vec<StreamEvent> {
2120 vec![
2121 StreamEvent::MessageStart {
2122 model: "test-model".to_string(),
2123 },
2124 StreamEvent::TextDelta {
2125 text: text.to_string(),
2126 },
2127 StreamEvent::MessageEnd {
2128 message: AssistantMessage {
2129 content: vec![ContentBlock::Text {
2130 text: text.to_string(),
2131 }],
2132 usage: Some(Usage {
2133 input_tokens,
2134 output_tokens,
2135 cache_read_tokens: 0,
2136 cache_write_tokens: 0,
2137 }),
2138 stop_reason: StopReason::EndTurn,
2139 timestamp: 1000,
2140 },
2141 },
2142 ]
2143 }
2144
2145 fn tool_call_response(
2146 call_id: &str,
2147 tool_name: &str,
2148 args: serde_json::Value,
2149 input_tokens: u32,
2150 output_tokens: u32,
2151 ) -> Vec<StreamEvent> {
2152 vec![
2153 StreamEvent::MessageStart {
2154 model: "test-model".to_string(),
2155 },
2156 StreamEvent::ToolCall {
2157 id: call_id.to_string(),
2158 name: tool_name.to_string(),
2159 arguments: args.clone(),
2160 },
2161 StreamEvent::MessageEnd {
2162 message: AssistantMessage {
2163 content: vec![ContentBlock::ToolCall {
2164 id: call_id.to_string(),
2165 name: tool_name.to_string(),
2166 arguments: args,
2167 }],
2168 usage: Some(Usage {
2169 input_tokens,
2170 output_tokens,
2171 cache_read_tokens: 0,
2172 cache_write_tokens: 0,
2173 }),
2174 stop_reason: StopReason::ToolUse,
2175 timestamp: 1000,
2176 },
2177 },
2178 ]
2179 }
2180
2181 fn multi_tool_call_response(
2182 calls: &[(&str, &str, serde_json::Value)],
2183 input_tokens: u32,
2184 output_tokens: u32,
2185 ) -> Vec<StreamEvent> {
2186 let mut events = vec![StreamEvent::MessageStart {
2187 model: "test-model".to_string(),
2188 }];
2189
2190 let mut content = Vec::new();
2191 for (id, name, args) in calls {
2192 events.push(StreamEvent::ToolCall {
2193 id: id.to_string(),
2194 name: name.to_string(),
2195 arguments: args.clone(),
2196 });
2197 content.push(ContentBlock::ToolCall {
2198 id: id.to_string(),
2199 name: name.to_string(),
2200 arguments: args.clone(),
2201 });
2202 }
2203
2204 events.push(StreamEvent::MessageEnd {
2205 message: AssistantMessage {
2206 content,
2207 usage: Some(Usage {
2208 input_tokens,
2209 output_tokens,
2210 cache_read_tokens: 0,
2211 cache_write_tokens: 0,
2212 }),
2213 stop_reason: StopReason::ToolUse,
2214 timestamp: 1000,
2215 },
2216 });
2217
2218 events
2219 }
2220
2221 fn make_assistant_tool_call(
2222 call_id: &str,
2223 tool_name: &str,
2224 args: serde_json::Value,
2225 ) -> Message {
2226 Message::Assistant(AssistantMessage {
2227 content: vec![ContentBlock::ToolCall {
2228 id: call_id.to_string(),
2229 name: tool_name.to_string(),
2230 arguments: args,
2231 }],
2232 usage: None,
2233 stop_reason: StopReason::ToolUse,
2234 timestamp: imp_llm::now(),
2235 })
2236 }
2237
2238 fn make_tool_result(call_id: &str, tool_name: &str, output: &str) -> Message {
2239 Message::ToolResult(imp_llm::ToolResultMessage {
2240 tool_call_id: call_id.to_string(),
2241 tool_name: tool_name.to_string(),
2242 content: vec![ContentBlock::Text {
2243 text: output.to_string(),
2244 }],
2245 is_error: false,
2246 details: serde_json::Value::Null,
2247 timestamp: imp_llm::now(),
2248 })
2249 }
2250
2251 fn tool_result_text(message: &Message) -> Option<&str> {
2252 match message {
2253 Message::ToolResult(result) => result.content.iter().find_map(|block| match block {
2254 ContentBlock::Text { text } => Some(text.as_str()),
2255 _ => None,
2256 }),
2257 _ => None,
2258 }
2259 }
2260
2261 struct EchoTool;
2263
2264 #[async_trait]
2265 impl crate::tools::Tool for EchoTool {
2266 fn name(&self) -> &str {
2267 "echo"
2268 }
2269 fn label(&self) -> &str {
2270 "Echo"
2271 }
2272 fn description(&self) -> &str {
2273 "Echoes back the input"
2274 }
2275 fn parameters(&self) -> serde_json::Value {
2276 serde_json::json!({
2277 "type": "object",
2278 "properties": {
2279 "text": { "type": "string" }
2280 },
2281 "required": ["text"]
2282 })
2283 }
2284 fn is_readonly(&self) -> bool {
2285 true
2286 }
2287 async fn execute(
2288 &self,
2289 _call_id: &str,
2290 params: serde_json::Value,
2291 _ctx: crate::tools::ToolContext,
2292 ) -> crate::error::Result<crate::tools::ToolOutput> {
2293 let text = params["text"].as_str().unwrap_or("no text");
2294 Ok(crate::tools::ToolOutput::text(format!("echo: {text}")))
2295 }
2296 }
2297
2298 #[allow(dead_code)]
2300 struct WriteTool;
2301
2302 #[async_trait]
2303 impl crate::tools::Tool for WriteTool {
2304 fn name(&self) -> &str {
2305 "write"
2306 }
2307 fn label(&self) -> &str {
2308 "Write"
2309 }
2310 fn description(&self) -> &str {
2311 "Writes data"
2312 }
2313 fn parameters(&self) -> serde_json::Value {
2314 serde_json::json!({
2315 "type": "object",
2316 "properties": {
2317 "data": { "type": "string" }
2318 },
2319 "required": ["data"]
2320 })
2321 }
2322 fn is_readonly(&self) -> bool {
2323 false
2324 }
2325 async fn execute(
2326 &self,
2327 _call_id: &str,
2328 params: serde_json::Value,
2329 _ctx: crate::tools::ToolContext,
2330 ) -> crate::error::Result<crate::tools::ToolOutput> {
2331 let data = params["data"].as_str().unwrap_or("no data");
2332 Ok(crate::tools::ToolOutput::text(format!("wrote: {data}")))
2333 }
2334 }
2335
2336 struct ConcurrentReadonlyState {
2337 readonly_expected: usize,
2338 readonly_started: AtomicUsize,
2339 readonly_finished: AtomicUsize,
2340 mutable_observed_finished: AtomicUsize,
2341 log: StdMutex<Vec<String>>,
2342 notify: Notify,
2343 }
2344
2345 impl ConcurrentReadonlyState {
2346 fn new(readonly_expected: usize) -> Self {
2347 Self {
2348 readonly_expected,
2349 readonly_started: AtomicUsize::new(0),
2350 readonly_finished: AtomicUsize::new(0),
2351 mutable_observed_finished: AtomicUsize::new(0),
2352 log: StdMutex::new(Vec::new()),
2353 notify: Notify::new(),
2354 }
2355 }
2356
2357 fn record(&self, entry: impl Into<String>) {
2358 self.log
2359 .lock()
2360 .expect("concurrent log lock")
2361 .push(entry.into());
2362 }
2363
2364 async fn wait_for_all_readonly_to_start(&self) {
2365 while self.readonly_started.load(Ordering::SeqCst) < self.readonly_expected {
2366 self.notify.notified().await;
2367 }
2368 }
2369 }
2370
2371 struct CoordinatedReadonlyTool {
2372 name: &'static str,
2373 shared: Arc<ConcurrentReadonlyState>,
2374 }
2375
2376 #[async_trait]
2377 impl crate::tools::Tool for CoordinatedReadonlyTool {
2378 fn name(&self) -> &str {
2379 self.name
2380 }
2381 fn label(&self) -> &str {
2382 self.name
2383 }
2384 fn description(&self) -> &str {
2385 "Read-only tool used to verify concurrent execution"
2386 }
2387 fn parameters(&self) -> serde_json::Value {
2388 serde_json::json!({
2389 "type": "object",
2390 "properties": {
2391 "text": { "type": "string" }
2392 },
2393 "required": ["text"]
2394 })
2395 }
2396 fn is_readonly(&self) -> bool {
2397 true
2398 }
2399 async fn execute(
2400 &self,
2401 _call_id: &str,
2402 params: serde_json::Value,
2403 _ctx: crate::tools::ToolContext,
2404 ) -> crate::error::Result<crate::tools::ToolOutput> {
2405 self.shared.record(format!("{}:start", self.name));
2406 self.shared.readonly_started.fetch_add(1, Ordering::SeqCst);
2407 self.shared.notify.notify_waiters();
2408 self.shared.wait_for_all_readonly_to_start().await;
2409 self.shared.record(format!("{}:end", self.name));
2410 self.shared.readonly_finished.fetch_add(1, Ordering::SeqCst);
2411
2412 let text = params["text"].as_str().unwrap_or(self.name);
2413 Ok(crate::tools::ToolOutput::text(format!(
2414 "{}: {text}",
2415 self.name
2416 )))
2417 }
2418 }
2419
2420 struct CoordinatedMutableTool {
2421 shared: Arc<ConcurrentReadonlyState>,
2422 }
2423
2424 #[async_trait]
2425 impl crate::tools::Tool for CoordinatedMutableTool {
2426 fn name(&self) -> &str {
2427 "write_after_reads"
2428 }
2429 fn label(&self) -> &str {
2430 "Write After Reads"
2431 }
2432 fn description(&self) -> &str {
2433 "Mutable tool used to verify read-only tools finish first"
2434 }
2435 fn parameters(&self) -> serde_json::Value {
2436 serde_json::json!({
2437 "type": "object",
2438 "properties": {
2439 "data": { "type": "string" }
2440 },
2441 "required": ["data"]
2442 })
2443 }
2444 fn is_readonly(&self) -> bool {
2445 false
2446 }
2447 async fn execute(
2448 &self,
2449 _call_id: &str,
2450 params: serde_json::Value,
2451 _ctx: crate::tools::ToolContext,
2452 ) -> crate::error::Result<crate::tools::ToolOutput> {
2453 let finished = self.shared.readonly_finished.load(Ordering::SeqCst);
2454 self.shared
2455 .mutable_observed_finished
2456 .store(finished, Ordering::SeqCst);
2457 self.shared.record("write_after_reads:start");
2458
2459 let data = params["data"].as_str().unwrap_or("no data");
2460 Ok(crate::tools::ToolOutput::text(format!(
2461 "wrote after reads: {data}"
2462 )))
2463 }
2464 }
2465
2466 async fn collect_events(mut handle: AgentHandle) -> Vec<AgentEvent> {
2468 let mut events = Vec::new();
2469 while let Some(event) = handle.event_rx.recv().await {
2470 events.push(event);
2471 }
2472 events
2473 }
2474
2475 #[test]
2476 fn agent_queues_mana_hint_for_planner_requests() {
2477 let provider = Arc::new(MockProvider::new(vec![
2478 text_response("Loaded mana skill", 100, 20),
2479 text_response("Done", 120, 25),
2480 ]));
2481
2482 let model = test_model(provider);
2483 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
2484 agent.has_mana_skill = true;
2485 agent.mode = AgentMode::Planner;
2486
2487 let rt = tokio::runtime::Runtime::new().unwrap();
2488 rt.block_on(async {
2489 agent
2490 .run("Please split this into units for workers".to_string())
2491 .await
2492 .unwrap();
2493 });
2494
2495 let user_texts: Vec<String> = agent
2496 .messages
2497 .iter()
2498 .filter_map(|message| match message {
2499 Message::User(user) => user.content.iter().find_map(|block| match block {
2500 ContentBlock::Text { text } => Some(text.clone()),
2501 _ => None,
2502 }),
2503 _ => None,
2504 })
2505 .collect();
2506
2507 assert_eq!(user_texts.len(), 1);
2508 assert_eq!(user_texts[0], "Please split this into units for workers");
2509 }
2510
2511 #[tokio::test]
2512 async fn agent_queues_mana_externalization_follow_up_after_planning_turn() {
2513 let provider = Arc::new(MockProvider::new(vec![
2514 text_response("Here is the plan: split this into phases, add dependencies, and define verify steps.", 100, 20),
2515 text_response("Externalized into mana.", 120, 25),
2516 ]));
2517
2518 let model = test_model(provider);
2519 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
2520 agent.has_mana_skill = true;
2521 agent.mode = AgentMode::Planner;
2522
2523 agent.run("Plan the rollout".to_string()).await.unwrap();
2524
2525 let user_texts: Vec<String> = agent
2526 .messages
2527 .iter()
2528 .filter_map(|message| match message {
2529 Message::User(user) => user.content.iter().find_map(|block| match block {
2530 ContentBlock::Text { text } => Some(text.clone()),
2531 _ => None,
2532 }),
2533 _ => None,
2534 })
2535 .collect();
2536
2537 assert_eq!(user_texts.len(), 2);
2538 assert_eq!(user_texts[0], "Plan the rollout");
2539 assert!(user_texts[1].contains("externalize the durable plan"));
2540 }
2541
2542 #[tokio::test]
2543 async fn turn_assessment_debug_view_reports_execution_blocker() {
2544 let (agent, _handle) = Agent::new(
2545 test_model(Arc::new(MockProvider::new(vec![]))),
2546 PathBuf::from("/tmp"),
2547 );
2548 let assessment = agent.assess_post_turn(
2549 &AssistantMessage {
2550 content: vec![ContentBlock::Text {
2551 text: "Verify failed.".to_string(),
2552 }],
2553 usage: None,
2554 stop_reason: StopReason::EndTurn,
2555 timestamp: 0,
2556 },
2557 &[imp_llm::ToolResultMessage {
2558 tool_call_id: "call_verify".to_string(),
2559 tool_name: "mana".to_string(),
2560 content: vec![ContentBlock::Text {
2561 text: "Verify failed".to_string(),
2562 }],
2563 is_error: true,
2564 details: serde_json::json!({
2565 "action": "verify",
2566 "passed": false,
2567 "exit_code": 1
2568 }),
2569 timestamp: 0,
2570 }],
2571 true,
2572 &TurnManaReview::no_change(0),
2573 );
2574
2575 let debug = assessment.debug_view();
2576 assert_eq!(
2577 debug.runtime.execution_stop_reason.as_deref(),
2578 Some("execution_blocked")
2579 );
2580 assert_eq!(
2581 debug.chosen_action,
2582 NextActionDebugView::Stop {
2583 reason: "execution_blocked".to_string(),
2584 }
2585 );
2586 }
2587
2588 #[test]
2589 fn turn_assessment_debug_view_reports_continue_recommendation() {
2590 let assessment = PostTurnAssessment {
2591 runtime: RuntimeEvidence {
2592 repeated_action: false,
2593 execution_stop_reason: None,
2594 work_completed: false,
2595 execution_debt: false,
2596 execution_evidence: false,
2597 planning_only_progress: false,
2598 },
2599 mana: ManaEvidence { stop_reason: None },
2600 text_fallback: TextFallbackEvidence {
2601 planner_stop_reason: None,
2602 execution_stop_reason: None,
2603 },
2604 continue_recommendation: Some(ContinueRecommendation {
2605 prompt: "continue".to_string(),
2606 reason: ContinueReason::HighConfidenceVisibleNextStep,
2607 }),
2608 };
2609
2610 let debug = assessment.debug_view();
2611 let recommendation = debug
2612 .continue_recommendation
2613 .expect("continue recommendation present");
2614 assert_eq!(recommendation.reason, "high_confidence_visible_next_step");
2615 assert!(matches!(
2616 debug.chosen_action,
2617 NextActionDebugView::Continue { .. }
2618 ));
2619 }
2620
2621 #[tokio::test]
2622 async fn emits_turn_assessment_event_for_execution_blocker() {
2623 let provider = Arc::new(MockProvider::new(vec![
2624 tool_call_response(
2625 "call_check",
2626 "bash",
2627 serde_json::json!({"command": "cargo check -p definitely_missing_crate", "timeout": 1}),
2628 100,
2629 20,
2630 ),
2631 text_response("The check failed.", 120, 20),
2632 ]));
2633
2634 let model = test_model(provider);
2635 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
2636 agent.mode = AgentMode::Full;
2637 agent.tools.register(Arc::new(crate::tools::bash::BashTool));
2638 agent.tools.register_alias("bash", "shell");
2639
2640 let events_task = tokio::spawn(collect_events(handle));
2641 agent.run("Run the check".to_string()).await.unwrap();
2642 drop(agent);
2643 let events = events_task.await.unwrap();
2644
2645 let assessment = events.iter().find_map(|event| match event {
2646 AgentEvent::TurnAssessment { assessment, .. } => Some(assessment),
2647 _ => None,
2648 });
2649
2650 let assessment = assessment.expect("turn assessment emitted");
2651 assert_eq!(
2652 assessment.runtime.execution_stop_reason.as_deref(),
2653 Some("execution_blocked")
2654 );
2655 assert_eq!(
2656 assessment.chosen_action,
2657 NextActionDebugView::Stop {
2658 reason: "execution_blocked".to_string(),
2659 }
2660 );
2661 }
2662
2663 #[tokio::test]
2664 async fn emits_turn_assessment_event_for_continue_recommendation() {
2665 let provider = Arc::new(MockProvider::new(vec![
2666 vec![
2667 StreamEvent::MessageStart {
2668 model: "test-model".to_string(),
2669 },
2670 StreamEvent::ToolCall {
2671 id: "call_1".to_string(),
2672 name: "mana".to_string(),
2673 arguments: serde_json::json!({"action": "update", "id": "1", "notes": "done"}),
2674 },
2675 StreamEvent::TextDelta {
2676 text: "Done. Updated mana and next step is ready to continue.".to_string(),
2677 },
2678 StreamEvent::MessageEnd {
2679 message: AssistantMessage {
2680 content: vec![
2681 ContentBlock::ToolCall {
2682 id: "call_1".to_string(),
2683 name: "mana".to_string(),
2684 arguments: serde_json::json!({"action": "update", "id": "1", "notes": "done"}),
2685 },
2686 ContentBlock::Text {
2687 text: "Done. Updated mana and next step is ready to continue."
2688 .to_string(),
2689 },
2690 ],
2691 usage: Some(Usage {
2692 input_tokens: 100,
2693 output_tokens: 20,
2694 cache_read_tokens: 0,
2695 cache_write_tokens: 0,
2696 }),
2697 stop_reason: StopReason::ToolUse,
2698 timestamp: 1000,
2699 },
2700 },
2701 ],
2702 text_response("Stopped after visible mana turn.", 120, 25),
2703 ]));
2704
2705 let model = test_model(provider);
2706 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
2707 agent.mode = AgentMode::Planner;
2708 agent.continue_policy = ContinuePolicy::Balanced;
2709 agent
2710 .tools
2711 .register(Arc::new(crate::tools::mana::ManaTool::default()));
2712
2713 let events_task = tokio::spawn(collect_events(handle));
2714 agent.run("Do the next thing".to_string()).await.unwrap();
2715 drop(agent);
2716 let events = events_task.await.unwrap();
2717
2718 let assessment = events.iter().find_map(|event| match event {
2719 AgentEvent::TurnAssessment { assessment, .. } => Some(assessment),
2720 _ => None,
2721 });
2722
2723 let assessment = assessment.expect("turn assessment emitted");
2724 let recommendation = assessment
2725 .continue_recommendation
2726 .as_ref()
2727 .expect("continue recommendation present");
2728 assert_eq!(recommendation.reason, "high_confidence_visible_next_step");
2729 assert!(matches!(
2730 assessment.chosen_action,
2731 NextActionDebugView::Continue { .. }
2732 ));
2733 }
2734
2735 #[test]
2736 fn post_turn_assessment_prefers_execution_blocker_over_completion() {
2737 let assessment = PostTurnAssessment {
2738 runtime: RuntimeEvidence {
2739 repeated_action: false,
2740 execution_stop_reason: Some(NextActionStopReason::ExecutionBlocked),
2741 work_completed: true,
2742 execution_debt: false,
2743 execution_evidence: false,
2744 planning_only_progress: false,
2745 },
2746 mana: ManaEvidence {
2747 stop_reason: Some(NextActionStopReason::DecompositionCompleted),
2748 },
2749 text_fallback: TextFallbackEvidence {
2750 planner_stop_reason: Some(NextActionStopReason::DecompositionCompleted),
2751 execution_stop_reason: Some(NextActionStopReason::WorkCompleted),
2752 },
2753 continue_recommendation: Some(ContinueRecommendation {
2754 prompt: "continue".to_string(),
2755 reason: ContinueReason::HighConfidenceVisibleNextStep,
2756 }),
2757 };
2758
2759 assert_eq!(
2760 assessment.into_next_action(),
2761 NextAction::Stop {
2762 reason: NextActionStopReason::ExecutionBlocked,
2763 }
2764 );
2765 }
2766
2767 #[test]
2768 fn post_turn_assessment_emits_continue_when_no_stop_reason_exists() {
2769 let assessment = PostTurnAssessment {
2770 runtime: RuntimeEvidence {
2771 repeated_action: false,
2772 execution_stop_reason: None,
2773 work_completed: false,
2774 execution_debt: false,
2775 execution_evidence: false,
2776 planning_only_progress: false,
2777 },
2778 mana: ManaEvidence { stop_reason: None },
2779 text_fallback: TextFallbackEvidence {
2780 planner_stop_reason: None,
2781 execution_stop_reason: None,
2782 },
2783 continue_recommendation: Some(ContinueRecommendation {
2784 prompt: "continue".to_string(),
2785 reason: ContinueReason::HighConfidenceVisibleNextStep,
2786 }),
2787 };
2788
2789 assert_eq!(
2790 assessment.into_next_action(),
2791 NextAction::Continue {
2792 prompt: "continue".to_string(),
2793 reason: ContinueReason::HighConfidenceVisibleNextStep,
2794 }
2795 );
2796 }
2797
2798 #[test]
2799 fn execution_debt_follow_up_is_preferred_before_stopping_for_planning_only_progress() {
2800 let assessment = PostTurnAssessment {
2801 runtime: RuntimeEvidence {
2802 repeated_action: false,
2803 execution_stop_reason: None,
2804 work_completed: false,
2805 execution_debt: true,
2806 execution_evidence: false,
2807 planning_only_progress: false,
2808 },
2809 mana: ManaEvidence { stop_reason: None },
2810 text_fallback: TextFallbackEvidence {
2811 planner_stop_reason: None,
2812 execution_stop_reason: None,
2813 },
2814 continue_recommendation: Some(ContinueRecommendation {
2815 prompt: execution_debt_follow_up_text().to_string(),
2816 reason: ContinueReason::ExecutionDebt,
2817 }),
2818 };
2819
2820 assert_eq!(
2821 assessment.into_next_action(),
2822 NextAction::Continue {
2823 prompt: execution_debt_follow_up_text().to_string(),
2824 reason: ContinueReason::ExecutionDebt,
2825 }
2826 );
2827 }
2828
2829 #[test]
2830 fn mana_planning_without_execution_creates_execution_debt_follow_up() {
2831 let result = imp_llm::ToolResultMessage {
2832 tool_call_id: "call_mana".to_string(),
2833 tool_name: "mana".to_string(),
2834 content: vec![ContentBlock::Text {
2835 text: "Created task".to_string(),
2836 }],
2837 is_error: false,
2838 details: serde_json::json!({ "action": "create" }),
2839 timestamp: 0,
2840 };
2841
2842 assert!(tool_results_indicate_execution_debt(
2843 std::slice::from_ref(&result),
2844 AgentMode::Full
2845 ));
2846 assert!(!tool_results_indicate_execution_evidence(
2847 std::slice::from_ref(&result),
2848 AgentMode::Full
2849 ));
2850 assert!(should_queue_execution_debt_follow_up(
2851 true, false, false, true
2852 ));
2853 }
2854
2855 #[test]
2856 fn mutating_tool_call_satisfies_execution_evidence() {
2857 let result = imp_llm::ToolResultMessage {
2858 tool_call_id: "call_edit".to_string(),
2859 tool_name: "edit".to_string(),
2860 content: vec![ContentBlock::Text {
2861 text: "diff".to_string(),
2862 }],
2863 is_error: false,
2864 details: serde_json::json!({ "path": "src/lib.rs" }),
2865 timestamp: 0,
2866 };
2867
2868 assert!(tool_results_indicate_execution_evidence(
2869 &[result],
2870 AgentMode::Full
2871 ));
2872 assert!(!should_queue_execution_debt_follow_up(
2873 true, true, false, true
2874 ));
2875 }
2876
2877 #[test]
2878 fn tool_results_indicate_execution_blocker_detects_failed_verify() {
2879 let result = imp_llm::ToolResultMessage {
2880 tool_call_id: "call_verify".to_string(),
2881 tool_name: "mana".to_string(),
2882 content: vec![ContentBlock::Text {
2883 text: "Verify failed".to_string(),
2884 }],
2885 is_error: true,
2886 details: serde_json::json!({
2887 "action": "verify",
2888 "passed": false,
2889 "exit_code": 1
2890 }),
2891 timestamp: 0,
2892 };
2893
2894 assert_eq!(
2895 tool_results_indicate_execution_blocker(&[result], AgentMode::Full),
2896 Some(NextActionStopReason::ExecutionBlocked)
2897 );
2898 }
2899
2900 #[test]
2901 fn tool_results_indicate_execution_blocker_detects_ask_tool_as_user_blocker() {
2902 let result = imp_llm::ToolResultMessage {
2903 tool_call_id: "call_ask".to_string(),
2904 tool_name: "ask".to_string(),
2905 content: vec![ContentBlock::Text {
2906 text: "blue".to_string(),
2907 }],
2908 is_error: false,
2909 details: serde_json::Value::Null,
2910 timestamp: 0,
2911 };
2912
2913 assert_eq!(
2914 tool_results_indicate_execution_blocker(&[result], AgentMode::Full),
2915 Some(NextActionStopReason::UserBlocker)
2916 );
2917 }
2918
2919 #[test]
2920 fn tool_results_indicate_work_completed_detects_edit_plus_successful_check() {
2921 let edit_result = imp_llm::ToolResultMessage {
2922 tool_call_id: "call_edit".to_string(),
2923 tool_name: "edit".to_string(),
2924 content: vec![ContentBlock::Text {
2925 text: "diff output".to_string(),
2926 }],
2927 is_error: false,
2928 details: serde_json::json!({
2929 "path": "/tmp/file.rs"
2930 }),
2931 timestamp: 0,
2932 };
2933 let check_result = imp_llm::ToolResultMessage {
2934 tool_call_id: "call_check".to_string(),
2935 tool_name: "bash".to_string(),
2936 content: vec![ContentBlock::Text {
2937 text: "ok".to_string(),
2938 }],
2939 is_error: false,
2940 details: serde_json::json!({
2941 "exit_code": 0,
2942 "command": "cargo check -p imp-core"
2943 }),
2944 timestamp: 0,
2945 };
2946
2947 assert!(tool_results_indicate_work_completed(
2948 &[edit_result, check_result],
2949 AgentMode::Full
2950 ));
2951 }
2952
2953 #[test]
2954 fn tool_results_indicate_work_completed_detects_closed_unit_details() {
2955 let result = imp_llm::ToolResultMessage {
2956 tool_call_id: "call_close".to_string(),
2957 tool_name: "mana".to_string(),
2958 content: vec![ContentBlock::Text {
2959 text: "Closed unit 1".to_string(),
2960 }],
2961 is_error: false,
2962 details: serde_json::json!({
2963 "action": "close",
2964 "unit": {
2965 "id": "1",
2966 "title": "Test unit",
2967 "status": "closed"
2968 }
2969 }),
2970 timestamp: 0,
2971 };
2972
2973 assert!(tool_results_indicate_work_completed(
2974 &[result],
2975 AgentMode::Full
2976 ));
2977 }
2978
2979 #[test]
2980 fn mana_review_needs_decision_maps_to_user_blocker() {
2981 let review = TurnManaReview {
2982 turn_index: 0,
2983 state: ManaReviewState::NeedsDecision,
2984 scope: crate::mana_review::ManaReviewScope::default(),
2985 anchor_unit: None,
2986 touched_units: Vec::new(),
2987 proposed_children: Vec::new(),
2988 material_field_changes: Vec::new(),
2989 notes_appended: Vec::new(),
2990 decision_events: Vec::new(),
2991 unresolved_consequential_choices: Vec::new(),
2992 next_question: Some("Which path should we take?".to_string()),
2993 };
2994
2995 assert_eq!(
2996 mana_review_stop_reason(&review, AgentMode::Planner),
2997 Some(NextActionStopReason::UserBlocker)
2998 );
2999 }
3000
3001 #[test]
3002 fn mana_review_changed_with_planner_children_maps_to_decomposition_completed() {
3003 let review = TurnManaReview {
3004 turn_index: 0,
3005 state: ManaReviewState::Changed,
3006 scope: crate::mana_review::ManaReviewScope::default(),
3007 anchor_unit: None,
3008 touched_units: Vec::new(),
3009 proposed_children: vec![crate::mana_review::TurnManaProposedChild {
3010 unit: crate::mana_review::ManaUnitRef::new(
3011 "28.6.1",
3012 "child",
3013 Some("job".to_string()),
3014 ),
3015 parent: crate::mana_review::ManaUnitRef::new(
3016 "28.6",
3017 "parent",
3018 Some("epic".to_string()),
3019 ),
3020 child_kind: crate::mana_review::ManaReviewUnitKind::Job,
3021 child_origin: crate::mana_review::ManaUnitOrigin::CreatedInTurn,
3022 }],
3023 material_field_changes: Vec::new(),
3024 notes_appended: Vec::new(),
3025 decision_events: Vec::new(),
3026 unresolved_consequential_choices: Vec::new(),
3027 next_question: None,
3028 };
3029
3030 assert_eq!(
3031 mana_review_stop_reason(&review, AgentMode::Planner),
3032 Some(NextActionStopReason::DecompositionCompleted)
3033 );
3034 }
3035
3036 #[tokio::test]
3037 async fn planner_stops_after_decomposition_is_externalized() {
3038 let provider = Arc::new(MockProvider::new(vec![text_response(
3039 "Externalized into mana. Plan is complete and ready for handoff.",
3040 100,
3041 20,
3042 )]));
3043
3044 let model = test_model(provider);
3045 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
3046 agent.mode = AgentMode::Planner;
3047 agent.has_mana_skill = true;
3048
3049 agent.run("Plan the rollout".to_string()).await.unwrap();
3050
3051 let user_texts: Vec<String> = agent
3052 .messages
3053 .iter()
3054 .filter_map(|message| match message {
3055 Message::User(user) => user.content.iter().find_map(|block| match block {
3056 ContentBlock::Text { text } => Some(text.clone()),
3057 _ => None,
3058 }),
3059 _ => None,
3060 })
3061 .collect();
3062
3063 assert_eq!(user_texts, vec!["Plan the rollout".to_string()]);
3064 }
3065
3066 #[tokio::test]
3067 async fn planner_stops_for_user_blocker_instead_of_auto_follow_up() {
3068 let provider = Arc::new(MockProvider::new(vec![text_response(
3069 "Blocked: I need your input on which auth direction we should choose before continuing.",
3070 100,
3071 20,
3072 )]));
3073
3074 let model = test_model(provider);
3075 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
3076 agent.mode = AgentMode::Planner;
3077 agent.has_mana_skill = true;
3078
3079 agent.run("Plan the rollout".to_string()).await.unwrap();
3080
3081 let user_texts: Vec<String> = agent
3082 .messages
3083 .iter()
3084 .filter_map(|message| match message {
3085 Message::User(user) => user.content.iter().find_map(|block| match block {
3086 ContentBlock::Text { text } => Some(text.clone()),
3087 _ => None,
3088 }),
3089 _ => None,
3090 })
3091 .collect();
3092
3093 assert_eq!(user_texts, vec!["Plan the rollout".to_string()]);
3094 }
3095
3096 #[tokio::test]
3097 async fn agent_queues_confidence_continue_follow_up_after_visible_mana_turn() {
3098 let provider = Arc::new(MockProvider::new(vec![
3099 vec![
3100 StreamEvent::MessageStart {
3101 model: "test-model".to_string(),
3102 },
3103 StreamEvent::ToolCall {
3104 id: "call_1".to_string(),
3105 name: "mana".to_string(),
3106 arguments: serde_json::json!({"action": "update", "id": "1", "notes": "done"}),
3107 },
3108 StreamEvent::TextDelta {
3109 text: "Done. Updated mana and next step is ready to continue.".to_string(),
3110 },
3111 StreamEvent::MessageEnd {
3112 message: AssistantMessage {
3113 content: vec![
3114 ContentBlock::ToolCall {
3115 id: "call_1".to_string(),
3116 name: "mana".to_string(),
3117 arguments: serde_json::json!({"action": "update", "id": "1", "notes": "done"}),
3118 },
3119 ContentBlock::Text {
3120 text: "Done. Updated mana and next step is ready to continue."
3121 .to_string(),
3122 },
3123 ],
3124 usage: Some(Usage {
3125 input_tokens: 100,
3126 output_tokens: 20,
3127 cache_read_tokens: 0,
3128 cache_write_tokens: 0,
3129 }),
3130 stop_reason: StopReason::ToolUse,
3131 timestamp: 1000,
3132 },
3133 },
3134 ],
3135 text_response("Continuing.", 120, 25),
3136 ]));
3137
3138 let model = test_model(provider);
3139 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
3140 agent.mode = AgentMode::Planner;
3141 agent.continue_policy = ContinuePolicy::Balanced;
3142 agent
3143 .tools
3144 .register(Arc::new(crate::tools::mana::ManaTool::default()));
3145
3146 agent.run("Do the next thing".to_string()).await.unwrap();
3147
3148 let user_texts: Vec<String> = agent
3149 .messages
3150 .iter()
3151 .filter_map(|message| match message {
3152 Message::User(user) => user.content.iter().find_map(|block| match block {
3153 ContentBlock::Text { text } => Some(text.clone()),
3154 _ => None,
3155 }),
3156 _ => None,
3157 })
3158 .collect();
3159
3160 assert_eq!(user_texts.len(), 2);
3161 assert!(user_texts[1].contains("Confidence is high"));
3162 }
3163
3164 #[tokio::test]
3165 async fn agent_does_not_queue_confidence_continue_when_policy_disabled() {
3166 let provider = Arc::new(MockProvider::new(vec![
3167 vec![
3168 StreamEvent::MessageStart {
3169 model: "test-model".to_string(),
3170 },
3171 StreamEvent::ToolCall {
3172 id: "call_1".to_string(),
3173 name: "mana".to_string(),
3174 arguments: serde_json::json!({"action": "update", "id": "1", "notes": "done"}),
3175 },
3176 StreamEvent::TextDelta {
3177 text: "Done. Updated mana and next step is ready to continue.".to_string(),
3178 },
3179 StreamEvent::MessageEnd {
3180 message: AssistantMessage {
3181 content: vec![
3182 ContentBlock::ToolCall {
3183 id: "call_1".to_string(),
3184 name: "mana".to_string(),
3185 arguments: serde_json::json!({"action": "update", "id": "1", "notes": "done"}),
3186 },
3187 ContentBlock::Text {
3188 text: "Done. Updated mana and next step is ready to continue."
3189 .to_string(),
3190 },
3191 ],
3192 usage: Some(Usage {
3193 input_tokens: 100,
3194 output_tokens: 20,
3195 cache_read_tokens: 0,
3196 cache_write_tokens: 0,
3197 }),
3198 stop_reason: StopReason::ToolUse,
3199 timestamp: 1000,
3200 },
3201 },
3202 ],
3203 text_response("Stopped after visible mana turn.", 120, 25),
3204 ]));
3205
3206 let model = test_model(provider);
3207 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
3208 agent.mode = AgentMode::Planner;
3209 agent.continue_policy = ContinuePolicy::Disabled;
3210 agent
3211 .tools
3212 .register(Arc::new(crate::tools::mana::ManaTool::default()));
3213
3214 agent.run("Do the next thing".to_string()).await.unwrap();
3215
3216 let user_texts: Vec<String> = agent
3217 .messages
3218 .iter()
3219 .filter_map(|message| match message {
3220 Message::User(user) => user.content.iter().find_map(|block| match block {
3221 ContentBlock::Text { text } => Some(text.clone()),
3222 _ => None,
3223 }),
3224 _ => None,
3225 })
3226 .collect();
3227
3228 assert_eq!(user_texts, vec!["Do the next thing".to_string()]);
3229 }
3230
3231 #[tokio::test]
3232 async fn agent_does_not_queue_externalization_follow_up_after_mana_tool_turn() {
3233 let provider = Arc::new(MockProvider::new(vec![
3234 tool_call_response(
3235 "call_1",
3236 "mana",
3237 serde_json::json!({"action": "status"}),
3238 100,
3239 20,
3240 ),
3241 text_response("Done after mana", 120, 25),
3242 ]));
3243
3244 let model = test_model(provider);
3245 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
3246 agent.has_mana_skill = true;
3247 agent.mode = AgentMode::Planner;
3248 agent
3249 .tools
3250 .register(Arc::new(crate::tools::mana::ManaTool::default()));
3251
3252 agent.run("Plan the rollout".to_string()).await.unwrap();
3253
3254 let user_texts: Vec<String> = agent
3255 .messages
3256 .iter()
3257 .filter_map(|message| match message {
3258 Message::User(user) => user.content.iter().find_map(|block| match block {
3259 ContentBlock::Text { text } => Some(text.clone()),
3260 _ => None,
3261 }),
3262 _ => None,
3263 })
3264 .collect();
3265
3266 assert_eq!(user_texts, vec!["Plan the rollout".to_string()]);
3267 }
3268
3269 #[tokio::test]
3270 async fn agent_queues_mana_basics_hint_for_worker_mana_requests() {
3271 let provider = Arc::new(MockProvider::new(vec![
3272 text_response("Loaded basics skill", 100, 20),
3273 text_response("Done", 120, 25),
3274 ]));
3275
3276 let model = test_model(provider);
3277 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
3278 agent.has_mana_basics_skill = true;
3279 agent.mode = AgentMode::Worker;
3280
3281 agent
3282 .run("Check mana status and logs for my unit".to_string())
3283 .await
3284 .unwrap();
3285
3286 let user_texts: Vec<String> = agent
3287 .messages
3288 .iter()
3289 .filter_map(|message| match message {
3290 Message::User(user) => user.content.iter().find_map(|block| match block {
3291 ContentBlock::Text { text } => Some(text.clone()),
3292 _ => None,
3293 }),
3294 _ => None,
3295 })
3296 .collect();
3297
3298 assert_eq!(user_texts.len(), 1);
3299 assert_eq!(user_texts[0], "Check mana status and logs for my unit");
3300 }
3301
3302 #[tokio::test]
3303 async fn agent_does_not_queue_mana_hint_without_matching_signal() {
3304 let provider = Arc::new(MockProvider::new(vec![text_response("No nudge", 100, 20)]));
3305
3306 let model = test_model(provider);
3307 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
3308 agent.has_mana_skill = true;
3309 agent.mode = AgentMode::Planner;
3310
3311 agent
3312 .run("Explain how this parser works".to_string())
3313 .await
3314 .unwrap();
3315
3316 let user_texts: Vec<String> = agent
3317 .messages
3318 .iter()
3319 .filter_map(|message| match message {
3320 Message::User(user) => user.content.iter().find_map(|block| match block {
3321 ContentBlock::Text { text } => Some(text.clone()),
3322 _ => None,
3323 }),
3324 _ => None,
3325 })
3326 .collect();
3327
3328 assert_eq!(
3329 user_texts,
3330 vec!["Explain how this parser works".to_string()]
3331 );
3332 }
3333
3334 #[tokio::test]
3335 async fn agent_does_not_queue_mana_basics_hint_when_no_tools_available() {
3336 let provider = Arc::new(MockProvider::new(vec![text_response(
3337 "Loaded basics skill",
3338 100,
3339 20,
3340 )]));
3341
3342 let model = test_model(provider);
3343 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
3344 agent.has_mana_basics_skill = true;
3345 agent.mode = AgentMode::Worker;
3346 agent.tools.retain(|_| false);
3347
3348 agent
3349 .run("Check mana status and logs for my unit".to_string())
3350 .await
3351 .unwrap();
3352
3353 let user_texts: Vec<String> = agent
3354 .messages
3355 .iter()
3356 .filter_map(|message| match message {
3357 Message::User(user) => user.content.iter().find_map(|block| match block {
3358 ContentBlock::Text { text } => Some(text.clone()),
3359 _ => None,
3360 }),
3361 _ => None,
3362 })
3363 .collect();
3364
3365 assert_eq!(
3366 user_texts,
3367 vec!["Check mana status and logs for my unit".to_string()]
3368 );
3369 }
3370
3371 #[tokio::test]
3372 async fn single_text_turn_with_max_turns_one_and_no_tools_exits_cleanly() {
3373 let provider = Arc::new(MockProvider::new(vec![text_response("SMOKE_OK", 50, 10)]));
3374 let model = test_model(provider);
3375 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
3376 agent.max_turns = 1;
3377 agent.mode = AgentMode::Worker;
3378 agent.has_mana_basics_skill = true;
3379 agent.tools.retain(|_| false);
3380
3381 let events_task = tokio::spawn(collect_events(handle));
3382 let result = agent.run("Check mana status and finish".to_string()).await;
3383 drop(agent);
3384
3385 assert!(result.is_ok());
3386
3387 let events = events_task.await.unwrap();
3388 assert!(events
3389 .iter()
3390 .any(|e| matches!(e, AgentEvent::AgentEnd { .. })));
3391 assert!(!events.iter().any(|e| matches!(
3392 e,
3393 AgentEvent::Error { error } if error.contains("Max turns exceeded")
3394 )));
3395 }
3396
3397 #[tokio::test]
3398 async fn agent_emits_timing_events_in_order() {
3399 let provider = Arc::new(MockProvider::new(vec![text_response("timed", 10, 5)]));
3400 let model = test_model(provider);
3401 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
3402
3403 let events_task = tokio::spawn(collect_events(handle));
3404 agent.run("time this".to_string()).await.unwrap();
3405 drop(agent);
3406
3407 let events = events_task.await.unwrap();
3408 let timings: Vec<_> = events
3409 .iter()
3410 .filter_map(|event| match event {
3411 AgentEvent::Timing { timing } => Some(*timing),
3412 _ => None,
3413 })
3414 .collect();
3415
3416 assert!(timings.len() >= 4);
3417 assert_eq!(timings[0].stage, TimingStage::LlmRequestStart);
3418 assert_eq!(timings[1].stage, TimingStage::FirstStreamEvent);
3419 assert_eq!(timings[2].stage, TimingStage::FirstTextDelta);
3420 assert!(timings
3421 .iter()
3422 .any(|timing| timing.stage == TimingStage::MessageEnd));
3423
3424 for timing in timings {
3425 assert_eq!(timing.turn, 0);
3426 assert!(timing.since_turn_start_ms >= timing.since_llm_request_start_ms);
3427 }
3428 }
3429
3430 #[tokio::test]
3431 async fn agent_streams_message_delta_before_message_end() {
3432 let provider = Arc::new(MockProvider::new_results(vec![vec![
3433 Ok(StreamEvent::MessageStart {
3434 model: "test-model".to_string(),
3435 }),
3436 Ok(StreamEvent::TextDelta {
3437 text: "streaming".to_string(),
3438 }),
3439 Ok(StreamEvent::MessageEnd {
3440 message: AssistantMessage {
3441 content: vec![ContentBlock::Text {
3442 text: "streaming".to_string(),
3443 }],
3444 usage: Some(Usage {
3445 input_tokens: 10,
3446 output_tokens: 5,
3447 cache_read_tokens: 0,
3448 cache_write_tokens: 0,
3449 }),
3450 stop_reason: StopReason::EndTurn,
3451 timestamp: 1000,
3452 },
3453 }),
3454 ]]));
3455
3456 let model = test_model(provider);
3457 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
3458
3459 let events_task = tokio::spawn(collect_events(handle));
3460 agent.run("Say hi".to_string()).await.unwrap();
3461 drop(agent);
3462
3463 let events = events_task.await.unwrap();
3464 let text_delta_idx = events.iter().position(|event| {
3465 matches!(
3466 event,
3467 AgentEvent::MessageDelta {
3468 delta: StreamEvent::TextDelta { text }
3469 } if text == "streaming"
3470 )
3471 });
3472 let turn_end_idx = events
3473 .iter()
3474 .position(|event| matches!(event, AgentEvent::TurnEnd { .. }));
3475
3476 assert!(text_delta_idx.is_some());
3477 assert!(turn_end_idx.is_some());
3478 assert!(text_delta_idx.unwrap() < turn_end_idx.unwrap());
3479 }
3480
3481 #[tokio::test]
3482 async fn agent_retries_before_first_meaningful_event_but_not_after() {
3483 let provider = Arc::new(MockProvider::new_results(vec![
3484 vec![
3485 Ok(StreamEvent::MessageStart {
3486 model: "test-model".to_string(),
3487 }),
3488 Err(imp_llm::Error::Stream("startup failure".into())),
3489 ],
3490 vec![
3491 Ok(StreamEvent::MessageStart {
3492 model: "test-model".to_string(),
3493 }),
3494 Ok(StreamEvent::TextDelta {
3495 text: "recovered".to_string(),
3496 }),
3497 Ok(StreamEvent::MessageEnd {
3498 message: AssistantMessage {
3499 content: vec![ContentBlock::Text {
3500 text: "recovered".to_string(),
3501 }],
3502 usage: Some(Usage {
3503 input_tokens: 10,
3504 output_tokens: 5,
3505 cache_read_tokens: 0,
3506 cache_write_tokens: 0,
3507 }),
3508 stop_reason: StopReason::EndTurn,
3509 timestamp: 1000,
3510 },
3511 }),
3512 ],
3513 ]));
3514
3515 let model = test_model(provider);
3516 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
3517
3518 let events_task = tokio::spawn(collect_events(handle));
3519 agent.run("Recover".to_string()).await.unwrap();
3520 drop(agent);
3521
3522 let events = events_task.await.unwrap();
3523 let text_delta = events.iter().position(|e| {
3524 matches!(
3525 e,
3526 AgentEvent::MessageDelta {
3527 delta: StreamEvent::TextDelta { text }
3528 } if text == "recovered"
3529 )
3530 });
3531 let turn_end = events
3532 .iter()
3533 .position(|e| matches!(e, AgentEvent::TurnEnd { .. }));
3534
3535 assert!(text_delta.is_some());
3536 assert!(turn_end.is_some());
3537 assert!(text_delta.unwrap() < turn_end.unwrap());
3538 }
3539
3540 #[tokio::test]
3541 async fn agent_surfaces_error_after_partial_stream_without_retrying() {
3542 let provider = Arc::new(MockProvider::new_results(vec![vec![
3543 Ok(StreamEvent::TextDelta {
3544 text: "partial".to_string(),
3545 }),
3546 Err(imp_llm::Error::Stream("mid-stream failure".into())),
3547 ]]));
3548
3549 let model = test_model(provider);
3550 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
3551
3552 let events_task = tokio::spawn(collect_events(handle));
3553 let result = agent.run("Fail midway".to_string()).await;
3554 drop(agent);
3555
3556 assert!(result.is_err());
3557
3558 let events = events_task.await.unwrap();
3559 let text_delta = events.iter().position(|e| {
3560 matches!(
3561 e,
3562 AgentEvent::MessageDelta {
3563 delta: StreamEvent::TextDelta { text }
3564 } if text == "partial"
3565 )
3566 });
3567 let error_idx = events.iter().position(|e| {
3568 matches!(
3569 e,
3570 AgentEvent::Error { error }
3571 if error.contains("Provider stream failed after partial output")
3572 && error.contains("mid-stream failure")
3573 )
3574 });
3575
3576 assert!(text_delta.is_some());
3577 assert!(error_idx.is_some());
3578 assert!(text_delta.unwrap() < error_idx.unwrap());
3579 }
3580
3581 #[tokio::test]
3582 async fn agent_treats_silent_eof_without_message_end_as_error() {
3583 let provider = Arc::new(MockProvider::new_results(vec![vec![Ok(
3584 StreamEvent::TextDelta {
3585 text: "partial".to_string(),
3586 },
3587 )]]));
3588
3589 let model = test_model(provider);
3590 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
3591
3592 let events_task = tokio::spawn(collect_events(handle));
3593 let result = agent.run("Fail with silent eof".to_string()).await;
3594 drop(agent);
3595
3596 assert!(result.is_err());
3597
3598 let events = events_task.await.unwrap();
3599 let text_delta = events.iter().position(|e| {
3600 matches!(
3601 e,
3602 AgentEvent::MessageDelta {
3603 delta: StreamEvent::TextDelta { text }
3604 } if text == "partial"
3605 )
3606 });
3607 let error_idx = events.iter().position(|e| {
3608 matches!(
3609 e,
3610 AgentEvent::Error { error }
3611 if error.contains("missing terminal completion event")
3612 )
3613 });
3614 let turn_end_idx = events
3615 .iter()
3616 .position(|e| matches!(e, AgentEvent::TurnEnd { .. }));
3617
3618 assert!(text_delta.is_some());
3619 assert!(error_idx.is_some());
3620 assert!(turn_end_idx.is_none());
3621 assert!(text_delta.unwrap() < error_idx.unwrap());
3622 }
3623
3624 #[tokio::test]
3627 async fn agent_simple_text_response() {
3628 let provider = Arc::new(MockProvider::new(vec![text_response(
3629 "Hello, world!",
3630 100,
3631 20,
3632 )]));
3633
3634 let model = test_model(provider);
3635 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
3636
3637 let events_task = tokio::spawn(collect_events(handle));
3638 agent.run("Say hello".to_string()).await.unwrap();
3639 drop(agent); let events = events_task.await.unwrap();
3642
3643 assert!(matches!(events[0], AgentEvent::AgentStart { .. }));
3645
3646 let turn_start = events
3647 .iter()
3648 .position(|e| matches!(e, AgentEvent::TurnStart { index: 0 }));
3649 assert!(turn_start.is_some());
3650
3651 let turn_end = events
3652 .iter()
3653 .position(|e| matches!(e, AgentEvent::TurnEnd { index: 0, .. }));
3654 assert!(turn_end.is_some());
3655 assert!(turn_end.unwrap() > turn_start.unwrap());
3656
3657 let agent_end = events
3658 .iter()
3659 .position(|e| matches!(e, AgentEvent::AgentEnd { .. }));
3660 assert!(agent_end.is_some());
3661 assert!(agent_end.unwrap() > turn_end.unwrap());
3662
3663 if let AgentEvent::AgentEnd { usage, cost, .. } = &events[agent_end.unwrap()] {
3665 assert_eq!(usage.input_tokens, 100);
3666 assert_eq!(usage.output_tokens, 20);
3667 assert!(cost.total > 0.0);
3668 } else {
3669 panic!("Expected AgentEnd");
3670 }
3671
3672 let turn_starts: Vec<_> = events
3674 .iter()
3675 .filter(|e| matches!(e, AgentEvent::TurnStart { .. }))
3676 .collect();
3677 assert_eq!(turn_starts.len(), 1);
3678 }
3679
3680 #[tokio::test]
3683 async fn agent_single_tool_call() {
3684 let provider = Arc::new(MockProvider::new(vec![
3685 tool_call_response(
3687 "call_1",
3688 "echo",
3689 serde_json::json!({"text": "hello"}),
3690 100,
3691 30,
3692 ),
3693 text_response("The echo said: hello", 200, 25),
3695 ]));
3696
3697 let model = test_model(provider);
3698 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
3699 agent.tools.register(Arc::new(EchoTool));
3700
3701 let events_task = tokio::spawn(collect_events(handle));
3702 agent.run("Echo hello".to_string()).await.unwrap();
3703 drop(agent);
3704
3705 let events = events_task.await.unwrap();
3706
3707 let turn_starts: Vec<_> = events
3709 .iter()
3710 .filter(|e| matches!(e, AgentEvent::TurnStart { .. }))
3711 .collect();
3712 assert_eq!(turn_starts.len(), 2);
3713
3714 let tool_starts: Vec<_> = events
3716 .iter()
3717 .filter(|e| matches!(e, AgentEvent::ToolExecutionStart { .. }))
3718 .collect();
3719 assert_eq!(tool_starts.len(), 1);
3720
3721 let tool_ends: Vec<_> = events
3722 .iter()
3723 .filter(|e| matches!(e, AgentEvent::ToolExecutionEnd { .. }))
3724 .collect();
3725 assert_eq!(tool_ends.len(), 1);
3726
3727 if let Some(AgentEvent::AgentEnd { usage, .. }) = events
3729 .iter()
3730 .find(|e| matches!(e, AgentEvent::AgentEnd { .. }))
3731 {
3732 assert_eq!(usage.input_tokens, 300);
3733 assert_eq!(usage.output_tokens, 55);
3734 } else {
3735 panic!("Expected AgentEnd");
3736 }
3737 }
3738
3739 #[tokio::test]
3742 async fn agent_multiple_tool_calls() {
3743 let provider = Arc::new(MockProvider::new(vec![
3744 multi_tool_call_response(
3746 &[
3747 ("call_1", "echo", serde_json::json!({"text": "first"})),
3748 ("call_2", "echo", serde_json::json!({"text": "second"})),
3749 ],
3750 100,
3751 40,
3752 ),
3753 tool_call_response(
3755 "call_3",
3756 "echo",
3757 serde_json::json!({"text": "third"}),
3758 200,
3759 20,
3760 ),
3761 text_response("All done!", 300, 10),
3763 ]));
3764
3765 let model = test_model(provider);
3766 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
3767 agent.tools.register(Arc::new(EchoTool));
3768
3769 let events_task = tokio::spawn(collect_events(handle));
3770 agent.run("Echo three things".to_string()).await.unwrap();
3771 drop(agent);
3772
3773 let events = events_task.await.unwrap();
3774
3775 let turn_starts: Vec<_> = events
3777 .iter()
3778 .filter(|e| matches!(e, AgentEvent::TurnStart { .. }))
3779 .collect();
3780 assert_eq!(turn_starts.len(), 3);
3781
3782 let tool_starts: Vec<_> = events
3784 .iter()
3785 .filter(|e| matches!(e, AgentEvent::ToolExecutionStart { .. }))
3786 .collect();
3787 assert_eq!(tool_starts.len(), 3);
3788
3789 if let Some(AgentEvent::AgentEnd { usage, .. }) = events
3791 .iter()
3792 .find(|e| matches!(e, AgentEvent::AgentEnd { .. }))
3793 {
3794 assert_eq!(usage.input_tokens, 600);
3795 assert_eq!(usage.output_tokens, 70);
3796 } else {
3797 panic!("Expected AgentEnd");
3798 }
3799 }
3800
3801 #[tokio::test]
3804 async fn execution_stops_after_failed_verify_tool_result_without_blocked_text() {
3805 let provider = Arc::new(MockProvider::new(vec![
3806 tool_call_response(
3807 "call_verify",
3808 "mana",
3809 serde_json::json!({"action": "verify", "id": "1"}),
3810 100,
3811 20,
3812 ),
3813 text_response("Verify failed.", 120, 20),
3814 ]));
3815
3816 let model = test_model(provider);
3817 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
3818 agent.mode = AgentMode::Full;
3819 agent
3820 .tools
3821 .register(Arc::new(crate::tools::mana::ManaTool::default()));
3822
3823 agent.run("Verify the unit".to_string()).await.unwrap();
3824
3825 let user_texts: Vec<String> = agent
3826 .messages
3827 .iter()
3828 .filter_map(|message| match message {
3829 Message::User(user) => user.content.iter().find_map(|block| match block {
3830 ContentBlock::Text { text } => Some(text.clone()),
3831 _ => None,
3832 }),
3833 _ => None,
3834 })
3835 .collect();
3836
3837 assert_eq!(user_texts, vec!["Verify the unit".to_string()]);
3838 }
3839
3840 #[tokio::test]
3841 async fn execution_stops_after_mana_close_tool_result_without_done_text() {
3842 let provider = Arc::new(MockProvider::new(vec![
3843 tool_call_response(
3844 "call_close",
3845 "mana",
3846 serde_json::json!({"action": "close", "id": "1"}),
3847 100,
3848 20,
3849 ),
3850 text_response("Unit closed.", 120, 20),
3851 ]));
3852
3853 let model = test_model(provider);
3854 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
3855 agent.mode = AgentMode::Full;
3856 agent
3857 .tools
3858 .register(Arc::new(crate::tools::mana::ManaTool::default()));
3859
3860 agent.run("Close the unit".to_string()).await.unwrap();
3861
3862 let user_texts: Vec<String> = agent
3863 .messages
3864 .iter()
3865 .filter_map(|message| match message {
3866 Message::User(user) => user.content.iter().find_map(|block| match block {
3867 ContentBlock::Text { text } => Some(text.clone()),
3868 _ => None,
3869 }),
3870 _ => None,
3871 })
3872 .collect();
3873
3874 assert_eq!(user_texts, vec!["Close the unit".to_string()]);
3875 }
3876
3877 #[tokio::test]
3878 async fn execution_stops_after_work_completed_text() {
3879 let provider = Arc::new(MockProvider::new(vec![text_response(
3880 "All done! Implemented the change and finished the task.",
3881 100,
3882 20,
3883 )]));
3884
3885 let model = test_model(provider);
3886 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
3887 agent.mode = AgentMode::Full;
3888
3889 agent.run("Implement the change".to_string()).await.unwrap();
3890
3891 let user_texts: Vec<String> = agent
3892 .messages
3893 .iter()
3894 .filter_map(|message| match message {
3895 Message::User(user) => user.content.iter().find_map(|block| match block {
3896 ContentBlock::Text { text } => Some(text.clone()),
3897 _ => None,
3898 }),
3899 _ => None,
3900 })
3901 .collect();
3902
3903 assert_eq!(user_texts, vec!["Implement the change".to_string()]);
3904 }
3905
3906 #[tokio::test]
3907 async fn execution_stops_for_user_blocker_text() {
3908 let provider = Arc::new(MockProvider::new(vec![text_response(
3909 "Blocked: I need your input on which path to take before continuing.",
3910 100,
3911 20,
3912 )]));
3913
3914 let model = test_model(provider);
3915 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
3916 agent.mode = AgentMode::Full;
3917
3918 agent.run("Implement the change".to_string()).await.unwrap();
3919
3920 let user_texts: Vec<String> = agent
3921 .messages
3922 .iter()
3923 .filter_map(|message| match message {
3924 Message::User(user) => user.content.iter().find_map(|block| match block {
3925 ContentBlock::Text { text } => Some(text.clone()),
3926 _ => None,
3927 }),
3928 _ => None,
3929 })
3930 .collect();
3931
3932 assert_eq!(user_texts, vec!["Implement the change".to_string()]);
3933 }
3934
3935 #[tokio::test]
3936 async fn agent_follow_up_runs_after_current_work_finishes() {
3937 let provider = Arc::new(MockProvider::new(vec![
3938 tool_call_response(
3939 "call_1",
3940 "echo",
3941 serde_json::json!({"text": "hello"}),
3942 100,
3943 20,
3944 ),
3945 text_response("Handled follow-up", 120, 25),
3946 ]));
3947
3948 let model = test_model(provider);
3949 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
3950 agent.tools.register(Arc::new(EchoTool));
3951
3952 handle
3953 .command_tx
3954 .send(AgentCommand::FollowUp("What next?".into()))
3955 .await
3956 .unwrap();
3957
3958 let events_task = tokio::spawn(collect_events(handle));
3959 agent.run("Do the first thing".to_string()).await.unwrap();
3960 drop(agent);
3961
3962 let events = events_task.await.unwrap();
3963 let turn_starts: Vec<_> = events
3964 .iter()
3965 .filter(|e| matches!(e, AgentEvent::TurnStart { .. }))
3966 .collect();
3967 assert_eq!(turn_starts.len(), 2);
3968 }
3969
3970 #[tokio::test]
3971 async fn agent_follow_up_preserves_order_with_multiple_messages() {
3972 let provider = Arc::new(MockProvider::new(vec![
3973 tool_call_response(
3974 "call_1",
3975 "echo",
3976 serde_json::json!({"text": "hello"}),
3977 100,
3978 20,
3979 ),
3980 text_response("First follow-up handled", 120, 25),
3981 text_response("Second follow-up handled", 130, 30),
3982 ]));
3983
3984 let model = test_model(provider);
3985 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
3986 agent.tools.register(Arc::new(EchoTool));
3987
3988 handle
3989 .command_tx
3990 .send(AgentCommand::FollowUp("follow up one".into()))
3991 .await
3992 .unwrap();
3993 handle
3994 .command_tx
3995 .send(AgentCommand::FollowUp("follow up two".into()))
3996 .await
3997 .unwrap();
3998
3999 agent.run("Do the first thing".to_string()).await.unwrap();
4000
4001 let user_texts: Vec<String> = agent
4002 .messages
4003 .iter()
4004 .filter_map(|message| match message {
4005 Message::User(user) => user.content.iter().find_map(|block| match block {
4006 ContentBlock::Text { text } => Some(text.clone()),
4007 _ => None,
4008 }),
4009 _ => None,
4010 })
4011 .collect();
4012
4013 assert_eq!(
4014 user_texts,
4015 vec![
4016 "Do the first thing".to_string(),
4017 "follow up one".to_string(),
4018 "follow up two".to_string()
4019 ]
4020 );
4021 }
4022
4023 #[tokio::test]
4024 async fn agent_cancel_still_wins_over_follow_up_queue() {
4025 let provider = Arc::new(MockProvider::new(vec![tool_call_response(
4026 "call_1",
4027 "echo",
4028 serde_json::json!({"text": "hello"}),
4029 100,
4030 20,
4031 )]));
4032
4033 let model = test_model(provider);
4034 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4035 agent.tools.register(Arc::new(EchoTool));
4036
4037 handle
4038 .command_tx
4039 .send(AgentCommand::FollowUp("queued later".into()))
4040 .await
4041 .unwrap();
4042 handle.command_tx.send(AgentCommand::Cancel).await.unwrap();
4043
4044 let result = agent.run("Do something".to_string()).await;
4045 assert!(matches!(result, Err(crate::error::Error::Cancelled)));
4046 }
4047
4048 #[test]
4049 fn mana_bash_equivalent_hint_handles_release_and_tree() {
4050 assert!(mana_bash_equivalent_hint("mana release 1").is_some());
4051 assert!(mana_bash_equivalent_hint("mana tree").is_some());
4052 }
4053
4054 #[test]
4055 fn mana_bash_equivalent_hint_ignores_non_mana_prefixes() {
4056 assert!(mana_bash_equivalent_hint("manatee status").is_none());
4057 assert!(mana_bash_equivalent_hint("./mana status").is_none());
4058 }
4059
4060 #[tokio::test]
4061 async fn agent_blocks_bash_mana_when_native_action_exists() {
4062 let provider = Arc::new(MockProvider::new(vec![
4063 tool_call_response(
4064 "call_1",
4065 "bash",
4066 serde_json::json!({"command": "mana status", "timeout": 5}),
4067 100,
4068 20,
4069 ),
4070 text_response("Recovered after native-mana hint", 120, 25),
4071 ]));
4072
4073 let model = test_model(provider);
4074 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4075 agent.tools.register(Arc::new(crate::tools::bash::BashTool));
4076
4077 let events_task = tokio::spawn(collect_events(handle));
4078 agent.run("Check mana state".to_string()).await.unwrap();
4079 drop(agent);
4080
4081 let events = events_task.await.unwrap();
4082 let tool_end = events.iter().find_map(|e| match e {
4083 AgentEvent::ToolExecutionEnd { result, .. } => Some(result),
4084 _ => None,
4085 });
4086 let tool_end = tool_end.expect("expected ToolExecutionEnd");
4087 assert!(tool_end.is_error);
4088 let text = tool_end
4089 .content
4090 .iter()
4091 .find_map(|b| match b {
4092 ContentBlock::Text { text } => Some(text.as_str()),
4093 _ => None,
4094 })
4095 .unwrap_or("");
4096 assert!(text.contains("Use the native mana tool"));
4097 }
4098
4099 #[tokio::test]
4100 async fn agent_allows_non_mana_bash_commands() {
4101 let provider = Arc::new(MockProvider::new(vec![
4102 tool_call_response(
4103 "call_1",
4104 "bash",
4105 serde_json::json!({"command": "printf 'ok'", "timeout": 5}),
4106 100,
4107 20,
4108 ),
4109 text_response("done", 120, 25),
4110 ]));
4111
4112 let model = test_model(provider);
4113 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
4114 agent.tools.register(Arc::new(crate::tools::bash::BashTool));
4115 agent.tools.register_alias("bash", "shell");
4116
4117 agent.run("Run a shell command".to_string()).await.unwrap();
4118
4119 let tool_result = agent
4120 .messages
4121 .iter()
4122 .find_map(|message| match message {
4123 Message::ToolResult(result) => Some(result),
4124 _ => None,
4125 })
4126 .expect("expected tool result");
4127 assert!(!tool_result.is_error);
4128 }
4129
4130 #[tokio::test]
4131 async fn agent_cancel_mid_run() {
4132 let provider = Arc::new(MockProvider::new(vec![
4133 tool_call_response(
4135 "call_1",
4136 "echo",
4137 serde_json::json!({"text": "hello"}),
4138 100,
4139 20,
4140 ),
4141 text_response("Should not see this", 100, 20),
4143 ]));
4144
4145 let model = test_model(provider);
4146 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4147 agent.tools.register(Arc::new(EchoTool));
4148
4149 handle.command_tx.send(AgentCommand::Cancel).await.unwrap();
4151
4152 let events_task = tokio::spawn(collect_events(handle));
4153 let result = agent.run("Do something".to_string()).await;
4154 drop(agent);
4155
4156 assert!(matches!(result, Err(crate::error::Error::Cancelled)));
4158
4159 let events = events_task.await.unwrap();
4160
4161 assert!(events
4163 .iter()
4164 .any(|e| matches!(e, AgentEvent::AgentEnd { .. })));
4165
4166 let turn_starts: Vec<_> = events
4168 .iter()
4169 .filter(|e| matches!(e, AgentEvent::TurnStart { .. }))
4170 .collect();
4171 assert!(turn_starts.len() <= 1);
4172 }
4173
4174 #[tokio::test]
4175 async fn single_text_turn_with_max_turns_one_exits_cleanly() {
4176 let provider = Arc::new(MockProvider::new(vec![text_response("SMOKE_OK", 50, 10)]));
4177 let model = test_model(provider);
4178 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4179 agent.max_turns = 1;
4180
4181 let events_task = tokio::spawn(collect_events(handle));
4182 let result = agent.run("Reply once and stop".to_string()).await;
4183 drop(agent);
4184
4185 assert!(result.is_ok());
4186
4187 let events = events_task.await.unwrap();
4188 assert!(events
4189 .iter()
4190 .any(|e| matches!(e, AgentEvent::AgentEnd { .. })));
4191 assert!(!events.iter().any(|e| matches!(
4192 e,
4193 AgentEvent::Error { error } if error.contains("Max turns exceeded")
4194 )));
4195 }
4196
4197 #[tokio::test]
4200 async fn agent_max_turns_exceeded() {
4201 let responses: Vec<Vec<StreamEvent>> = (0..5)
4203 .map(|i| {
4204 tool_call_response(
4205 &format!("call_{i}"),
4206 "echo",
4207 serde_json::json!({"text": format!("turn {i}")}),
4208 50,
4209 10,
4210 )
4211 })
4212 .collect();
4213
4214 let provider = Arc::new(MockProvider::new(responses));
4215 let model = test_model(provider);
4216 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4217 agent.tools.register(Arc::new(EchoTool));
4218 agent.max_turns = 3; let events_task = tokio::spawn(collect_events(handle));
4221 let result = agent.run("Loop forever".to_string()).await;
4222 drop(agent);
4223
4224 assert!(matches!(result, Err(crate::error::Error::MaxTurns(3))));
4225
4226 let events = events_task.await.unwrap();
4227
4228 let has_error = events
4230 .iter()
4231 .any(|e| matches!(e, AgentEvent::Error { error } if error.contains("Max turns")));
4232 assert!(has_error);
4233
4234 assert!(events
4236 .iter()
4237 .any(|e| matches!(e, AgentEvent::AgentEnd { .. })));
4238
4239 if let Some(AgentEvent::AgentEnd { usage, .. }) = events
4241 .iter()
4242 .find(|e| matches!(e, AgentEvent::AgentEnd { .. }))
4243 {
4244 assert_eq!(usage.input_tokens, 150); assert_eq!(usage.output_tokens, 30); }
4247 }
4248
4249 #[tokio::test]
4252 async fn agent_unknown_tool_self_corrects() {
4253 let provider = Arc::new(MockProvider::new(vec![
4254 tool_call_response(
4256 "call_1",
4257 "nonexistent",
4258 serde_json::json!({"foo": "bar"}),
4259 100,
4260 20,
4261 ),
4262 text_response("Sorry, I used the wrong tool. Here's the answer.", 200, 30),
4264 ]));
4265
4266 let model = test_model(provider);
4267 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4268 let events_task = tokio::spawn(collect_events(handle));
4271 agent.run("Do something".to_string()).await.unwrap();
4272 drop(agent);
4273
4274 let events = events_task.await.unwrap();
4275
4276 let tool_end = events
4278 .iter()
4279 .find(|e| matches!(e, AgentEvent::ToolExecutionEnd { .. }));
4280 assert!(tool_end.is_some());
4281 if let Some(AgentEvent::ToolExecutionEnd { result, .. }) = tool_end {
4282 assert!(result.is_error);
4283 let text = result.content.iter().find_map(|c| {
4284 if let ContentBlock::Text { text } = c {
4285 Some(text.as_str())
4286 } else {
4287 None
4288 }
4289 });
4290 assert!(text.unwrap().contains("Unknown tool"));
4291 }
4292
4293 let turn_starts: Vec<_> = events
4295 .iter()
4296 .filter(|e| matches!(e, AgentEvent::TurnStart { .. }))
4297 .collect();
4298 assert_eq!(turn_starts.len(), 2);
4299
4300 assert!(events
4302 .iter()
4303 .any(|e| matches!(e, AgentEvent::AgentEnd { .. })));
4304 }
4305
4306 #[tokio::test]
4307 async fn agent_concurrent_readonly() {
4308 let shared = Arc::new(ConcurrentReadonlyState::new(3));
4309 let provider = Arc::new(MockProvider::new(vec![
4310 multi_tool_call_response(
4311 &[
4312 ("call_ro_1", "echo_a", serde_json::json!({"text": "first"})),
4313 (
4314 "call_write",
4315 "write_after_reads",
4316 serde_json::json!({"data": "mutate"}),
4317 ),
4318 ("call_ro_2", "echo_b", serde_json::json!({"text": "second"})),
4319 ("call_ro_3", "echo_c", serde_json::json!({"text": "third"})),
4320 ],
4321 100,
4322 40,
4323 ),
4324 text_response("All tools finished", 150, 20),
4325 ]));
4326
4327 let model = test_model(provider);
4328 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4329 drop(handle);
4330
4331 agent.tools.register(Arc::new(CoordinatedReadonlyTool {
4332 name: "echo_a",
4333 shared: shared.clone(),
4334 }));
4335 agent.tools.register(Arc::new(CoordinatedReadonlyTool {
4336 name: "echo_b",
4337 shared: shared.clone(),
4338 }));
4339 agent.tools.register(Arc::new(CoordinatedReadonlyTool {
4340 name: "echo_c",
4341 shared: shared.clone(),
4342 }));
4343 agent.tools.register(Arc::new(CoordinatedMutableTool {
4344 shared: shared.clone(),
4345 }));
4346
4347 tokio::time::timeout(
4348 Duration::from_millis(250),
4349 agent.run("Run all tools".to_string()),
4350 )
4351 .await
4352 .expect("read-only tools should not block each other")
4353 .expect("agent should complete successfully");
4354
4355 let tool_result_ids: Vec<_> = agent
4356 .messages
4357 .iter()
4358 .filter_map(|message| match message {
4359 Message::ToolResult(result) => Some(result.tool_call_id.as_str()),
4360 _ => None,
4361 })
4362 .collect();
4363 assert_eq!(
4364 tool_result_ids,
4365 vec!["call_ro_1", "call_write", "call_ro_2", "call_ro_3"]
4366 );
4367
4368 assert_eq!(shared.readonly_started.load(Ordering::SeqCst), 3);
4369 assert_eq!(shared.readonly_finished.load(Ordering::SeqCst), 3);
4370 assert_eq!(shared.mutable_observed_finished.load(Ordering::SeqCst), 3);
4371
4372 let log = shared.log.lock().expect("concurrent log lock").clone();
4373 assert_eq!(
4374 log.last().map(String::as_str),
4375 Some("write_after_reads:start")
4376 );
4377 }
4378
4379 #[tokio::test]
4382 async fn agent_event_ordering() {
4383 let provider = Arc::new(MockProvider::new(vec![
4384 tool_call_response(
4385 "call_1",
4386 "echo",
4387 serde_json::json!({"text": "hello"}),
4388 50,
4389 10,
4390 ),
4391 text_response("Done", 50, 10),
4392 ]));
4393
4394 let model = test_model(provider);
4395 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4396 agent.tools.register(Arc::new(EchoTool));
4397
4398 let events_task = tokio::spawn(collect_events(handle));
4399 agent.run("test".to_string()).await.unwrap();
4400 drop(agent);
4401
4402 let events = events_task.await.unwrap();
4403
4404 let types: Vec<&str> = events
4406 .iter()
4407 .map(|e| match e {
4408 AgentEvent::AgentStart { .. } => "AgentStart",
4409 AgentEvent::AgentEnd { .. } => "AgentEnd",
4410 AgentEvent::TurnStart { .. } => "TurnStart",
4411 AgentEvent::TurnEnd { .. } => "TurnEnd",
4412 AgentEvent::MessageDelta { .. } => "MessageDelta",
4413 AgentEvent::ToolExecutionStart { .. } => "ToolExecStart",
4414 AgentEvent::ToolExecutionEnd { .. } => "ToolExecEnd",
4415 AgentEvent::Warning { .. } => "Warning",
4416 AgentEvent::Error { .. } => "Error",
4417 _ => "Other",
4418 })
4419 .collect();
4420
4421 assert_eq!(types[0], "AgentStart");
4423
4424 assert_eq!(types[types.len() - 1], "AgentEnd");
4426
4427 let mut turn_start_indices: Vec<usize> = Vec::new();
4429 let mut turn_end_indices: Vec<usize> = Vec::new();
4430 for (i, t) in types.iter().enumerate() {
4431 if *t == "TurnStart" {
4432 turn_start_indices.push(i);
4433 }
4434 if *t == "TurnEnd" {
4435 turn_end_indices.push(i);
4436 }
4437 }
4438 assert_eq!(turn_start_indices.len(), 2);
4439 assert_eq!(turn_end_indices.len(), 2);
4440 for i in 0..turn_start_indices.len() {
4441 assert!(turn_start_indices[i] < turn_end_indices[i]);
4442 }
4443
4444 let tool_start = types.iter().position(|t| *t == "ToolExecStart");
4446 let tool_end = types.iter().position(|t| *t == "ToolExecEnd");
4447 assert!(tool_start.is_some());
4448 assert!(tool_end.is_some());
4449 assert!(tool_start.unwrap() < tool_end.unwrap());
4450 }
4451
4452 #[tokio::test]
4453 async fn agent_fires_hooks() {
4454 let provider = Arc::new(MockProvider::new(vec![text_response("hooked", 100, 20)]));
4455 let model = test_model(provider);
4456 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4457 drop(handle);
4458
4459 let hook_calls = Arc::new(AtomicUsize::new(0));
4460 let hook_calls_for_callback = hook_calls.clone();
4461 agent.hooks.register(crate::hooks::HookDefinition {
4462 event: "before_llm_call".to_string(),
4463 match_pattern: None,
4464 action: crate::hooks::HookAction::Callback(Arc::new(move |_event| {
4465 hook_calls_for_callback.fetch_add(1, Ordering::SeqCst);
4466 crate::hooks::HookResult::default()
4467 })),
4468 blocking: true,
4469 threshold: None,
4470 });
4471
4472 agent.run("Run once".to_string()).await.unwrap();
4473
4474 assert_eq!(hook_calls.load(Ordering::SeqCst), 1);
4475 }
4476
4477 #[tokio::test]
4478 async fn agent_context_masking() {
4479 let provider = Arc::new(MockProvider::new(vec![text_response("done", 100, 20)]));
4480
4481 let mut seeded_messages = Vec::new();
4482 for index in 0..12 {
4483 let call_id = format!("call_{index}");
4484 seeded_messages.push(make_assistant_tool_call(
4485 &call_id,
4486 "read",
4487 serde_json::json!({"path": format!("src/file_{index}.rs")}),
4488 ));
4489 seeded_messages.push(make_tool_result(&call_id, "read", &"x".repeat(400)));
4490 }
4491
4492 let mut usage_messages = seeded_messages.clone();
4493 usage_messages.push(Message::user("trigger masking"));
4494 let provisional_model = test_model(provider.clone());
4495 let usage = crate::context::context_usage(&usage_messages, &provisional_model);
4496 let context_window = ((usage.used as f64) / 0.7).ceil() as u32;
4497
4498 let model = test_model_with_context_window(provider, context_window.max(1));
4499 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4500 drop(handle);
4501 agent.messages = seeded_messages;
4502
4503 agent.run("trigger masking".to_string()).await.unwrap();
4504
4505 let masked = tool_result_text(&agent.messages[1]).expect("first tool result text");
4506 assert!(masked.starts_with("[Output omitted"));
4507
4508 let recent_index = (10 * 2) + 1;
4509 let recent =
4510 tool_result_text(&agent.messages[recent_index]).expect("recent tool result text");
4511 let expected_recent = "x".repeat(400);
4512 assert_eq!(recent, expected_recent.as_str());
4513 }
4514
4515 #[tokio::test]
4516 async fn agent_masks_observations_when_context_is_tight() {
4517 let provider = Arc::new(MockProvider::new(vec![text_response("done", 100, 20)]));
4518
4519 let mut seeded_messages = Vec::new();
4520 for index in 0..12 {
4521 let call_id = format!("call_{index}");
4522 seeded_messages.push(make_assistant_tool_call(
4523 &call_id,
4524 "read",
4525 serde_json::json!({"path": format!("src/file_{index}.rs")}),
4526 ));
4527 seeded_messages.push(make_tool_result(&call_id, "read", &"x".repeat(400)));
4528 }
4529
4530 let mut usage_messages = seeded_messages.clone();
4531 usage_messages.push(Message::user("trigger masking"));
4532 let provisional_model = test_model(provider.clone());
4533 let usage_before = crate::context::context_usage(&usage_messages, &provisional_model);
4534
4535 let mut masked_messages = usage_messages.clone();
4536 crate::context::mask_observations(&mut masked_messages, 10);
4537 let usage_after = crate::context::context_usage(&masked_messages, &provisional_model);
4538
4539 assert!(usage_before.used > usage_after.used);
4540
4541 let context_window = ((usage_before.used as f64) / 0.7).ceil() as u32;
4543
4544 let model = test_model_with_context_window(provider, context_window.max(1));
4545 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4546 let events_task = tokio::spawn(collect_events(handle));
4547 agent.messages = seeded_messages;
4548
4549 agent.run("trigger masking".to_string()).await.unwrap();
4550 drop(agent);
4551
4552 let events = events_task.await.unwrap();
4553
4554 assert!(
4555 events
4556 .iter()
4557 .any(|e| matches!(e, AgentEvent::TurnStart { index: 0 })),
4558 "agent should still run normally"
4559 );
4560 }
4561
4562 #[tokio::test]
4565 async fn agent_usage_cost_accumulation() {
4566 let provider = Arc::new(MockProvider::new(vec![
4567 tool_call_response(
4568 "call_1",
4569 "echo",
4570 serde_json::json!({"text": "a"}),
4571 1_000_000, 500_000, ),
4574 text_response("done", 1_000_000, 500_000),
4575 ]));
4576
4577 let model = test_model(provider);
4578 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4579 agent.tools.register(Arc::new(EchoTool));
4580
4581 let events_task = tokio::spawn(collect_events(handle));
4582 agent.run("test".to_string()).await.unwrap();
4583 drop(agent);
4584
4585 let events = events_task.await.unwrap();
4586
4587 if let Some(AgentEvent::AgentEnd { usage, cost }) = events
4588 .iter()
4589 .find(|e| matches!(e, AgentEvent::AgentEnd { .. }))
4590 {
4591 assert_eq!(usage.input_tokens, 2_000_000);
4593 assert_eq!(usage.output_tokens, 1_000_000);
4594
4595 assert!((cost.input - 6.0).abs() < 1e-10);
4597 assert!((cost.output - 15.0).abs() < 1e-10);
4598 assert!((cost.total - 21.0).abs() < 1e-10);
4599 } else {
4600 panic!("Expected AgentEnd");
4601 }
4602 }
4603
4604 struct RetryMockProvider {
4610 calls: Mutex<Vec<std::result::Result<Vec<StreamEvent>, imp_llm::Error>>>,
4611 }
4612
4613 impl RetryMockProvider {
4614 fn new(calls: Vec<std::result::Result<Vec<StreamEvent>, imp_llm::Error>>) -> Self {
4615 Self {
4616 calls: Mutex::new(calls),
4617 }
4618 }
4619 }
4620
4621 #[async_trait]
4622 impl Provider for RetryMockProvider {
4623 fn stream(
4624 &self,
4625 _model: &Model,
4626 _context: Context,
4627 _options: RequestOptions,
4628 _api_key: &str,
4629 ) -> Pin<Box<dyn Stream<Item = imp_llm::Result<StreamEvent>> + Send>> {
4630 let mut calls = self.calls.try_lock().expect("RetryMockProvider lock");
4631 let outcome = if calls.is_empty() {
4632 Ok(vec![StreamEvent::Error {
4633 error: "No more mock responses".to_string(),
4634 }])
4635 } else {
4636 calls.remove(0)
4637 };
4638 match outcome {
4639 Ok(events) => Box::pin(futures::stream::iter(
4640 events.into_iter().map(imp_llm::Result::Ok),
4641 )),
4642 Err(e) => Box::pin(futures::stream::once(async move {
4643 imp_llm::Result::<StreamEvent>::Err(e)
4644 })),
4645 }
4646 }
4647
4648 async fn resolve_auth(&self, _auth: &AuthStore) -> imp_llm::Result<ApiKey> {
4649 Ok("mock-key".to_string())
4650 }
4651
4652 fn id(&self) -> &str {
4653 "retry-mock"
4654 }
4655
4656 fn models(&self) -> &[ModelMeta] {
4657 &[]
4658 }
4659 }
4660
4661 #[tokio::test]
4663 async fn retry_succeeds_after_transient_failures() {
4664 use imp_llm::provider::RetryPolicy;
4665
4666 let provider = Arc::new(RetryMockProvider::new(vec![
4667 Err(imp_llm::Error::RateLimited {
4669 retry_after_secs: Some(0),
4670 }),
4671 Err(imp_llm::Error::RateLimited {
4672 retry_after_secs: Some(0),
4673 }),
4674 Ok(text_response("Hello after retries", 100, 20)),
4676 ]));
4677
4678 let model = test_model(provider);
4679 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4680 agent.retry_policy = RetryPolicy {
4682 max_retries: 3,
4683 base_delay: std::time::Duration::from_millis(0),
4684 max_delay: std::time::Duration::from_secs(30),
4685 retry_on: vec![],
4686 };
4687
4688 let events_task = tokio::spawn(collect_events(handle));
4689 agent.run("Say hello".to_string()).await.unwrap();
4690 drop(agent);
4691
4692 let events = events_task.await.unwrap();
4693
4694 assert!(events
4696 .iter()
4697 .any(|e| matches!(e, AgentEvent::AgentEnd { .. })));
4698
4699 let turn_end = events.iter().find_map(|e| match e {
4701 AgentEvent::TurnEnd { message, .. } => Some(message),
4702 _ => None,
4703 });
4704 assert!(turn_end.is_some());
4705 let content_text = turn_end
4706 .unwrap()
4707 .content
4708 .iter()
4709 .find_map(|b| match b {
4710 ContentBlock::Text { text } => Some(text.as_str()),
4711 _ => None,
4712 })
4713 .unwrap_or("");
4714 assert!(
4715 content_text.contains("Hello after retries"),
4716 "expected final text, got: {content_text}"
4717 );
4718 }
4719
4720 #[tokio::test]
4722 async fn retry_fails_when_max_retries_exhausted() {
4723 use imp_llm::provider::RetryPolicy;
4724
4725 let provider = Arc::new(RetryMockProvider::new(vec![
4726 Err(imp_llm::Error::RateLimited {
4727 retry_after_secs: Some(0),
4728 }),
4729 Err(imp_llm::Error::RateLimited {
4730 retry_after_secs: Some(0),
4731 }),
4732 Err(imp_llm::Error::RateLimited {
4733 retry_after_secs: Some(0),
4734 }),
4735 Err(imp_llm::Error::RateLimited {
4736 retry_after_secs: Some(0),
4737 }),
4738 ]));
4739
4740 let model = test_model(provider);
4741 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4742 agent.retry_policy = RetryPolicy {
4743 max_retries: 2, base_delay: std::time::Duration::from_millis(0),
4745 max_delay: std::time::Duration::from_secs(30),
4746 retry_on: vec![],
4747 };
4748 drop(handle);
4749
4750 let result = agent.run("Fail".to_string()).await;
4751 assert!(
4752 result.is_err(),
4753 "should have failed after exhausting retries"
4754 );
4755 }
4756
4757 #[tokio::test]
4759 async fn retry_does_not_retry_auth_errors() {
4760 use imp_llm::provider::RetryPolicy;
4761 use std::sync::atomic::{AtomicUsize, Ordering};
4762
4763 let call_count = Arc::new(AtomicUsize::new(0));
4764 let call_count_clone = call_count.clone();
4765
4766 struct CountingAuthFailProvider {
4767 calls: AtomicUsize,
4768 success_after: usize,
4769 }
4770
4771 #[async_trait]
4772 impl Provider for CountingAuthFailProvider {
4773 fn stream(
4774 &self,
4775 _model: &Model,
4776 _context: Context,
4777 _options: RequestOptions,
4778 _api_key: &str,
4779 ) -> Pin<Box<dyn Stream<Item = imp_llm::Result<StreamEvent>> + Send>> {
4780 let n = self.calls.fetch_add(1, Ordering::SeqCst);
4781 if n < self.success_after {
4782 Box::pin(futures::stream::once(async {
4783 Err(imp_llm::Error::Auth("Invalid API key".to_string()))
4784 }))
4785 } else {
4786 Box::pin(futures::stream::iter(
4787 text_response("ok", 10, 5).into_iter().map(Ok),
4788 ))
4789 }
4790 }
4791
4792 async fn resolve_auth(&self, _auth: &AuthStore) -> imp_llm::Result<ApiKey> {
4793 Ok("mock-key".to_string())
4794 }
4795
4796 fn id(&self) -> &str {
4797 "auth-fail-mock"
4798 }
4799
4800 fn models(&self) -> &[ModelMeta] {
4801 &[]
4802 }
4803 }
4804
4805 let _ = call_count_clone; let provider = Arc::new(CountingAuthFailProvider {
4808 calls: AtomicUsize::new(0),
4809 success_after: 999, });
4811 let call_ref = &provider.calls;
4812
4813 let model = test_model(provider.clone());
4814 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
4815 agent.retry_policy = RetryPolicy {
4816 max_retries: 5, base_delay: std::time::Duration::from_millis(0),
4818 max_delay: std::time::Duration::from_secs(30),
4819 retry_on: vec![],
4820 };
4821 drop(handle);
4822
4823 let result = agent.run("Auth test".to_string()).await;
4824 assert!(result.is_err(), "should fail on auth error");
4825
4826 assert_eq!(
4828 call_ref.load(std::sync::atomic::Ordering::SeqCst),
4829 1,
4830 "auth errors should not be retried"
4831 );
4832 }
4833}
4834
4835#[cfg(test)]
4838mod integration {
4839 use super::*;
4840 use std::path::PathBuf;
4841 use std::pin::Pin;
4842 use std::sync::Arc;
4843
4844 use async_trait::async_trait;
4845 use futures_core::Stream;
4846 use imp_llm::auth::{ApiKey, AuthStore};
4847 use imp_llm::model::{Capabilities, ModelMeta, ModelPricing};
4848 use imp_llm::provider::Provider;
4849 use tokio::sync::Mutex;
4850
4851 use crate::tools::{bash::BashTool, edit::EditTool, read::ReadTool, write::WriteTool};
4852
4853 struct MockProvider {
4856 responses: Mutex<Vec<Vec<StreamEvent>>>,
4857 }
4858
4859 impl MockProvider {
4860 fn new(responses: Vec<Vec<StreamEvent>>) -> Self {
4861 Self {
4862 responses: Mutex::new(responses),
4863 }
4864 }
4865 }
4866
4867 #[async_trait]
4868 impl Provider for MockProvider {
4869 fn stream(
4870 &self,
4871 _model: &Model,
4872 _context: Context,
4873 _options: RequestOptions,
4874 _api_key: &str,
4875 ) -> Pin<Box<dyn Stream<Item = imp_llm::Result<StreamEvent>> + Send>> {
4876 let mut responses = self.responses.try_lock().expect("MockProvider lock");
4877 let events = if responses.is_empty() {
4878 vec![StreamEvent::Error {
4879 error: "No more mock responses".to_string(),
4880 }]
4881 } else {
4882 responses.remove(0)
4883 };
4884 Box::pin(futures::stream::iter(events.into_iter().map(Ok)))
4885 }
4886
4887 async fn resolve_auth(&self, _auth: &AuthStore) -> imp_llm::Result<ApiKey> {
4888 Ok("mock-key".to_string())
4889 }
4890
4891 fn id(&self) -> &str {
4892 "mock"
4893 }
4894
4895 fn models(&self) -> &[ModelMeta] {
4896 &[]
4897 }
4898 }
4899
4900 fn test_model(provider: Arc<dyn Provider>) -> Model {
4901 Model {
4902 meta: ModelMeta {
4903 id: "test-model".to_string(),
4904 provider: "mock".to_string(),
4905 name: "Test Model".to_string(),
4906 context_window: 200_000,
4907 max_output_tokens: 16_384,
4908 pricing: ModelPricing {
4909 input_per_mtok: 3.0,
4910 output_per_mtok: 15.0,
4911 cache_read_per_mtok: 0.3,
4912 cache_write_per_mtok: 3.75,
4913 },
4914 capabilities: Capabilities {
4915 reasoning: true,
4916 images: false,
4917 tool_use: true,
4918 },
4919 },
4920 provider,
4921 }
4922 }
4923
4924 fn text_response(text: &str, input_tokens: u32, output_tokens: u32) -> Vec<StreamEvent> {
4925 vec![
4926 StreamEvent::MessageStart {
4927 model: "test-model".to_string(),
4928 },
4929 StreamEvent::TextDelta {
4930 text: text.to_string(),
4931 },
4932 StreamEvent::MessageEnd {
4933 message: AssistantMessage {
4934 content: vec![ContentBlock::Text {
4935 text: text.to_string(),
4936 }],
4937 usage: Some(Usage {
4938 input_tokens,
4939 output_tokens,
4940 cache_read_tokens: 0,
4941 cache_write_tokens: 0,
4942 }),
4943 stop_reason: StopReason::EndTurn,
4944 timestamp: 1000,
4945 },
4946 },
4947 ]
4948 }
4949
4950 fn tool_call_response(
4951 call_id: &str,
4952 tool_name: &str,
4953 args: serde_json::Value,
4954 input_tokens: u32,
4955 output_tokens: u32,
4956 ) -> Vec<StreamEvent> {
4957 vec![
4958 StreamEvent::MessageStart {
4959 model: "test-model".to_string(),
4960 },
4961 StreamEvent::ToolCall {
4962 id: call_id.to_string(),
4963 name: tool_name.to_string(),
4964 arguments: args.clone(),
4965 },
4966 StreamEvent::MessageEnd {
4967 message: AssistantMessage {
4968 content: vec![ContentBlock::ToolCall {
4969 id: call_id.to_string(),
4970 name: tool_name.to_string(),
4971 arguments: args,
4972 }],
4973 usage: Some(Usage {
4974 input_tokens,
4975 output_tokens,
4976 cache_read_tokens: 0,
4977 cache_write_tokens: 0,
4978 }),
4979 stop_reason: StopReason::ToolUse,
4980 timestamp: 1000,
4981 },
4982 },
4983 ]
4984 }
4985
4986 fn create_agent_with_tools(provider: Arc<dyn Provider>, cwd: PathBuf) -> (Agent, AgentHandle) {
4988 let model = test_model(provider);
4989 let (mut agent, handle) = Agent::new(model, cwd);
4990 agent.tools.register(Arc::new(WriteTool));
4991 agent.tools.register(Arc::new(ReadTool));
4992 agent.tools.register(Arc::new(EditTool));
4993 agent.tools.register(Arc::new(BashTool));
4994 agent.tools.register_alias("bash", "shell");
4995 (agent, handle)
4996 }
4997
4998 fn create_agent_with_reduced_tools(
5000 provider: Arc<dyn Provider>,
5001 cwd: PathBuf,
5002 ) -> (Agent, AgentHandle) {
5003 let model = test_model(provider);
5004 let (mut agent, handle) = Agent::new(model, cwd);
5005 agent.tools.register(Arc::new(WriteTool));
5006 agent.tools.register(Arc::new(ReadTool));
5007 agent.tools.register(Arc::new(EditTool));
5008 agent.tools.register(Arc::new(BashTool));
5009 (agent, handle)
5010 }
5011
5012 #[tokio::test]
5015 async fn agent_reads_and_writes_file() {
5016 let tmp = tempfile::tempdir().unwrap();
5017 let provider = Arc::new(MockProvider::new(vec![
5018 tool_call_response(
5019 "call_write",
5020 "write",
5021 serde_json::json!({"path": "test.txt", "content": "hello world"}),
5022 100,
5023 20,
5024 ),
5025 tool_call_response(
5026 "call_read",
5027 "read",
5028 serde_json::json!({"path": "test.txt"}),
5029 100,
5030 20,
5031 ),
5032 text_response("The file contains: hello world", 100, 20),
5033 ]));
5034
5035 let (mut agent, handle) = create_agent_with_tools(provider, tmp.path().to_path_buf());
5036 drop(handle);
5037
5038 agent
5039 .run("Write and read a file".to_string())
5040 .await
5041 .unwrap();
5042
5043 let on_disk = std::fs::read_to_string(tmp.path().join("test.txt")).unwrap();
5045 assert_eq!(on_disk, "hello world");
5046
5047 let read_result = agent
5049 .messages
5050 .iter()
5051 .find_map(|m| match m {
5052 Message::ToolResult(r) if r.tool_call_id == "call_read" => Some(r),
5053 _ => None,
5054 })
5055 .expect("should have a read tool result");
5056 let read_text = read_result
5057 .content
5058 .iter()
5059 .find_map(|b| match b {
5060 ContentBlock::Text { text } => Some(text.as_str()),
5061 _ => None,
5062 })
5063 .unwrap();
5064 assert!(
5065 read_text.contains("hello world"),
5066 "read result should contain file content, got: {read_text}"
5067 );
5068
5069 let assistant_count = agent
5071 .messages
5072 .iter()
5073 .filter(|m| matches!(m, Message::Assistant(_)))
5074 .count();
5075 assert_eq!(assistant_count, 3);
5076 }
5077
5078 #[tokio::test]
5081 async fn agent_edit_tool_modifies_file() {
5082 let tmp = tempfile::tempdir().unwrap();
5083 let provider = Arc::new(MockProvider::new(vec![
5084 tool_call_response(
5085 "call_write",
5086 "write",
5087 serde_json::json!({
5088 "path": "src/main.rs",
5089 "content": "fn main() {\n println!(\"old\");\n}"
5090 }),
5091 100,
5092 20,
5093 ),
5094 tool_call_response(
5095 "call_edit",
5096 "edit",
5097 serde_json::json!({
5098 "path": "src/main.rs",
5099 "oldText": "old",
5100 "newText": "new"
5101 }),
5102 100,
5103 20,
5104 ),
5105 tool_call_response(
5106 "call_read",
5107 "read",
5108 serde_json::json!({"path": "src/main.rs"}),
5109 100,
5110 20,
5111 ),
5112 text_response("Done", 100, 20),
5113 ]));
5114
5115 let (mut agent, handle) = create_agent_with_tools(provider, tmp.path().to_path_buf());
5116 drop(handle);
5117
5118 agent.run("Edit a file".to_string()).await.unwrap();
5119
5120 let on_disk = std::fs::read_to_string(tmp.path().join("src/main.rs")).unwrap();
5122 assert!(on_disk.contains("new"), "file should contain 'new'");
5123 assert!(!on_disk.contains("old"), "file should not contain 'old'");
5124
5125 let edit_result = agent
5127 .messages
5128 .iter()
5129 .find_map(|m| match m {
5130 Message::ToolResult(r) if r.tool_call_id == "call_edit" => Some(r),
5131 _ => None,
5132 })
5133 .expect("should have an edit tool result");
5134 let edit_text = edit_result
5135 .content
5136 .iter()
5137 .find_map(|b| match b {
5138 ContentBlock::Text { text } => Some(text.as_str()),
5139 _ => None,
5140 })
5141 .unwrap();
5142 assert!(
5143 edit_text.contains("---") || edit_text.contains("+++"),
5144 "edit result should include a diff, got: {edit_text}"
5145 );
5146 }
5147
5148 #[tokio::test]
5151 async fn agent_bash_search_finds_pattern() {
5152 let tmp = tempfile::tempdir().unwrap();
5153 std::fs::write(
5154 tmp.path().join("search_me.txt"),
5155 "line one\nunique_pattern_xyz here\nline three\n",
5156 )
5157 .unwrap();
5158 let provider = Arc::new(MockProvider::new(vec![
5159 tool_call_response(
5160 "call_bash",
5161 "bash",
5162 serde_json::json!({"command": "grep --no-color -rn 'unique_pattern_xyz' ."}),
5163 100,
5164 20,
5165 ),
5166 text_response("Found it!", 100, 20),
5167 ]));
5168
5169 let (mut agent, handle) =
5170 create_agent_with_reduced_tools(provider, tmp.path().to_path_buf());
5171 drop(handle);
5172
5173 agent.run("Search for a pattern".to_string()).await.unwrap();
5174
5175 let bash_result = agent
5176 .messages
5177 .iter()
5178 .find_map(|m| match m {
5179 Message::ToolResult(r) if r.tool_call_id == "call_bash" => Some(r),
5180 _ => None,
5181 })
5182 .expect("should have a bash tool result");
5183 let bash_text = bash_result
5184 .content
5185 .iter()
5186 .find_map(|b| match b {
5187 ContentBlock::Text { text } => Some(text.as_str()),
5188 _ => None,
5189 })
5190 .unwrap();
5191 assert!(
5192 !bash_text.trim().is_empty(),
5193 "bash grep output should not be empty"
5194 );
5195 }
5196
5197 #[tokio::test]
5200 async fn agent_repeated_tool_calls_warn_then_block() {
5201 let tmp = tempfile::tempdir().unwrap();
5202 std::fs::write(tmp.path().join("repeat.txt"), "same content\n").unwrap();
5203
5204 let provider = Arc::new(MockProvider::new(vec![
5205 tool_call_response(
5206 "call_1",
5207 "read",
5208 serde_json::json!({"path": "repeat.txt"}),
5209 100,
5210 20,
5211 ),
5212 tool_call_response(
5213 "call_2",
5214 "read",
5215 serde_json::json!({"path": "repeat.txt"}),
5216 100,
5217 20,
5218 ),
5219 tool_call_response(
5220 "call_3",
5221 "read",
5222 serde_json::json!({"path": "repeat.txt"}),
5223 100,
5224 20,
5225 ),
5226 tool_call_response(
5227 "call_4",
5228 "read",
5229 serde_json::json!({"path": "repeat.txt"}),
5230 100,
5231 20,
5232 ),
5233 text_response("Done", 100, 20),
5234 ]));
5235
5236 let (mut agent, handle) =
5237 create_agent_with_reduced_tools(provider, tmp.path().to_path_buf());
5238 drop(handle);
5239
5240 agent
5241 .run("Read the same file repeatedly".to_string())
5242 .await
5243 .unwrap();
5244
5245 let third = agent
5246 .messages
5247 .iter()
5248 .find_map(|m| match m {
5249 Message::ToolResult(r) if r.tool_call_id == "call_3" => Some(r),
5250 _ => None,
5251 })
5252 .expect("third tool result");
5253 let fourth = agent
5254 .messages
5255 .iter()
5256 .find_map(|m| match m {
5257 Message::ToolResult(r) if r.tool_call_id == "call_4" => Some(r),
5258 _ => None,
5259 })
5260 .expect("fourth tool result");
5261
5262 let third_text = third
5263 .content
5264 .iter()
5265 .filter_map(|b| match b {
5266 ContentBlock::Text { text } => Some(text.as_str()),
5267 _ => None,
5268 })
5269 .collect::<Vec<_>>()
5270 .join("\n");
5271 let fourth_text = fourth
5272 .content
5273 .iter()
5274 .filter_map(|b| match b {
5275 ContentBlock::Text { text } => Some(text.as_str()),
5276 _ => None,
5277 })
5278 .collect::<Vec<_>>()
5279 .join("\n");
5280
5281 assert!(third_text.contains("Warning: identical tool call repeated 3 times"));
5282 assert!(fourth.is_error);
5283 assert!(fourth_text.contains("Blocked: identical tool call repeated 4 times"));
5284 assert_eq!(
5285 agent
5286 .messages
5287 .iter()
5288 .filter(|message| matches!(message, Message::User(_)))
5289 .count(),
5290 1,
5291 "agent should stop after repeated-action block rather than enqueueing more follow-ups"
5292 );
5293 }
5294
5295 #[test]
5296 fn tool_results_indicate_repeated_action_detects_blocked_repeat_message() {
5297 let result = imp_llm::ToolResultMessage {
5298 tool_call_id: "call_repeat".to_string(),
5299 tool_name: "read".to_string(),
5300 content: vec![ContentBlock::Text {
5301 text: "Blocked: identical tool call repeated 4 times in a row for 'read'."
5302 .to_string(),
5303 }],
5304 is_error: true,
5305 details: serde_json::Value::Null,
5306 timestamp: 0,
5307 };
5308
5309 assert!(tool_results_indicate_repeated_action(&[result]));
5310 }
5311
5312 #[tokio::test]
5315 async fn agent_bash_runs_command() {
5316 let tmp = tempfile::tempdir().unwrap();
5317 let provider = Arc::new(MockProvider::new(vec![
5318 tool_call_response(
5319 "call_bash",
5320 "bash",
5321 serde_json::json!({"command": "echo hello && echo world"}),
5322 100,
5323 20,
5324 ),
5325 text_response("Done", 100, 20),
5326 ]));
5327
5328 let (mut agent, handle) = create_agent_with_tools(provider, tmp.path().to_path_buf());
5329 drop(handle);
5330
5331 agent.run("Run a command".to_string()).await.unwrap();
5332
5333 let bash_result = agent
5335 .messages
5336 .iter()
5337 .find_map(|m| match m {
5338 Message::ToolResult(r) if r.tool_call_id == "call_bash" => Some(r),
5339 _ => None,
5340 })
5341 .expect("should have a bash tool result");
5342 let bash_text = bash_result
5343 .content
5344 .iter()
5345 .find_map(|b| match b {
5346 ContentBlock::Text { text } => Some(text.as_str()),
5347 _ => None,
5348 })
5349 .unwrap();
5350 assert!(
5351 bash_text.contains("hello"),
5352 "bash output should contain 'hello', got: {bash_text}"
5353 );
5354 assert!(
5355 bash_text.contains("world"),
5356 "bash output should contain 'world', got: {bash_text}"
5357 );
5358
5359 assert_eq!(bash_result.details["exit_code"], 0);
5361 }
5362
5363 #[tokio::test]
5366 async fn agent_handles_tool_error_gracefully() {
5367 let tmp = tempfile::tempdir().unwrap();
5368 let provider = Arc::new(MockProvider::new(vec![
5369 tool_call_response(
5370 "call_read",
5371 "read",
5372 serde_json::json!({"path": "nonexistent.txt"}),
5373 100,
5374 20,
5375 ),
5376 text_response("File not found, let me try something else", 100, 20),
5377 ]));
5378
5379 let (mut agent, handle) = create_agent_with_tools(provider, tmp.path().to_path_buf());
5380 drop(handle);
5381
5382 agent.run("Read a file".to_string()).await.unwrap();
5383
5384 let read_result = agent
5386 .messages
5387 .iter()
5388 .find_map(|m| match m {
5389 Message::ToolResult(r) if r.tool_call_id == "call_read" => Some(r),
5390 _ => None,
5391 })
5392 .expect("should have a read tool result");
5393 assert!(
5394 read_result.is_error,
5395 "reading nonexistent file should produce an error result"
5396 );
5397
5398 let assistant_count = agent
5400 .messages
5401 .iter()
5402 .filter(|m| matches!(m, Message::Assistant(_)))
5403 .count();
5404 assert_eq!(
5405 assistant_count, 2,
5406 "agent should have 2 turns: error + recovery"
5407 );
5408
5409 }
5411}
5412
5413#[cfg(test)]
5416mod mode_tests {
5417 use super::*;
5418 use std::path::PathBuf;
5419 use std::pin::Pin;
5420 use std::sync::Arc;
5421
5422 use async_trait::async_trait;
5423 use futures_core::Stream;
5424 use imp_llm::auth::{ApiKey, AuthStore};
5425 use imp_llm::model::ModelMeta;
5426 use imp_llm::provider::Provider;
5427 use tokio::sync::Mutex;
5428
5429 struct MockProvider {
5432 responses: Mutex<Vec<Vec<imp_llm::StreamEvent>>>,
5433 }
5434
5435 impl MockProvider {
5436 fn new(responses: Vec<Vec<imp_llm::StreamEvent>>) -> Self {
5437 Self {
5438 responses: Mutex::new(responses),
5439 }
5440 }
5441 }
5442
5443 #[async_trait]
5444 impl Provider for MockProvider {
5445 fn stream(
5446 &self,
5447 _model: &imp_llm::Model,
5448 _context: imp_llm::Context,
5449 _options: imp_llm::RequestOptions,
5450 _api_key: &str,
5451 ) -> Pin<Box<dyn Stream<Item = imp_llm::Result<imp_llm::StreamEvent>> + Send>> {
5452 let mut responses = self.responses.try_lock().expect("MockProvider lock");
5453 let events = if responses.is_empty() {
5454 vec![imp_llm::StreamEvent::Error {
5455 error: "No more mock responses".to_string(),
5456 }]
5457 } else {
5458 responses.remove(0)
5459 };
5460 Box::pin(futures::stream::iter(events.into_iter().map(Ok)))
5461 }
5462
5463 async fn resolve_auth(&self, _auth: &AuthStore) -> imp_llm::Result<ApiKey> {
5464 Ok("mock-key".to_string())
5465 }
5466
5467 fn id(&self) -> &str {
5468 "mock"
5469 }
5470
5471 fn models(&self) -> &[imp_llm::model::ModelMeta] {
5472 &[]
5473 }
5474 }
5475
5476 fn test_model(provider: Arc<dyn Provider>) -> imp_llm::Model {
5477 imp_llm::Model {
5478 meta: ModelMeta {
5479 id: "test-model".to_string(),
5480 provider: "mock".to_string(),
5481 name: "Test Model".to_string(),
5482 context_window: 200_000,
5483 max_output_tokens: 16_384,
5484 pricing: imp_llm::model::ModelPricing {
5485 input_per_mtok: 3.0,
5486 output_per_mtok: 15.0,
5487 cache_read_per_mtok: 0.3,
5488 cache_write_per_mtok: 3.75,
5489 },
5490 capabilities: imp_llm::model::Capabilities {
5491 reasoning: true,
5492 images: false,
5493 tool_use: true,
5494 },
5495 },
5496 provider,
5497 }
5498 }
5499
5500 fn text_response(text: &str, input: u32, output: u32) -> Vec<imp_llm::StreamEvent> {
5501 vec![
5502 imp_llm::StreamEvent::MessageStart {
5503 model: "test-model".to_string(),
5504 },
5505 imp_llm::StreamEvent::TextDelta {
5506 text: text.to_string(),
5507 },
5508 imp_llm::StreamEvent::MessageEnd {
5509 message: imp_llm::AssistantMessage {
5510 content: vec![imp_llm::ContentBlock::Text {
5511 text: text.to_string(),
5512 }],
5513 usage: Some(imp_llm::Usage {
5514 input_tokens: input,
5515 output_tokens: output,
5516 cache_read_tokens: 0,
5517 cache_write_tokens: 0,
5518 }),
5519 stop_reason: imp_llm::StopReason::EndTurn,
5520 timestamp: 1000,
5521 },
5522 },
5523 ]
5524 }
5525
5526 fn tool_call_response(
5527 call_id: &str,
5528 tool_name: &str,
5529 args: serde_json::Value,
5530 input: u32,
5531 output: u32,
5532 ) -> Vec<imp_llm::StreamEvent> {
5533 vec![
5534 imp_llm::StreamEvent::MessageStart {
5535 model: "test-model".to_string(),
5536 },
5537 imp_llm::StreamEvent::ToolCall {
5538 id: call_id.to_string(),
5539 name: tool_name.to_string(),
5540 arguments: args.clone(),
5541 },
5542 imp_llm::StreamEvent::MessageEnd {
5543 message: imp_llm::AssistantMessage {
5544 content: vec![imp_llm::ContentBlock::ToolCall {
5545 id: call_id.to_string(),
5546 name: tool_name.to_string(),
5547 arguments: args,
5548 }],
5549 usage: Some(imp_llm::Usage {
5550 input_tokens: input,
5551 output_tokens: output,
5552 cache_read_tokens: 0,
5553 cache_write_tokens: 0,
5554 }),
5555 stop_reason: imp_llm::StopReason::ToolUse,
5556 timestamp: 1000,
5557 },
5558 },
5559 ]
5560 }
5561
5562 async fn collect_events(mut handle: AgentHandle) -> Vec<AgentEvent> {
5563 let mut events = Vec::new();
5564 while let Some(event) = handle.event_rx.recv().await {
5565 events.push(event);
5566 }
5567 events
5568 }
5569
5570 struct EchoTool;
5573
5574 #[async_trait]
5575 impl crate::tools::Tool for EchoTool {
5576 fn name(&self) -> &str {
5577 "echo"
5578 }
5579 fn label(&self) -> &str {
5580 "Echo"
5581 }
5582 fn description(&self) -> &str {
5583 "Echoes back the input"
5584 }
5585 fn parameters(&self) -> serde_json::Value {
5586 serde_json::json!({
5587 "type": "object",
5588 "properties": { "text": { "type": "string" } },
5589 "required": ["text"]
5590 })
5591 }
5592 fn is_readonly(&self) -> bool {
5593 true
5594 }
5595 async fn execute(
5596 &self,
5597 _call_id: &str,
5598 params: serde_json::Value,
5599 _ctx: crate::tools::ToolContext,
5600 ) -> crate::error::Result<crate::tools::ToolOutput> {
5601 let text = params["text"].as_str().unwrap_or("no text");
5602 Ok(crate::tools::ToolOutput::text(format!("echo: {text}")))
5603 }
5604 }
5605
5606 struct NamedWriteTool(&'static str);
5607
5608 #[async_trait]
5609 impl crate::tools::Tool for NamedWriteTool {
5610 fn name(&self) -> &str {
5611 self.0
5612 }
5613 fn label(&self) -> &str {
5614 self.0
5615 }
5616 fn description(&self) -> &str {
5617 "A write tool"
5618 }
5619 fn parameters(&self) -> serde_json::Value {
5620 serde_json::json!({"type": "object", "properties": {"data": {"type": "string"}}})
5621 }
5622 fn is_readonly(&self) -> bool {
5623 false
5624 }
5625 async fn execute(
5626 &self,
5627 _call_id: &str,
5628 _params: serde_json::Value,
5629 _ctx: crate::tools::ToolContext,
5630 ) -> crate::error::Result<crate::tools::ToolOutput> {
5631 Ok(crate::tools::ToolOutput::text("written"))
5632 }
5633 }
5634
5635 fn single_text_model(text: &str) -> Arc<MockProvider> {
5636 Arc::new(MockProvider::new(vec![text_response(text, 50, 10)]))
5637 }
5638
5639 #[tokio::test]
5641 async fn agent_mode_enforcement_full_registers_all_tools() {
5642 use crate::config::AgentMode;
5643
5644 let provider = single_text_model("ok");
5645 let model = test_model(provider);
5646 let (mut agent, _handle) = Agent::new(model, PathBuf::from("/tmp"));
5647 agent.mode = AgentMode::Full;
5648
5649 agent.tools.register(Arc::new(EchoTool)); agent.tools.register(Arc::new(NamedWriteTool("write")));
5652
5653 assert!(
5655 agent.tools.get("echo").is_some(),
5656 "echo should be registered"
5657 );
5658 assert!(
5659 agent.tools.get("write").is_some(),
5660 "write should be registered"
5661 );
5662 assert!(agent.mode.allows_tool("echo"));
5663 assert!(agent.mode.allows_tool("write"));
5664 assert!(agent.mode.allows_tool("any_future_tool"));
5665 }
5666
5667 #[test]
5669 fn agent_mode_enforcement_orchestrator_excludes_write_tools() {
5670 use crate::config::AgentMode;
5671 use crate::tools::ToolRegistry;
5672
5673 let mut registry = ToolRegistry::new();
5674 registry.register(Arc::new(EchoTool)); registry.register(Arc::new(NamedWriteTool("write")));
5676 registry.register(Arc::new(NamedWriteTool("edit")));
5677 registry.register(Arc::new(NamedWriteTool("bash")));
5678
5679 let mode = AgentMode::Orchestrator;
5681 registry.retain(|name| mode.allows_tool(name));
5682
5683 assert!(
5685 registry.get("write").is_none(),
5686 "write must be filtered out"
5687 );
5688 assert!(registry.get("edit").is_none(), "edit must be filtered out");
5689 assert!(registry.get("bash").is_none(), "bash must be filtered out");
5690 assert!(registry.get("echo").is_none(), "echo must be filtered out");
5692 }
5693
5694 #[tokio::test]
5696 async fn agent_mode_enforcement_guard_blocks_disallowed() {
5697 use crate::config::AgentMode;
5698
5699 let provider = Arc::new(MockProvider::new(vec![
5700 tool_call_response(
5702 "call_1",
5703 "write",
5704 serde_json::json!({"data": "content"}),
5705 50,
5706 10,
5707 ),
5708 text_response("Understood, I cannot write directly.", 50, 10),
5710 ]));
5711
5712 let model = test_model(provider);
5713 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
5714 agent.mode = AgentMode::Orchestrator;
5715 agent.tools.register(Arc::new(NamedWriteTool("write")));
5717
5718 let events_task = tokio::spawn(collect_events(handle));
5719 agent.run("Write something".to_string()).await.unwrap();
5720 drop(agent);
5721
5722 let events = events_task.await.unwrap();
5723
5724 let tool_end = events
5726 .iter()
5727 .find(|e| matches!(e, AgentEvent::ToolExecutionEnd { .. }));
5728 assert!(tool_end.is_some(), "should have a ToolExecutionEnd event");
5729
5730 if let Some(AgentEvent::ToolExecutionEnd { result, .. }) = tool_end {
5731 assert!(result.is_error, "mode guard should produce an error result");
5732 let text = result.content.iter().find_map(|c| {
5733 if let ContentBlock::Text { text } = c {
5734 Some(text.as_str())
5735 } else {
5736 None
5737 }
5738 });
5739 let text = text.expect("error result should have text");
5740 assert!(
5741 text.contains("write") && text.contains("mode"),
5742 "error should name the tool and mention mode, got: {text}"
5743 );
5744 }
5745 }
5746
5747 #[tokio::test]
5749 async fn agent_mode_enforcement_guard_allows_permitted() {
5750 use crate::config::AgentMode;
5751
5752 let provider = Arc::new(MockProvider::new(vec![
5753 tool_call_response(
5755 "call_1",
5756 "echo",
5757 serde_json::json!({"text": "hello"}),
5758 50,
5759 10,
5760 ),
5761 text_response("Echo succeeded", 50, 10),
5762 ]));
5763
5764 let model = test_model(provider);
5765 let (mut agent, handle) = Agent::new(model, PathBuf::from("/tmp"));
5766 agent.mode = AgentMode::Full;
5768 agent.tools.register(Arc::new(EchoTool));
5769
5770 let events_task = tokio::spawn(collect_events(handle));
5771 agent.run("Echo something".to_string()).await.unwrap();
5772 drop(agent);
5773
5774 let events = events_task.await.unwrap();
5775
5776 let tool_end = events
5778 .iter()
5779 .find(|e| matches!(e, AgentEvent::ToolExecutionEnd { .. }));
5780 assert!(tool_end.is_some());
5781
5782 if let Some(AgentEvent::ToolExecutionEnd { result, .. }) = tool_end {
5783 assert!(!result.is_error, "permitted tool should succeed");
5784 }
5785 }
5786
5787 #[test]
5789 fn agent_mode_enforcement_system_prompt_filters() {
5790 use crate::config::AgentMode;
5791 use crate::system_prompt::{assemble, AssembleParams};
5792 use crate::tools::ToolRegistry;
5793
5794 let mut registry = ToolRegistry::new();
5795 registry.register(Arc::new(NamedWriteTool("write")));
5796 registry.register(Arc::new(NamedWriteTool("edit")));
5797 registry.register(Arc::new(NamedWriteTool("bash")));
5798
5799 struct ReadTool;
5801 #[async_trait]
5802 impl crate::tools::Tool for ReadTool {
5803 fn name(&self) -> &str {
5804 "read"
5805 }
5806 fn label(&self) -> &str {
5807 "Read"
5808 }
5809 fn description(&self) -> &str {
5810 "Read a file"
5811 }
5812 fn parameters(&self) -> serde_json::Value {
5813 serde_json::json!({"type": "object"})
5814 }
5815 fn is_readonly(&self) -> bool {
5816 true
5817 }
5818 async fn execute(
5819 &self,
5820 _: &str,
5821 _: serde_json::Value,
5822 _: crate::tools::ToolContext,
5823 ) -> crate::error::Result<crate::tools::ToolOutput> {
5824 Ok(crate::tools::ToolOutput::text(""))
5825 }
5826 }
5827 registry.register(Arc::new(ReadTool));
5828
5829 let mode = AgentMode::Orchestrator;
5830 let result = assemble(&AssembleParams {
5831 tools: ®istry,
5832 agents_md: &[],
5833 skills: &[],
5834 facts: &[],
5835 project_memory_status: None,
5836 personality: None,
5837 soul: None,
5838 task: None,
5839 role: None,
5840 mode: &mode,
5841 memory: None,
5842 user_profile: None,
5843 cwd: None,
5844 learning_enabled: false,
5845 guardrail_profile: None,
5846 });
5847
5848 assert!(
5850 result.text.contains("- read:"),
5851 "read should be in orchestrator prompt"
5852 );
5853
5854 assert!(
5856 !result.text.contains("- write:"),
5857 "write must not appear in orchestrator prompt"
5858 );
5859 assert!(
5860 !result.text.contains("- edit:"),
5861 "edit must not appear in orchestrator prompt"
5862 );
5863 assert!(
5864 !result.text.contains("- bash:"),
5865 "bash must not appear in orchestrator prompt"
5866 );
5867 }
5868
5869 #[test]
5871 fn agent_mode_enforcement_system_prompt_instructions() {
5872 use crate::config::AgentMode;
5873 use crate::system_prompt::{assemble, AssembleParams};
5874 use crate::tools::ToolRegistry;
5875
5876 let registry = ToolRegistry::new();
5877
5878 let full_result = assemble(&AssembleParams {
5880 tools: ®istry,
5881 agents_md: &[],
5882 skills: &[],
5883 facts: &[],
5884 project_memory_status: None,
5885 personality: None,
5886 soul: None,
5887 task: None,
5888 role: None,
5889 mode: &AgentMode::Full,
5890 memory: None,
5891 user_profile: None,
5892 cwd: None,
5893 learning_enabled: false,
5894 guardrail_profile: None,
5895 });
5896 assert!(
5898 !full_result.text.contains("orchestrator"),
5899 "Full mode should not mention orchestrator"
5900 );
5901 assert!(
5902 !full_result.text.contains("You are a worker agent."),
5903 "Full mode should not include worker mode instructions"
5904 );
5905
5906 let orch_result = assemble(&AssembleParams {
5908 tools: ®istry,
5909 agents_md: &[],
5910 skills: &[],
5911 facts: &[],
5912 project_memory_status: None,
5913 personality: None,
5914 soul: None,
5915 task: None,
5916 role: None,
5917 mode: &AgentMode::Orchestrator,
5918 memory: None,
5919 user_profile: None,
5920 cwd: None,
5921 learning_enabled: false,
5922 guardrail_profile: None,
5923 });
5924 assert!(
5925 orch_result.text.contains("orchestrator"),
5926 "orchestrator prompt should contain mode instructions, got: {}",
5927 orch_result.text
5928 );
5929
5930 let worker_result = assemble(&AssembleParams {
5932 tools: ®istry,
5933 agents_md: &[],
5934 skills: &[],
5935 facts: &[],
5936 project_memory_status: None,
5937 personality: None,
5938 soul: None,
5939 task: None,
5940 role: None,
5941 mode: &AgentMode::Worker,
5942 memory: None,
5943 user_profile: None,
5944 cwd: None,
5945 learning_enabled: false,
5946 guardrail_profile: None,
5947 });
5948 assert!(
5949 worker_result.text.contains("worker"),
5950 "worker prompt should contain mode instructions"
5951 );
5952
5953 let reviewer_result = assemble(&AssembleParams {
5955 tools: ®istry,
5956 agents_md: &[],
5957 skills: &[],
5958 facts: &[],
5959 project_memory_status: None,
5960 personality: None,
5961 soul: None,
5962 task: None,
5963 role: None,
5964 mode: &AgentMode::Reviewer,
5965 memory: None,
5966 user_profile: None,
5967 cwd: None,
5968 learning_enabled: false,
5969 guardrail_profile: None,
5970 });
5971 assert!(
5972 reviewer_result.text.contains("reviewer") || reviewer_result.text.contains("read"),
5973 "reviewer prompt should contain mode instructions"
5974 );
5975 }
5976}