1#![allow(unused_doc_comments)]
2
3pub mod config;
11pub mod helpers;
13pub mod queues;
15pub mod retry;
17pub mod stream_outcome;
19pub mod streaming;
21pub mod tool_exec;
23pub mod ttsr;
25
26use crate::agent::ProviderResolver;
28use crate::compaction::{CompactedContext, CompactionEvent};
29use crate::events::AgentEvent;
30use crate::recovery::{CircuitBreaker, CircuitBreakerConfig};
31use crate::state::TokenSource;
32use crate::{state::SharedState, tools::ToolContext, tools::ToolRegistry};
33use anyhow::{Error, Result};
34pub use config::{AfterToolCallHook, AgentLoopConfig, BeforeToolCallHook, ToolExecutionMode};
35use oxi_ai::{
36 CompactionManager as OxCompactionManager, CompactionStrategy, ContentBlock, LlmCompactor,
37 Message, Provider, StopReason, TextContent, UserMessage,
38};
39use parking_lot::RwLock;
40use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
42use std::time::Instant;
43
44use self::helpers::{sanitize_orphaned_tool_results, should_stop_after_turn};
45use self::queues::{
46 clear_all_queues, clear_follow_up_queue, clear_steering_queue, drain_follow_up_queue,
47 drain_steering_queue, try_push_follow_up, try_push_steering,
48};
49use self::retry::{
50 auto_retry_attempt_method, cancel_auto_retry, handle_retryable_error, is_retryable_error,
51};
52use self::streaming::stream_assistant_response;
53use self::tool_exec::execute_tool_calls;
54
55pub use self::stream_outcome::StreamOutcome;
56type EmitFn = Arc<dyn Fn(AgentEvent) + Send + Sync>;
57
58pub struct AgentLoop {
60 provider: Arc<dyn Provider>,
61 config: AgentLoopConfig,
62 tools: Arc<ToolRegistry>,
63 state: SharedState,
64 compaction_manager: OxCompactionManager,
65 before_tool_call: Option<BeforeToolCallHook>,
66 after_tool_call: Option<AfterToolCallHook>,
67 steering_queue: RwLock<Vec<Message>>,
68 follow_up_queue: RwLock<Vec<Message>>,
69 session_id: Option<String>,
70 auto_retry_attempt: AtomicUsize,
71 auto_retry_cancel: AtomicBool,
72 auto_retry_notify: tokio::sync::Notify,
74 circuit_breaker: CircuitBreaker,
75 external_stop: Arc<AtomicBool>,
78 cancel_signal: Option<Arc<AtomicBool>>,
82 resolver: Arc<dyn ProviderResolver>,
84 steering_hook: Option<Arc<dyn Fn() -> Vec<String> + Send + Sync>>,
87 follow_up_hook: Option<Arc<dyn Fn() -> Vec<String> + Send + Sync>>,
89 ttsr_engine: Option<Arc<ttsr::TtsrEngine>>,
91}
92
93impl AgentLoop {
94 pub fn new_with_resolver(
98 provider: Arc<dyn Provider>,
99 config: AgentLoopConfig,
100 tools: Arc<ToolRegistry>,
101 state: SharedState,
102 resolver: Arc<dyn ProviderResolver>,
103 ) -> Self {
104 let mut compaction_manager =
105 OxCompactionManager::new(config.compaction_strategy.clone(), config.context_window);
106
107 if config.compaction_strategy != CompactionStrategy::Disabled {
108 let model = resolver.resolve_model(&config.model_id);
109 if let Some(model) = model {
110 let llm_compactor =
111 Arc::new(LlmCompactor::new(model.clone(), Arc::clone(&provider)));
112 compaction_manager.set_compactor(llm_compactor);
113 }
114 }
115
116 Self {
117 provider,
118 config: config.clone(),
119 tools,
120 state,
121 compaction_manager,
122 before_tool_call: None,
123 after_tool_call: None,
124 steering_queue: RwLock::new(Vec::new()),
125 follow_up_queue: RwLock::new(Vec::new()),
126 session_id: config.session_id.clone(),
127 auto_retry_attempt: AtomicUsize::new(0),
128 auto_retry_cancel: AtomicBool::new(false),
129 auto_retry_notify: tokio::sync::Notify::new(),
130 circuit_breaker: CircuitBreaker::new(CircuitBreakerConfig::default()),
131 external_stop: Arc::new(AtomicBool::new(false)),
132 cancel_signal: None,
133 resolver,
134 steering_hook: None,
135 follow_up_hook: None,
136 ttsr_engine: config.ttsr_engine.clone(),
137 }
138 }
139
140 pub fn new(
142 provider: Arc<dyn Provider>,
143 config: AgentLoopConfig,
144 tools: Arc<ToolRegistry>,
145 state: SharedState,
146 ) -> Self {
147 use crate::agent::GlobalProviderResolver;
148 Self::new_with_resolver(
149 provider,
150 config,
151 tools,
152 state,
153 Arc::new(GlobalProviderResolver),
154 )
155 }
156
157 pub fn with_before_tool_call(mut self, hook: BeforeToolCallHook) -> Self {
160 self.before_tool_call = Some(hook);
161 self
162 }
163
164 pub fn with_after_tool_call(mut self, hook: AfterToolCallHook) -> Self {
167 self.after_tool_call = Some(hook);
168 self
169 }
170
171 pub fn steer(&self, message: Message) {
177 if !try_push_steering(self, message) {
178 tracing::warn!("Steering message dropped — queue at capacity");
179 }
180 }
181
182 pub fn follow_up(&self, message: Message) {
188 if !try_push_follow_up(self, message) {
189 tracing::warn!("Follow-up message dropped — queue at capacity");
190 }
191 }
192
193 pub fn clear_steering_queue(&self) {
196 clear_steering_queue(self);
197 }
198
199 pub fn clear_follow_up_queue(&self) {
202 clear_follow_up_queue(self);
203 }
204
205 pub fn clear_all_queues(&self) {
207 clear_all_queues(self);
208 }
209
210 fn drain_steering_queue(&self) -> Vec<Message> {
211 drain_steering_queue(self)
212 }
213
214 #[cfg_attr(test, allow(dead_code))]
217 pub(crate) fn build_tool_context(&self) -> ToolContext {
218 let workspace = self
219 .config
220 .workspace_dir
221 .clone()
222 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
223 ToolContext {
224 workspace_dir: workspace,
225 root_dir: self.config.workspace_dir.clone(),
226 session_id: self.session_id.clone(),
227 snapshot_store: self.config.snapshot_store.clone(),
228 memory: self.config.memory.clone(),
229 url_resolver: self.config.url_resolver.clone(),
230 todo: self.config.todo.clone(),
231 agent_pool: self.config.agent_pool.clone(),
232 lsp: self.config.lsp.clone(),
233 subagent_runner: self.config.subagent_runner.clone(),
234 subagent_depth: self.config.subagent_depth,
235 }
236 }
237
238 fn maybe_truncate_tool_result(
246 &self,
247 mut result: oxi_ai::ToolResultMessage,
248 ) -> oxi_ai::ToolResultMessage {
249 let Some(max_bytes) = self.config.max_tool_result_bytes else {
250 return result;
251 };
252
253 for block in &mut result.content {
254 if let oxi_ai::ContentBlock::Text(tc) = block
255 && tc.text.len() > max_bytes
256 {
257 let omitted = tc.text.len() - max_bytes;
258 tc.text.truncate(max_bytes);
259 tc.text.push_str(&format!(
260 "\n\n... [truncated: {omitted} bytes omitted, \
261 use read/grep for full content]"
262 ));
263 }
264 }
265
266 result
267 }
268
269 fn drain_follow_up_queue(&self) -> Vec<Message> {
270 drain_follow_up_queue(self)
271 }
272
273 pub fn cancel_auto_retry(&self) {
277 cancel_auto_retry(self);
278 }
279
280 pub fn auto_retry_attempt(&self) -> usize {
283 auto_retry_attempt_method(self)
284 }
285
286 pub fn state(&self) -> &SharedState {
289 &self.state
290 }
291
292 pub fn external_stop(&self) -> &Arc<AtomicBool> {
294 &self.external_stop
295 }
296
297 pub fn set_cancel_signal(&mut self, flag: Arc<AtomicBool>) {
302 self.cancel_signal = Some(flag);
303 }
304
305 pub fn cancel_signal(&self) -> Option<Arc<AtomicBool>> {
310 self.cancel_signal.as_ref().map(Arc::clone)
311 }
312
313 pub fn is_cancelled(&self) -> bool {
316 if self.external_stop.load(Ordering::SeqCst) {
317 return true;
318 }
319 self.cancel_signal
320 .as_ref()
321 .is_some_and(|f| f.load(Ordering::SeqCst))
322 }
323 pub fn cancel(&self) {
328 self.external_stop.store(true, Ordering::SeqCst);
329 }
330
331 pub fn set_steering_hook(&mut self, hook: Arc<dyn Fn() -> Vec<String> + Send + Sync>) {
334 self.steering_hook = Some(hook);
335 }
336
337 pub fn set_follow_up_hook(&mut self, hook: Arc<dyn Fn() -> Vec<String> + Send + Sync>) {
340 self.follow_up_hook = Some(hook);
341 }
342
343 fn poll_external_queues(&self) {
346 if let Some(ref hook) = self.steering_hook {
347 for msg_text in hook() {
348 self.steer(Message::User(UserMessage::new(msg_text)));
349 }
350 }
351 if let Some(ref hook) = self.follow_up_hook {
352 for msg_text in hook() {
353 self.follow_up(Message::User(UserMessage::new(msg_text)));
354 }
355 }
356 }
357
358 pub async fn run(
361 &self,
362 prompt: String,
363 emit: impl Fn(AgentEvent) + Send + Sync + 'static,
364 ) -> Result<Vec<AgentEvent>> {
365 let message = Message::User(UserMessage::new(prompt));
366 let emit = Arc::new(emit);
367 self.run_messages(vec![message], emit).await
368 }
369
370 pub async fn run_mut<S: Send + std::fmt::Debug + 'static>(
401 &self,
402 prompt: String,
403 state: S,
404 emit: impl FnMut(AgentEvent, &mut S) + Send + 'static,
405 ) -> Result<(Vec<AgentEvent>, S)> {
406 let emit_fnmut = Arc::new(parking_lot::Mutex::new(emit));
407 let state_arc = Arc::new(parking_lot::Mutex::new(state));
408
409 let state_for_closure = Arc::clone(&state_arc);
411
412 let emit_fn: EmitFn = Arc::new(move |event: AgentEvent| {
413 let mut cb = emit_fnmut.lock();
414 let mut s = state_for_closure.lock();
415 cb(event, &mut s);
416 });
417
418 let events = self.run_inner(prompt, emit_fn).await?;
419
420 let mutex = Arc::try_unwrap(state_arc)
424 .expect("run_mut: state Arc still has multiple owners after run");
425 Ok((events, mutex.into_inner()))
426 }
427
428 async fn run_inner(&self, prompt: String, emit: EmitFn) -> Result<Vec<AgentEvent>> {
430 let message = Message::User(UserMessage::new(prompt));
431 self.run_messages(vec![message], emit).await
432 }
433
434 pub async fn run_messages(
437 &self,
438 prompts: Vec<Message>,
439 emit: EmitFn,
440 ) -> Result<Vec<AgentEvent>> {
441 let mut all_events = Vec::new();
442
443 let state_messages = self.state.get_state().messages.clone();
444 let mut all_messages = state_messages;
445 all_messages.extend(prompts.clone());
446
447 tracing::info!(session_id = ?self.session_id, "AgentLoop starting");
448 emit(AgentEvent::AgentStart {
449 prompts: prompts.clone(),
450 session_id: self.session_id.clone(),
451 });
452 all_events.push(AgentEvent::AgentStart {
453 prompts: prompts.clone(),
454 session_id: self.session_id.clone(),
455 });
456
457 let (result_messages, events) = self.run_loop(prompts, emit.clone()).await?;
458
459 all_events.extend(events);
460
461 let stop_reason = result_messages.last().and_then(|m| {
462 if let Message::Assistant(a) = m {
463 Some(format!("{:?}", a.stop_reason))
464 } else {
465 None
466 }
467 });
468
469 tracing::info!(session_id = ?self.session_id, "AgentLoop run_messages complete");
470
471 self.state.update(|s| {
473 s.replace_messages(result_messages.clone());
474 });
475
476 emit(AgentEvent::AgentEnd {
477 messages: result_messages.clone(),
478 stop_reason: stop_reason.clone(),
479 session_id: self.session_id.clone(),
480 });
481 all_events.push(AgentEvent::AgentEnd {
482 messages: result_messages.clone(),
483 stop_reason,
484 session_id: self.session_id.clone(),
485 });
486
487 Ok(all_events)
488 }
489
490 pub async fn continue_loop(
493 &self,
494 emit: impl Fn(AgentEvent) + Send + Sync + 'static,
495 ) -> Result<Vec<AgentEvent>> {
496 let emit = Arc::new(emit);
497 let mut all_events = Vec::new();
498
499 tracing::info!(session_id = ?self.session_id, "AgentLoop continuing");
500 emit(AgentEvent::AgentStart {
501 prompts: vec![],
502 session_id: self.session_id.clone(),
503 });
504 all_events.push(AgentEvent::AgentStart {
505 prompts: vec![],
506 session_id: self.session_id.clone(),
507 });
508
509 let (result_messages, events) = self.run_loop(vec![], emit.clone()).await?;
510
511 all_events.extend(events);
512
513 let stop_reason = result_messages.last().and_then(|m| {
514 if let Message::Assistant(a) = m {
515 Some(format!("{:?}", a.stop_reason))
516 } else {
517 None
518 }
519 });
520
521 tracing::info!(session_id = ?self.session_id, "AgentLoop continue_loop complete");
522 emit(AgentEvent::AgentEnd {
523 messages: result_messages.clone(),
524 stop_reason: stop_reason.clone(),
525 session_id: self.session_id.clone(),
526 });
527 all_events.push(AgentEvent::AgentEnd {
528 messages: result_messages.clone(),
529 stop_reason,
530 session_id: self.session_id.clone(),
531 });
532
533 Ok(all_events)
534 }
535
536 fn process_steering_messages(
538 &self,
539 pending_messages: &mut Vec<Message>,
540 messages: &mut Vec<Message>,
541 new_messages: &mut Vec<Message>,
542 events: &mut Vec<AgentEvent>,
543 emit: &EmitFn,
544 ) {
545 if pending_messages.is_empty() {
546 return;
547 }
548 for message in pending_messages.drain(..) {
549 emit(AgentEvent::SteeringMessage {
550 message: message.clone(),
551 });
552 emit(AgentEvent::MessageStart {
553 message: message.clone(),
554 });
555 emit(AgentEvent::MessageEnd {
556 message: message.clone(),
557 });
558 events.push(AgentEvent::SteeringMessage {
559 message: message.clone(),
560 });
561 events.push(AgentEvent::MessageStart {
562 message: message.clone(),
563 });
564 events.push(AgentEvent::MessageEnd {
565 message: message.clone(),
566 });
567 messages.push(message.clone());
568 new_messages.push(message);
569 }
570 }
571
572 async fn handle_streaming_error(
574 &self,
575 e: anyhow::Error,
576 messages: &mut Vec<Message>,
577 new_messages: &mut Vec<Message>,
578 events: &mut Vec<AgentEvent>,
579 emit: &EmitFn,
580 turn_number: u32,
581 ) -> (Vec<Message>, Vec<AgentEvent>) {
582 let err_msg = format!("{}", e);
583 tracing::error!(session_id = ?self.session_id, "Unexpected streaming error: {}", err_msg);
584
585 let mut error_asst = oxi_ai::AssistantMessage::new(
586 oxi_ai::Api::OpenAiCompletions,
587 "agent",
588 &self.config.model_id,
589 );
590 error_asst.stop_reason = StopReason::Error;
591 error_asst
592 .content
593 .push(ContentBlock::Text(TextContent::new(format!(
594 "⚠ {}",
595 err_msg
596 ))));
597
598 new_messages.push(Message::Assistant(error_asst.clone()));
599 messages.push(Message::Assistant(error_asst.clone()));
600
601 emit(AgentEvent::MessageStart {
602 message: Message::Assistant(error_asst.clone()),
603 });
604 emit(AgentEvent::MessageEnd {
605 message: Message::Assistant(error_asst.clone()),
606 });
607 emit(AgentEvent::Error {
608 message: err_msg.clone(),
609 session_id: self.session_id.clone(),
610 });
611
612 emit(AgentEvent::TurnEnd {
613 turn_number,
614 assistant_message: Message::Assistant(error_asst.clone()),
615 tool_results: vec![],
616 });
617 events.push(AgentEvent::TurnEnd {
618 turn_number,
619 assistant_message: Message::Assistant(error_asst),
620 tool_results: vec![],
621 });
622 (messages.clone(), events.clone())
624 }
625
626 async fn run_loop(
627 &self,
628 initial_prompts: Vec<Message>,
629 emit: EmitFn,
630 ) -> Result<(Vec<Message>, Vec<AgentEvent>)> {
631 tracing::info!("[AGENT-LOOP] run_loop started");
632 let mut messages = self.state.get_state().messages.clone();
633 messages.extend(initial_prompts.clone());
634
635 let mut new_messages: Vec<Message> = initial_prompts;
636 let mut events = Vec::new();
637 let mut turn_number: u32 = 0;
638 let mut first_turn = true;
639
640 let mut pending_messages: Vec<Message> = self.drain_steering_queue();
641
642 loop {
643 tracing::info!(
644 "[AGENT-LOOP] Top of loop, has_more_tool_calls={}, pending_messages={}",
645 true,
646 pending_messages.is_empty()
647 );
648 let mut has_more_tool_calls = true;
649
650 while has_more_tool_calls || !pending_messages.is_empty() {
651 if !first_turn {
652 turn_number += 1;
653 emit(AgentEvent::TurnStart { turn_number });
654 events.push(AgentEvent::TurnStart { turn_number });
655 } else {
656 first_turn = false;
657 turn_number = 1;
658 emit(AgentEvent::TurnStart { turn_number });
659 events.push(AgentEvent::TurnStart { turn_number });
660 }
661
662 if !pending_messages.is_empty() {
663 self.process_steering_messages(
664 &mut pending_messages,
665 &mut messages,
666 &mut new_messages,
667 &mut events,
668 &emit,
669 );
670 }
671
672 self.poll_external_queues();
675
676 self.maybe_compact(&mut messages, turn_number as usize, &emit)
677 .await;
678
679 tracing::info!("[AGENT-LOOP] About to call stream_assistant_response");
680 let ttsr = self.ttsr_engine.as_deref();
681 let outcome = stream_assistant_response(self, &mut messages, &emit, ttsr).await;
682
683 let assistant_message = match outcome {
684 StreamOutcome::Complete(msg) => msg,
685 StreamOutcome::Error {
686 message: _message,
687 detail,
688 } => {
689 let is_tool_ordering_error = detail.contains("tool")
692 && (detail.contains("must be a response")
693 || detail.contains("preceding")
694 || detail.contains("tool_calls"));
695
696 if is_tool_ordering_error {
697 let removed = sanitize_orphaned_tool_results(&mut messages);
698 tracing::warn!(
699 session_id = ?self.session_id,
700 removed,
701 detail = %detail,
702 "Message-ordering error detected, removed orphaned tool results, retrying"
703 );
704 if removed > 0 {
705 emit(AgentEvent::Error {
707 message: format!(
708 "⚠ Provider rejected message order: {}. Removed {} orphaned tool results, retrying…",
709 detail, removed
710 ),
711 session_id: self.session_id.clone(),
712 });
713 continue; }
715 }
716
717 return Ok(self
719 .handle_streaming_error(
720 anyhow::anyhow!("Provider stream error: {}", detail),
721 &mut messages,
722 &mut new_messages,
723 &mut events,
724 &emit,
725 turn_number,
726 )
727 .await);
728 }
729 StreamOutcome::Cancelled(msg) => {
730 emit(AgentEvent::TurnEnd {
731 turn_number,
732 assistant_message: Message::Assistant(msg.clone()),
733 tool_results: vec![],
734 });
735 return Ok((messages, events));
736 }
737 StreamOutcome::RuleInterrupt { partial, rule } => {
738 tracing::info!("RuleInterrupt: '{}' violated, retrying", rule.name);
739 emit(AgentEvent::TtsrInterrupt {
740 rule_name: rule.name.clone(),
741 session_id: self.session_id.clone(),
742 });
743 messages.push(Message::Assistant(partial));
744 let interrupt_body = format!(
745 "<system-interrupt reason=\"rule_violation\" rule=\"{name}\">\n\
746 Your output was interrupted because it violated a project rule.\n\
747 Comply with: {content}\n</system-interrupt>",
748 name = rule.name,
749 content = rule.content
750 );
751 messages.push(Message::user(interrupt_body));
752 continue;
753 }
754 };
755
756 new_messages.push(Message::Assistant(assistant_message.clone()));
757
758 if matches!(assistant_message.stop_reason, StopReason::Error) {
759 if is_retryable_error(&assistant_message) {
760 let did_retry =
761 handle_retryable_error(self, &assistant_message, &mut messages, &emit)
762 .await;
763 if did_retry {
764 emit(AgentEvent::TurnEnd {
765 turn_number,
766 assistant_message: Message::Assistant(assistant_message.clone()),
767 tool_results: vec![],
768 });
769 events.push(AgentEvent::TurnEnd {
770 turn_number,
771 assistant_message: Message::Assistant(assistant_message.clone()),
772 tool_results: vec![],
773 });
774 has_more_tool_calls = true;
775 continue;
776 }
777 }
778
779 emit(AgentEvent::TurnEnd {
780 turn_number,
781 assistant_message: Message::Assistant(assistant_message.clone()),
782 tool_results: vec![],
783 });
784 events.push(AgentEvent::TurnEnd {
785 turn_number,
786 assistant_message: Message::Assistant(assistant_message.clone()),
787 tool_results: vec![],
788 });
789 return Ok((messages, events));
790 }
791 if matches!(assistant_message.stop_reason, StopReason::Aborted) {
792 if self.auto_retry_attempt.load(Ordering::Relaxed) > 0 {
793 emit(AgentEvent::AutoRetryEnd {
794 success: true,
795 attempt: self.auto_retry_attempt.load(Ordering::Relaxed),
796 final_error: None,
797 });
798 self.auto_retry_attempt.store(0, Ordering::Relaxed);
799 }
800
801 emit(AgentEvent::TurnEnd {
802 turn_number,
803 assistant_message: Message::Assistant(assistant_message.clone()),
804 tool_results: vec![],
805 });
806 events.push(AgentEvent::TurnEnd {
807 turn_number,
808 assistant_message: Message::Assistant(assistant_message.clone()),
809 tool_results: vec![],
810 });
811 return Ok((messages, events));
812 }
813
814 if self.auto_retry_attempt.load(Ordering::Relaxed) > 0 {
815 emit(AgentEvent::AutoRetryEnd {
816 success: true,
817 attempt: self.auto_retry_attempt.load(Ordering::Relaxed),
818 final_error: None,
819 });
820 self.auto_retry_attempt.store(0, Ordering::Relaxed);
821 }
822
823 let tool_calls = helpers::extract_tool_calls(&assistant_message);
824 tracing::info!(
825 "[AGENT-LOOP] extract_tool_calls found {} calls, stop_reason={:?}",
826 tool_calls.len(),
827 assistant_message.stop_reason
828 );
829
830 let mut tool_results: Vec<oxi_ai::ToolResultMessage> = Vec::new();
831 has_more_tool_calls = false;
832
833 if !tool_calls.is_empty() {
834 tracing::info!("[AGENT-LOOP] Executing {} tool calls", tool_calls.len());
835 let ctx = self.build_tool_context();
836 let executed_batch = match execute_tool_calls(
837 self,
838 &mut messages,
839 &assistant_message,
840 tool_calls,
841 &emit,
842 &ctx,
843 )
844 .await
845 {
846 Ok(batch) => batch,
847 Err(e) => {
848 tracing::error!(session_id = ?self.session_id, "Tool execution error: {}", e);
851 emit(AgentEvent::Error {
852 message: format!("Tool execution error: {}", e),
853 session_id: self.session_id.clone(),
854 });
855 emit(AgentEvent::TurnEnd {
856 turn_number,
857 assistant_message: Message::Assistant(assistant_message.clone()),
858 tool_results: vec![],
859 });
860 events.push(AgentEvent::TurnEnd {
861 turn_number,
862 assistant_message: Message::Assistant(assistant_message.clone()),
863 tool_results: vec![],
864 });
865 return Ok((messages, events));
866 }
867 };
868
869 tool_results = executed_batch.messages;
870 has_more_tool_calls = !executed_batch.terminate;
871
872 if executed_batch.terminate {
873 tracing::warn!(
874 session_id = ?self.session_id,
875 "Tool batch terminated early (terminate flag set by after_tool_call hook). \
876 This halts the tool-calling loop. If this is unexpected, \
877 check after_tool_call hooks for unintended terminate: true."
878 );
879 }
880
881 for result in &tool_results {
882 let result = self.maybe_truncate_tool_result(result.clone());
883 messages.push(Message::ToolResult(result.clone()));
884 new_messages.push(Message::ToolResult(result));
885 }
886 }
887
888 emit(AgentEvent::TurnEnd {
889 turn_number,
890 assistant_message: Message::Assistant(assistant_message.clone()),
891 tool_results: tool_results.clone(),
892 });
893 events.push(AgentEvent::TurnEnd {
894 turn_number,
895 assistant_message: Message::Assistant(assistant_message.clone()),
896 tool_results: tool_results.clone(),
897 });
898
899 if should_stop_after_turn(&self.external_stop) {
900 tracing::info!("[AGENT-LOOP] external_stop, ending loop");
901 return Ok((messages, events));
902 }
903
904 pending_messages = self.drain_steering_queue();
905 tracing::info!(
906 "[AGENT-LOOP] TurnEnd complete, pending_messages={}, has_more_tool_calls={}",
907 !pending_messages.is_empty(),
908 has_more_tool_calls
909 );
910
911 if self.external_stop.load(Ordering::SeqCst) {
914 tracing::info!(
915 "[AGENT-LOOP] external_stop set after steering drain, ending loop"
916 );
917 return Ok((messages, events));
918 }
919 }
920
921 let late_steering = self.drain_steering_queue();
925 if !late_steering.is_empty() {
926 tracing::info!(
927 count = late_steering.len(),
928 "[AGENT-LOOP] Caught late steering messages after inner loop exit"
929 );
930 pending_messages = late_steering;
931 continue;
932 }
933
934 let follow_up_messages = self.drain_follow_up_queue();
935 if !follow_up_messages.is_empty() {
936 pending_messages = follow_up_messages;
937 continue;
938 }
939
940 let final_steering = self.drain_steering_queue();
943 if !final_steering.is_empty() {
944 pending_messages = final_steering;
945 continue;
946 }
947
948 break;
949 }
950
951 Ok((messages, events))
952 }
953
954 fn build_compaction_instruction(&self) -> Option<String> {
957 let base = self.config.compaction_instruction.as_deref();
958 let injected = self
959 .ttsr_engine
960 .as_ref()
961 .map(|e| e.injected_records())
962 .unwrap_or_default();
963 if injected.is_empty() {
964 return base.map(|s| s.to_string());
965 }
966 let mut instr = base.map(|s| s.to_string()).unwrap_or_default();
967 instr.push_str("\n\nThe following rules have already been enforced in this session and corrections applied. Do NOT violate them again:");
968 for (name, _turn) in &injected {
969 instr.push_str(&format!("\n- {name}"));
970 }
971 Some(instr)
972 }
973
974 async fn maybe_compact(&self, messages: &mut Vec<Message>, iteration: usize, emit: &EmitFn) {
975 let snapshot = self.state.get_state();
991 let (context_tokens, source_label) = match snapshot.current_token_source() {
992 TokenSource::Real(n) => (n, "provider-reported"),
993 TokenSource::Heuristic(n) => (n, "bytes/4 heuristic (cold start)"),
994 TokenSource::None => (0, "empty"),
995 };
996 if let Some(div) = snapshot.last_estimate_divergence
1001 && div > 2.0
1002 {
1003 tracing::warn!(
1004 session_id = ?self.session_id,
1005 divergence = div,
1006 reported = snapshot.last_input_tokens.unwrap_or(0),
1007 estimate = snapshot.last_estimate_at_report.unwrap_or(0),
1008 "Token-count heuristic (bytes/4) diverges from provider-reported usage \
1009 by >2x; CompactionStrategy::Threshold decisions are using the \
1010 provider-reported count (issue #28 gap 2)."
1011 );
1012 }
1013 drop(snapshot);
1014
1015 if !self
1016 .compaction_manager
1017 .should_compact(context_tokens, iteration)
1018 {
1019 return;
1020 }
1021
1022 emit(AgentEvent::Compaction {
1023 event: CompactionEvent::Triggered {
1024 context_tokens,
1025 iteration,
1026 source: source_label.to_string(),
1027 },
1028 });
1029
1030 let messages_to_compact: Vec<Message> = messages.to_vec();
1031 let instruction = self.build_compaction_instruction();
1032
1033 match self
1034 .compaction_manager
1035 .compact_if_needed(
1036 &messages_to_compact,
1037 instruction.as_deref(),
1038 context_tokens,
1039 iteration,
1040 )
1041 .await
1042 {
1043 Ok(Some(compacted)) => {
1044 let start = Instant::now();
1045 let message_count = compacted.compacted_count;
1046
1047 emit(AgentEvent::Compaction {
1048 event: CompactionEvent::Started { message_count },
1049 });
1050
1051 let kept_messages = compacted.kept_messages;
1052 let summary = compacted.summary;
1053 let compacted_count = compacted.compacted_count;
1054
1055 *messages = kept_messages;
1056
1057 let state_msgs = messages.clone();
1058 self.state.update(|s| {
1059 s.replace_messages(state_msgs);
1060 });
1061
1062 let compacted_ctx = CompactedContext {
1063 summary,
1064 kept_messages: Vec::new(),
1065 compacted_count,
1066 };
1067 emit(AgentEvent::Compaction {
1068 event: CompactionEvent::Completed {
1069 result: compacted_ctx.clone(),
1070 duration_ms: start.elapsed().as_millis() as u64,
1071 },
1072 });
1073
1074 if let Some(ref hook) = self.config.on_compaction {
1076 match hook(compacted_ctx).await {
1077 Ok(()) => {
1078 tracing::debug!("Compaction hook completed successfully");
1079 }
1080 Err(e) => {
1081 tracing::warn!(error = %e, "Compaction hook failed");
1082 }
1083 }
1084 }
1085 }
1086 Ok(None) => {}
1087 Err(e) => {
1088 emit(AgentEvent::Compaction {
1089 event: CompactionEvent::Failed {
1090 error: e.to_string(),
1091 },
1092 });
1093 }
1094 }
1095 }
1096
1097 fn resolve_model(&self) -> Result<oxi_ai::Model> {
1098 self.resolver
1099 .resolve_model(&self.config.model_id)
1100 .ok_or_else(|| Error::msg(format!("Model not found: {}", self.config.model_id)))
1101 }
1102}
1103
1104#[cfg(test)]
1105mod session_id_wiring_tests {
1106 use super::*;
1111 use crate::ProviderResolver;
1112 use crate::agent_loop::config::AgentLoopConfig;
1113 use crate::config::ToolExecutionMode;
1114 use crate::state::SharedState;
1115 use crate::tools::ToolRegistry;
1116 use oxi_ai::{
1117 CompactionStrategy, Context, Model, Provider, ProviderError, StreamOptions, StreamResult,
1118 };
1119 use std::future::Future;
1120 use std::pin::Pin;
1121
1122 struct NopProvider;
1123 impl Provider for NopProvider {
1124 fn stream<'a>(
1125 &'a self,
1126 _model: &'a Model,
1127 _context: &'a Context,
1128 _options: Option<StreamOptions>,
1129 ) -> Pin<Box<dyn Future<Output = StreamResult> + Send + 'a>> {
1130 Box::pin(async {
1131 Err(ProviderError::NotImplemented(
1132 "session-id wiring tests never stream".to_string(),
1133 ))
1134 })
1135 }
1136 fn name(&self) -> &str {
1137 "nop"
1138 }
1139 }
1140
1141 struct NullResolver;
1142 impl ProviderResolver for NullResolver {
1143 fn resolve_provider(&self, _name: &str) -> Option<Arc<dyn Provider>> {
1144 None
1145 }
1146 fn resolve_model(&self, _model_id: &str) -> Option<Model> {
1147 None
1148 }
1149 }
1150
1151 fn loop_with(session_id: Option<String>) -> AgentLoop {
1152 let config = AgentLoopConfig {
1153 model_id: "test/model".to_string(),
1154 system_prompt: None,
1155 temperature: 1.0,
1156 max_tokens: 4096,
1157 tool_execution: ToolExecutionMode::Sequential,
1158 compaction_strategy: CompactionStrategy::Disabled,
1159 compaction_instruction: None,
1160 context_window: 128_000,
1161 session_id,
1162 transport: None,
1163 compact_on_start: false,
1164 max_retry_delay_ms: None,
1165 auto_retry_enabled: true,
1166 auto_retry_max_attempts: 3,
1167 auto_retry_base_delay_ms: 1000,
1168 api_key: None,
1169 workspace_dir: None,
1170 provider_options: None,
1171 on_compaction: None,
1172 snapshot_store: None,
1173 memory: None,
1174 url_resolver: None,
1175 todo: None,
1176 agent_pool: None,
1177 lsp: None,
1178 ttsr_engine: None,
1179 subagent_runner: None,
1180 subagent_depth: 0,
1181 max_tool_result_bytes: None,
1182 };
1183 AgentLoop::new_with_resolver(
1184 Arc::new(NopProvider),
1185 config,
1186 Arc::new(ToolRegistry::new()),
1187 SharedState::new(),
1188 Arc::new(NullResolver),
1189 )
1190 }
1191
1192 #[test]
1198 fn tool_context_inherits_session_id_when_set() {
1199 let loop_ = loop_with(Some("proc-test-session-id".to_string()));
1200 let ctx = loop_.build_tool_context();
1201 assert_eq!(
1202 ctx.session_id.as_deref(),
1203 Some("proc-test-session-id"),
1204 "ToolContext.session_id must inherit AgentConfig.session_id"
1205 );
1206 }
1207
1208 #[test]
1209 fn tool_context_session_id_defaults_to_none() {
1210 let loop_ = loop_with(None);
1211 let ctx = loop_.build_tool_context();
1212 assert!(
1213 ctx.session_id.is_none(),
1214 "default ToolContext.session_id should be None"
1215 );
1216 }
1217}
1218
1219#[cfg(test)]
1222mod truncation_tests {
1223 use super::*;
1224 use crate::agent::ProviderResolver;
1225 use oxi_ai::{
1226 ContentBlock, Context, Model, Provider, ProviderError, StreamOptions, StreamResult,
1227 TextContent, ToolResultMessage,
1228 };
1229 use std::future::Future;
1230 use std::pin::Pin;
1231
1232 struct NopProvider;
1233 impl Provider for NopProvider {
1234 fn stream<'a>(
1235 &'a self,
1236 _model: &'a Model,
1237 _context: &'a Context,
1238 _options: Option<StreamOptions>,
1239 ) -> Pin<Box<dyn Future<Output = StreamResult> + Send + 'a>> {
1240 Box::pin(async {
1241 Err(ProviderError::NotImplemented(
1242 "truncation tests never stream".to_string(),
1243 ))
1244 })
1245 }
1246 fn name(&self) -> &str {
1247 "nop"
1248 }
1249 }
1250
1251 struct NullResolver;
1252 impl ProviderResolver for NullResolver {
1253 fn resolve_provider(&self, _name: &str) -> Option<Arc<dyn Provider>> {
1254 None
1255 }
1256 fn resolve_model(&self, _model_id: &str) -> Option<Model> {
1257 None
1258 }
1259 }
1260
1261 fn make_result(text: &str) -> ToolResultMessage {
1262 ToolResultMessage::new(
1263 "tc_test".to_string(),
1264 "test_tool",
1265 vec![ContentBlock::Text(TextContent::new(text.to_string()))],
1266 )
1267 }
1268
1269 fn loop_with_limit(limit: Option<usize>) -> AgentLoop {
1270 let config = AgentLoopConfig {
1271 model_id: "test/model".to_string(),
1272 max_tool_result_bytes: limit,
1273 ..Default::default()
1274 };
1275 AgentLoop::new_with_resolver(
1276 Arc::new(NopProvider),
1277 config,
1278 Arc::new(ToolRegistry::new()),
1279 SharedState::new(),
1280 Arc::new(NullResolver),
1281 )
1282 }
1283
1284 #[test]
1285 fn truncate_passthrough_when_none() {
1286 let loop_ = loop_with_limit(None);
1287 let result = make_result(&"x".repeat(10_000));
1288 let truncated = loop_.maybe_truncate_tool_result(result);
1289 if let ContentBlock::Text(tc) = &truncated.content[0] {
1290 assert_eq!(tc.text.len(), 10_000);
1291 assert!(!tc.text.contains("truncated"));
1292 }
1293 }
1294
1295 #[test]
1296 fn truncate_passthrough_when_under_limit() {
1297 let loop_ = loop_with_limit(Some(1000));
1298 let result = make_result(&"x".repeat(500));
1299 let truncated = loop_.maybe_truncate_tool_result(result);
1300 if let ContentBlock::Text(tc) = &truncated.content[0] {
1301 assert_eq!(tc.text.len(), 500);
1302 assert!(!tc.text.contains("truncated"));
1303 }
1304 }
1305
1306 #[test]
1307 fn truncate_applies_when_over_limit() {
1308 let loop_ = loop_with_limit(Some(100));
1309 let result = make_result(&"x".repeat(500));
1310 let truncated = loop_.maybe_truncate_tool_result(result);
1311 if let ContentBlock::Text(tc) = &truncated.content[0] {
1312 assert!(
1313 tc.text.len() < 500,
1314 "text not truncated: {} bytes",
1315 tc.text.len()
1316 );
1317 assert!(tc.text.contains("truncated"), "missing truncation marker");
1318 assert!(tc.text.contains("400 bytes omitted"));
1319 }
1320 }
1321}