1use std::collections::HashMap;
24use std::time::Instant;
25
26use futures::StreamExt;
27
28use crate::chat::{ChatMessage, ChatResponse, ContentBlock, StopReason, ToolCall, ToolResult};
29use crate::error::LlmError;
30use crate::provider::{ChatParams, DynProvider};
31use crate::stream::{ChatStream, StreamEvent};
32use crate::usage::Usage;
33
34use super::LoopDepth;
35use super::ToolRegistry;
36use super::approval::approve_calls;
37use super::config::{
38 LoopEvent, StopContext, StopDecision, TerminationReason, ToolLoopConfig, ToolLoopResult,
39};
40use super::execution::execute_with_events;
41use super::loop_detection::{IterationSnapshot, LoopDetectionState, handle_loop_detection};
42use super::loop_resumable::LoopCommand;
43
44pub(crate) enum IterationOutcome {
50 ToolsExecuted {
51 tool_calls: Vec<ToolCall>,
52 results: Vec<ToolResult>,
53 assistant_content: Vec<ContentBlock>,
54 iteration: u32,
55 total_usage: Usage,
56 },
57 Completed(CompletedData),
58 Error(ErrorData),
59}
60
61pub(crate) struct CompletedData {
63 pub response: ChatResponse,
64 pub termination_reason: TerminationReason,
65 pub iterations: u32,
66 pub total_usage: Usage,
67}
68
69pub(crate) struct ErrorData {
71 pub error: LlmError,
72 pub iterations: u32,
73 pub total_usage: Usage,
74}
75
76pub(crate) enum StartOutcome {
83 Stream(ChatStream),
86 Terminal(Box<IterationOutcome>),
89}
90
91pub(crate) struct LoopCore<Ctx: LoopDepth + Send + Sync + 'static> {
110 pub(crate) params: ChatParams,
111 config: ToolLoopConfig,
112 nested_ctx: Ctx,
113 total_usage: Usage,
114 iterations: u32,
115 tool_calls_executed: usize,
116 last_tool_results: Vec<ToolResult>,
117 loop_state: LoopDetectionState,
118 start_time: Instant,
119 finished: bool,
120 pending_command: Option<LoopCommand>,
121 final_result: Option<ToolLoopResult>,
122 depth_error: Option<LlmError>,
123 events: Vec<LoopEvent>,
124 tool_result_meta: Vec<ToolResultMeta>,
127}
128
129struct ToolResultMeta {
131 message_index: usize,
133 iteration: u32,
135 masked: bool,
137}
138
139impl<Ctx: LoopDepth + Send + Sync + 'static> LoopCore<Ctx> {
140 pub(crate) fn new(params: ChatParams, config: ToolLoopConfig, ctx: &Ctx) -> Self {
145 let current_depth = ctx.loop_depth();
146 let depth_error = config.max_depth.and_then(|max_depth| {
147 if current_depth >= max_depth {
148 Some(LlmError::MaxDepthExceeded {
149 current: current_depth,
150 limit: max_depth,
151 })
152 } else {
153 None
154 }
155 });
156
157 let nested_ctx = ctx.with_depth(current_depth + 1);
158
159 Self {
160 params,
161 config,
162 nested_ctx,
163 total_usage: Usage::default(),
164 iterations: 0,
165 tool_calls_executed: 0,
166 last_tool_results: Vec::new(),
167 loop_state: LoopDetectionState::default(),
168 start_time: Instant::now(),
169 finished: false,
170 pending_command: None,
171 final_result: None,
172 depth_error,
173 events: Vec::new(),
174 tool_result_meta: Vec::new(),
175 }
176 }
177
178 pub(crate) async fn start_iteration(&mut self, provider: &dyn DynProvider) -> StartOutcome {
197 if let Some(outcome) = self.check_preconditions() {
199 return StartOutcome::Terminal(Box::new(outcome));
200 }
201
202 self.iterations += 1;
203
204 self.events.push(LoopEvent::IterationStart {
206 iteration: self.iterations,
207 message_count: self.params.messages.len(),
208 });
209
210 if self.iterations > self.config.max_iterations {
212 return StartOutcome::Terminal(Box::new(self.finish(
213 ChatResponse::empty(),
214 TerminationReason::MaxIterations {
215 limit: self.config.max_iterations,
216 },
217 )));
218 }
219
220 self.mask_old_observations();
222
223 match provider.stream_boxed(&self.params).await {
225 Ok(stream) => StartOutcome::Stream(stream),
226 Err(e) => StartOutcome::Terminal(Box::new(self.finish_error(e))),
227 }
228 }
229
230 pub(crate) async fn finish_iteration(
242 &mut self,
243 response: ChatResponse,
244 registry: &ToolRegistry<Ctx>,
245 ) -> IterationOutcome {
246 self.total_usage += &response.usage;
247
248 let call_refs: Vec<&ToolCall> = response.tool_calls();
250 if let Some(outcome) = self.check_termination(&response, &call_refs) {
251 return outcome;
252 }
253
254 self.execute_tools(registry, response).await
256 }
257
258 pub(crate) async fn do_iteration(
265 &mut self,
266 provider: &dyn DynProvider,
267 registry: &ToolRegistry<Ctx>,
268 ) -> IterationOutcome {
269 let stream = match self.start_iteration(provider).await {
270 StartOutcome::Stream(s) => s,
271 StartOutcome::Terminal(outcome) => return *outcome,
272 };
273
274 let response = collect_stream(stream).await;
275 match response {
276 Ok(resp) => self.finish_iteration(resp, registry).await,
277 Err(e) => self.finish_error(e),
278 }
279 }
280
281 pub(crate) fn drain_events(&mut self) -> Vec<LoopEvent> {
285 std::mem::take(&mut self.events)
286 }
287
288 fn check_preconditions(&mut self) -> Option<IterationOutcome> {
291 if let Some(error) = self.depth_error.take() {
293 return Some(self.finish_error(error));
294 }
295
296 if self.finished {
298 return Some(self.make_terminal_outcome());
299 }
300
301 if let Some(command) = self.pending_command.take() {
303 match command {
304 LoopCommand::Continue => {}
305 LoopCommand::InjectMessages(messages) => {
306 self.params.messages.extend(messages);
307 }
308 LoopCommand::Stop(reason) => {
309 return Some(self.finish(
310 ChatResponse::empty(),
311 TerminationReason::StopCondition { reason },
312 ));
313 }
314 }
315 }
316
317 if let Some(limit) = self.config.timeout {
319 if self.start_time.elapsed() >= limit {
320 return Some(
321 self.finish(ChatResponse::empty(), TerminationReason::Timeout { limit }),
322 );
323 }
324 }
325
326 None
327 }
328
329 fn check_termination(
332 &mut self,
333 response: &ChatResponse,
334 call_refs: &[&ToolCall],
335 ) -> Option<IterationOutcome> {
336 if let Some(ref stop_fn) = self.config.stop_when {
338 let ctx = StopContext {
339 iteration: self.iterations,
340 response,
341 total_usage: &self.total_usage,
342 tool_calls_executed: self.tool_calls_executed,
343 last_tool_results: &self.last_tool_results,
344 };
345 match stop_fn(&ctx) {
346 StopDecision::Continue => {}
347 StopDecision::Stop => {
348 return Some(self.finish(
349 response.clone(),
350 TerminationReason::StopCondition { reason: None },
351 ));
352 }
353 StopDecision::StopWithReason(reason) => {
354 return Some(self.finish(
355 response.clone(),
356 TerminationReason::StopCondition {
357 reason: Some(reason),
358 },
359 ));
360 }
361 }
362 }
363
364 if call_refs.is_empty() || response.stop_reason != StopReason::ToolUse {
366 return Some(self.finish(response.clone(), TerminationReason::Complete));
367 }
368
369 if self.iterations > self.config.max_iterations {
372 return Some(self.finish(
373 response.clone(),
374 TerminationReason::MaxIterations {
375 limit: self.config.max_iterations,
376 },
377 ));
378 }
379
380 let snap = IterationSnapshot {
382 response,
383 call_refs,
384 iterations: self.iterations,
385 total_usage: &self.total_usage,
386 config: &self.config,
387 };
388 if let Some(result) = handle_loop_detection(
389 &mut self.loop_state,
390 &snap,
391 &mut self.params.messages,
392 &mut self.events,
393 ) {
394 return Some(self.finish(result.response, result.termination_reason));
395 }
396
397 None
398 }
399
400 async fn execute_tools(
403 &mut self,
404 registry: &ToolRegistry<Ctx>,
405 response: ChatResponse,
406 ) -> IterationOutcome {
407 let (calls, other_content) = response.partition_content();
408
409 let outcome_calls = calls.clone();
412
413 let mut msg_content = other_content.clone();
417 msg_content.extend(calls.iter().map(|c| ContentBlock::ToolCall(c.clone())));
418 self.params.messages.push(ChatMessage {
419 role: crate::chat::ChatRole::Assistant,
420 content: msg_content,
421 });
422
423 let (approved_calls, denied_results) = approve_calls(calls, &self.config);
425 let exec_result = execute_with_events(
426 registry,
427 approved_calls,
428 denied_results,
429 self.config.parallel_tool_execution,
430 &self.nested_ctx,
431 )
432 .await;
433
434 self.events.extend(exec_result.events);
435
436 let mut results = exec_result.results;
437 self.tool_calls_executed += results.len();
438
439 self.postprocess_results(&mut results, &outcome_calls).await;
441
442 self.last_tool_results.clone_from(&results);
443
444 for result in &results {
446 let idx = self.params.messages.len();
447 self.params
448 .messages
449 .push(ChatMessage::tool_result_full(result.clone()));
450 self.tool_result_meta.push(ToolResultMeta {
451 message_index: idx,
452 iteration: self.iterations,
453 masked: false,
454 });
455 }
456
457 IterationOutcome::ToolsExecuted {
458 tool_calls: outcome_calls,
459 results,
460 assistant_content: other_content,
461 iteration: self.iterations,
462 total_usage: self.total_usage.clone(),
463 }
464 }
465
466 async fn postprocess_results(&mut self, results: &mut [ToolResult], calls: &[ToolCall]) {
474 let has_processor = self.config.result_processor.is_some();
475 let has_extractor = self.config.result_extractor.is_some();
476 let has_cacher = self.config.result_cacher.is_some();
477
478 if !has_processor && !has_extractor && !has_cacher {
479 return;
480 }
481
482 let call_id_to_name: HashMap<&str, &str> = calls
484 .iter()
485 .map(|c| (c.id.as_str(), c.name.as_str()))
486 .collect();
487
488 let user_query: String = self
490 .params
491 .messages
492 .iter()
493 .rev()
494 .find_map(|m| {
495 if m.role == crate::chat::ChatRole::User {
496 m.content.iter().find_map(|b| match b {
497 ContentBlock::Text(t) => Some(t.clone()),
498 _ => None,
499 })
500 } else {
501 None
502 }
503 })
504 .unwrap_or_default();
505
506 for result in results.iter_mut() {
507 let tool_name = call_id_to_name
508 .get(result.tool_call_id.as_str())
509 .copied()
510 .unwrap_or("unknown");
511
512 if result.is_error {
513 continue;
514 }
515
516 if let Some(ref processor) = self.config.result_processor {
518 let processed = processor.process(tool_name, &result.content);
519 if processed.was_processed {
520 self.events.push(LoopEvent::ToolResultProcessed {
521 tool_name: tool_name.to_string(),
522 original_tokens: processed.original_tokens_est,
523 processed_tokens: processed.processed_tokens_est,
524 });
525 result.content = processed.content;
526 }
527 }
528
529 if let Some(ref extractor) = self.config.result_extractor {
531 let tokens = crate::context::estimate_tokens(&result.content);
532 if tokens > extractor.extraction_threshold() {
533 if let Some(extracted) = extractor
534 .extract(tool_name, &result.content, &user_query)
535 .await
536 {
537 self.events.push(LoopEvent::ToolResultExtracted {
538 tool_name: tool_name.to_string(),
539 original_tokens: extracted.original_tokens_est,
540 extracted_tokens: extracted.extracted_tokens_est,
541 });
542 result.content = extracted.content;
543 }
544 }
545 }
546
547 if let Some(ref cacher) = self.config.result_cacher {
549 let tokens = crate::context::estimate_tokens(&result.content);
550 if tokens > cacher.inline_threshold() {
551 if let Some(cached) = cacher.cache(tool_name, &result.content) {
552 self.events.push(LoopEvent::ToolResultCached {
553 tool_name: tool_name.to_string(),
554 original_tokens: cached.original_tokens_est,
555 summary_tokens: cached.summary_tokens_est,
556 });
557 result.content = cached.summary;
558 }
559 }
560 }
561 }
562 }
563
564 fn finish(
570 &mut self,
571 response: ChatResponse,
572 termination_reason: TerminationReason,
573 ) -> IterationOutcome {
574 self.finished = true;
575 let usage = self.total_usage.clone();
576 let result = ToolLoopResult {
577 response: response.clone(),
578 iterations: self.iterations,
579 total_usage: usage.clone(),
580 termination_reason: termination_reason.clone(),
581 };
582 self.final_result = Some(result.clone());
583 self.events.push(LoopEvent::Done(result));
584
585 IterationOutcome::Completed(CompletedData {
586 response,
587 termination_reason,
588 iterations: self.iterations,
589 total_usage: usage,
590 })
591 }
592
593 pub(crate) fn finish_error(&mut self, error: LlmError) -> IterationOutcome {
595 self.finished = true;
596 let usage = self.total_usage.clone();
597 self.final_result = Some(ToolLoopResult {
598 response: ChatResponse::empty(),
599 iterations: self.iterations,
600 total_usage: usage.clone(),
601 termination_reason: TerminationReason::Complete,
602 });
603 IterationOutcome::Error(ErrorData {
604 error,
605 iterations: self.iterations,
606 total_usage: usage,
607 })
608 }
609
610 fn make_terminal_outcome(&self) -> IterationOutcome {
612 if let Some(ref result) = self.final_result {
613 IterationOutcome::Completed(CompletedData {
614 response: result.response.clone(),
615 termination_reason: result.termination_reason.clone(),
616 iterations: result.iterations,
617 total_usage: result.total_usage.clone(),
618 })
619 } else {
620 IterationOutcome::Completed(CompletedData {
621 response: ChatResponse::empty(),
622 termination_reason: TerminationReason::Complete,
623 iterations: self.iterations,
624 total_usage: self.total_usage.clone(),
625 })
626 }
627 }
628
629 fn mask_old_observations(&mut self) {
638 let Some(masking_config) = self.config.masking else {
639 return;
640 };
641
642 let force_mask = self
644 .config
645 .force_mask_iterations
646 .as_ref()
647 .and_then(|fm| fm.lock().ok())
648 .map(|set| set.clone());
649
650 let has_force_masks = force_mask.as_ref().is_some_and(|s| !s.is_empty());
653 if !has_force_masks && self.iterations <= masking_config.max_iterations_to_keep {
654 return;
655 }
656
657 let cutoff = self
658 .iterations
659 .saturating_sub(masking_config.max_iterations_to_keep);
660 let mut masked_count: usize = 0;
661 let mut tokens_saved: u32 = 0;
662
663 for meta in &mut self.tool_result_meta {
664 if meta.masked {
665 continue;
666 }
667
668 let is_old = meta.iteration <= cutoff;
670 let is_forced = force_mask
671 .as_ref()
672 .is_some_and(|s| s.contains(&meta.iteration));
673
674 if !is_old && !is_forced {
675 continue;
676 }
677
678 let msg = &self.params.messages[meta.message_index];
679
680 let (tool_call_id, content, is_error) = match msg.content.first() {
682 Some(ContentBlock::ToolResult(tr)) => {
683 (tr.tool_call_id.clone(), &tr.content, tr.is_error)
684 }
685 _ => continue,
686 };
687
688 if is_error {
690 continue;
691 }
692
693 let content_tokens = crate::context::estimate_tokens(content);
694 if content_tokens < masking_config.min_tokens_to_mask {
695 continue;
696 }
697
698 let placeholder = format!(
700 "[Masked — tool result from iteration {iter}, ~{content_tokens} tokens. \
701 Use result_cache tool if available, or re-invoke tool.]",
702 iter = meta.iteration,
703 );
704 let placeholder_tokens = crate::context::estimate_tokens(&placeholder);
705
706 self.params.messages[meta.message_index] = ChatMessage::tool_result_full(ToolResult {
708 tool_call_id,
709 content: placeholder,
710 is_error: false,
711 });
712
713 meta.masked = true;
714 masked_count += 1;
715 tokens_saved += content_tokens.saturating_sub(placeholder_tokens);
716 }
717
718 if masked_count > 0 {
719 self.events.push(LoopEvent::ObservationsMasked {
720 masked_count,
721 tokens_saved,
722 });
723 }
724 }
725
726 pub(crate) fn resume(&mut self, command: LoopCommand) {
730 if !self.finished {
731 self.pending_command = Some(command);
732 }
733 }
734
735 pub(crate) fn messages(&self) -> &[ChatMessage] {
737 &self.params.messages
738 }
739
740 pub(crate) fn messages_mut(&mut self) -> &mut Vec<ChatMessage> {
742 &mut self.params.messages
743 }
744
745 pub(crate) fn total_usage(&self) -> &Usage {
747 &self.total_usage
748 }
749
750 pub(crate) fn iterations(&self) -> u32 {
752 self.iterations
753 }
754
755 pub(crate) fn is_finished(&self) -> bool {
757 self.finished
758 }
759
760 pub(crate) fn into_result(self) -> ToolLoopResult {
762 self.final_result.unwrap_or_else(|| ToolLoopResult {
763 response: ChatResponse::empty(),
764 iterations: self.iterations,
765 total_usage: self.total_usage,
766 termination_reason: TerminationReason::Complete,
767 })
768 }
769}
770
771pub(crate) async fn collect_stream(mut stream: ChatStream) -> Result<ChatResponse, LlmError> {
779 let mut text = String::new();
780 let mut tool_calls: Vec<ToolCall> = Vec::new();
781 let mut usage = Usage::default();
782 let mut stop_reason = StopReason::EndTurn;
783
784 while let Some(event) = stream.next().await {
785 match event? {
786 StreamEvent::TextDelta(t) => text.push_str(&t),
787 StreamEvent::ToolCallComplete { call, .. } => tool_calls.push(call),
788 StreamEvent::Usage(u) => usage += &u,
789 StreamEvent::Done { stop_reason: sr } => stop_reason = sr,
790 _ => {}
792 }
793 }
794
795 let mut content = Vec::new();
796 if !text.is_empty() {
797 content.push(ContentBlock::Text(text));
798 }
799 for call in tool_calls {
800 content.push(ContentBlock::ToolCall(call));
801 }
802
803 Ok(ChatResponse {
804 content,
805 usage,
806 stop_reason,
807 model: String::new(),
808 metadata: HashMap::new(),
809 })
810}
811
812impl ChatMessage {
815 pub fn tool_result_full(result: ToolResult) -> Self {
817 Self {
818 role: crate::chat::ChatRole::Tool,
819 content: vec![ContentBlock::ToolResult(result)],
820 }
821 }
822}
823
824impl<Ctx: LoopDepth + Send + Sync + 'static> std::fmt::Debug for LoopCore<Ctx> {
825 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826 f.debug_struct("LoopCore")
827 .field("iterations", &self.iterations)
828 .field("tool_calls_executed", &self.tool_calls_executed)
829 .field("finished", &self.finished)
830 .field("has_pending_command", &self.pending_command.is_some())
831 .field("buffered_events", &self.events.len())
832 .finish_non_exhaustive()
833 }
834}