Skip to main content

oxi_agent/agent_loop/
mod.rs

1#![allow(unused_doc_comments)]
2
3//! Agent loop — the main request/response cycle driver.
4//!
5//! Coordinates the interaction between the agent, provider, tools, and
6//! state management. Handles streaming, tool execution, retry logic,
7//! and compaction events.
8
9/// Agent-loop configuration.
10pub mod config;
11/// Miscellaneous helper functions.
12pub mod helpers;
13/// Internal message/event queues.
14pub mod queues;
15/// Retry logic for the agent loop.
16pub mod retry;
17/// Stream outcome types for TTSR integration.
18pub mod stream_outcome;
19/// Streaming response handling.
20pub mod streaming;
21/// Tool execution strategies.
22pub mod tool_exec;
23/// Time-Traveling Stream Rules engine.
24pub mod ttsr;
25
26// Re-export for sibling module access
27use crate::agent::ProviderResolver;
28use crate::compaction::{CompactedContext, CompactionEvent};
29use crate::events::AgentEvent;
30use crate::recovery::{CircuitBreaker, CircuitBreakerConfig};
31use crate::state::TokenSource;
32use crate::{state::SharedState, tools::ToolContext, tools::ToolRegistry};
33use anyhow::{Error, Result};
34pub use config::{AfterToolCallHook, AgentLoopConfig, BeforeToolCallHook, ToolExecutionMode};
35use oxi_ai::{
36    CompactionManager as OxCompactionManager, CompactionStrategy, ContentBlock, LlmCompactor,
37    Message, Provider, StopReason, TextContent, UserMessage,
38};
39use parking_lot::RwLock;
40use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
42use std::time::Instant;
43
44use self::helpers::{sanitize_orphaned_tool_results, should_stop_after_turn};
45use self::queues::{
46    clear_all_queues, clear_follow_up_queue, clear_steering_queue, drain_follow_up_queue,
47    drain_steering_queue, try_push_follow_up, try_push_steering,
48};
49use self::retry::{
50    auto_retry_attempt_method, cancel_auto_retry, handle_retryable_error, is_retryable_error,
51};
52use self::streaming::stream_assistant_response;
53use self::tool_exec::execute_tool_calls;
54
55pub use self::stream_outcome::StreamOutcome;
56type EmitFn = Arc<dyn Fn(AgentEvent) + Send + Sync>;
57
58/// AgentLoop.
59pub struct AgentLoop {
60    provider: Arc<dyn Provider>,
61    config: AgentLoopConfig,
62    tools: Arc<ToolRegistry>,
63    state: SharedState,
64    compaction_manager: OxCompactionManager,
65    before_tool_call: Option<BeforeToolCallHook>,
66    after_tool_call: Option<AfterToolCallHook>,
67    steering_queue: RwLock<Vec<Message>>,
68    follow_up_queue: RwLock<Vec<Message>>,
69    session_id: Option<String>,
70    auto_retry_attempt: AtomicUsize,
71    auto_retry_cancel: AtomicBool,
72    /// Notify used to wake up the auto-retry sleep immediately when cancelled.
73    auto_retry_notify: tokio::sync::Notify,
74    circuit_breaker: CircuitBreaker,
75    /// External stop flag — when set, should_stop_after_turn returns true.
76    /// Used by Agent to forward the should_stop_flag from AgentHooks.
77    external_stop: Arc<AtomicBool>,
78    /// Direct cancel signal shared with `Agent::cancel_flag`.
79    /// Set by `Agent::cancel()` and checked by the streaming loop's periodic
80    /// timer so cancellation is detected even when no stream events arrive.
81    cancel_signal: Option<Arc<AtomicBool>>,
82    /// Provider/model resolver for isolated model lookups.
83    resolver: Arc<dyn ProviderResolver>,
84    /// Steering hook from AgentHooks — polled each turn to drain new messages
85    /// from AgentSession's queue into AgentLoop's internal steering_queue.
86    steering_hook: Option<Arc<dyn Fn() -> Vec<String> + Send + Sync>>,
87    /// Follow-up hook from AgentHooks — same as steering but for follow-ups.
88    follow_up_hook: Option<Arc<dyn Fn() -> Vec<String> + Send + Sync>>,
89    /// TTSR engine for stream rule checking.
90    ttsr_engine: Option<Arc<ttsr::TtsrEngine>>,
91}
92
93impl AgentLoop {
94    /// Creates a new `AgentLoop` with an explicit provider resolver.
95    /// Use this when the model ID needs to be resolved to a provider+model pair
96    /// using custom logic (e.g., per-session routing).
97    pub fn new_with_resolver(
98        provider: Arc<dyn Provider>,
99        config: AgentLoopConfig,
100        tools: Arc<ToolRegistry>,
101        state: SharedState,
102        resolver: Arc<dyn ProviderResolver>,
103    ) -> Self {
104        let mut compaction_manager =
105            OxCompactionManager::new(config.compaction_strategy.clone(), config.context_window);
106
107        if config.compaction_strategy != CompactionStrategy::Disabled {
108            let model = resolver.resolve_model(&config.model_id);
109            if let Some(model) = model {
110                let llm_compactor =
111                    Arc::new(LlmCompactor::new(model.clone(), Arc::clone(&provider)));
112                compaction_manager.set_compactor(llm_compactor);
113            }
114        }
115
116        Self {
117            provider,
118            config: config.clone(),
119            tools,
120            state,
121            compaction_manager,
122            before_tool_call: None,
123            after_tool_call: None,
124            steering_queue: RwLock::new(Vec::new()),
125            follow_up_queue: RwLock::new(Vec::new()),
126            session_id: config.session_id.clone(),
127            auto_retry_attempt: AtomicUsize::new(0),
128            auto_retry_cancel: AtomicBool::new(false),
129            auto_retry_notify: tokio::sync::Notify::new(),
130            circuit_breaker: CircuitBreaker::new(CircuitBreakerConfig::default()),
131            external_stop: Arc::new(AtomicBool::new(false)),
132            cancel_signal: None,
133            resolver,
134            steering_hook: None,
135            follow_up_hook: None,
136            ttsr_engine: config.ttsr_engine.clone(),
137        }
138    }
139
140    /// Create a new AgentLoop using the global resolver (backward compat).
141    pub fn new(
142        provider: Arc<dyn Provider>,
143        config: AgentLoopConfig,
144        tools: Arc<ToolRegistry>,
145        state: SharedState,
146    ) -> Self {
147        use crate::agent::GlobalProviderResolver;
148        Self::new_with_resolver(
149            provider,
150            config,
151            tools,
152            state,
153            Arc::new(GlobalProviderResolver),
154        )
155    }
156
157    /// Registers a hook called before every tool execution.
158    /// The hook can inspect and modify tool arguments, or reject the call entirely.
159    pub fn with_before_tool_call(mut self, hook: BeforeToolCallHook) -> Self {
160        self.before_tool_call = Some(hook);
161        self
162    }
163
164    /// Registers a hook called after every tool execution.
165    /// The hook receives the tool name, arguments, and result.
166    pub fn with_after_tool_call(mut self, hook: AfterToolCallHook) -> Self {
167        self.after_tool_call = Some(hook);
168        self
169    }
170
171    /// Inject a steering message into the agent loop.
172    ///
173    /// Steering messages are processed at the start of each turn, before the
174    /// next LLM call. If the steering queue is at capacity (256 messages), the
175    /// message is dropped and a warning is logged.
176    pub fn steer(&self, message: Message) {
177        if !try_push_steering(self, message) {
178            tracing::warn!("Steering message dropped — queue at capacity");
179        }
180    }
181
182    /// Enqueue a follow-up message to continue the conversation after all
183    /// tool calls in the current batch are complete.
184    ///
185    /// If the follow-up queue is at capacity (64 messages), the message is
186    /// dropped and a warning is logged.
187    pub fn follow_up(&self, message: Message) {
188        if !try_push_follow_up(self, message) {
189            tracing::warn!("Follow-up message dropped — queue at capacity");
190        }
191    }
192
193    /// Removes all pending steering messages from the queue.
194    /// See [`steer()`](Self::steer) for an explanation of steering messages.
195    pub fn clear_steering_queue(&self) {
196        clear_steering_queue(self);
197    }
198
199    /// Removes all pending follow-up messages from the queue.
200    /// See [`follow_up()`](Self::follow_up) for an explanation of follow-up messages.
201    pub fn clear_follow_up_queue(&self) {
202        clear_follow_up_queue(self);
203    }
204
205    /// Removes all pending messages from both the steering and follow-up queues.
206    pub fn clear_all_queues(&self) {
207        clear_all_queues(self);
208    }
209
210    fn drain_steering_queue(&self) -> Vec<Message> {
211        drain_steering_queue(self)
212    }
213
214    /// Build a ToolContext from the agent loop config.
215    /// Uses workspace_dir from config if set, otherwise falls back to current directory.
216    #[cfg_attr(test, allow(dead_code))]
217    pub(crate) fn build_tool_context(&self) -> ToolContext {
218        let workspace = self
219            .config
220            .workspace_dir
221            .clone()
222            .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
223        ToolContext {
224            workspace_dir: workspace,
225            root_dir: self.config.workspace_dir.clone(),
226            session_id: self.session_id.clone(),
227            snapshot_store: self.config.snapshot_store.clone(),
228            memory: self.config.memory.clone(),
229            url_resolver: self.config.url_resolver.clone(),
230            todo: self.config.todo.clone(),
231            agent_pool: self.config.agent_pool.clone(),
232            lsp: self.config.lsp.clone(),
233            subagent_runner: self.config.subagent_runner.clone(),
234            subagent_depth: self.config.subagent_depth,
235        }
236    }
237
238    /// Truncate a tool result's text content if it exceeds
239    /// `config.max_tool_result_bytes` (issue #28 gap 1).
240    ///
241    /// When the limit is `None`, the result is returned unchanged.
242    /// When set, any `ContentBlock::Text` block whose `text` field
243    /// exceeds the limit is truncated to the limit and a marker is
244    /// appended so the model knows content was omitted.
245    fn maybe_truncate_tool_result(
246        &self,
247        mut result: oxi_ai::ToolResultMessage,
248    ) -> oxi_ai::ToolResultMessage {
249        let Some(max_bytes) = self.config.max_tool_result_bytes else {
250            return result;
251        };
252
253        for block in &mut result.content {
254            if let oxi_ai::ContentBlock::Text(tc) = block
255                && tc.text.len() > max_bytes
256            {
257                let omitted = tc.text.len() - max_bytes;
258                tc.text.truncate(max_bytes);
259                tc.text.push_str(&format!(
260                    "\n\n... [truncated: {omitted} bytes omitted, \
261                     use read/grep for full content]"
262                ));
263            }
264        }
265
266        result
267    }
268
269    fn drain_follow_up_queue(&self) -> Vec<Message> {
270        drain_follow_up_queue(self)
271    }
272
273    /// Cancels any in-progress auto-retry countdown.
274    /// After calling this, the agent will not automatically retry
275    /// on the next turn.
276    pub fn cancel_auto_retry(&self) {
277        cancel_auto_retry(self);
278    }
279
280    /// Returns the current auto-retry attempt number (0-based).
281    /// Useful for displaying retry status in the UI.
282    pub fn auto_retry_attempt(&self) -> usize {
283        auto_retry_attempt_method(self)
284    }
285
286    /// Get a reference to the shared state.
287    /// Used by Agent to sync state after loop execution.
288    pub fn state(&self) -> &SharedState {
289        &self.state
290    }
291
292    /// Get the external stop flag.
293    pub fn external_stop(&self) -> &Arc<AtomicBool> {
294        &self.external_stop
295    }
296
297    /// Sets a shared cancel signal (typically `Agent::cancel_flag`).
298    /// The streaming loop checks this in its periodic wake-up timer,
299    /// ensuring cancellation is detected even when the provider stream
300    /// produces no events (e.g. waiting for first token).
301    pub fn set_cancel_signal(&mut self, flag: Arc<AtomicBool>) {
302        self.cancel_signal = Some(flag);
303    }
304
305    /// Returns a clone of the loop's cancel-signal flag, if one has been
306    /// installed via [`Self::set_cancel_signal`]. Used by tool execution
307    /// to bridge the loop's `AtomicBool` cancellation into the per-tool
308    /// `oneshot::Receiver` cancellation channel (audit finding F-8).
309    pub fn cancel_signal(&self) -> Option<Arc<AtomicBool>> {
310        self.cancel_signal.as_ref().map(Arc::clone)
311    }
312
313    /// Returns true if cancellation has been requested via either
314    /// `external_stop` or the direct `cancel_signal`.
315    pub fn is_cancelled(&self) -> bool {
316        if self.external_stop.load(Ordering::SeqCst) {
317            return true;
318        }
319        self.cancel_signal
320            .as_ref()
321            .is_some_and(|f| f.load(Ordering::SeqCst))
322    }
323    /// Request cancellation from outside the loop (e.g. Ctrl+C).
324    /// Sets the `external_stop` flag which causes the streaming loop
325    /// to abort on its next periodic check (~500ms) and the agent loop
326    /// to exit after the current turn.
327    pub fn cancel(&self) {
328        self.external_stop.store(true, Ordering::SeqCst);
329    }
330
331    /// Set the steering hook — called each turn to drain new messages
332    /// from the session's steering queue into the loop's internal queue.
333    pub fn set_steering_hook(&mut self, hook: Arc<dyn Fn() -> Vec<String> + Send + Sync>) {
334        self.steering_hook = Some(hook);
335    }
336
337    /// Set the follow-up hook — called each turn to drain new messages
338    /// from the session's follow-up queue into the loop's internal queue.
339    pub fn set_follow_up_hook(&mut self, hook: Arc<dyn Fn() -> Vec<String> + Send + Sync>) {
340        self.follow_up_hook = Some(hook);
341    }
342
343    /// Poll the steering/follow-up hooks and inject new messages
344    /// into the internal queues.
345    fn poll_external_queues(&self) {
346        if let Some(ref hook) = self.steering_hook {
347            for msg_text in hook() {
348                self.steer(Message::User(UserMessage::new(msg_text)));
349            }
350        }
351        if let Some(ref hook) = self.follow_up_hook {
352            for msg_text in hook() {
353                self.follow_up(Message::User(UserMessage::new(msg_text)));
354            }
355        }
356    }
357
358    /// Runs the agent loop with a single user prompt.
359    /// Convenience wrapper around [`run_messages()`](Self::run_messages).
360    pub async fn run(
361        &self,
362        prompt: String,
363        emit: impl Fn(AgentEvent) + Send + Sync + 'static,
364    ) -> Result<Vec<AgentEvent>> {
365        let message = Message::User(UserMessage::new(prompt));
366        let emit = Arc::new(emit);
367        self.run_messages(vec![message], emit).await
368    }
369
370    /// Run with an `FnMut` callback and mutable state — no `Arc<Mutex<>>` needed.
371    ///
372    /// Unlike [`run()`](Self::run), which takes `Fn`, this method accepts `FnMut`
373    /// and a user-provided state value `S`. The callback receives `&mut S` on each
374    /// event, so you can accumulate results without any locking overhead.
375    ///
376    /// Returns the collected events **and** the final state value.
377    ///
378    /// # Example
379    ///
380    /// ```ignore
381    /// #[derive(Default)]
382    /// struct MyState { steps: usize, output: String }
383    ///
384    /// let (events, state) = agent_loop.run_mut(
385    ///     "do something".into(),
386    ///     MyState::default(),
387    ///     |event, s| {
388    ///         match event {
389    ///             AgentEvent::ToolExecutionEnd { is_error: false, .. } => s.steps += 1,
390    ///             AgentEvent::AgentEnd { messages, .. } => {
391    ///                 if let Some(Message::Assistant(a)) = messages.last() {
392    ///                     s.output = a.text_content();
393    ///                 }
394    ///             }
395    ///             _ => {}
396    ///         }
397    ///     },
398    /// ).await?;
399    /// ```
400    pub async fn run_mut<S: Send + std::fmt::Debug + 'static>(
401        &self,
402        prompt: String,
403        state: S,
404        emit: impl FnMut(AgentEvent, &mut S) + Send + 'static,
405    ) -> Result<(Vec<AgentEvent>, S)> {
406        let emit_fnmut = Arc::new(parking_lot::Mutex::new(emit));
407        let state_arc = Arc::new(parking_lot::Mutex::new(state));
408
409        // Clone the Arc for the closure; the original stays for recovery after run.
410        let state_for_closure = Arc::clone(&state_arc);
411
412        let emit_fn: EmitFn = Arc::new(move |event: AgentEvent| {
413            let mut cb = emit_fnmut.lock();
414            let mut s = state_for_closure.lock();
415            cb(event, &mut s);
416        });
417
418        let events = self.run_inner(prompt, emit_fn).await?;
419
420        // Recover the state. After run_inner completes, emit_fn is dropped,
421        // releasing the last Arc clone. Arc::try_unwrap should succeed since
422        // only our `state_arc` reference remains.
423        let mutex = Arc::try_unwrap(state_arc)
424            .expect("run_mut: state Arc still has multiple owners after run");
425        Ok((events, mutex.into_inner()))
426    }
427
428    /// Internal: create the initial user message and delegate to run_messages.
429    async fn run_inner(&self, prompt: String, emit: EmitFn) -> Result<Vec<AgentEvent>> {
430        let message = Message::User(UserMessage::new(prompt));
431        self.run_messages(vec![message], emit).await
432    }
433
434    /// Runs the agent loop with a list of pre-constructed messages.
435    /// This is the primary entry point for executing agent turns.
436    pub async fn run_messages(
437        &self,
438        prompts: Vec<Message>,
439        emit: EmitFn,
440    ) -> Result<Vec<AgentEvent>> {
441        let mut all_events = Vec::new();
442
443        let state_messages = self.state.get_state().messages.clone();
444        let mut all_messages = state_messages;
445        all_messages.extend(prompts.clone());
446
447        tracing::info!(session_id = ?self.session_id, "AgentLoop starting");
448        emit(AgentEvent::AgentStart {
449            prompts: prompts.clone(),
450            session_id: self.session_id.clone(),
451        });
452        all_events.push(AgentEvent::AgentStart {
453            prompts: prompts.clone(),
454            session_id: self.session_id.clone(),
455        });
456
457        let (result_messages, events) = self.run_loop(prompts, emit.clone()).await?;
458
459        all_events.extend(events);
460
461        let stop_reason = result_messages.last().and_then(|m| {
462            if let Message::Assistant(a) = m {
463                Some(format!("{:?}", a.stop_reason))
464            } else {
465                None
466            }
467        });
468
469        tracing::info!(session_id = ?self.session_id, "AgentLoop run_messages complete");
470
471        // Sync messages back to shared state
472        self.state.update(|s| {
473            s.replace_messages(result_messages.clone());
474        });
475
476        emit(AgentEvent::AgentEnd {
477            messages: result_messages.clone(),
478            stop_reason: stop_reason.clone(),
479            session_id: self.session_id.clone(),
480        });
481        all_events.push(AgentEvent::AgentEnd {
482            messages: result_messages.clone(),
483            stop_reason,
484            session_id: self.session_id.clone(),
485        });
486
487        Ok(all_events)
488    }
489
490    /// Resumes the agent loop after a previous turn ended in a paused state
491    /// (e.g., waiting for user confirmation). Emits events to the provided callback.
492    pub async fn continue_loop(
493        &self,
494        emit: impl Fn(AgentEvent) + Send + Sync + 'static,
495    ) -> Result<Vec<AgentEvent>> {
496        let emit = Arc::new(emit);
497        let mut all_events = Vec::new();
498
499        tracing::info!(session_id = ?self.session_id, "AgentLoop continuing");
500        emit(AgentEvent::AgentStart {
501            prompts: vec![],
502            session_id: self.session_id.clone(),
503        });
504        all_events.push(AgentEvent::AgentStart {
505            prompts: vec![],
506            session_id: self.session_id.clone(),
507        });
508
509        let (result_messages, events) = self.run_loop(vec![], emit.clone()).await?;
510
511        all_events.extend(events);
512
513        let stop_reason = result_messages.last().and_then(|m| {
514            if let Message::Assistant(a) = m {
515                Some(format!("{:?}", a.stop_reason))
516            } else {
517                None
518            }
519        });
520
521        tracing::info!(session_id = ?self.session_id, "AgentLoop continue_loop complete");
522        emit(AgentEvent::AgentEnd {
523            messages: result_messages.clone(),
524            stop_reason: stop_reason.clone(),
525            session_id: self.session_id.clone(),
526        });
527        all_events.push(AgentEvent::AgentEnd {
528            messages: result_messages.clone(),
529            stop_reason,
530            session_id: self.session_id.clone(),
531        });
532
533        Ok(all_events)
534    }
535
536    /// Process pending steering messages, emitting events and appending to message history.
537    fn process_steering_messages(
538        &self,
539        pending_messages: &mut Vec<Message>,
540        messages: &mut Vec<Message>,
541        new_messages: &mut Vec<Message>,
542        events: &mut Vec<AgentEvent>,
543        emit: &EmitFn,
544    ) {
545        if pending_messages.is_empty() {
546            return;
547        }
548        for message in pending_messages.drain(..) {
549            emit(AgentEvent::SteeringMessage {
550                message: message.clone(),
551            });
552            emit(AgentEvent::MessageStart {
553                message: message.clone(),
554            });
555            emit(AgentEvent::MessageEnd {
556                message: message.clone(),
557            });
558            events.push(AgentEvent::SteeringMessage {
559                message: message.clone(),
560            });
561            events.push(AgentEvent::MessageStart {
562                message: message.clone(),
563            });
564            events.push(AgentEvent::MessageEnd {
565                message: message.clone(),
566            });
567            messages.push(message.clone());
568            new_messages.push(message);
569        }
570    }
571
572    /// Handle a streaming error by synthesizing an error message and completing the turn.
573    async fn handle_streaming_error(
574        &self,
575        e: anyhow::Error,
576        messages: &mut Vec<Message>,
577        new_messages: &mut Vec<Message>,
578        events: &mut Vec<AgentEvent>,
579        emit: &EmitFn,
580        turn_number: u32,
581    ) -> (Vec<Message>, Vec<AgentEvent>) {
582        let err_msg = format!("{}", e);
583        tracing::error!(session_id = ?self.session_id, "Unexpected streaming error: {}", err_msg);
584
585        let mut error_asst = oxi_ai::AssistantMessage::new(
586            oxi_ai::Api::OpenAiCompletions,
587            "agent",
588            &self.config.model_id,
589        );
590        error_asst.stop_reason = StopReason::Error;
591        error_asst
592            .content
593            .push(ContentBlock::Text(TextContent::new(format!(
594                "⚠ {}",
595                err_msg
596            ))));
597
598        new_messages.push(Message::Assistant(error_asst.clone()));
599        messages.push(Message::Assistant(error_asst.clone()));
600
601        emit(AgentEvent::MessageStart {
602            message: Message::Assistant(error_asst.clone()),
603        });
604        emit(AgentEvent::MessageEnd {
605            message: Message::Assistant(error_asst.clone()),
606        });
607        emit(AgentEvent::Error {
608            message: err_msg.clone(),
609            session_id: self.session_id.clone(),
610        });
611
612        emit(AgentEvent::TurnEnd {
613            turn_number,
614            assistant_message: Message::Assistant(error_asst.clone()),
615            tool_results: vec![],
616        });
617        events.push(AgentEvent::TurnEnd {
618            turn_number,
619            assistant_message: Message::Assistant(error_asst),
620            tool_results: vec![],
621        });
622        // Return Ok — lifecycle is complete
623        (messages.clone(), events.clone())
624    }
625
626    async fn run_loop(
627        &self,
628        initial_prompts: Vec<Message>,
629        emit: EmitFn,
630    ) -> Result<(Vec<Message>, Vec<AgentEvent>)> {
631        tracing::info!("[AGENT-LOOP] run_loop started");
632        let mut messages = self.state.get_state().messages.clone();
633        messages.extend(initial_prompts.clone());
634
635        let mut new_messages: Vec<Message> = initial_prompts;
636        let mut events = Vec::new();
637        let mut turn_number: u32 = 0;
638        let mut first_turn = true;
639
640        let mut pending_messages: Vec<Message> = self.drain_steering_queue();
641
642        loop {
643            tracing::info!(
644                "[AGENT-LOOP] Top of loop, has_more_tool_calls={}, pending_messages={}",
645                true,
646                pending_messages.is_empty()
647            );
648            let mut has_more_tool_calls = true;
649
650            while has_more_tool_calls || !pending_messages.is_empty() {
651                if !first_turn {
652                    turn_number += 1;
653                    emit(AgentEvent::TurnStart { turn_number });
654                    events.push(AgentEvent::TurnStart { turn_number });
655                } else {
656                    first_turn = false;
657                    turn_number = 1;
658                    emit(AgentEvent::TurnStart { turn_number });
659                    events.push(AgentEvent::TurnStart { turn_number });
660                }
661
662                if !pending_messages.is_empty() {
663                    self.process_steering_messages(
664                        &mut pending_messages,
665                        &mut messages,
666                        &mut new_messages,
667                        &mut events,
668                        &emit,
669                    );
670                }
671
672                // Poll external hooks each turn to drain new steering/follow-up
673                // messages injected since the last turn.
674                self.poll_external_queues();
675
676                self.maybe_compact(&mut messages, turn_number as usize, &emit)
677                    .await;
678
679                tracing::info!("[AGENT-LOOP] About to call stream_assistant_response");
680                let ttsr = self.ttsr_engine.as_deref();
681                let outcome = stream_assistant_response(self, &mut messages, &emit, ttsr).await;
682
683                let assistant_message = match outcome {
684                    StreamOutcome::Complete(msg) => msg,
685                    StreamOutcome::Error {
686                        message: _message,
687                        detail,
688                    } => {
689                        // Check for message-ordering errors that can be recovered
690                        // by removing orphaned tool results.
691                        let is_tool_ordering_error = detail.contains("tool")
692                            && (detail.contains("must be a response")
693                                || detail.contains("preceding")
694                                || detail.contains("tool_calls"));
695
696                        if is_tool_ordering_error {
697                            let removed = sanitize_orphaned_tool_results(&mut messages);
698                            tracing::warn!(
699                                session_id = ?self.session_id,
700                                removed,
701                                detail = %detail,
702                                "Message-ordering error detected, removed orphaned tool results, retrying"
703                            );
704                            if removed > 0 {
705                                // Don't push the error message to history; retry the turn.
706                                emit(AgentEvent::Error {
707                                    message: format!(
708                                        "⚠ Provider rejected message order: {}. Removed {} orphaned tool results, retrying…",
709                                        detail, removed
710                                    ),
711                                    session_id: self.session_id.clone(),
712                                });
713                                continue; // Retry the turn with sanitized messages
714                            }
715                        }
716
717                        // Unrecoverable — fall through to error handler.
718                        return Ok(self
719                            .handle_streaming_error(
720                                anyhow::anyhow!("Provider stream error: {}", detail),
721                                &mut messages,
722                                &mut new_messages,
723                                &mut events,
724                                &emit,
725                                turn_number,
726                            )
727                            .await);
728                    }
729                    StreamOutcome::Cancelled(msg) => {
730                        emit(AgentEvent::TurnEnd {
731                            turn_number,
732                            assistant_message: Message::Assistant(msg.clone()),
733                            tool_results: vec![],
734                        });
735                        return Ok((messages, events));
736                    }
737                    StreamOutcome::RuleInterrupt { partial, rule } => {
738                        tracing::info!("RuleInterrupt: '{}' violated, retrying", rule.name);
739                        emit(AgentEvent::TtsrInterrupt {
740                            rule_name: rule.name.clone(),
741                            session_id: self.session_id.clone(),
742                        });
743                        messages.push(Message::Assistant(partial));
744                        let interrupt_body = format!(
745                            "<system-interrupt reason=\"rule_violation\" rule=\"{name}\">\n\
746                             Your output was interrupted because it violated a project rule.\n\
747                             Comply with: {content}\n</system-interrupt>",
748                            name = rule.name,
749                            content = rule.content
750                        );
751                        messages.push(Message::user(interrupt_body));
752                        continue;
753                    }
754                };
755
756                new_messages.push(Message::Assistant(assistant_message.clone()));
757
758                if matches!(assistant_message.stop_reason, StopReason::Error) {
759                    if is_retryable_error(&assistant_message) {
760                        let did_retry =
761                            handle_retryable_error(self, &assistant_message, &mut messages, &emit)
762                                .await;
763                        if did_retry {
764                            emit(AgentEvent::TurnEnd {
765                                turn_number,
766                                assistant_message: Message::Assistant(assistant_message.clone()),
767                                tool_results: vec![],
768                            });
769                            events.push(AgentEvent::TurnEnd {
770                                turn_number,
771                                assistant_message: Message::Assistant(assistant_message.clone()),
772                                tool_results: vec![],
773                            });
774                            has_more_tool_calls = true;
775                            continue;
776                        }
777                    }
778
779                    emit(AgentEvent::TurnEnd {
780                        turn_number,
781                        assistant_message: Message::Assistant(assistant_message.clone()),
782                        tool_results: vec![],
783                    });
784                    events.push(AgentEvent::TurnEnd {
785                        turn_number,
786                        assistant_message: Message::Assistant(assistant_message.clone()),
787                        tool_results: vec![],
788                    });
789                    return Ok((messages, events));
790                }
791                if matches!(assistant_message.stop_reason, StopReason::Aborted) {
792                    if self.auto_retry_attempt.load(Ordering::Relaxed) > 0 {
793                        emit(AgentEvent::AutoRetryEnd {
794                            success: true,
795                            attempt: self.auto_retry_attempt.load(Ordering::Relaxed),
796                            final_error: None,
797                        });
798                        self.auto_retry_attempt.store(0, Ordering::Relaxed);
799                    }
800
801                    emit(AgentEvent::TurnEnd {
802                        turn_number,
803                        assistant_message: Message::Assistant(assistant_message.clone()),
804                        tool_results: vec![],
805                    });
806                    events.push(AgentEvent::TurnEnd {
807                        turn_number,
808                        assistant_message: Message::Assistant(assistant_message.clone()),
809                        tool_results: vec![],
810                    });
811                    return Ok((messages, events));
812                }
813
814                if self.auto_retry_attempt.load(Ordering::Relaxed) > 0 {
815                    emit(AgentEvent::AutoRetryEnd {
816                        success: true,
817                        attempt: self.auto_retry_attempt.load(Ordering::Relaxed),
818                        final_error: None,
819                    });
820                    self.auto_retry_attempt.store(0, Ordering::Relaxed);
821                }
822
823                let tool_calls = helpers::extract_tool_calls(&assistant_message);
824                tracing::info!(
825                    "[AGENT-LOOP] extract_tool_calls found {} calls, stop_reason={:?}",
826                    tool_calls.len(),
827                    assistant_message.stop_reason
828                );
829
830                let mut tool_results: Vec<oxi_ai::ToolResultMessage> = Vec::new();
831                has_more_tool_calls = false;
832
833                if !tool_calls.is_empty() {
834                    tracing::info!("[AGENT-LOOP] Executing {} tool calls", tool_calls.len());
835                    let ctx = self.build_tool_context();
836                    let executed_batch = match execute_tool_calls(
837                        self,
838                        &mut messages,
839                        &assistant_message,
840                        tool_calls,
841                        &emit,
842                        &ctx,
843                    )
844                    .await
845                    {
846                        Ok(batch) => batch,
847                        Err(e) => {
848                            // Tool execution failed — emit TurnEnd and return Ok.
849                            // The lifecycle must always complete.
850                            tracing::error!(session_id = ?self.session_id, "Tool execution error: {}", e);
851                            emit(AgentEvent::Error {
852                                message: format!("Tool execution error: {}", e),
853                                session_id: self.session_id.clone(),
854                            });
855                            emit(AgentEvent::TurnEnd {
856                                turn_number,
857                                assistant_message: Message::Assistant(assistant_message.clone()),
858                                tool_results: vec![],
859                            });
860                            events.push(AgentEvent::TurnEnd {
861                                turn_number,
862                                assistant_message: Message::Assistant(assistant_message.clone()),
863                                tool_results: vec![],
864                            });
865                            return Ok((messages, events));
866                        }
867                    };
868
869                    tool_results = executed_batch.messages;
870                    has_more_tool_calls = !executed_batch.terminate;
871
872                    if executed_batch.terminate {
873                        tracing::warn!(
874                            session_id = ?self.session_id,
875                            "Tool batch terminated early (terminate flag set by after_tool_call hook). \
876                             This halts the tool-calling loop. If this is unexpected, \
877                             check after_tool_call hooks for unintended terminate: true."
878                        );
879                    }
880
881                    for result in &tool_results {
882                        let result = self.maybe_truncate_tool_result(result.clone());
883                        messages.push(Message::ToolResult(result.clone()));
884                        new_messages.push(Message::ToolResult(result));
885                    }
886                }
887
888                emit(AgentEvent::TurnEnd {
889                    turn_number,
890                    assistant_message: Message::Assistant(assistant_message.clone()),
891                    tool_results: tool_results.clone(),
892                });
893                events.push(AgentEvent::TurnEnd {
894                    turn_number,
895                    assistant_message: Message::Assistant(assistant_message.clone()),
896                    tool_results: tool_results.clone(),
897                });
898
899                if should_stop_after_turn(&self.external_stop) {
900                    tracing::info!("[AGENT-LOOP] external_stop, ending loop");
901                    return Ok((messages, events));
902                }
903
904                pending_messages = self.drain_steering_queue();
905                tracing::info!(
906                    "[AGENT-LOOP] TurnEnd complete, pending_messages={}, has_more_tool_calls={}",
907                    !pending_messages.is_empty(),
908                    has_more_tool_calls
909                );
910
911                // Early stop check: if external_stop was set (e.g. Ctrl+C),
912                // don't process steering messages from the next turn.
913                if self.external_stop.load(Ordering::SeqCst) {
914                    tracing::info!(
915                        "[AGENT-LOOP] external_stop set after steering drain, ending loop"
916                    );
917                    return Ok((messages, events));
918                }
919            }
920
921            // Re-check steering queue after the inner while loop exits.
922            // This closes the race window where steer() is called between the
923            // last drain_steering_queue() and the while-exit condition check.
924            let late_steering = self.drain_steering_queue();
925            if !late_steering.is_empty() {
926                tracing::info!(
927                    count = late_steering.len(),
928                    "[AGENT-LOOP] Caught late steering messages after inner loop exit"
929                );
930                pending_messages = late_steering;
931                continue;
932            }
933
934            let follow_up_messages = self.drain_follow_up_queue();
935            if !follow_up_messages.is_empty() {
936                pending_messages = follow_up_messages;
937                continue;
938            }
939
940            // Final check: one more steering drain after follow-up to catch
941            // messages injected during the follow-up drain window.
942            let final_steering = self.drain_steering_queue();
943            if !final_steering.is_empty() {
944                pending_messages = final_steering;
945                continue;
946            }
947
948            break;
949        }
950
951        Ok((messages, events))
952    }
953
954    /// Build the compaction instruction, appending injected TTSR rule
955    /// names so that the model remembers rules already enforced.
956    fn build_compaction_instruction(&self) -> Option<String> {
957        let base = self.config.compaction_instruction.as_deref();
958        let injected = self
959            .ttsr_engine
960            .as_ref()
961            .map(|e| e.injected_records())
962            .unwrap_or_default();
963        if injected.is_empty() {
964            return base.map(|s| s.to_string());
965        }
966        let mut instr = base.map(|s| s.to_string()).unwrap_or_default();
967        instr.push_str("\n\nThe following rules have already been enforced in this session and corrections applied. Do NOT violate them again:");
968        for (name, _turn) in &injected {
969            instr.push_str(&format!("\n- {name}"));
970        }
971        Some(instr)
972    }
973
974    async fn maybe_compact(&self, messages: &mut Vec<Message>, iteration: usize, emit: &EmitFn) {
975        // Decide the context-size value to drive compaction with. Prefer
976        // the provider-reported `last_input_tokens` (ground truth) over
977        // the legacy `bytes/4` heuristic. The heuristic can undercount
978        // by 3-4× on token-dense content (base64, JSON, CJK) and is the
979        // reason `CompactionStrategy::Threshold` was effectively a no-op
980        // in issue #28's failure (35k estimated vs 122k actual).
981        //
982        // The provider count lags by exactly one turn: `maybe_compact`
983        // runs at the **top** of a turn, before streaming. So the
984        // `last_input_tokens` we read here reflects the *previous*
985        // turn's `Done` event. The drift is at most the size of the
986        // tool results the model is about to receive, which is small
987        // relative to the failure-mode drift the heuristic suffers.
988        // For turn 1 there is no prior count, so we fall back to the
989        // heuristic on cold start.
990        let snapshot = self.state.get_state();
991        let (context_tokens, source_label) = match snapshot.current_token_source() {
992            TokenSource::Real(n) => (n, "provider-reported"),
993            TokenSource::Heuristic(n) => (n, "bytes/4 heuristic (cold start)"),
994            TokenSource::None => (0, "empty"),
995        };
996        // Surface heuristic drift as a warning when the operator has
997        // observed at least one provider count and it diverges from
998        // the estimate by more than 2×. This is the diagnostic path
999        // from #28's "Proposed fix" option 3.
1000        if let Some(div) = snapshot.last_estimate_divergence
1001            && div > 2.0
1002        {
1003            tracing::warn!(
1004                session_id = ?self.session_id,
1005                divergence = div,
1006                reported = snapshot.last_input_tokens.unwrap_or(0),
1007                estimate = snapshot.last_estimate_at_report.unwrap_or(0),
1008                "Token-count heuristic (bytes/4) diverges from provider-reported usage \
1009                 by >2x; CompactionStrategy::Threshold decisions are using the \
1010                 provider-reported count (issue #28 gap 2)."
1011            );
1012        }
1013        drop(snapshot);
1014
1015        if !self
1016            .compaction_manager
1017            .should_compact(context_tokens, iteration)
1018        {
1019            return;
1020        }
1021
1022        emit(AgentEvent::Compaction {
1023            event: CompactionEvent::Triggered {
1024                context_tokens,
1025                iteration,
1026                source: source_label.to_string(),
1027            },
1028        });
1029
1030        let messages_to_compact: Vec<Message> = messages.to_vec();
1031        let instruction = self.build_compaction_instruction();
1032
1033        match self
1034            .compaction_manager
1035            .compact_if_needed(
1036                &messages_to_compact,
1037                instruction.as_deref(),
1038                context_tokens,
1039                iteration,
1040            )
1041            .await
1042        {
1043            Ok(Some(compacted)) => {
1044                let start = Instant::now();
1045                let message_count = compacted.compacted_count;
1046
1047                emit(AgentEvent::Compaction {
1048                    event: CompactionEvent::Started { message_count },
1049                });
1050
1051                let kept_messages = compacted.kept_messages;
1052                let summary = compacted.summary;
1053                let compacted_count = compacted.compacted_count;
1054
1055                *messages = kept_messages;
1056
1057                let state_msgs = messages.clone();
1058                self.state.update(|s| {
1059                    s.replace_messages(state_msgs);
1060                });
1061
1062                let compacted_ctx = CompactedContext {
1063                    summary,
1064                    kept_messages: Vec::new(),
1065                    compacted_count,
1066                };
1067                emit(AgentEvent::Compaction {
1068                    event: CompactionEvent::Completed {
1069                        result: compacted_ctx.clone(),
1070                        duration_ms: start.elapsed().as_millis() as u64,
1071                    },
1072                });
1073
1074                // Async compaction hook — awaited, not fire-and-forget.
1075                if let Some(ref hook) = self.config.on_compaction {
1076                    match hook(compacted_ctx).await {
1077                        Ok(()) => {
1078                            tracing::debug!("Compaction hook completed successfully");
1079                        }
1080                        Err(e) => {
1081                            tracing::warn!(error = %e, "Compaction hook failed");
1082                        }
1083                    }
1084                }
1085            }
1086            Ok(None) => {}
1087            Err(e) => {
1088                emit(AgentEvent::Compaction {
1089                    event: CompactionEvent::Failed {
1090                        error: e.to_string(),
1091                    },
1092                });
1093            }
1094        }
1095    }
1096
1097    fn resolve_model(&self) -> Result<oxi_ai::Model> {
1098        self.resolver
1099            .resolve_model(&self.config.model_id)
1100            .ok_or_else(|| Error::msg(format!("Model not found: {}", self.config.model_id)))
1101    }
1102}
1103
1104#[cfg(test)]
1105mod session_id_wiring_tests {
1106    //! Regression coverage for the #13 fix.
1107    //! `build_tool_context` is private; testing it here keeps the test in the
1108    //! same module so it can reach private surface. We never stream — the
1109    //! nop provider only exists to satisfy `AgentLoop::new_with_resolver`.
1110    use super::*;
1111    use crate::ProviderResolver;
1112    use crate::agent_loop::config::AgentLoopConfig;
1113    use crate::config::ToolExecutionMode;
1114    use crate::state::SharedState;
1115    use crate::tools::ToolRegistry;
1116    use oxi_ai::{
1117        CompactionStrategy, Context, Model, Provider, ProviderError, StreamOptions, StreamResult,
1118    };
1119    use std::future::Future;
1120    use std::pin::Pin;
1121
1122    struct NopProvider;
1123    impl Provider for NopProvider {
1124        fn stream<'a>(
1125            &'a self,
1126            _model: &'a Model,
1127            _context: &'a Context,
1128            _options: Option<StreamOptions>,
1129        ) -> Pin<Box<dyn Future<Output = StreamResult> + Send + 'a>> {
1130            Box::pin(async {
1131                Err(ProviderError::NotImplemented(
1132                    "session-id wiring tests never stream".to_string(),
1133                ))
1134            })
1135        }
1136        fn name(&self) -> &str {
1137            "nop"
1138        }
1139    }
1140
1141    struct NullResolver;
1142    impl ProviderResolver for NullResolver {
1143        fn resolve_provider(&self, _name: &str) -> Option<Arc<dyn Provider>> {
1144            None
1145        }
1146        fn resolve_model(&self, _model_id: &str) -> Option<Model> {
1147            None
1148        }
1149    }
1150
1151    fn loop_with(session_id: Option<String>) -> AgentLoop {
1152        let config = AgentLoopConfig {
1153            model_id: "test/model".to_string(),
1154            system_prompt: None,
1155            temperature: 1.0,
1156            max_tokens: 4096,
1157            tool_execution: ToolExecutionMode::Sequential,
1158            compaction_strategy: CompactionStrategy::Disabled,
1159            compaction_instruction: None,
1160            context_window: 128_000,
1161            session_id,
1162            transport: None,
1163            compact_on_start: false,
1164            max_retry_delay_ms: None,
1165            auto_retry_enabled: true,
1166            auto_retry_max_attempts: 3,
1167            auto_retry_base_delay_ms: 1000,
1168            api_key: None,
1169            workspace_dir: None,
1170            provider_options: None,
1171            on_compaction: None,
1172            snapshot_store: None,
1173            memory: None,
1174            url_resolver: None,
1175            todo: None,
1176            agent_pool: None,
1177            lsp: None,
1178            ttsr_engine: None,
1179            subagent_runner: None,
1180            subagent_depth: 0,
1181            max_tool_result_bytes: None,
1182        };
1183        AgentLoop::new_with_resolver(
1184            Arc::new(NopProvider),
1185            config,
1186            Arc::new(ToolRegistry::new()),
1187            SharedState::new(),
1188            Arc::new(NullResolver),
1189        )
1190    }
1191
1192    /// Regression for defect #13: `AgentLoopConfig.session_id` MUST flow into
1193    /// `ToolContext.session_id`. Before the fix, the field was hardcoded to
1194    /// `None`, so the `issue` tool received an empty caller id and bypassed
1195    /// all ownership/liveness checks (two agents could both `start` the same
1196    /// issue and the last writer silently won).
1197    #[test]
1198    fn tool_context_inherits_session_id_when_set() {
1199        let loop_ = loop_with(Some("proc-test-session-id".to_string()));
1200        let ctx = loop_.build_tool_context();
1201        assert_eq!(
1202            ctx.session_id.as_deref(),
1203            Some("proc-test-session-id"),
1204            "ToolContext.session_id must inherit AgentConfig.session_id"
1205        );
1206    }
1207
1208    #[test]
1209    fn tool_context_session_id_defaults_to_none() {
1210        let loop_ = loop_with(None);
1211        let ctx = loop_.build_tool_context();
1212        assert!(
1213            ctx.session_id.is_none(),
1214            "default ToolContext.session_id should be None"
1215        );
1216    }
1217}
1218
1219// ── Gap 1: tool-result truncation tests (issue #28) ──────────────────
1220
1221#[cfg(test)]
1222mod truncation_tests {
1223    use super::*;
1224    use crate::agent::ProviderResolver;
1225    use oxi_ai::{
1226        ContentBlock, Context, Model, Provider, ProviderError, StreamOptions, StreamResult,
1227        TextContent, ToolResultMessage,
1228    };
1229    use std::future::Future;
1230    use std::pin::Pin;
1231
1232    struct NopProvider;
1233    impl Provider for NopProvider {
1234        fn stream<'a>(
1235            &'a self,
1236            _model: &'a Model,
1237            _context: &'a Context,
1238            _options: Option<StreamOptions>,
1239        ) -> Pin<Box<dyn Future<Output = StreamResult> + Send + 'a>> {
1240            Box::pin(async {
1241                Err(ProviderError::NotImplemented(
1242                    "truncation tests never stream".to_string(),
1243                ))
1244            })
1245        }
1246        fn name(&self) -> &str {
1247            "nop"
1248        }
1249    }
1250
1251    struct NullResolver;
1252    impl ProviderResolver for NullResolver {
1253        fn resolve_provider(&self, _name: &str) -> Option<Arc<dyn Provider>> {
1254            None
1255        }
1256        fn resolve_model(&self, _model_id: &str) -> Option<Model> {
1257            None
1258        }
1259    }
1260
1261    fn make_result(text: &str) -> ToolResultMessage {
1262        ToolResultMessage::new(
1263            "tc_test".to_string(),
1264            "test_tool",
1265            vec![ContentBlock::Text(TextContent::new(text.to_string()))],
1266        )
1267    }
1268
1269    fn loop_with_limit(limit: Option<usize>) -> AgentLoop {
1270        let config = AgentLoopConfig {
1271            model_id: "test/model".to_string(),
1272            max_tool_result_bytes: limit,
1273            ..Default::default()
1274        };
1275        AgentLoop::new_with_resolver(
1276            Arc::new(NopProvider),
1277            config,
1278            Arc::new(ToolRegistry::new()),
1279            SharedState::new(),
1280            Arc::new(NullResolver),
1281        )
1282    }
1283
1284    #[test]
1285    fn truncate_passthrough_when_none() {
1286        let loop_ = loop_with_limit(None);
1287        let result = make_result(&"x".repeat(10_000));
1288        let truncated = loop_.maybe_truncate_tool_result(result);
1289        if let ContentBlock::Text(tc) = &truncated.content[0] {
1290            assert_eq!(tc.text.len(), 10_000);
1291            assert!(!tc.text.contains("truncated"));
1292        }
1293    }
1294
1295    #[test]
1296    fn truncate_passthrough_when_under_limit() {
1297        let loop_ = loop_with_limit(Some(1000));
1298        let result = make_result(&"x".repeat(500));
1299        let truncated = loop_.maybe_truncate_tool_result(result);
1300        if let ContentBlock::Text(tc) = &truncated.content[0] {
1301            assert_eq!(tc.text.len(), 500);
1302            assert!(!tc.text.contains("truncated"));
1303        }
1304    }
1305
1306    #[test]
1307    fn truncate_applies_when_over_limit() {
1308        let loop_ = loop_with_limit(Some(100));
1309        let result = make_result(&"x".repeat(500));
1310        let truncated = loop_.maybe_truncate_tool_result(result);
1311        if let ContentBlock::Text(tc) = &truncated.content[0] {
1312            assert!(
1313                tc.text.len() < 500,
1314                "text not truncated: {} bytes",
1315                tc.text.len()
1316            );
1317            assert!(tc.text.contains("truncated"), "missing truncation marker");
1318            assert!(tc.text.contains("400 bytes omitted"));
1319        }
1320    }
1321}