Skip to main content

oxi_agent/agent_loop/
mod.rs

1#![allow(unused_doc_comments)]
2
3//! Agent loop — the main request/response cycle driver.
4//!
5//! Coordinates the interaction between the agent, provider, tools, and
6//! state management. Handles streaming, tool execution, retry logic,
7//! and compaction events.
8
9/// Agent-loop configuration.
10pub mod config;
11/// Miscellaneous helper functions.
12pub mod helpers;
13/// Internal message/event queues.
14pub mod queues;
15/// Retry logic for the agent loop.
16pub mod retry;
17/// Streaming response handling.
18pub mod streaming;
19/// Tool execution strategies.
20pub mod tool_exec;
21
22// Re-export for sibling module access
23use crate::agent::ProviderResolver;
24use crate::compaction::{CompactedContext, CompactionEvent};
25use crate::events::AgentEvent;
26use crate::recovery::{CircuitBreaker, CircuitBreakerConfig};
27use crate::{state::SharedState, tools::ToolContext, tools::ToolRegistry};
28use anyhow::{Error, Result};
29pub use config::{AfterToolCallHook, AgentLoopConfig, BeforeToolCallHook, ToolExecutionMode};
30use oxi_ai::{
31    estimate_tokens, CompactionManager as OxCompactionManager, CompactionStrategy, ContentBlock,
32    LlmCompactor, Message, Provider, StopReason, TextContent, UserMessage,
33};
34use parking_lot::RwLock;
35use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
36use std::sync::Arc;
37use std::time::Instant;
38
39use self::helpers::should_stop_after_turn;
40use self::queues::{
41    clear_all_queues, clear_follow_up_queue, clear_steering_queue, drain_follow_up_queue,
42    drain_steering_queue,
43};
44use self::retry::{
45    auto_retry_attempt_method, cancel_auto_retry, handle_retryable_error, is_retryable_error,
46};
47use self::streaming::stream_assistant_response;
48use self::tool_exec::execute_tool_calls;
49
50type EmitFn = Arc<dyn Fn(AgentEvent) + Send + Sync>;
51
52/// AgentLoop.
53pub struct AgentLoop {
54    provider: Arc<dyn Provider>,
55    config: AgentLoopConfig,
56    tools: Arc<ToolRegistry>,
57    state: SharedState,
58    compaction_manager: OxCompactionManager,
59    before_tool_call: Option<BeforeToolCallHook>,
60    after_tool_call: Option<AfterToolCallHook>,
61    steering_queue: RwLock<Vec<Message>>,
62    follow_up_queue: RwLock<Vec<Message>>,
63    session_id: Option<String>,
64    auto_retry_attempt: AtomicUsize,
65    auto_retry_cancel: AtomicBool,
66    circuit_breaker: CircuitBreaker,
67    /// External stop flag — when set, should_stop_after_turn returns true.
68    /// Used by Agent to forward the should_stop_flag from AgentHooks.
69    external_stop: Arc<AtomicBool>,
70    /// Provider/model resolver for isolated model lookups.
71    resolver: Arc<dyn ProviderResolver>,
72}
73
74impl AgentLoop {
75    /// TODO.
76    /// Create a new AgentLoop with an explicit resolver.
77    pub fn new_with_resolver(
78        provider: Arc<dyn Provider>,
79        config: AgentLoopConfig,
80        tools: Arc<ToolRegistry>,
81        state: SharedState,
82        resolver: Arc<dyn ProviderResolver>,
83    ) -> Self {
84        let mut compaction_manager =
85            OxCompactionManager::new(config.compaction_strategy.clone(), config.context_window);
86
87        if config.compaction_strategy != CompactionStrategy::Disabled {
88            let model = resolver.resolve_model(&config.model_id);
89            if let Some(model) = model {
90                let llm_compactor =
91                    Arc::new(LlmCompactor::new(model.clone(), Arc::clone(&provider)));
92                compaction_manager.set_compactor(llm_compactor);
93            }
94        }
95
96        Self {
97            provider,
98            config: config.clone(),
99            tools,
100            state,
101            compaction_manager,
102            before_tool_call: None,
103            after_tool_call: None,
104            steering_queue: RwLock::new(Vec::new()),
105            follow_up_queue: RwLock::new(Vec::new()),
106            session_id: config.session_id.clone(),
107            auto_retry_attempt: AtomicUsize::new(0),
108            auto_retry_cancel: AtomicBool::new(false),
109            circuit_breaker: CircuitBreaker::new(CircuitBreakerConfig::default()),
110            external_stop: Arc::new(AtomicBool::new(false)),
111            resolver,
112        }
113    }
114
115    /// Create a new AgentLoop using the global resolver (backward compat).
116    pub fn new(
117        provider: Arc<dyn Provider>,
118        config: AgentLoopConfig,
119        tools: Arc<ToolRegistry>,
120        state: SharedState,
121    ) -> Self {
122        use crate::agent::GlobalProviderResolver;
123        Self::new_with_resolver(
124            provider,
125            config,
126            tools,
127            state,
128            Arc::new(GlobalProviderResolver),
129        )
130    }
131
132    /// TODO: document this function.
133    pub fn with_before_tool_call(mut self, hook: BeforeToolCallHook) -> Self {
134        self.before_tool_call = Some(hook);
135        self
136    }
137
138    /// TODO: document this function.
139    pub fn with_after_tool_call(mut self, hook: AfterToolCallHook) -> Self {
140        self.after_tool_call = Some(hook);
141        self
142    }
143
144    /// TODO: document this function.
145    pub fn steer(&self, message: Message) {
146        self.steering_queue.write().push(message);
147    }
148
149    /// TODO: document this function.
150    pub fn follow_up(&self, message: Message) {
151        self.follow_up_queue.write().push(message);
152    }
153
154    /// TODO: document this function.
155    pub fn clear_steering_queue(&self) {
156        clear_steering_queue(self);
157    }
158
159    /// TODO: document this function.
160    pub fn clear_follow_up_queue(&self) {
161        clear_follow_up_queue(self);
162    }
163
164    /// TODO: document this function.
165    pub fn clear_all_queues(&self) {
166        clear_all_queues(self);
167    }
168
169    fn drain_steering_queue(&self) -> Vec<Message> {
170        drain_steering_queue(self)
171    }
172
173    /// Build a ToolContext from the agent loop config.
174    /// Uses workspace_dir from config if set, otherwise falls back to current directory.
175    fn build_tool_context(&self) -> ToolContext {
176        let workspace = self
177            .config
178            .workspace_dir
179            .clone()
180            .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
181        ToolContext {
182            workspace_dir: workspace,
183            root_dir: self.config.workspace_dir.clone(),
184            session_id: self.session_id.clone(),
185        }
186    }
187
188    fn drain_follow_up_queue(&self) -> Vec<Message> {
189        drain_follow_up_queue(self)
190    }
191
192    /// TODO: document this function.
193    pub fn cancel_auto_retry(&self) {
194        cancel_auto_retry(self);
195    }
196
197    /// TODO: document this function.
198    pub fn auto_retry_attempt(&self) -> usize {
199        auto_retry_attempt_method(self)
200    }
201
202    /// Get a reference to the shared state.
203    /// Used by Agent to sync state after loop execution.
204    pub fn state(&self) -> &SharedState {
205        &self.state
206    }
207
208    /// Get the external stop flag.
209    pub fn external_stop(&self) -> &Arc<AtomicBool> {
210        &self.external_stop
211    }
212
213    /// TODO: document this function.
214    pub async fn run(
215        &self,
216        prompt: String,
217        emit: impl Fn(AgentEvent) + Send + Sync + 'static,
218    ) -> Result<Vec<AgentEvent>> {
219        let message = Message::User(UserMessage::new(prompt));
220        let emit = Arc::new(emit);
221        self.run_messages(vec![message], emit).await
222    }
223
224    /// TODO: document this function.
225    pub async fn run_messages(
226        &self,
227        prompts: Vec<Message>,
228        emit: EmitFn,
229    ) -> Result<Vec<AgentEvent>> {
230        let mut all_events = Vec::new();
231
232        let state_messages = self.state.get_state().messages.clone();
233        let mut all_messages = state_messages;
234        all_messages.extend(prompts.clone());
235
236        tracing::info!(session_id = ?self.session_id, "AgentLoop starting");
237        emit(AgentEvent::AgentStart {
238            prompts: prompts.clone(),
239            session_id: self.session_id.clone(),
240        });
241        all_events.push(AgentEvent::AgentStart {
242            prompts: prompts.clone(),
243            session_id: self.session_id.clone(),
244        });
245
246        let (result_messages, events) = self.run_loop(prompts, emit.clone()).await?;
247
248        all_events.extend(events);
249
250        let stop_reason = result_messages.last().and_then(|m| {
251            if let Message::Assistant(a) = m {
252                Some(format!("{:?}", a.stop_reason))
253            } else {
254                None
255            }
256        });
257
258        tracing::info!(session_id = ?self.session_id, "AgentLoop run_messages complete");
259
260        // Sync messages back to shared state
261        self.state.update(|s| {
262            s.replace_messages(result_messages.clone());
263        });
264
265        emit(AgentEvent::AgentEnd {
266            messages: result_messages.clone(),
267            stop_reason: stop_reason.clone(),
268            session_id: self.session_id.clone(),
269        });
270        all_events.push(AgentEvent::AgentEnd {
271            messages: result_messages.clone(),
272            stop_reason,
273            session_id: self.session_id.clone(),
274        });
275
276        Ok(all_events)
277    }
278
279    /// TODO: document this function.
280    pub async fn continue_loop(
281        &self,
282        emit: impl Fn(AgentEvent) + Send + Sync + 'static,
283    ) -> Result<Vec<AgentEvent>> {
284        let emit = Arc::new(emit);
285        let mut all_events = Vec::new();
286
287        tracing::info!(session_id = ?self.session_id, "AgentLoop continuing");
288        emit(AgentEvent::AgentStart {
289            prompts: vec![],
290            session_id: self.session_id.clone(),
291        });
292        all_events.push(AgentEvent::AgentStart {
293            prompts: vec![],
294            session_id: self.session_id.clone(),
295        });
296
297        let (result_messages, events) = self.run_loop(vec![], emit.clone()).await?;
298
299        all_events.extend(events);
300
301        let stop_reason = result_messages.last().and_then(|m| {
302            if let Message::Assistant(a) = m {
303                Some(format!("{:?}", a.stop_reason))
304            } else {
305                None
306            }
307        });
308
309        tracing::info!(session_id = ?self.session_id, "AgentLoop continue_loop complete");
310        emit(AgentEvent::AgentEnd {
311            messages: result_messages.clone(),
312            stop_reason: stop_reason.clone(),
313            session_id: self.session_id.clone(),
314        });
315        all_events.push(AgentEvent::AgentEnd {
316            messages: result_messages.clone(),
317            stop_reason,
318            session_id: self.session_id.clone(),
319        });
320
321        Ok(all_events)
322    }
323
324    /// Process pending steering messages, emitting events and appending to message history.
325    fn process_steering_messages(
326        &self,
327        pending_messages: &mut Vec<Message>,
328        messages: &mut Vec<Message>,
329        new_messages: &mut Vec<Message>,
330        events: &mut Vec<AgentEvent>,
331        emit: &EmitFn,
332    ) {
333        if pending_messages.is_empty() {
334            return;
335        }
336        for message in pending_messages.drain(..) {
337            emit(AgentEvent::SteeringMessage {
338                message: message.clone(),
339            });
340            emit(AgentEvent::MessageStart {
341                message: message.clone(),
342            });
343            emit(AgentEvent::MessageEnd {
344                message: message.clone(),
345            });
346            events.push(AgentEvent::SteeringMessage {
347                message: message.clone(),
348            });
349            events.push(AgentEvent::MessageStart {
350                message: message.clone(),
351            });
352            events.push(AgentEvent::MessageEnd {
353                message: message.clone(),
354            });
355            messages.push(message.clone());
356            new_messages.push(message);
357        }
358    }
359
360    /// Handle a streaming error by synthesizing an error message and completing the turn.
361    async fn handle_streaming_error(
362        &self,
363        e: anyhow::Error,
364        messages: &mut Vec<Message>,
365        new_messages: &mut Vec<Message>,
366        events: &mut Vec<AgentEvent>,
367        emit: &EmitFn,
368        turn_number: u32,
369    ) -> (Vec<Message>, Vec<AgentEvent>) {
370        let err_msg = format!("{}", e);
371        tracing::error!(session_id = ?self.session_id, "Unexpected streaming error: {}", err_msg);
372
373        let mut error_asst = oxi_ai::AssistantMessage::new(
374            oxi_ai::Api::OpenAiCompletions,
375            "agent",
376            &self.config.model_id,
377        );
378        error_asst.stop_reason = StopReason::Error;
379        error_asst
380            .content
381            .push(ContentBlock::Text(TextContent::new(format!(
382                "⚠ {}",
383                err_msg
384            ))));
385
386        new_messages.push(Message::Assistant(error_asst.clone()));
387        messages.push(Message::Assistant(error_asst.clone()));
388
389        emit(AgentEvent::MessageStart {
390            message: Message::Assistant(error_asst.clone()),
391        });
392        emit(AgentEvent::MessageEnd {
393            message: Message::Assistant(error_asst.clone()),
394        });
395        emit(AgentEvent::Error {
396            message: err_msg.clone(),
397            session_id: self.session_id.clone(),
398        });
399
400        emit(AgentEvent::TurnEnd {
401            turn_number,
402            assistant_message: Message::Assistant(error_asst.clone()),
403            tool_results: vec![],
404        });
405        events.push(AgentEvent::TurnEnd {
406            turn_number,
407            assistant_message: Message::Assistant(error_asst),
408            tool_results: vec![],
409        });
410        // Return Ok — lifecycle is complete
411        (messages.clone(), events.clone())
412    }
413
414    async fn run_loop(
415        &self,
416        initial_prompts: Vec<Message>,
417        emit: EmitFn,
418    ) -> Result<(Vec<Message>, Vec<AgentEvent>)> {
419        tracing::info!("[AGENT-LOOP] run_loop started");
420        let mut messages = self.state.get_state().messages.clone();
421        messages.extend(initial_prompts.clone());
422
423        let mut new_messages: Vec<Message> = initial_prompts;
424        let mut events = Vec::new();
425        let mut turn_number: u32 = 0;
426        let mut first_turn = true;
427
428        let mut pending_messages: Vec<Message> = self.drain_steering_queue();
429
430        loop {
431            tracing::info!(
432                "[AGENT-LOOP] Top of loop, has_more_tool_calls={}, pending_messages={}",
433                true,
434                pending_messages.is_empty()
435            );
436            let mut has_more_tool_calls = true;
437
438            while has_more_tool_calls || !pending_messages.is_empty() {
439                if !first_turn {
440                    turn_number += 1;
441                    emit(AgentEvent::TurnStart { turn_number });
442                    events.push(AgentEvent::TurnStart { turn_number });
443                } else {
444                    first_turn = false;
445                    turn_number = 1;
446                    emit(AgentEvent::TurnStart { turn_number });
447                    events.push(AgentEvent::TurnStart { turn_number });
448                }
449
450                if !pending_messages.is_empty() {
451                    self.process_steering_messages(
452                        &mut pending_messages,
453                        &mut messages,
454                        &mut new_messages,
455                        &mut events,
456                        &emit,
457                    );
458                }
459
460                self.maybe_compact(&mut messages, turn_number as usize, &emit)
461                    .await;
462
463                tracing::info!("[AGENT-LOOP] About to call stream_assistant_response");
464                let assistant_message =
465                    match stream_assistant_response(self, &mut messages, &emit).await {
466                        Ok(msg) => msg,
467                        Err(e) => {
468                            return Ok(self
469                                .handle_streaming_error(
470                                    e,
471                                    &mut messages,
472                                    &mut new_messages,
473                                    &mut events,
474                                    &emit,
475                                    turn_number,
476                                )
477                                .await);
478                        }
479                    };
480
481                new_messages.push(Message::Assistant(assistant_message.clone()));
482
483                if matches!(assistant_message.stop_reason, StopReason::Error) {
484                    if is_retryable_error(&assistant_message) {
485                        let did_retry =
486                            handle_retryable_error(self, &assistant_message, &mut messages, &emit)
487                                .await;
488                        if did_retry {
489                            emit(AgentEvent::TurnEnd {
490                                turn_number,
491                                assistant_message: Message::Assistant(assistant_message.clone()),
492                                tool_results: vec![],
493                            });
494                            events.push(AgentEvent::TurnEnd {
495                                turn_number,
496                                assistant_message: Message::Assistant(assistant_message.clone()),
497                                tool_results: vec![],
498                            });
499                            has_more_tool_calls = true;
500                            continue;
501                        }
502                    }
503
504                    emit(AgentEvent::TurnEnd {
505                        turn_number,
506                        assistant_message: Message::Assistant(assistant_message.clone()),
507                        tool_results: vec![],
508                    });
509                    events.push(AgentEvent::TurnEnd {
510                        turn_number,
511                        assistant_message: Message::Assistant(assistant_message.clone()),
512                        tool_results: vec![],
513                    });
514                    return Ok((messages, events));
515                }
516                if matches!(assistant_message.stop_reason, StopReason::Aborted) {
517                    if self.auto_retry_attempt.load(Ordering::Relaxed) > 0 {
518                        emit(AgentEvent::AutoRetryEnd {
519                            success: true,
520                            attempt: self.auto_retry_attempt.load(Ordering::Relaxed),
521                            final_error: None,
522                        });
523                        self.auto_retry_attempt.store(0, Ordering::Relaxed);
524                    }
525
526                    emit(AgentEvent::TurnEnd {
527                        turn_number,
528                        assistant_message: Message::Assistant(assistant_message.clone()),
529                        tool_results: vec![],
530                    });
531                    events.push(AgentEvent::TurnEnd {
532                        turn_number,
533                        assistant_message: Message::Assistant(assistant_message.clone()),
534                        tool_results: vec![],
535                    });
536                    return Ok((messages, events));
537                }
538
539                if self.auto_retry_attempt.load(Ordering::Relaxed) > 0 {
540                    emit(AgentEvent::AutoRetryEnd {
541                        success: true,
542                        attempt: self.auto_retry_attempt.load(Ordering::Relaxed),
543                        final_error: None,
544                    });
545                    self.auto_retry_attempt.store(0, Ordering::Relaxed);
546                }
547
548                let tool_calls = helpers::extract_tool_calls(&assistant_message);
549                tracing::info!(
550                    "[AGENT-LOOP] extract_tool_calls found {} calls, stop_reason={:?}",
551                    tool_calls.len(),
552                    assistant_message.stop_reason
553                );
554
555                let mut tool_results: Vec<oxi_ai::ToolResultMessage> = Vec::new();
556                has_more_tool_calls = false;
557
558                if !tool_calls.is_empty() {
559                    tracing::info!("[AGENT-LOOP] Executing {} tool calls", tool_calls.len());
560                    let ctx = self.build_tool_context();
561                    let executed_batch = match execute_tool_calls(
562                        self,
563                        &mut messages,
564                        &assistant_message,
565                        tool_calls,
566                        &emit,
567                        &ctx,
568                    )
569                    .await
570                    {
571                        Ok(batch) => batch,
572                        Err(e) => {
573                            // Tool execution failed — emit TurnEnd and return Ok.
574                            // The lifecycle must always complete.
575                            tracing::error!(session_id = ?self.session_id, "Tool execution error: {}", e);
576                            emit(AgentEvent::Error {
577                                message: format!("Tool execution error: {}", e),
578                                session_id: self.session_id.clone(),
579                            });
580                            emit(AgentEvent::TurnEnd {
581                                turn_number,
582                                assistant_message: Message::Assistant(assistant_message.clone()),
583                                tool_results: vec![],
584                            });
585                            events.push(AgentEvent::TurnEnd {
586                                turn_number,
587                                assistant_message: Message::Assistant(assistant_message.clone()),
588                                tool_results: vec![],
589                            });
590                            return Ok((messages, events));
591                        }
592                    };
593
594                    tool_results = executed_batch.messages;
595                    has_more_tool_calls = !executed_batch.terminate;
596
597                    for result in &tool_results {
598                        messages.push(Message::ToolResult(result.clone()));
599                        new_messages.push(Message::ToolResult(result.clone()));
600                    }
601                }
602
603                emit(AgentEvent::TurnEnd {
604                    turn_number,
605                    assistant_message: Message::Assistant(assistant_message.clone()),
606                    tool_results: tool_results.clone(),
607                });
608                events.push(AgentEvent::TurnEnd {
609                    turn_number,
610                    assistant_message: Message::Assistant(assistant_message.clone()),
611                    tool_results: tool_results.clone(),
612                });
613
614                if should_stop_after_turn(
615                    &messages,
616                    &assistant_message,
617                    self.config.max_iterations,
618                    &self.external_stop,
619                    turn_number as usize,
620                ) {
621                    tracing::info!("[AGENT-LOOP] should_stop_after_turn=true, ending loop");
622                    return Ok((messages, events));
623                }
624
625                pending_messages = self.drain_steering_queue();
626                tracing::info!(
627                    "[AGENT-LOOP] TurnEnd complete, pending_messages={}, has_more_tool_calls={}",
628                    !pending_messages.is_empty(),
629                    has_more_tool_calls
630                );
631            }
632
633            let follow_up_messages = self.drain_follow_up_queue();
634            if !follow_up_messages.is_empty() {
635                pending_messages = follow_up_messages;
636                continue;
637            }
638
639            break;
640        }
641
642        Ok((messages, events))
643    }
644
645    async fn maybe_compact(&self, messages: &mut Vec<Message>, iteration: usize, emit: &EmitFn) {
646        let context_text = serde_json::to_string(&*messages).unwrap_or_default();
647        let context_tokens = estimate_tokens(&context_text);
648
649        if !self
650            .compaction_manager
651            .should_compact(context_tokens, iteration)
652        {
653            return;
654        }
655
656        emit(AgentEvent::Compaction {
657            event: CompactionEvent::Triggered {
658                context_tokens,
659                iteration,
660            },
661        });
662
663        let messages_to_compact: Vec<Message> = messages.to_vec();
664        let instruction = self.config.compaction_instruction.as_deref();
665
666        match self
667            .compaction_manager
668            .compact_if_needed(&messages_to_compact, instruction, context_tokens, iteration)
669            .await
670        {
671            Ok(Some(compacted)) => {
672                let start = Instant::now();
673                let message_count = compacted.compacted_count;
674
675                emit(AgentEvent::Compaction {
676                    event: CompactionEvent::Started { message_count },
677                });
678
679                let kept_messages = compacted.kept_messages;
680                let summary = compacted.summary;
681                let compacted_count = compacted.compacted_count;
682
683                *messages = kept_messages;
684
685                let state_msgs = messages.clone();
686                self.state.update(|s| {
687                    s.replace_messages(state_msgs);
688                });
689
690                let compacted_ctx = CompactedContext {
691                    summary,
692                    kept_messages: Vec::new(),
693                    compacted_count,
694                };
695                emit(AgentEvent::Compaction {
696                    event: CompactionEvent::Completed {
697                        result: compacted_ctx,
698                        duration_ms: start.elapsed().as_millis() as u64,
699                    },
700                });
701            }
702            Ok(None) => {}
703            Err(e) => {
704                emit(AgentEvent::Compaction {
705                    event: CompactionEvent::Failed {
706                        error: e.to_string(),
707                    },
708                });
709            }
710        }
711    }
712
713    fn resolve_model(&self) -> Result<oxi_ai::Model> {
714        self.resolver
715            .resolve_model(&self.config.model_id)
716            .ok_or_else(|| Error::msg(format!("Model not found: {}", self.config.model_id)))
717    }
718}