1#![allow(unused_doc_comments)]
2
3pub mod config;
11pub mod helpers;
13pub mod queues;
15pub mod retry;
17pub mod streaming;
19pub mod tool_exec;
21
22use crate::agent::ProviderResolver;
24use crate::compaction::{CompactedContext, CompactionEvent};
25use crate::events::AgentEvent;
26use crate::recovery::{CircuitBreaker, CircuitBreakerConfig};
27use crate::{state::SharedState, tools::ToolContext, tools::ToolRegistry};
28use anyhow::{Error, Result};
29pub use config::{AfterToolCallHook, AgentLoopConfig, BeforeToolCallHook, ToolExecutionMode};
30use oxi_ai::{
31 estimate_tokens, CompactionManager as OxCompactionManager, CompactionStrategy, ContentBlock,
32 LlmCompactor, Message, Provider, StopReason, TextContent, UserMessage,
33};
34use parking_lot::RwLock;
35use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
36use std::sync::Arc;
37use std::time::Instant;
38
39use self::helpers::should_stop_after_turn;
40use self::queues::{
41 clear_all_queues, clear_follow_up_queue, clear_steering_queue, drain_follow_up_queue,
42 drain_steering_queue,
43};
44use self::retry::{
45 auto_retry_attempt_method, cancel_auto_retry, handle_retryable_error, is_retryable_error,
46};
47use self::streaming::stream_assistant_response;
48use self::tool_exec::execute_tool_calls;
49
50type EmitFn = Arc<dyn Fn(AgentEvent) + Send + Sync>;
51
52pub struct AgentLoop {
54 provider: Arc<dyn Provider>,
55 config: AgentLoopConfig,
56 tools: Arc<ToolRegistry>,
57 state: SharedState,
58 compaction_manager: OxCompactionManager,
59 before_tool_call: Option<BeforeToolCallHook>,
60 after_tool_call: Option<AfterToolCallHook>,
61 steering_queue: RwLock<Vec<Message>>,
62 follow_up_queue: RwLock<Vec<Message>>,
63 session_id: Option<String>,
64 auto_retry_attempt: AtomicUsize,
65 auto_retry_cancel: AtomicBool,
66 circuit_breaker: CircuitBreaker,
67 external_stop: Arc<AtomicBool>,
70 resolver: Arc<dyn ProviderResolver>,
72}
73
74impl AgentLoop {
75 pub fn new_with_resolver(
78 provider: Arc<dyn Provider>,
79 config: AgentLoopConfig,
80 tools: Arc<ToolRegistry>,
81 state: SharedState,
82 resolver: Arc<dyn ProviderResolver>,
83 ) -> Self {
84 let mut compaction_manager =
85 OxCompactionManager::new(config.compaction_strategy.clone(), config.context_window);
86
87 if config.compaction_strategy != CompactionStrategy::Disabled {
88 let model = resolver.resolve_model(&config.model_id);
89 if let Some(model) = model {
90 let llm_compactor =
91 Arc::new(LlmCompactor::new(model.clone(), Arc::clone(&provider)));
92 compaction_manager.set_compactor(llm_compactor);
93 }
94 }
95
96 Self {
97 provider,
98 config: config.clone(),
99 tools,
100 state,
101 compaction_manager,
102 before_tool_call: None,
103 after_tool_call: None,
104 steering_queue: RwLock::new(Vec::new()),
105 follow_up_queue: RwLock::new(Vec::new()),
106 session_id: config.session_id.clone(),
107 auto_retry_attempt: AtomicUsize::new(0),
108 auto_retry_cancel: AtomicBool::new(false),
109 circuit_breaker: CircuitBreaker::new(CircuitBreakerConfig::default()),
110 external_stop: Arc::new(AtomicBool::new(false)),
111 resolver,
112 }
113 }
114
115 pub fn new(
117 provider: Arc<dyn Provider>,
118 config: AgentLoopConfig,
119 tools: Arc<ToolRegistry>,
120 state: SharedState,
121 ) -> Self {
122 use crate::agent::GlobalProviderResolver;
123 Self::new_with_resolver(
124 provider,
125 config,
126 tools,
127 state,
128 Arc::new(GlobalProviderResolver),
129 )
130 }
131
132 pub fn with_before_tool_call(mut self, hook: BeforeToolCallHook) -> Self {
134 self.before_tool_call = Some(hook);
135 self
136 }
137
138 pub fn with_after_tool_call(mut self, hook: AfterToolCallHook) -> Self {
140 self.after_tool_call = Some(hook);
141 self
142 }
143
144 pub fn steer(&self, message: Message) {
146 self.steering_queue.write().push(message);
147 }
148
149 pub fn follow_up(&self, message: Message) {
151 self.follow_up_queue.write().push(message);
152 }
153
154 pub fn clear_steering_queue(&self) {
156 clear_steering_queue(self);
157 }
158
159 pub fn clear_follow_up_queue(&self) {
161 clear_follow_up_queue(self);
162 }
163
164 pub fn clear_all_queues(&self) {
166 clear_all_queues(self);
167 }
168
169 fn drain_steering_queue(&self) -> Vec<Message> {
170 drain_steering_queue(self)
171 }
172
173 fn build_tool_context(&self) -> ToolContext {
176 let workspace = self
177 .config
178 .workspace_dir
179 .clone()
180 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
181 ToolContext {
182 workspace_dir: workspace,
183 root_dir: self.config.workspace_dir.clone(),
184 session_id: self.session_id.clone(),
185 }
186 }
187
188 fn drain_follow_up_queue(&self) -> Vec<Message> {
189 drain_follow_up_queue(self)
190 }
191
192 pub fn cancel_auto_retry(&self) {
194 cancel_auto_retry(self);
195 }
196
197 pub fn auto_retry_attempt(&self) -> usize {
199 auto_retry_attempt_method(self)
200 }
201
202 pub fn state(&self) -> &SharedState {
205 &self.state
206 }
207
208 pub fn external_stop(&self) -> &Arc<AtomicBool> {
210 &self.external_stop
211 }
212
213 pub async fn run(
215 &self,
216 prompt: String,
217 emit: impl Fn(AgentEvent) + Send + Sync + 'static,
218 ) -> Result<Vec<AgentEvent>> {
219 let message = Message::User(UserMessage::new(prompt));
220 let emit = Arc::new(emit);
221 self.run_messages(vec![message], emit).await
222 }
223
224 pub async fn run_messages(
226 &self,
227 prompts: Vec<Message>,
228 emit: EmitFn,
229 ) -> Result<Vec<AgentEvent>> {
230 let mut all_events = Vec::new();
231
232 let state_messages = self.state.get_state().messages.clone();
233 let mut all_messages = state_messages;
234 all_messages.extend(prompts.clone());
235
236 tracing::info!(session_id = ?self.session_id, "AgentLoop starting");
237 emit(AgentEvent::AgentStart {
238 prompts: prompts.clone(),
239 session_id: self.session_id.clone(),
240 });
241 all_events.push(AgentEvent::AgentStart {
242 prompts: prompts.clone(),
243 session_id: self.session_id.clone(),
244 });
245
246 let (result_messages, events) = self.run_loop(prompts, emit.clone()).await?;
247
248 all_events.extend(events);
249
250 let stop_reason = result_messages.last().and_then(|m| {
251 if let Message::Assistant(a) = m {
252 Some(format!("{:?}", a.stop_reason))
253 } else {
254 None
255 }
256 });
257
258 tracing::info!(session_id = ?self.session_id, "AgentLoop run_messages complete");
259
260 self.state.update(|s| {
262 s.replace_messages(result_messages.clone());
263 });
264
265 emit(AgentEvent::AgentEnd {
266 messages: result_messages.clone(),
267 stop_reason: stop_reason.clone(),
268 session_id: self.session_id.clone(),
269 });
270 all_events.push(AgentEvent::AgentEnd {
271 messages: result_messages.clone(),
272 stop_reason,
273 session_id: self.session_id.clone(),
274 });
275
276 Ok(all_events)
277 }
278
279 pub async fn continue_loop(
281 &self,
282 emit: impl Fn(AgentEvent) + Send + Sync + 'static,
283 ) -> Result<Vec<AgentEvent>> {
284 let emit = Arc::new(emit);
285 let mut all_events = Vec::new();
286
287 tracing::info!(session_id = ?self.session_id, "AgentLoop continuing");
288 emit(AgentEvent::AgentStart {
289 prompts: vec![],
290 session_id: self.session_id.clone(),
291 });
292 all_events.push(AgentEvent::AgentStart {
293 prompts: vec![],
294 session_id: self.session_id.clone(),
295 });
296
297 let (result_messages, events) = self.run_loop(vec![], emit.clone()).await?;
298
299 all_events.extend(events);
300
301 let stop_reason = result_messages.last().and_then(|m| {
302 if let Message::Assistant(a) = m {
303 Some(format!("{:?}", a.stop_reason))
304 } else {
305 None
306 }
307 });
308
309 tracing::info!(session_id = ?self.session_id, "AgentLoop continue_loop complete");
310 emit(AgentEvent::AgentEnd {
311 messages: result_messages.clone(),
312 stop_reason: stop_reason.clone(),
313 session_id: self.session_id.clone(),
314 });
315 all_events.push(AgentEvent::AgentEnd {
316 messages: result_messages.clone(),
317 stop_reason,
318 session_id: self.session_id.clone(),
319 });
320
321 Ok(all_events)
322 }
323
324 fn process_steering_messages(
326 &self,
327 pending_messages: &mut Vec<Message>,
328 messages: &mut Vec<Message>,
329 new_messages: &mut Vec<Message>,
330 events: &mut Vec<AgentEvent>,
331 emit: &EmitFn,
332 ) {
333 if pending_messages.is_empty() {
334 return;
335 }
336 for message in pending_messages.drain(..) {
337 emit(AgentEvent::SteeringMessage {
338 message: message.clone(),
339 });
340 emit(AgentEvent::MessageStart {
341 message: message.clone(),
342 });
343 emit(AgentEvent::MessageEnd {
344 message: message.clone(),
345 });
346 events.push(AgentEvent::SteeringMessage {
347 message: message.clone(),
348 });
349 events.push(AgentEvent::MessageStart {
350 message: message.clone(),
351 });
352 events.push(AgentEvent::MessageEnd {
353 message: message.clone(),
354 });
355 messages.push(message.clone());
356 new_messages.push(message);
357 }
358 }
359
360 async fn handle_streaming_error(
362 &self,
363 e: anyhow::Error,
364 messages: &mut Vec<Message>,
365 new_messages: &mut Vec<Message>,
366 events: &mut Vec<AgentEvent>,
367 emit: &EmitFn,
368 turn_number: u32,
369 ) -> (Vec<Message>, Vec<AgentEvent>) {
370 let err_msg = format!("{}", e);
371 tracing::error!(session_id = ?self.session_id, "Unexpected streaming error: {}", err_msg);
372
373 let mut error_asst = oxi_ai::AssistantMessage::new(
374 oxi_ai::Api::OpenAiCompletions,
375 "agent",
376 &self.config.model_id,
377 );
378 error_asst.stop_reason = StopReason::Error;
379 error_asst
380 .content
381 .push(ContentBlock::Text(TextContent::new(format!(
382 "⚠ {}",
383 err_msg
384 ))));
385
386 new_messages.push(Message::Assistant(error_asst.clone()));
387 messages.push(Message::Assistant(error_asst.clone()));
388
389 emit(AgentEvent::MessageStart {
390 message: Message::Assistant(error_asst.clone()),
391 });
392 emit(AgentEvent::MessageEnd {
393 message: Message::Assistant(error_asst.clone()),
394 });
395 emit(AgentEvent::Error {
396 message: err_msg.clone(),
397 session_id: self.session_id.clone(),
398 });
399
400 emit(AgentEvent::TurnEnd {
401 turn_number,
402 assistant_message: Message::Assistant(error_asst.clone()),
403 tool_results: vec![],
404 });
405 events.push(AgentEvent::TurnEnd {
406 turn_number,
407 assistant_message: Message::Assistant(error_asst),
408 tool_results: vec![],
409 });
410 (messages.clone(), events.clone())
412 }
413
414 async fn run_loop(
415 &self,
416 initial_prompts: Vec<Message>,
417 emit: EmitFn,
418 ) -> Result<(Vec<Message>, Vec<AgentEvent>)> {
419 tracing::info!("[AGENT-LOOP] run_loop started");
420 let mut messages = self.state.get_state().messages.clone();
421 messages.extend(initial_prompts.clone());
422
423 let mut new_messages: Vec<Message> = initial_prompts;
424 let mut events = Vec::new();
425 let mut turn_number: u32 = 0;
426 let mut first_turn = true;
427
428 let mut pending_messages: Vec<Message> = self.drain_steering_queue();
429
430 loop {
431 tracing::info!(
432 "[AGENT-LOOP] Top of loop, has_more_tool_calls={}, pending_messages={}",
433 true,
434 pending_messages.is_empty()
435 );
436 let mut has_more_tool_calls = true;
437
438 while has_more_tool_calls || !pending_messages.is_empty() {
439 if !first_turn {
440 turn_number += 1;
441 emit(AgentEvent::TurnStart { turn_number });
442 events.push(AgentEvent::TurnStart { turn_number });
443 } else {
444 first_turn = false;
445 turn_number = 1;
446 emit(AgentEvent::TurnStart { turn_number });
447 events.push(AgentEvent::TurnStart { turn_number });
448 }
449
450 if !pending_messages.is_empty() {
451 self.process_steering_messages(
452 &mut pending_messages,
453 &mut messages,
454 &mut new_messages,
455 &mut events,
456 &emit,
457 );
458 }
459
460 self.maybe_compact(&mut messages, turn_number as usize, &emit)
461 .await;
462
463 tracing::info!("[AGENT-LOOP] About to call stream_assistant_response");
464 let assistant_message =
465 match stream_assistant_response(self, &mut messages, &emit).await {
466 Ok(msg) => msg,
467 Err(e) => {
468 return Ok(self
469 .handle_streaming_error(
470 e,
471 &mut messages,
472 &mut new_messages,
473 &mut events,
474 &emit,
475 turn_number,
476 )
477 .await);
478 }
479 };
480
481 new_messages.push(Message::Assistant(assistant_message.clone()));
482
483 if matches!(assistant_message.stop_reason, StopReason::Error) {
484 if is_retryable_error(&assistant_message) {
485 let did_retry =
486 handle_retryable_error(self, &assistant_message, &mut messages, &emit)
487 .await;
488 if did_retry {
489 emit(AgentEvent::TurnEnd {
490 turn_number,
491 assistant_message: Message::Assistant(assistant_message.clone()),
492 tool_results: vec![],
493 });
494 events.push(AgentEvent::TurnEnd {
495 turn_number,
496 assistant_message: Message::Assistant(assistant_message.clone()),
497 tool_results: vec![],
498 });
499 has_more_tool_calls = true;
500 continue;
501 }
502 }
503
504 emit(AgentEvent::TurnEnd {
505 turn_number,
506 assistant_message: Message::Assistant(assistant_message.clone()),
507 tool_results: vec![],
508 });
509 events.push(AgentEvent::TurnEnd {
510 turn_number,
511 assistant_message: Message::Assistant(assistant_message.clone()),
512 tool_results: vec![],
513 });
514 return Ok((messages, events));
515 }
516 if matches!(assistant_message.stop_reason, StopReason::Aborted) {
517 if self.auto_retry_attempt.load(Ordering::Relaxed) > 0 {
518 emit(AgentEvent::AutoRetryEnd {
519 success: true,
520 attempt: self.auto_retry_attempt.load(Ordering::Relaxed),
521 final_error: None,
522 });
523 self.auto_retry_attempt.store(0, Ordering::Relaxed);
524 }
525
526 emit(AgentEvent::TurnEnd {
527 turn_number,
528 assistant_message: Message::Assistant(assistant_message.clone()),
529 tool_results: vec![],
530 });
531 events.push(AgentEvent::TurnEnd {
532 turn_number,
533 assistant_message: Message::Assistant(assistant_message.clone()),
534 tool_results: vec![],
535 });
536 return Ok((messages, events));
537 }
538
539 if self.auto_retry_attempt.load(Ordering::Relaxed) > 0 {
540 emit(AgentEvent::AutoRetryEnd {
541 success: true,
542 attempt: self.auto_retry_attempt.load(Ordering::Relaxed),
543 final_error: None,
544 });
545 self.auto_retry_attempt.store(0, Ordering::Relaxed);
546 }
547
548 let tool_calls = helpers::extract_tool_calls(&assistant_message);
549 tracing::info!(
550 "[AGENT-LOOP] extract_tool_calls found {} calls, stop_reason={:?}",
551 tool_calls.len(),
552 assistant_message.stop_reason
553 );
554
555 let mut tool_results: Vec<oxi_ai::ToolResultMessage> = Vec::new();
556 has_more_tool_calls = false;
557
558 if !tool_calls.is_empty() {
559 tracing::info!("[AGENT-LOOP] Executing {} tool calls", tool_calls.len());
560 let ctx = self.build_tool_context();
561 let executed_batch = match execute_tool_calls(
562 self,
563 &mut messages,
564 &assistant_message,
565 tool_calls,
566 &emit,
567 &ctx,
568 )
569 .await
570 {
571 Ok(batch) => batch,
572 Err(e) => {
573 tracing::error!(session_id = ?self.session_id, "Tool execution error: {}", e);
576 emit(AgentEvent::Error {
577 message: format!("Tool execution error: {}", e),
578 session_id: self.session_id.clone(),
579 });
580 emit(AgentEvent::TurnEnd {
581 turn_number,
582 assistant_message: Message::Assistant(assistant_message.clone()),
583 tool_results: vec![],
584 });
585 events.push(AgentEvent::TurnEnd {
586 turn_number,
587 assistant_message: Message::Assistant(assistant_message.clone()),
588 tool_results: vec![],
589 });
590 return Ok((messages, events));
591 }
592 };
593
594 tool_results = executed_batch.messages;
595 has_more_tool_calls = !executed_batch.terminate;
596
597 for result in &tool_results {
598 messages.push(Message::ToolResult(result.clone()));
599 new_messages.push(Message::ToolResult(result.clone()));
600 }
601 }
602
603 emit(AgentEvent::TurnEnd {
604 turn_number,
605 assistant_message: Message::Assistant(assistant_message.clone()),
606 tool_results: tool_results.clone(),
607 });
608 events.push(AgentEvent::TurnEnd {
609 turn_number,
610 assistant_message: Message::Assistant(assistant_message.clone()),
611 tool_results: tool_results.clone(),
612 });
613
614 if should_stop_after_turn(
615 &messages,
616 &assistant_message,
617 self.config.max_iterations,
618 &self.external_stop,
619 turn_number as usize,
620 ) {
621 tracing::info!("[AGENT-LOOP] should_stop_after_turn=true, ending loop");
622 return Ok((messages, events));
623 }
624
625 pending_messages = self.drain_steering_queue();
626 tracing::info!(
627 "[AGENT-LOOP] TurnEnd complete, pending_messages={}, has_more_tool_calls={}",
628 !pending_messages.is_empty(),
629 has_more_tool_calls
630 );
631 }
632
633 let follow_up_messages = self.drain_follow_up_queue();
634 if !follow_up_messages.is_empty() {
635 pending_messages = follow_up_messages;
636 continue;
637 }
638
639 break;
640 }
641
642 Ok((messages, events))
643 }
644
645 async fn maybe_compact(&self, messages: &mut Vec<Message>, iteration: usize, emit: &EmitFn) {
646 let context_text = serde_json::to_string(&*messages).unwrap_or_default();
647 let context_tokens = estimate_tokens(&context_text);
648
649 if !self
650 .compaction_manager
651 .should_compact(context_tokens, iteration)
652 {
653 return;
654 }
655
656 emit(AgentEvent::Compaction {
657 event: CompactionEvent::Triggered {
658 context_tokens,
659 iteration,
660 },
661 });
662
663 let messages_to_compact: Vec<Message> = messages.to_vec();
664 let instruction = self.config.compaction_instruction.as_deref();
665
666 match self
667 .compaction_manager
668 .compact_if_needed(&messages_to_compact, instruction, context_tokens, iteration)
669 .await
670 {
671 Ok(Some(compacted)) => {
672 let start = Instant::now();
673 let message_count = compacted.compacted_count;
674
675 emit(AgentEvent::Compaction {
676 event: CompactionEvent::Started { message_count },
677 });
678
679 let kept_messages = compacted.kept_messages;
680 let summary = compacted.summary;
681 let compacted_count = compacted.compacted_count;
682
683 *messages = kept_messages;
684
685 let state_msgs = messages.clone();
686 self.state.update(|s| {
687 s.replace_messages(state_msgs);
688 });
689
690 let compacted_ctx = CompactedContext {
691 summary,
692 kept_messages: Vec::new(),
693 compacted_count,
694 };
695 emit(AgentEvent::Compaction {
696 event: CompactionEvent::Completed {
697 result: compacted_ctx,
698 duration_ms: start.elapsed().as_millis() as u64,
699 },
700 });
701 }
702 Ok(None) => {}
703 Err(e) => {
704 emit(AgentEvent::Compaction {
705 event: CompactionEvent::Failed {
706 error: e.to_string(),
707 },
708 });
709 }
710 }
711 }
712
713 fn resolve_model(&self) -> Result<oxi_ai::Model> {
714 self.resolver
715 .resolve_model(&self.config.model_id)
716 .ok_or_else(|| Error::msg(format!("Model not found: {}", self.config.model_id)))
717 }
718}