1#![allow(unused_doc_comments)]
2
3pub mod config;
11pub mod helpers;
13pub mod queues;
15pub mod retry;
17pub mod stream_outcome;
19pub mod streaming;
21pub mod tool_exec;
23pub mod ttsr;
25
26use crate::agent::ProviderResolver;
28use crate::compaction::{CompactedContext, CompactionEvent};
29use crate::events::AgentEvent;
30use crate::recovery::{CircuitBreaker, CircuitBreakerConfig};
31use crate::{state::SharedState, tools::ToolContext, tools::ToolRegistry};
32use anyhow::{Error, Result};
33pub use config::{AfterToolCallHook, AgentLoopConfig, BeforeToolCallHook, ToolExecutionMode};
34use oxi_ai::{
35 CompactionManager as OxCompactionManager, CompactionStrategy, ContentBlock, LlmCompactor,
36 Message, Provider, StopReason, TextContent, UserMessage, estimate_tokens,
37};
38use parking_lot::RwLock;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
41use std::time::Instant;
42
43use self::helpers::{sanitize_orphaned_tool_results, should_stop_after_turn};
44use self::queues::{
45 clear_all_queues, clear_follow_up_queue, clear_steering_queue, drain_follow_up_queue,
46 drain_steering_queue, try_push_follow_up, try_push_steering,
47};
48use self::retry::{
49 auto_retry_attempt_method, cancel_auto_retry, handle_retryable_error, is_retryable_error,
50};
51use self::streaming::stream_assistant_response;
52use self::tool_exec::execute_tool_calls;
53
54pub use self::stream_outcome::StreamOutcome;
55type EmitFn = Arc<dyn Fn(AgentEvent) + Send + Sync>;
56
57pub struct AgentLoop {
59 provider: Arc<dyn Provider>,
60 config: AgentLoopConfig,
61 tools: Arc<ToolRegistry>,
62 state: SharedState,
63 compaction_manager: OxCompactionManager,
64 before_tool_call: Option<BeforeToolCallHook>,
65 after_tool_call: Option<AfterToolCallHook>,
66 steering_queue: RwLock<Vec<Message>>,
67 follow_up_queue: RwLock<Vec<Message>>,
68 session_id: Option<String>,
69 auto_retry_attempt: AtomicUsize,
70 auto_retry_cancel: AtomicBool,
71 auto_retry_notify: tokio::sync::Notify,
73 circuit_breaker: CircuitBreaker,
74 external_stop: Arc<AtomicBool>,
77 cancel_signal: Option<Arc<AtomicBool>>,
81 resolver: Arc<dyn ProviderResolver>,
83 steering_hook: Option<Arc<dyn Fn() -> Vec<String> + Send + Sync>>,
86 follow_up_hook: Option<Arc<dyn Fn() -> Vec<String> + Send + Sync>>,
88 ttsr_engine: Option<Arc<ttsr::TtsrEngine>>,
90}
91
92impl AgentLoop {
93 pub fn new_with_resolver(
97 provider: Arc<dyn Provider>,
98 config: AgentLoopConfig,
99 tools: Arc<ToolRegistry>,
100 state: SharedState,
101 resolver: Arc<dyn ProviderResolver>,
102 ) -> Self {
103 let mut compaction_manager =
104 OxCompactionManager::new(config.compaction_strategy.clone(), config.context_window);
105
106 if config.compaction_strategy != CompactionStrategy::Disabled {
107 let model = resolver.resolve_model(&config.model_id);
108 if let Some(model) = model {
109 let llm_compactor =
110 Arc::new(LlmCompactor::new(model.clone(), Arc::clone(&provider)));
111 compaction_manager.set_compactor(llm_compactor);
112 }
113 }
114
115 Self {
116 provider,
117 config: config.clone(),
118 tools,
119 state,
120 compaction_manager,
121 before_tool_call: None,
122 after_tool_call: None,
123 steering_queue: RwLock::new(Vec::new()),
124 follow_up_queue: RwLock::new(Vec::new()),
125 session_id: config.session_id.clone(),
126 auto_retry_attempt: AtomicUsize::new(0),
127 auto_retry_cancel: AtomicBool::new(false),
128 auto_retry_notify: tokio::sync::Notify::new(),
129 circuit_breaker: CircuitBreaker::new(CircuitBreakerConfig::default()),
130 external_stop: Arc::new(AtomicBool::new(false)),
131 cancel_signal: None,
132 resolver,
133 steering_hook: None,
134 follow_up_hook: None,
135 ttsr_engine: config.ttsr_engine.clone(),
136 }
137 }
138
139 pub fn new(
141 provider: Arc<dyn Provider>,
142 config: AgentLoopConfig,
143 tools: Arc<ToolRegistry>,
144 state: SharedState,
145 ) -> Self {
146 use crate::agent::GlobalProviderResolver;
147 Self::new_with_resolver(
148 provider,
149 config,
150 tools,
151 state,
152 Arc::new(GlobalProviderResolver),
153 )
154 }
155
156 pub fn with_before_tool_call(mut self, hook: BeforeToolCallHook) -> Self {
159 self.before_tool_call = Some(hook);
160 self
161 }
162
163 pub fn with_after_tool_call(mut self, hook: AfterToolCallHook) -> Self {
166 self.after_tool_call = Some(hook);
167 self
168 }
169
170 pub fn steer(&self, message: Message) {
176 if !try_push_steering(self, message) {
177 tracing::warn!("Steering message dropped — queue at capacity");
178 }
179 }
180
181 pub fn follow_up(&self, message: Message) {
187 if !try_push_follow_up(self, message) {
188 tracing::warn!("Follow-up message dropped — queue at capacity");
189 }
190 }
191
192 pub fn clear_steering_queue(&self) {
195 clear_steering_queue(self);
196 }
197
198 pub fn clear_follow_up_queue(&self) {
201 clear_follow_up_queue(self);
202 }
203
204 pub fn clear_all_queues(&self) {
206 clear_all_queues(self);
207 }
208
209 fn drain_steering_queue(&self) -> Vec<Message> {
210 drain_steering_queue(self)
211 }
212
213 #[cfg_attr(test, allow(dead_code))]
216 pub(crate) fn build_tool_context(&self) -> ToolContext {
217 let workspace = self
218 .config
219 .workspace_dir
220 .clone()
221 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
222 ToolContext {
223 workspace_dir: workspace,
224 root_dir: self.config.workspace_dir.clone(),
225 session_id: self.session_id.clone(),
226 snapshot_store: self.config.snapshot_store.clone(),
227 memory: self.config.memory.clone(),
228 url_resolver: self.config.url_resolver.clone(),
229 todo: self.config.todo.clone(),
230 agent_pool: self.config.agent_pool.clone(),
231 lsp: self.config.lsp.clone(),
232 }
233 }
234
235 fn drain_follow_up_queue(&self) -> Vec<Message> {
236 drain_follow_up_queue(self)
237 }
238
239 pub fn cancel_auto_retry(&self) {
243 cancel_auto_retry(self);
244 }
245
246 pub fn auto_retry_attempt(&self) -> usize {
249 auto_retry_attempt_method(self)
250 }
251
252 pub fn state(&self) -> &SharedState {
255 &self.state
256 }
257
258 pub fn external_stop(&self) -> &Arc<AtomicBool> {
260 &self.external_stop
261 }
262
263 pub fn set_cancel_signal(&mut self, flag: Arc<AtomicBool>) {
268 self.cancel_signal = Some(flag);
269 }
270
271 pub fn cancel_signal(&self) -> Option<Arc<AtomicBool>> {
276 self.cancel_signal.as_ref().map(Arc::clone)
277 }
278
279 pub fn is_cancelled(&self) -> bool {
282 if self.external_stop.load(Ordering::SeqCst) {
283 return true;
284 }
285 self.cancel_signal
286 .as_ref()
287 .is_some_and(|f| f.load(Ordering::SeqCst))
288 }
289 pub fn cancel(&self) {
294 self.external_stop.store(true, Ordering::SeqCst);
295 }
296
297 pub fn set_steering_hook(&mut self, hook: Arc<dyn Fn() -> Vec<String> + Send + Sync>) {
300 self.steering_hook = Some(hook);
301 }
302
303 pub fn set_follow_up_hook(&mut self, hook: Arc<dyn Fn() -> Vec<String> + Send + Sync>) {
306 self.follow_up_hook = Some(hook);
307 }
308
309 fn poll_external_queues(&self) {
312 if let Some(ref hook) = self.steering_hook {
313 for msg_text in hook() {
314 self.steer(Message::User(UserMessage::new(msg_text)));
315 }
316 }
317 if let Some(ref hook) = self.follow_up_hook {
318 for msg_text in hook() {
319 self.follow_up(Message::User(UserMessage::new(msg_text)));
320 }
321 }
322 }
323
324 pub async fn run(
327 &self,
328 prompt: String,
329 emit: impl Fn(AgentEvent) + Send + Sync + 'static,
330 ) -> Result<Vec<AgentEvent>> {
331 let message = Message::User(UserMessage::new(prompt));
332 let emit = Arc::new(emit);
333 self.run_messages(vec![message], emit).await
334 }
335
336 pub async fn run_mut<S: Send + std::fmt::Debug + 'static>(
367 &self,
368 prompt: String,
369 state: S,
370 emit: impl FnMut(AgentEvent, &mut S) + Send + 'static,
371 ) -> Result<(Vec<AgentEvent>, S)> {
372 let emit_fnmut = Arc::new(parking_lot::Mutex::new(emit));
373 let state_arc = Arc::new(parking_lot::Mutex::new(state));
374
375 let state_for_closure = Arc::clone(&state_arc);
377
378 let emit_fn: EmitFn = Arc::new(move |event: AgentEvent| {
379 let mut cb = emit_fnmut.lock();
380 let mut s = state_for_closure.lock();
381 cb(event, &mut s);
382 });
383
384 let events = self.run_inner(prompt, emit_fn).await?;
385
386 let mutex = Arc::try_unwrap(state_arc)
390 .expect("run_mut: state Arc still has multiple owners after run");
391 Ok((events, mutex.into_inner()))
392 }
393
394 async fn run_inner(&self, prompt: String, emit: EmitFn) -> Result<Vec<AgentEvent>> {
396 let message = Message::User(UserMessage::new(prompt));
397 self.run_messages(vec![message], emit).await
398 }
399
400 pub async fn run_messages(
403 &self,
404 prompts: Vec<Message>,
405 emit: EmitFn,
406 ) -> Result<Vec<AgentEvent>> {
407 let mut all_events = Vec::new();
408
409 let state_messages = self.state.get_state().messages.clone();
410 let mut all_messages = state_messages;
411 all_messages.extend(prompts.clone());
412
413 tracing::info!(session_id = ?self.session_id, "AgentLoop starting");
414 emit(AgentEvent::AgentStart {
415 prompts: prompts.clone(),
416 session_id: self.session_id.clone(),
417 });
418 all_events.push(AgentEvent::AgentStart {
419 prompts: prompts.clone(),
420 session_id: self.session_id.clone(),
421 });
422
423 let (result_messages, events) = self.run_loop(prompts, emit.clone()).await?;
424
425 all_events.extend(events);
426
427 let stop_reason = result_messages.last().and_then(|m| {
428 if let Message::Assistant(a) = m {
429 Some(format!("{:?}", a.stop_reason))
430 } else {
431 None
432 }
433 });
434
435 tracing::info!(session_id = ?self.session_id, "AgentLoop run_messages complete");
436
437 self.state.update(|s| {
439 s.replace_messages(result_messages.clone());
440 });
441
442 emit(AgentEvent::AgentEnd {
443 messages: result_messages.clone(),
444 stop_reason: stop_reason.clone(),
445 session_id: self.session_id.clone(),
446 });
447 all_events.push(AgentEvent::AgentEnd {
448 messages: result_messages.clone(),
449 stop_reason,
450 session_id: self.session_id.clone(),
451 });
452
453 Ok(all_events)
454 }
455
456 pub async fn continue_loop(
459 &self,
460 emit: impl Fn(AgentEvent) + Send + Sync + 'static,
461 ) -> Result<Vec<AgentEvent>> {
462 let emit = Arc::new(emit);
463 let mut all_events = Vec::new();
464
465 tracing::info!(session_id = ?self.session_id, "AgentLoop continuing");
466 emit(AgentEvent::AgentStart {
467 prompts: vec![],
468 session_id: self.session_id.clone(),
469 });
470 all_events.push(AgentEvent::AgentStart {
471 prompts: vec![],
472 session_id: self.session_id.clone(),
473 });
474
475 let (result_messages, events) = self.run_loop(vec![], emit.clone()).await?;
476
477 all_events.extend(events);
478
479 let stop_reason = result_messages.last().and_then(|m| {
480 if let Message::Assistant(a) = m {
481 Some(format!("{:?}", a.stop_reason))
482 } else {
483 None
484 }
485 });
486
487 tracing::info!(session_id = ?self.session_id, "AgentLoop continue_loop complete");
488 emit(AgentEvent::AgentEnd {
489 messages: result_messages.clone(),
490 stop_reason: stop_reason.clone(),
491 session_id: self.session_id.clone(),
492 });
493 all_events.push(AgentEvent::AgentEnd {
494 messages: result_messages.clone(),
495 stop_reason,
496 session_id: self.session_id.clone(),
497 });
498
499 Ok(all_events)
500 }
501
502 fn process_steering_messages(
504 &self,
505 pending_messages: &mut Vec<Message>,
506 messages: &mut Vec<Message>,
507 new_messages: &mut Vec<Message>,
508 events: &mut Vec<AgentEvent>,
509 emit: &EmitFn,
510 ) {
511 if pending_messages.is_empty() {
512 return;
513 }
514 for message in pending_messages.drain(..) {
515 emit(AgentEvent::SteeringMessage {
516 message: message.clone(),
517 });
518 emit(AgentEvent::MessageStart {
519 message: message.clone(),
520 });
521 emit(AgentEvent::MessageEnd {
522 message: message.clone(),
523 });
524 events.push(AgentEvent::SteeringMessage {
525 message: message.clone(),
526 });
527 events.push(AgentEvent::MessageStart {
528 message: message.clone(),
529 });
530 events.push(AgentEvent::MessageEnd {
531 message: message.clone(),
532 });
533 messages.push(message.clone());
534 new_messages.push(message);
535 }
536 }
537
538 async fn handle_streaming_error(
540 &self,
541 e: anyhow::Error,
542 messages: &mut Vec<Message>,
543 new_messages: &mut Vec<Message>,
544 events: &mut Vec<AgentEvent>,
545 emit: &EmitFn,
546 turn_number: u32,
547 ) -> (Vec<Message>, Vec<AgentEvent>) {
548 let err_msg = format!("{}", e);
549 tracing::error!(session_id = ?self.session_id, "Unexpected streaming error: {}", err_msg);
550
551 let mut error_asst = oxi_ai::AssistantMessage::new(
552 oxi_ai::Api::OpenAiCompletions,
553 "agent",
554 &self.config.model_id,
555 );
556 error_asst.stop_reason = StopReason::Error;
557 error_asst
558 .content
559 .push(ContentBlock::Text(TextContent::new(format!(
560 "⚠ {}",
561 err_msg
562 ))));
563
564 new_messages.push(Message::Assistant(error_asst.clone()));
565 messages.push(Message::Assistant(error_asst.clone()));
566
567 emit(AgentEvent::MessageStart {
568 message: Message::Assistant(error_asst.clone()),
569 });
570 emit(AgentEvent::MessageEnd {
571 message: Message::Assistant(error_asst.clone()),
572 });
573 emit(AgentEvent::Error {
574 message: err_msg.clone(),
575 session_id: self.session_id.clone(),
576 });
577
578 emit(AgentEvent::TurnEnd {
579 turn_number,
580 assistant_message: Message::Assistant(error_asst.clone()),
581 tool_results: vec![],
582 });
583 events.push(AgentEvent::TurnEnd {
584 turn_number,
585 assistant_message: Message::Assistant(error_asst),
586 tool_results: vec![],
587 });
588 (messages.clone(), events.clone())
590 }
591
592 async fn run_loop(
593 &self,
594 initial_prompts: Vec<Message>,
595 emit: EmitFn,
596 ) -> Result<(Vec<Message>, Vec<AgentEvent>)> {
597 tracing::info!("[AGENT-LOOP] run_loop started");
598 let mut messages = self.state.get_state().messages.clone();
599 messages.extend(initial_prompts.clone());
600
601 let mut new_messages: Vec<Message> = initial_prompts;
602 let mut events = Vec::new();
603 let mut turn_number: u32 = 0;
604 let mut first_turn = true;
605
606 let mut pending_messages: Vec<Message> = self.drain_steering_queue();
607
608 loop {
609 tracing::info!(
610 "[AGENT-LOOP] Top of loop, has_more_tool_calls={}, pending_messages={}",
611 true,
612 pending_messages.is_empty()
613 );
614 let mut has_more_tool_calls = true;
615
616 while has_more_tool_calls || !pending_messages.is_empty() {
617 if !first_turn {
618 turn_number += 1;
619 emit(AgentEvent::TurnStart { turn_number });
620 events.push(AgentEvent::TurnStart { turn_number });
621 } else {
622 first_turn = false;
623 turn_number = 1;
624 emit(AgentEvent::TurnStart { turn_number });
625 events.push(AgentEvent::TurnStart { turn_number });
626 }
627
628 if !pending_messages.is_empty() {
629 self.process_steering_messages(
630 &mut pending_messages,
631 &mut messages,
632 &mut new_messages,
633 &mut events,
634 &emit,
635 );
636 }
637
638 self.poll_external_queues();
641
642 self.maybe_compact(&mut messages, turn_number as usize, &emit)
643 .await;
644
645 tracing::info!("[AGENT-LOOP] About to call stream_assistant_response");
646 let ttsr = self.ttsr_engine.as_deref();
647 let outcome = stream_assistant_response(self, &mut messages, &emit, ttsr).await;
648
649 let assistant_message = match outcome {
650 StreamOutcome::Complete(msg) => msg,
651 StreamOutcome::Error {
652 message: _message,
653 detail,
654 } => {
655 let is_tool_ordering_error = detail.contains("tool")
658 && (detail.contains("must be a response")
659 || detail.contains("preceding")
660 || detail.contains("tool_calls"));
661
662 if is_tool_ordering_error {
663 let removed = sanitize_orphaned_tool_results(&mut messages);
664 tracing::warn!(
665 session_id = ?self.session_id,
666 removed,
667 detail = %detail,
668 "Message-ordering error detected, removed orphaned tool results, retrying"
669 );
670 if removed > 0 {
671 emit(AgentEvent::Error {
673 message: format!(
674 "⚠ Provider rejected message order: {}. Removed {} orphaned tool results, retrying…",
675 detail, removed
676 ),
677 session_id: self.session_id.clone(),
678 });
679 continue; }
681 }
682
683 return Ok(self
685 .handle_streaming_error(
686 anyhow::anyhow!("Provider stream error: {}", detail),
687 &mut messages,
688 &mut new_messages,
689 &mut events,
690 &emit,
691 turn_number,
692 )
693 .await);
694 }
695 StreamOutcome::Cancelled(msg) => {
696 emit(AgentEvent::TurnEnd {
697 turn_number,
698 assistant_message: Message::Assistant(msg.clone()),
699 tool_results: vec![],
700 });
701 return Ok((messages, events));
702 }
703 StreamOutcome::RuleInterrupt { partial, rule } => {
704 tracing::info!("RuleInterrupt: '{}' violated, retrying", rule.name);
705 emit(AgentEvent::TtsrInterrupt {
706 rule_name: rule.name.clone(),
707 session_id: self.session_id.clone(),
708 });
709 messages.push(Message::Assistant(partial));
710 let interrupt_body = format!(
711 "<system-interrupt reason=\"rule_violation\" rule=\"{name}\">\n\
712 Your output was interrupted because it violated a project rule.\n\
713 Comply with: {content}\n</system-interrupt>",
714 name = rule.name,
715 content = rule.content
716 );
717 messages.push(Message::user(interrupt_body));
718 continue;
719 }
720 };
721
722 new_messages.push(Message::Assistant(assistant_message.clone()));
723
724 if matches!(assistant_message.stop_reason, StopReason::Error) {
725 if is_retryable_error(&assistant_message) {
726 let did_retry =
727 handle_retryable_error(self, &assistant_message, &mut messages, &emit)
728 .await;
729 if did_retry {
730 emit(AgentEvent::TurnEnd {
731 turn_number,
732 assistant_message: Message::Assistant(assistant_message.clone()),
733 tool_results: vec![],
734 });
735 events.push(AgentEvent::TurnEnd {
736 turn_number,
737 assistant_message: Message::Assistant(assistant_message.clone()),
738 tool_results: vec![],
739 });
740 has_more_tool_calls = true;
741 continue;
742 }
743 }
744
745 emit(AgentEvent::TurnEnd {
746 turn_number,
747 assistant_message: Message::Assistant(assistant_message.clone()),
748 tool_results: vec![],
749 });
750 events.push(AgentEvent::TurnEnd {
751 turn_number,
752 assistant_message: Message::Assistant(assistant_message.clone()),
753 tool_results: vec![],
754 });
755 return Ok((messages, events));
756 }
757 if matches!(assistant_message.stop_reason, StopReason::Aborted) {
758 if self.auto_retry_attempt.load(Ordering::Relaxed) > 0 {
759 emit(AgentEvent::AutoRetryEnd {
760 success: true,
761 attempt: self.auto_retry_attempt.load(Ordering::Relaxed),
762 final_error: None,
763 });
764 self.auto_retry_attempt.store(0, Ordering::Relaxed);
765 }
766
767 emit(AgentEvent::TurnEnd {
768 turn_number,
769 assistant_message: Message::Assistant(assistant_message.clone()),
770 tool_results: vec![],
771 });
772 events.push(AgentEvent::TurnEnd {
773 turn_number,
774 assistant_message: Message::Assistant(assistant_message.clone()),
775 tool_results: vec![],
776 });
777 return Ok((messages, events));
778 }
779
780 if self.auto_retry_attempt.load(Ordering::Relaxed) > 0 {
781 emit(AgentEvent::AutoRetryEnd {
782 success: true,
783 attempt: self.auto_retry_attempt.load(Ordering::Relaxed),
784 final_error: None,
785 });
786 self.auto_retry_attempt.store(0, Ordering::Relaxed);
787 }
788
789 let tool_calls = helpers::extract_tool_calls(&assistant_message);
790 tracing::info!(
791 "[AGENT-LOOP] extract_tool_calls found {} calls, stop_reason={:?}",
792 tool_calls.len(),
793 assistant_message.stop_reason
794 );
795
796 let mut tool_results: Vec<oxi_ai::ToolResultMessage> = Vec::new();
797 has_more_tool_calls = false;
798
799 if !tool_calls.is_empty() {
800 tracing::info!("[AGENT-LOOP] Executing {} tool calls", tool_calls.len());
801 let ctx = self.build_tool_context();
802 let executed_batch = match execute_tool_calls(
803 self,
804 &mut messages,
805 &assistant_message,
806 tool_calls,
807 &emit,
808 &ctx,
809 )
810 .await
811 {
812 Ok(batch) => batch,
813 Err(e) => {
814 tracing::error!(session_id = ?self.session_id, "Tool execution error: {}", e);
817 emit(AgentEvent::Error {
818 message: format!("Tool execution error: {}", e),
819 session_id: self.session_id.clone(),
820 });
821 emit(AgentEvent::TurnEnd {
822 turn_number,
823 assistant_message: Message::Assistant(assistant_message.clone()),
824 tool_results: vec![],
825 });
826 events.push(AgentEvent::TurnEnd {
827 turn_number,
828 assistant_message: Message::Assistant(assistant_message.clone()),
829 tool_results: vec![],
830 });
831 return Ok((messages, events));
832 }
833 };
834
835 tool_results = executed_batch.messages;
836 has_more_tool_calls = !executed_batch.terminate;
837
838 if executed_batch.terminate {
839 tracing::warn!(
840 session_id = ?self.session_id,
841 "Tool batch terminated early (terminate flag set by after_tool_call hook). \
842 This halts the tool-calling loop. If this is unexpected, \
843 check after_tool_call hooks for unintended terminate: true."
844 );
845 }
846
847 for result in &tool_results {
848 messages.push(Message::ToolResult(result.clone()));
849 new_messages.push(Message::ToolResult(result.clone()));
850 }
851 }
852
853 emit(AgentEvent::TurnEnd {
854 turn_number,
855 assistant_message: Message::Assistant(assistant_message.clone()),
856 tool_results: tool_results.clone(),
857 });
858 events.push(AgentEvent::TurnEnd {
859 turn_number,
860 assistant_message: Message::Assistant(assistant_message.clone()),
861 tool_results: tool_results.clone(),
862 });
863
864 if should_stop_after_turn(&self.external_stop) {
865 tracing::info!("[AGENT-LOOP] external_stop, ending loop");
866 return Ok((messages, events));
867 }
868
869 pending_messages = self.drain_steering_queue();
870 tracing::info!(
871 "[AGENT-LOOP] TurnEnd complete, pending_messages={}, has_more_tool_calls={}",
872 !pending_messages.is_empty(),
873 has_more_tool_calls
874 );
875
876 if self.external_stop.load(Ordering::SeqCst) {
879 tracing::info!(
880 "[AGENT-LOOP] external_stop set after steering drain, ending loop"
881 );
882 return Ok((messages, events));
883 }
884 }
885
886 let late_steering = self.drain_steering_queue();
890 if !late_steering.is_empty() {
891 tracing::info!(
892 count = late_steering.len(),
893 "[AGENT-LOOP] Caught late steering messages after inner loop exit"
894 );
895 pending_messages = late_steering;
896 continue;
897 }
898
899 let follow_up_messages = self.drain_follow_up_queue();
900 if !follow_up_messages.is_empty() {
901 pending_messages = follow_up_messages;
902 continue;
903 }
904
905 let final_steering = self.drain_steering_queue();
908 if !final_steering.is_empty() {
909 pending_messages = final_steering;
910 continue;
911 }
912
913 break;
914 }
915
916 Ok((messages, events))
917 }
918
919 fn build_compaction_instruction(&self) -> Option<String> {
922 let base = self.config.compaction_instruction.as_deref();
923 let injected = self
924 .ttsr_engine
925 .as_ref()
926 .map(|e| e.injected_records())
927 .unwrap_or_default();
928 if injected.is_empty() {
929 return base.map(|s| s.to_string());
930 }
931 let mut instr = base.map(|s| s.to_string()).unwrap_or_default();
932 instr.push_str("\n\nThe following rules have already been enforced in this session and corrections applied. Do NOT violate them again:");
933 for (name, _turn) in &injected {
934 instr.push_str(&format!("\n- {name}"));
935 }
936 Some(instr)
937 }
938
939 async fn maybe_compact(&self, messages: &mut Vec<Message>, iteration: usize, emit: &EmitFn) {
940 let context_text = serde_json::to_string(&*messages).unwrap_or_default();
941 let context_tokens = estimate_tokens(&context_text);
942
943 if !self
944 .compaction_manager
945 .should_compact(context_tokens, iteration)
946 {
947 return;
948 }
949
950 emit(AgentEvent::Compaction {
951 event: CompactionEvent::Triggered {
952 context_tokens,
953 iteration,
954 },
955 });
956
957 let messages_to_compact: Vec<Message> = messages.to_vec();
958 let instruction = self.build_compaction_instruction();
959
960 match self
961 .compaction_manager
962 .compact_if_needed(
963 &messages_to_compact,
964 instruction.as_deref(),
965 context_tokens,
966 iteration,
967 )
968 .await
969 {
970 Ok(Some(compacted)) => {
971 let start = Instant::now();
972 let message_count = compacted.compacted_count;
973
974 emit(AgentEvent::Compaction {
975 event: CompactionEvent::Started { message_count },
976 });
977
978 let kept_messages = compacted.kept_messages;
979 let summary = compacted.summary;
980 let compacted_count = compacted.compacted_count;
981
982 *messages = kept_messages;
983
984 let state_msgs = messages.clone();
985 self.state.update(|s| {
986 s.replace_messages(state_msgs);
987 });
988
989 let compacted_ctx = CompactedContext {
990 summary,
991 kept_messages: Vec::new(),
992 compacted_count,
993 };
994 emit(AgentEvent::Compaction {
995 event: CompactionEvent::Completed {
996 result: compacted_ctx.clone(),
997 duration_ms: start.elapsed().as_millis() as u64,
998 },
999 });
1000
1001 if let Some(ref hook) = self.config.on_compaction {
1003 match hook(compacted_ctx).await {
1004 Ok(()) => {
1005 tracing::debug!("Compaction hook completed successfully");
1006 }
1007 Err(e) => {
1008 tracing::warn!(error = %e, "Compaction hook failed");
1009 }
1010 }
1011 }
1012 }
1013 Ok(None) => {}
1014 Err(e) => {
1015 emit(AgentEvent::Compaction {
1016 event: CompactionEvent::Failed {
1017 error: e.to_string(),
1018 },
1019 });
1020 }
1021 }
1022 }
1023
1024 fn resolve_model(&self) -> Result<oxi_ai::Model> {
1025 self.resolver
1026 .resolve_model(&self.config.model_id)
1027 .ok_or_else(|| Error::msg(format!("Model not found: {}", self.config.model_id)))
1028 }
1029}
1030
1031#[cfg(test)]
1032mod session_id_wiring_tests {
1033 use super::*;
1038 use crate::ProviderResolver;
1039 use crate::agent_loop::config::AgentLoopConfig;
1040 use crate::config::ToolExecutionMode;
1041 use crate::state::SharedState;
1042 use crate::tools::ToolRegistry;
1043 use oxi_ai::{
1044 CompactionStrategy, Context, Model, Provider, ProviderError, StreamOptions, StreamResult,
1045 };
1046 use std::future::Future;
1047 use std::pin::Pin;
1048
1049 struct NopProvider;
1050 impl Provider for NopProvider {
1051 fn stream<'a>(
1052 &'a self,
1053 _model: &'a Model,
1054 _context: &'a Context,
1055 _options: Option<StreamOptions>,
1056 ) -> Pin<Box<dyn Future<Output = StreamResult> + Send + 'a>> {
1057 Box::pin(async {
1058 Err(ProviderError::NotImplemented(
1059 "session-id wiring tests never stream".to_string(),
1060 ))
1061 })
1062 }
1063 fn name(&self) -> &str {
1064 "nop"
1065 }
1066 }
1067
1068 struct NullResolver;
1069 impl ProviderResolver for NullResolver {
1070 fn resolve_provider(&self, _name: &str) -> Option<Arc<dyn Provider>> {
1071 None
1072 }
1073 fn resolve_model(&self, _model_id: &str) -> Option<Model> {
1074 None
1075 }
1076 }
1077
1078 fn loop_with(session_id: Option<String>) -> AgentLoop {
1079 let config = AgentLoopConfig {
1080 model_id: "test/model".to_string(),
1081 system_prompt: None,
1082 temperature: 1.0,
1083 max_tokens: 4096,
1084 tool_execution: ToolExecutionMode::Sequential,
1085 compaction_strategy: CompactionStrategy::Disabled,
1086 compaction_instruction: None,
1087 context_window: 128_000,
1088 session_id,
1089 transport: None,
1090 compact_on_start: false,
1091 max_retry_delay_ms: None,
1092 auto_retry_enabled: true,
1093 auto_retry_max_attempts: 3,
1094 auto_retry_base_delay_ms: 1000,
1095 api_key: None,
1096 workspace_dir: None,
1097 provider_options: None,
1098 on_compaction: None,
1099 snapshot_store: None,
1100 memory: None,
1101 url_resolver: None,
1102 todo: None,
1103 agent_pool: None,
1104 lsp: None,
1105 ttsr_engine: None,
1106 };
1107 AgentLoop::new_with_resolver(
1108 Arc::new(NopProvider),
1109 config,
1110 Arc::new(ToolRegistry::new()),
1111 SharedState::new(),
1112 Arc::new(NullResolver),
1113 )
1114 }
1115
1116 #[test]
1122 fn tool_context_inherits_session_id_when_set() {
1123 let loop_ = loop_with(Some("proc-test-session-id".to_string()));
1124 let ctx = loop_.build_tool_context();
1125 assert_eq!(
1126 ctx.session_id.as_deref(),
1127 Some("proc-test-session-id"),
1128 "ToolContext.session_id must inherit AgentConfig.session_id"
1129 );
1130 }
1131
1132 #[test]
1133 fn tool_context_session_id_defaults_to_none() {
1134 let loop_ = loop_with(None);
1135 let ctx = loop_.build_tool_context();
1136 assert!(
1137 ctx.session_id.is_none(),
1138 "default ToolContext.session_id should be None"
1139 );
1140 }
1141}