Skip to main content

oxi_agent/agent_loop/
mod.rs

1#![allow(unused_doc_comments)]
2
3//! Agent loop — the main request/response cycle driver.
4//!
5//! Coordinates the interaction between the agent, provider, tools, and
6//! state management. Handles streaming, tool execution, retry logic,
7//! and compaction events.
8
9/// Agent-loop configuration.
10pub mod config;
11/// Miscellaneous helper functions.
12pub mod helpers;
13/// Internal message/event queues.
14pub mod queues;
15/// Retry logic for the agent loop.
16pub mod retry;
17/// Stream outcome types for TTSR integration.
18pub mod stream_outcome;
19/// Streaming response handling.
20pub mod streaming;
21/// Tool execution strategies.
22pub mod tool_exec;
23/// Time-Traveling Stream Rules engine.
24pub mod ttsr;
25
26// Re-export for sibling module access
27use crate::agent::ProviderResolver;
28use crate::compaction::{CompactedContext, CompactionEvent};
29use crate::events::AgentEvent;
30use crate::recovery::{CircuitBreaker, CircuitBreakerConfig};
31use crate::{state::SharedState, tools::ToolContext, tools::ToolRegistry};
32use anyhow::{Error, Result};
33pub use config::{AfterToolCallHook, AgentLoopConfig, BeforeToolCallHook, ToolExecutionMode};
34use oxi_ai::{
35    CompactionManager as OxCompactionManager, CompactionStrategy, ContentBlock, LlmCompactor,
36    Message, Provider, StopReason, TextContent, UserMessage, estimate_tokens,
37};
38use parking_lot::RwLock;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
41use std::time::Instant;
42
43use self::helpers::{sanitize_orphaned_tool_results, should_stop_after_turn};
44use self::queues::{
45    clear_all_queues, clear_follow_up_queue, clear_steering_queue, drain_follow_up_queue,
46    drain_steering_queue, try_push_follow_up, try_push_steering,
47};
48use self::retry::{
49    auto_retry_attempt_method, cancel_auto_retry, handle_retryable_error, is_retryable_error,
50};
51use self::streaming::stream_assistant_response;
52use self::tool_exec::execute_tool_calls;
53
54pub use self::stream_outcome::StreamOutcome;
55type EmitFn = Arc<dyn Fn(AgentEvent) + Send + Sync>;
56
57/// AgentLoop.
58pub struct AgentLoop {
59    provider: Arc<dyn Provider>,
60    config: AgentLoopConfig,
61    tools: Arc<ToolRegistry>,
62    state: SharedState,
63    compaction_manager: OxCompactionManager,
64    before_tool_call: Option<BeforeToolCallHook>,
65    after_tool_call: Option<AfterToolCallHook>,
66    steering_queue: RwLock<Vec<Message>>,
67    follow_up_queue: RwLock<Vec<Message>>,
68    session_id: Option<String>,
69    auto_retry_attempt: AtomicUsize,
70    auto_retry_cancel: AtomicBool,
71    /// Notify used to wake up the auto-retry sleep immediately when cancelled.
72    auto_retry_notify: tokio::sync::Notify,
73    circuit_breaker: CircuitBreaker,
74    /// External stop flag — when set, should_stop_after_turn returns true.
75    /// Used by Agent to forward the should_stop_flag from AgentHooks.
76    external_stop: Arc<AtomicBool>,
77    /// Direct cancel signal shared with `Agent::cancel_flag`.
78    /// Set by `Agent::cancel()` and checked by the streaming loop's periodic
79    /// timer so cancellation is detected even when no stream events arrive.
80    cancel_signal: Option<Arc<AtomicBool>>,
81    /// Provider/model resolver for isolated model lookups.
82    resolver: Arc<dyn ProviderResolver>,
83    /// Steering hook from AgentHooks — polled each turn to drain new messages
84    /// from AgentSession's queue into AgentLoop's internal steering_queue.
85    steering_hook: Option<Arc<dyn Fn() -> Vec<String> + Send + Sync>>,
86    /// Follow-up hook from AgentHooks — same as steering but for follow-ups.
87    follow_up_hook: Option<Arc<dyn Fn() -> Vec<String> + Send + Sync>>,
88    /// TTSR engine for stream rule checking.
89    ttsr_engine: Option<Arc<ttsr::TtsrEngine>>,
90}
91
92impl AgentLoop {
93    /// Creates a new `AgentLoop` with an explicit provider resolver.
94    /// Use this when the model ID needs to be resolved to a provider+model pair
95    /// using custom logic (e.g., per-session routing).
96    pub fn new_with_resolver(
97        provider: Arc<dyn Provider>,
98        config: AgentLoopConfig,
99        tools: Arc<ToolRegistry>,
100        state: SharedState,
101        resolver: Arc<dyn ProviderResolver>,
102    ) -> Self {
103        let mut compaction_manager =
104            OxCompactionManager::new(config.compaction_strategy.clone(), config.context_window);
105
106        if config.compaction_strategy != CompactionStrategy::Disabled {
107            let model = resolver.resolve_model(&config.model_id);
108            if let Some(model) = model {
109                let llm_compactor =
110                    Arc::new(LlmCompactor::new(model.clone(), Arc::clone(&provider)));
111                compaction_manager.set_compactor(llm_compactor);
112            }
113        }
114
115        Self {
116            provider,
117            config: config.clone(),
118            tools,
119            state,
120            compaction_manager,
121            before_tool_call: None,
122            after_tool_call: None,
123            steering_queue: RwLock::new(Vec::new()),
124            follow_up_queue: RwLock::new(Vec::new()),
125            session_id: config.session_id.clone(),
126            auto_retry_attempt: AtomicUsize::new(0),
127            auto_retry_cancel: AtomicBool::new(false),
128            auto_retry_notify: tokio::sync::Notify::new(),
129            circuit_breaker: CircuitBreaker::new(CircuitBreakerConfig::default()),
130            external_stop: Arc::new(AtomicBool::new(false)),
131            cancel_signal: None,
132            resolver,
133            steering_hook: None,
134            follow_up_hook: None,
135            ttsr_engine: config.ttsr_engine.clone(),
136        }
137    }
138
139    /// Create a new AgentLoop using the global resolver (backward compat).
140    pub fn new(
141        provider: Arc<dyn Provider>,
142        config: AgentLoopConfig,
143        tools: Arc<ToolRegistry>,
144        state: SharedState,
145    ) -> Self {
146        use crate::agent::GlobalProviderResolver;
147        Self::new_with_resolver(
148            provider,
149            config,
150            tools,
151            state,
152            Arc::new(GlobalProviderResolver),
153        )
154    }
155
156    /// Registers a hook called before every tool execution.
157    /// The hook can inspect and modify tool arguments, or reject the call entirely.
158    pub fn with_before_tool_call(mut self, hook: BeforeToolCallHook) -> Self {
159        self.before_tool_call = Some(hook);
160        self
161    }
162
163    /// Registers a hook called after every tool execution.
164    /// The hook receives the tool name, arguments, and result.
165    pub fn with_after_tool_call(mut self, hook: AfterToolCallHook) -> Self {
166        self.after_tool_call = Some(hook);
167        self
168    }
169
170    /// Inject a steering message into the agent loop.
171    ///
172    /// Steering messages are processed at the start of each turn, before the
173    /// next LLM call. If the steering queue is at capacity (256 messages), the
174    /// message is dropped and a warning is logged.
175    pub fn steer(&self, message: Message) {
176        if !try_push_steering(self, message) {
177            tracing::warn!("Steering message dropped — queue at capacity");
178        }
179    }
180
181    /// Enqueue a follow-up message to continue the conversation after all
182    /// tool calls in the current batch are complete.
183    ///
184    /// If the follow-up queue is at capacity (64 messages), the message is
185    /// dropped and a warning is logged.
186    pub fn follow_up(&self, message: Message) {
187        if !try_push_follow_up(self, message) {
188            tracing::warn!("Follow-up message dropped — queue at capacity");
189        }
190    }
191
192    /// Removes all pending steering messages from the queue.
193    /// See [`steer()`](Self::steer) for an explanation of steering messages.
194    pub fn clear_steering_queue(&self) {
195        clear_steering_queue(self);
196    }
197
198    /// Removes all pending follow-up messages from the queue.
199    /// See [`follow_up()`](Self::follow_up) for an explanation of follow-up messages.
200    pub fn clear_follow_up_queue(&self) {
201        clear_follow_up_queue(self);
202    }
203
204    /// Removes all pending messages from both the steering and follow-up queues.
205    pub fn clear_all_queues(&self) {
206        clear_all_queues(self);
207    }
208
209    fn drain_steering_queue(&self) -> Vec<Message> {
210        drain_steering_queue(self)
211    }
212
213    /// Build a ToolContext from the agent loop config.
214    /// Uses workspace_dir from config if set, otherwise falls back to current directory.
215    #[cfg_attr(test, allow(dead_code))]
216    pub(crate) fn build_tool_context(&self) -> ToolContext {
217        let workspace = self
218            .config
219            .workspace_dir
220            .clone()
221            .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
222        ToolContext {
223            workspace_dir: workspace,
224            root_dir: self.config.workspace_dir.clone(),
225            session_id: self.session_id.clone(),
226            snapshot_store: self.config.snapshot_store.clone(),
227            memory: self.config.memory.clone(),
228            url_resolver: self.config.url_resolver.clone(),
229            todo: self.config.todo.clone(),
230            agent_pool: self.config.agent_pool.clone(),
231            lsp: self.config.lsp.clone(),
232        }
233    }
234
235    fn drain_follow_up_queue(&self) -> Vec<Message> {
236        drain_follow_up_queue(self)
237    }
238
239    /// Cancels any in-progress auto-retry countdown.
240    /// After calling this, the agent will not automatically retry
241    /// on the next turn.
242    pub fn cancel_auto_retry(&self) {
243        cancel_auto_retry(self);
244    }
245
246    /// Returns the current auto-retry attempt number (0-based).
247    /// Useful for displaying retry status in the UI.
248    pub fn auto_retry_attempt(&self) -> usize {
249        auto_retry_attempt_method(self)
250    }
251
252    /// Get a reference to the shared state.
253    /// Used by Agent to sync state after loop execution.
254    pub fn state(&self) -> &SharedState {
255        &self.state
256    }
257
258    /// Get the external stop flag.
259    pub fn external_stop(&self) -> &Arc<AtomicBool> {
260        &self.external_stop
261    }
262
263    /// Sets a shared cancel signal (typically `Agent::cancel_flag`).
264    /// The streaming loop checks this in its periodic wake-up timer,
265    /// ensuring cancellation is detected even when the provider stream
266    /// produces no events (e.g. waiting for first token).
267    pub fn set_cancel_signal(&mut self, flag: Arc<AtomicBool>) {
268        self.cancel_signal = Some(flag);
269    }
270
271    /// Returns a clone of the loop's cancel-signal flag, if one has been
272    /// installed via [`Self::set_cancel_signal`]. Used by tool execution
273    /// to bridge the loop's `AtomicBool` cancellation into the per-tool
274    /// `oneshot::Receiver` cancellation channel (audit finding F-8).
275    pub fn cancel_signal(&self) -> Option<Arc<AtomicBool>> {
276        self.cancel_signal.as_ref().map(Arc::clone)
277    }
278
279    /// Returns true if cancellation has been requested via either
280    /// `external_stop` or the direct `cancel_signal`.
281    pub fn is_cancelled(&self) -> bool {
282        if self.external_stop.load(Ordering::SeqCst) {
283            return true;
284        }
285        self.cancel_signal
286            .as_ref()
287            .is_some_and(|f| f.load(Ordering::SeqCst))
288    }
289    /// Request cancellation from outside the loop (e.g. Ctrl+C).
290    /// Sets the `external_stop` flag which causes the streaming loop
291    /// to abort on its next periodic check (~500ms) and the agent loop
292    /// to exit after the current turn.
293    pub fn cancel(&self) {
294        self.external_stop.store(true, Ordering::SeqCst);
295    }
296
297    /// Set the steering hook — called each turn to drain new messages
298    /// from the session's steering queue into the loop's internal queue.
299    pub fn set_steering_hook(&mut self, hook: Arc<dyn Fn() -> Vec<String> + Send + Sync>) {
300        self.steering_hook = Some(hook);
301    }
302
303    /// Set the follow-up hook — called each turn to drain new messages
304    /// from the session's follow-up queue into the loop's internal queue.
305    pub fn set_follow_up_hook(&mut self, hook: Arc<dyn Fn() -> Vec<String> + Send + Sync>) {
306        self.follow_up_hook = Some(hook);
307    }
308
309    /// Poll the steering/follow-up hooks and inject new messages
310    /// into the internal queues.
311    fn poll_external_queues(&self) {
312        if let Some(ref hook) = self.steering_hook {
313            for msg_text in hook() {
314                self.steer(Message::User(UserMessage::new(msg_text)));
315            }
316        }
317        if let Some(ref hook) = self.follow_up_hook {
318            for msg_text in hook() {
319                self.follow_up(Message::User(UserMessage::new(msg_text)));
320            }
321        }
322    }
323
324    /// Runs the agent loop with a single user prompt.
325    /// Convenience wrapper around [`run_messages()`](Self::run_messages).
326    pub async fn run(
327        &self,
328        prompt: String,
329        emit: impl Fn(AgentEvent) + Send + Sync + 'static,
330    ) -> Result<Vec<AgentEvent>> {
331        let message = Message::User(UserMessage::new(prompt));
332        let emit = Arc::new(emit);
333        self.run_messages(vec![message], emit).await
334    }
335
336    /// Run with an `FnMut` callback and mutable state — no `Arc<Mutex<>>` needed.
337    ///
338    /// Unlike [`run()`](Self::run), which takes `Fn`, this method accepts `FnMut`
339    /// and a user-provided state value `S`. The callback receives `&mut S` on each
340    /// event, so you can accumulate results without any locking overhead.
341    ///
342    /// Returns the collected events **and** the final state value.
343    ///
344    /// # Example
345    ///
346    /// ```ignore
347    /// #[derive(Default)]
348    /// struct MyState { steps: usize, output: String }
349    ///
350    /// let (events, state) = agent_loop.run_mut(
351    ///     "do something".into(),
352    ///     MyState::default(),
353    ///     |event, s| {
354    ///         match event {
355    ///             AgentEvent::ToolExecutionEnd { is_error: false, .. } => s.steps += 1,
356    ///             AgentEvent::AgentEnd { messages, .. } => {
357    ///                 if let Some(Message::Assistant(a)) = messages.last() {
358    ///                     s.output = a.text_content();
359    ///                 }
360    ///             }
361    ///             _ => {}
362    ///         }
363    ///     },
364    /// ).await?;
365    /// ```
366    pub async fn run_mut<S: Send + std::fmt::Debug + 'static>(
367        &self,
368        prompt: String,
369        state: S,
370        emit: impl FnMut(AgentEvent, &mut S) + Send + 'static,
371    ) -> Result<(Vec<AgentEvent>, S)> {
372        let emit_fnmut = Arc::new(parking_lot::Mutex::new(emit));
373        let state_arc = Arc::new(parking_lot::Mutex::new(state));
374
375        // Clone the Arc for the closure; the original stays for recovery after run.
376        let state_for_closure = Arc::clone(&state_arc);
377
378        let emit_fn: EmitFn = Arc::new(move |event: AgentEvent| {
379            let mut cb = emit_fnmut.lock();
380            let mut s = state_for_closure.lock();
381            cb(event, &mut s);
382        });
383
384        let events = self.run_inner(prompt, emit_fn).await?;
385
386        // Recover the state. After run_inner completes, emit_fn is dropped,
387        // releasing the last Arc clone. Arc::try_unwrap should succeed since
388        // only our `state_arc` reference remains.
389        let mutex = Arc::try_unwrap(state_arc)
390            .expect("run_mut: state Arc still has multiple owners after run");
391        Ok((events, mutex.into_inner()))
392    }
393
394    /// Internal: create the initial user message and delegate to run_messages.
395    async fn run_inner(&self, prompt: String, emit: EmitFn) -> Result<Vec<AgentEvent>> {
396        let message = Message::User(UserMessage::new(prompt));
397        self.run_messages(vec![message], emit).await
398    }
399
400    /// Runs the agent loop with a list of pre-constructed messages.
401    /// This is the primary entry point for executing agent turns.
402    pub async fn run_messages(
403        &self,
404        prompts: Vec<Message>,
405        emit: EmitFn,
406    ) -> Result<Vec<AgentEvent>> {
407        let mut all_events = Vec::new();
408
409        let state_messages = self.state.get_state().messages.clone();
410        let mut all_messages = state_messages;
411        all_messages.extend(prompts.clone());
412
413        tracing::info!(session_id = ?self.session_id, "AgentLoop starting");
414        emit(AgentEvent::AgentStart {
415            prompts: prompts.clone(),
416            session_id: self.session_id.clone(),
417        });
418        all_events.push(AgentEvent::AgentStart {
419            prompts: prompts.clone(),
420            session_id: self.session_id.clone(),
421        });
422
423        let (result_messages, events) = self.run_loop(prompts, emit.clone()).await?;
424
425        all_events.extend(events);
426
427        let stop_reason = result_messages.last().and_then(|m| {
428            if let Message::Assistant(a) = m {
429                Some(format!("{:?}", a.stop_reason))
430            } else {
431                None
432            }
433        });
434
435        tracing::info!(session_id = ?self.session_id, "AgentLoop run_messages complete");
436
437        // Sync messages back to shared state
438        self.state.update(|s| {
439            s.replace_messages(result_messages.clone());
440        });
441
442        emit(AgentEvent::AgentEnd {
443            messages: result_messages.clone(),
444            stop_reason: stop_reason.clone(),
445            session_id: self.session_id.clone(),
446        });
447        all_events.push(AgentEvent::AgentEnd {
448            messages: result_messages.clone(),
449            stop_reason,
450            session_id: self.session_id.clone(),
451        });
452
453        Ok(all_events)
454    }
455
456    /// Resumes the agent loop after a previous turn ended in a paused state
457    /// (e.g., waiting for user confirmation). Emits events to the provided callback.
458    pub async fn continue_loop(
459        &self,
460        emit: impl Fn(AgentEvent) + Send + Sync + 'static,
461    ) -> Result<Vec<AgentEvent>> {
462        let emit = Arc::new(emit);
463        let mut all_events = Vec::new();
464
465        tracing::info!(session_id = ?self.session_id, "AgentLoop continuing");
466        emit(AgentEvent::AgentStart {
467            prompts: vec![],
468            session_id: self.session_id.clone(),
469        });
470        all_events.push(AgentEvent::AgentStart {
471            prompts: vec![],
472            session_id: self.session_id.clone(),
473        });
474
475        let (result_messages, events) = self.run_loop(vec![], emit.clone()).await?;
476
477        all_events.extend(events);
478
479        let stop_reason = result_messages.last().and_then(|m| {
480            if let Message::Assistant(a) = m {
481                Some(format!("{:?}", a.stop_reason))
482            } else {
483                None
484            }
485        });
486
487        tracing::info!(session_id = ?self.session_id, "AgentLoop continue_loop complete");
488        emit(AgentEvent::AgentEnd {
489            messages: result_messages.clone(),
490            stop_reason: stop_reason.clone(),
491            session_id: self.session_id.clone(),
492        });
493        all_events.push(AgentEvent::AgentEnd {
494            messages: result_messages.clone(),
495            stop_reason,
496            session_id: self.session_id.clone(),
497        });
498
499        Ok(all_events)
500    }
501
502    /// Process pending steering messages, emitting events and appending to message history.
503    fn process_steering_messages(
504        &self,
505        pending_messages: &mut Vec<Message>,
506        messages: &mut Vec<Message>,
507        new_messages: &mut Vec<Message>,
508        events: &mut Vec<AgentEvent>,
509        emit: &EmitFn,
510    ) {
511        if pending_messages.is_empty() {
512            return;
513        }
514        for message in pending_messages.drain(..) {
515            emit(AgentEvent::SteeringMessage {
516                message: message.clone(),
517            });
518            emit(AgentEvent::MessageStart {
519                message: message.clone(),
520            });
521            emit(AgentEvent::MessageEnd {
522                message: message.clone(),
523            });
524            events.push(AgentEvent::SteeringMessage {
525                message: message.clone(),
526            });
527            events.push(AgentEvent::MessageStart {
528                message: message.clone(),
529            });
530            events.push(AgentEvent::MessageEnd {
531                message: message.clone(),
532            });
533            messages.push(message.clone());
534            new_messages.push(message);
535        }
536    }
537
538    /// Handle a streaming error by synthesizing an error message and completing the turn.
539    async fn handle_streaming_error(
540        &self,
541        e: anyhow::Error,
542        messages: &mut Vec<Message>,
543        new_messages: &mut Vec<Message>,
544        events: &mut Vec<AgentEvent>,
545        emit: &EmitFn,
546        turn_number: u32,
547    ) -> (Vec<Message>, Vec<AgentEvent>) {
548        let err_msg = format!("{}", e);
549        tracing::error!(session_id = ?self.session_id, "Unexpected streaming error: {}", err_msg);
550
551        let mut error_asst = oxi_ai::AssistantMessage::new(
552            oxi_ai::Api::OpenAiCompletions,
553            "agent",
554            &self.config.model_id,
555        );
556        error_asst.stop_reason = StopReason::Error;
557        error_asst
558            .content
559            .push(ContentBlock::Text(TextContent::new(format!(
560                "⚠ {}",
561                err_msg
562            ))));
563
564        new_messages.push(Message::Assistant(error_asst.clone()));
565        messages.push(Message::Assistant(error_asst.clone()));
566
567        emit(AgentEvent::MessageStart {
568            message: Message::Assistant(error_asst.clone()),
569        });
570        emit(AgentEvent::MessageEnd {
571            message: Message::Assistant(error_asst.clone()),
572        });
573        emit(AgentEvent::Error {
574            message: err_msg.clone(),
575            session_id: self.session_id.clone(),
576        });
577
578        emit(AgentEvent::TurnEnd {
579            turn_number,
580            assistant_message: Message::Assistant(error_asst.clone()),
581            tool_results: vec![],
582        });
583        events.push(AgentEvent::TurnEnd {
584            turn_number,
585            assistant_message: Message::Assistant(error_asst),
586            tool_results: vec![],
587        });
588        // Return Ok — lifecycle is complete
589        (messages.clone(), events.clone())
590    }
591
592    async fn run_loop(
593        &self,
594        initial_prompts: Vec<Message>,
595        emit: EmitFn,
596    ) -> Result<(Vec<Message>, Vec<AgentEvent>)> {
597        tracing::info!("[AGENT-LOOP] run_loop started");
598        let mut messages = self.state.get_state().messages.clone();
599        messages.extend(initial_prompts.clone());
600
601        let mut new_messages: Vec<Message> = initial_prompts;
602        let mut events = Vec::new();
603        let mut turn_number: u32 = 0;
604        let mut first_turn = true;
605
606        let mut pending_messages: Vec<Message> = self.drain_steering_queue();
607
608        loop {
609            tracing::info!(
610                "[AGENT-LOOP] Top of loop, has_more_tool_calls={}, pending_messages={}",
611                true,
612                pending_messages.is_empty()
613            );
614            let mut has_more_tool_calls = true;
615
616            while has_more_tool_calls || !pending_messages.is_empty() {
617                if !first_turn {
618                    turn_number += 1;
619                    emit(AgentEvent::TurnStart { turn_number });
620                    events.push(AgentEvent::TurnStart { turn_number });
621                } else {
622                    first_turn = false;
623                    turn_number = 1;
624                    emit(AgentEvent::TurnStart { turn_number });
625                    events.push(AgentEvent::TurnStart { turn_number });
626                }
627
628                if !pending_messages.is_empty() {
629                    self.process_steering_messages(
630                        &mut pending_messages,
631                        &mut messages,
632                        &mut new_messages,
633                        &mut events,
634                        &emit,
635                    );
636                }
637
638                // Poll external hooks each turn to drain new steering/follow-up
639                // messages injected since the last turn.
640                self.poll_external_queues();
641
642                self.maybe_compact(&mut messages, turn_number as usize, &emit)
643                    .await;
644
645                tracing::info!("[AGENT-LOOP] About to call stream_assistant_response");
646                let ttsr = self.ttsr_engine.as_deref();
647                let outcome = stream_assistant_response(self, &mut messages, &emit, ttsr).await;
648
649                let assistant_message = match outcome {
650                    StreamOutcome::Complete(msg) => msg,
651                    StreamOutcome::Error {
652                        message: _message,
653                        detail,
654                    } => {
655                        // Check for message-ordering errors that can be recovered
656                        // by removing orphaned tool results.
657                        let is_tool_ordering_error = detail.contains("tool")
658                            && (detail.contains("must be a response")
659                                || detail.contains("preceding")
660                                || detail.contains("tool_calls"));
661
662                        if is_tool_ordering_error {
663                            let removed = sanitize_orphaned_tool_results(&mut messages);
664                            tracing::warn!(
665                                session_id = ?self.session_id,
666                                removed,
667                                detail = %detail,
668                                "Message-ordering error detected, removed orphaned tool results, retrying"
669                            );
670                            if removed > 0 {
671                                // Don't push the error message to history; retry the turn.
672                                emit(AgentEvent::Error {
673                                    message: format!(
674                                        "⚠ Provider rejected message order: {}. Removed {} orphaned tool results, retrying…",
675                                        detail, removed
676                                    ),
677                                    session_id: self.session_id.clone(),
678                                });
679                                continue; // Retry the turn with sanitized messages
680                            }
681                        }
682
683                        // Unrecoverable — fall through to error handler.
684                        return Ok(self
685                            .handle_streaming_error(
686                                anyhow::anyhow!("Provider stream error: {}", detail),
687                                &mut messages,
688                                &mut new_messages,
689                                &mut events,
690                                &emit,
691                                turn_number,
692                            )
693                            .await);
694                    }
695                    StreamOutcome::Cancelled(msg) => {
696                        emit(AgentEvent::TurnEnd {
697                            turn_number,
698                            assistant_message: Message::Assistant(msg.clone()),
699                            tool_results: vec![],
700                        });
701                        return Ok((messages, events));
702                    }
703                    StreamOutcome::RuleInterrupt { partial, rule } => {
704                        tracing::info!("RuleInterrupt: '{}' violated, retrying", rule.name);
705                        emit(AgentEvent::TtsrInterrupt {
706                            rule_name: rule.name.clone(),
707                            session_id: self.session_id.clone(),
708                        });
709                        messages.push(Message::Assistant(partial));
710                        let interrupt_body = format!(
711                            "<system-interrupt reason=\"rule_violation\" rule=\"{name}\">\n\
712                             Your output was interrupted because it violated a project rule.\n\
713                             Comply with: {content}\n</system-interrupt>",
714                            name = rule.name,
715                            content = rule.content
716                        );
717                        messages.push(Message::user(interrupt_body));
718                        continue;
719                    }
720                };
721
722                new_messages.push(Message::Assistant(assistant_message.clone()));
723
724                if matches!(assistant_message.stop_reason, StopReason::Error) {
725                    if is_retryable_error(&assistant_message) {
726                        let did_retry =
727                            handle_retryable_error(self, &assistant_message, &mut messages, &emit)
728                                .await;
729                        if did_retry {
730                            emit(AgentEvent::TurnEnd {
731                                turn_number,
732                                assistant_message: Message::Assistant(assistant_message.clone()),
733                                tool_results: vec![],
734                            });
735                            events.push(AgentEvent::TurnEnd {
736                                turn_number,
737                                assistant_message: Message::Assistant(assistant_message.clone()),
738                                tool_results: vec![],
739                            });
740                            has_more_tool_calls = true;
741                            continue;
742                        }
743                    }
744
745                    emit(AgentEvent::TurnEnd {
746                        turn_number,
747                        assistant_message: Message::Assistant(assistant_message.clone()),
748                        tool_results: vec![],
749                    });
750                    events.push(AgentEvent::TurnEnd {
751                        turn_number,
752                        assistant_message: Message::Assistant(assistant_message.clone()),
753                        tool_results: vec![],
754                    });
755                    return Ok((messages, events));
756                }
757                if matches!(assistant_message.stop_reason, StopReason::Aborted) {
758                    if self.auto_retry_attempt.load(Ordering::Relaxed) > 0 {
759                        emit(AgentEvent::AutoRetryEnd {
760                            success: true,
761                            attempt: self.auto_retry_attempt.load(Ordering::Relaxed),
762                            final_error: None,
763                        });
764                        self.auto_retry_attempt.store(0, Ordering::Relaxed);
765                    }
766
767                    emit(AgentEvent::TurnEnd {
768                        turn_number,
769                        assistant_message: Message::Assistant(assistant_message.clone()),
770                        tool_results: vec![],
771                    });
772                    events.push(AgentEvent::TurnEnd {
773                        turn_number,
774                        assistant_message: Message::Assistant(assistant_message.clone()),
775                        tool_results: vec![],
776                    });
777                    return Ok((messages, events));
778                }
779
780                if self.auto_retry_attempt.load(Ordering::Relaxed) > 0 {
781                    emit(AgentEvent::AutoRetryEnd {
782                        success: true,
783                        attempt: self.auto_retry_attempt.load(Ordering::Relaxed),
784                        final_error: None,
785                    });
786                    self.auto_retry_attempt.store(0, Ordering::Relaxed);
787                }
788
789                let tool_calls = helpers::extract_tool_calls(&assistant_message);
790                tracing::info!(
791                    "[AGENT-LOOP] extract_tool_calls found {} calls, stop_reason={:?}",
792                    tool_calls.len(),
793                    assistant_message.stop_reason
794                );
795
796                let mut tool_results: Vec<oxi_ai::ToolResultMessage> = Vec::new();
797                has_more_tool_calls = false;
798
799                if !tool_calls.is_empty() {
800                    tracing::info!("[AGENT-LOOP] Executing {} tool calls", tool_calls.len());
801                    let ctx = self.build_tool_context();
802                    let executed_batch = match execute_tool_calls(
803                        self,
804                        &mut messages,
805                        &assistant_message,
806                        tool_calls,
807                        &emit,
808                        &ctx,
809                    )
810                    .await
811                    {
812                        Ok(batch) => batch,
813                        Err(e) => {
814                            // Tool execution failed — emit TurnEnd and return Ok.
815                            // The lifecycle must always complete.
816                            tracing::error!(session_id = ?self.session_id, "Tool execution error: {}", e);
817                            emit(AgentEvent::Error {
818                                message: format!("Tool execution error: {}", e),
819                                session_id: self.session_id.clone(),
820                            });
821                            emit(AgentEvent::TurnEnd {
822                                turn_number,
823                                assistant_message: Message::Assistant(assistant_message.clone()),
824                                tool_results: vec![],
825                            });
826                            events.push(AgentEvent::TurnEnd {
827                                turn_number,
828                                assistant_message: Message::Assistant(assistant_message.clone()),
829                                tool_results: vec![],
830                            });
831                            return Ok((messages, events));
832                        }
833                    };
834
835                    tool_results = executed_batch.messages;
836                    has_more_tool_calls = !executed_batch.terminate;
837
838                    if executed_batch.terminate {
839                        tracing::warn!(
840                            session_id = ?self.session_id,
841                            "Tool batch terminated early (terminate flag set by after_tool_call hook). \
842                             This halts the tool-calling loop. If this is unexpected, \
843                             check after_tool_call hooks for unintended terminate: true."
844                        );
845                    }
846
847                    for result in &tool_results {
848                        messages.push(Message::ToolResult(result.clone()));
849                        new_messages.push(Message::ToolResult(result.clone()));
850                    }
851                }
852
853                emit(AgentEvent::TurnEnd {
854                    turn_number,
855                    assistant_message: Message::Assistant(assistant_message.clone()),
856                    tool_results: tool_results.clone(),
857                });
858                events.push(AgentEvent::TurnEnd {
859                    turn_number,
860                    assistant_message: Message::Assistant(assistant_message.clone()),
861                    tool_results: tool_results.clone(),
862                });
863
864                if should_stop_after_turn(&self.external_stop) {
865                    tracing::info!("[AGENT-LOOP] external_stop, ending loop");
866                    return Ok((messages, events));
867                }
868
869                pending_messages = self.drain_steering_queue();
870                tracing::info!(
871                    "[AGENT-LOOP] TurnEnd complete, pending_messages={}, has_more_tool_calls={}",
872                    !pending_messages.is_empty(),
873                    has_more_tool_calls
874                );
875
876                // Early stop check: if external_stop was set (e.g. Ctrl+C),
877                // don't process steering messages from the next turn.
878                if self.external_stop.load(Ordering::SeqCst) {
879                    tracing::info!(
880                        "[AGENT-LOOP] external_stop set after steering drain, ending loop"
881                    );
882                    return Ok((messages, events));
883                }
884            }
885
886            // Re-check steering queue after the inner while loop exits.
887            // This closes the race window where steer() is called between the
888            // last drain_steering_queue() and the while-exit condition check.
889            let late_steering = self.drain_steering_queue();
890            if !late_steering.is_empty() {
891                tracing::info!(
892                    count = late_steering.len(),
893                    "[AGENT-LOOP] Caught late steering messages after inner loop exit"
894                );
895                pending_messages = late_steering;
896                continue;
897            }
898
899            let follow_up_messages = self.drain_follow_up_queue();
900            if !follow_up_messages.is_empty() {
901                pending_messages = follow_up_messages;
902                continue;
903            }
904
905            // Final check: one more steering drain after follow-up to catch
906            // messages injected during the follow-up drain window.
907            let final_steering = self.drain_steering_queue();
908            if !final_steering.is_empty() {
909                pending_messages = final_steering;
910                continue;
911            }
912
913            break;
914        }
915
916        Ok((messages, events))
917    }
918
919    /// Build the compaction instruction, appending injected TTSR rule
920    /// names so that the model remembers rules already enforced.
921    fn build_compaction_instruction(&self) -> Option<String> {
922        let base = self.config.compaction_instruction.as_deref();
923        let injected = self
924            .ttsr_engine
925            .as_ref()
926            .map(|e| e.injected_records())
927            .unwrap_or_default();
928        if injected.is_empty() {
929            return base.map(|s| s.to_string());
930        }
931        let mut instr = base.map(|s| s.to_string()).unwrap_or_default();
932        instr.push_str("\n\nThe following rules have already been enforced in this session and corrections applied. Do NOT violate them again:");
933        for (name, _turn) in &injected {
934            instr.push_str(&format!("\n- {name}"));
935        }
936        Some(instr)
937    }
938
939    async fn maybe_compact(&self, messages: &mut Vec<Message>, iteration: usize, emit: &EmitFn) {
940        let context_text = serde_json::to_string(&*messages).unwrap_or_default();
941        let context_tokens = estimate_tokens(&context_text);
942
943        if !self
944            .compaction_manager
945            .should_compact(context_tokens, iteration)
946        {
947            return;
948        }
949
950        emit(AgentEvent::Compaction {
951            event: CompactionEvent::Triggered {
952                context_tokens,
953                iteration,
954            },
955        });
956
957        let messages_to_compact: Vec<Message> = messages.to_vec();
958        let instruction = self.build_compaction_instruction();
959
960        match self
961            .compaction_manager
962            .compact_if_needed(
963                &messages_to_compact,
964                instruction.as_deref(),
965                context_tokens,
966                iteration,
967            )
968            .await
969        {
970            Ok(Some(compacted)) => {
971                let start = Instant::now();
972                let message_count = compacted.compacted_count;
973
974                emit(AgentEvent::Compaction {
975                    event: CompactionEvent::Started { message_count },
976                });
977
978                let kept_messages = compacted.kept_messages;
979                let summary = compacted.summary;
980                let compacted_count = compacted.compacted_count;
981
982                *messages = kept_messages;
983
984                let state_msgs = messages.clone();
985                self.state.update(|s| {
986                    s.replace_messages(state_msgs);
987                });
988
989                let compacted_ctx = CompactedContext {
990                    summary,
991                    kept_messages: Vec::new(),
992                    compacted_count,
993                };
994                emit(AgentEvent::Compaction {
995                    event: CompactionEvent::Completed {
996                        result: compacted_ctx.clone(),
997                        duration_ms: start.elapsed().as_millis() as u64,
998                    },
999                });
1000
1001                // Async compaction hook — awaited, not fire-and-forget.
1002                if let Some(ref hook) = self.config.on_compaction {
1003                    match hook(compacted_ctx).await {
1004                        Ok(()) => {
1005                            tracing::debug!("Compaction hook completed successfully");
1006                        }
1007                        Err(e) => {
1008                            tracing::warn!(error = %e, "Compaction hook failed");
1009                        }
1010                    }
1011                }
1012            }
1013            Ok(None) => {}
1014            Err(e) => {
1015                emit(AgentEvent::Compaction {
1016                    event: CompactionEvent::Failed {
1017                        error: e.to_string(),
1018                    },
1019                });
1020            }
1021        }
1022    }
1023
1024    fn resolve_model(&self) -> Result<oxi_ai::Model> {
1025        self.resolver
1026            .resolve_model(&self.config.model_id)
1027            .ok_or_else(|| Error::msg(format!("Model not found: {}", self.config.model_id)))
1028    }
1029}
1030
1031#[cfg(test)]
1032mod session_id_wiring_tests {
1033    //! Regression coverage for the #13 fix.
1034    //! `build_tool_context` is private; testing it here keeps the test in the
1035    //! same module so it can reach private surface. We never stream — the
1036    //! nop provider only exists to satisfy `AgentLoop::new_with_resolver`.
1037    use super::*;
1038    use crate::ProviderResolver;
1039    use crate::agent_loop::config::AgentLoopConfig;
1040    use crate::config::ToolExecutionMode;
1041    use crate::state::SharedState;
1042    use crate::tools::ToolRegistry;
1043    use oxi_ai::{
1044        CompactionStrategy, Context, Model, Provider, ProviderError, StreamOptions, StreamResult,
1045    };
1046    use std::future::Future;
1047    use std::pin::Pin;
1048
1049    struct NopProvider;
1050    impl Provider for NopProvider {
1051        fn stream<'a>(
1052            &'a self,
1053            _model: &'a Model,
1054            _context: &'a Context,
1055            _options: Option<StreamOptions>,
1056        ) -> Pin<Box<dyn Future<Output = StreamResult> + Send + 'a>> {
1057            Box::pin(async {
1058                Err(ProviderError::NotImplemented(
1059                    "session-id wiring tests never stream".to_string(),
1060                ))
1061            })
1062        }
1063        fn name(&self) -> &str {
1064            "nop"
1065        }
1066    }
1067
1068    struct NullResolver;
1069    impl ProviderResolver for NullResolver {
1070        fn resolve_provider(&self, _name: &str) -> Option<Arc<dyn Provider>> {
1071            None
1072        }
1073        fn resolve_model(&self, _model_id: &str) -> Option<Model> {
1074            None
1075        }
1076    }
1077
1078    fn loop_with(session_id: Option<String>) -> AgentLoop {
1079        let config = AgentLoopConfig {
1080            model_id: "test/model".to_string(),
1081            system_prompt: None,
1082            temperature: 1.0,
1083            max_tokens: 4096,
1084            tool_execution: ToolExecutionMode::Sequential,
1085            compaction_strategy: CompactionStrategy::Disabled,
1086            compaction_instruction: None,
1087            context_window: 128_000,
1088            session_id,
1089            transport: None,
1090            compact_on_start: false,
1091            max_retry_delay_ms: None,
1092            auto_retry_enabled: true,
1093            auto_retry_max_attempts: 3,
1094            auto_retry_base_delay_ms: 1000,
1095            api_key: None,
1096            workspace_dir: None,
1097            provider_options: None,
1098            on_compaction: None,
1099            snapshot_store: None,
1100            memory: None,
1101            url_resolver: None,
1102            todo: None,
1103            agent_pool: None,
1104            lsp: None,
1105            ttsr_engine: None,
1106        };
1107        AgentLoop::new_with_resolver(
1108            Arc::new(NopProvider),
1109            config,
1110            Arc::new(ToolRegistry::new()),
1111            SharedState::new(),
1112            Arc::new(NullResolver),
1113        )
1114    }
1115
1116    /// Regression for defect #13: `AgentLoopConfig.session_id` MUST flow into
1117    /// `ToolContext.session_id`. Before the fix, the field was hardcoded to
1118    /// `None`, so the `issue` tool received an empty caller id and bypassed
1119    /// all ownership/liveness checks (two agents could both `start` the same
1120    /// issue and the last writer silently won).
1121    #[test]
1122    fn tool_context_inherits_session_id_when_set() {
1123        let loop_ = loop_with(Some("proc-test-session-id".to_string()));
1124        let ctx = loop_.build_tool_context();
1125        assert_eq!(
1126            ctx.session_id.as_deref(),
1127            Some("proc-test-session-id"),
1128            "ToolContext.session_id must inherit AgentConfig.session_id"
1129        );
1130    }
1131
1132    #[test]
1133    fn tool_context_session_id_defaults_to_none() {
1134        let loop_ = loop_with(None);
1135        let ctx = loop_.build_tool_context();
1136        assert!(
1137            ctx.session_id.is_none(),
1138            "default ToolContext.session_id should be None"
1139        );
1140    }
1141}