Skip to main content

llm_stack/tool/
loop_core.rs

1//! Shared iteration engine for all tool loop variants.
2//!
3//! `LoopCore<Ctx>` holds all mutable state for a tool loop and provides the
4//! iteration methods. The caller provides `&dyn DynProvider` and
5//! `&ToolRegistry<Ctx>` for each operation — `LoopCore` has no opinion on
6//! ownership (borrowed vs `Arc`).
7//!
8//! # Streaming-first design
9//!
10//! The fundamental LLM call uses `stream_boxed()`. Non-streaming callers
11//! collect the stream internally via [`do_iteration`](LoopCore::do_iteration).
12//! Streaming callers use [`start_iteration`](LoopCore::start_iteration) to
13//! obtain the raw `ChatStream`, drive it themselves (yielding events to the
14//! caller), then hand the accumulated response back via
15//! [`finish_iteration`](LoopCore::finish_iteration).
16//!
17//! # Event buffer
18//!
19//! `LoopCore` accumulates [`LoopEvent`]s in an internal buffer as it performs
20//! each phase. Callers drain the buffer via [`drain_events`](LoopCore::drain_events)
21//! to forward events to consumers. This replaces the old `on_event` callback.
22
23use std::collections::HashMap;
24use std::time::Instant;
25
26use futures::StreamExt;
27
28use crate::chat::{ChatMessage, ChatResponse, ContentBlock, StopReason, ToolCall, ToolResult};
29use crate::error::LlmError;
30use crate::provider::{ChatParams, DynProvider};
31use crate::stream::{ChatStream, StreamEvent};
32use crate::usage::Usage;
33
34use super::LoopDepth;
35use super::ToolRegistry;
36use super::approval::approve_calls;
37use super::config::{
38    LoopEvent, StopContext, StopDecision, TerminationReason, ToolLoopConfig, ToolLoopResult,
39};
40use super::execution::execute_with_events;
41use super::loop_detection::{IterationSnapshot, LoopDetectionState, handle_loop_detection};
42use super::loop_resumable::LoopCommand;
43
44// ── IterationOutcome ────────────────────────────────────────────────
45
46/// Intermediate result from `do_iteration` / `finish_iteration`. Holds
47/// owned data (no borrows on the core) so the caller can construct its
48/// own `TurnResult` with a fresh `&mut self` afterwards.
49pub(crate) enum IterationOutcome {
50    ToolsExecuted {
51        tool_calls: Vec<ToolCall>,
52        results: Vec<ToolResult>,
53        assistant_content: Vec<ContentBlock>,
54        iteration: u32,
55        total_usage: Usage,
56    },
57    Completed(CompletedData),
58    Error(ErrorData),
59}
60
61/// Owned data for a completed iteration.
62pub(crate) struct CompletedData {
63    pub response: ChatResponse,
64    pub termination_reason: TerminationReason,
65    pub iterations: u32,
66    pub total_usage: Usage,
67}
68
69/// Owned data for a failed iteration.
70pub(crate) struct ErrorData {
71    pub error: LlmError,
72    pub iterations: u32,
73    pub total_usage: Usage,
74}
75
76// ── StartOutcome ────────────────────────────────────────────────────
77
78/// Result of [`LoopCore::start_iteration`].
79///
80/// Either the LLM stream is ready to be consumed, or the iteration ended
81/// early (precondition failure, max iterations, timeout, etc.).
82pub(crate) enum StartOutcome {
83    /// The LLM stream is ready. The caller should drive it to completion,
84    /// accumulate the response, then call `finish_iteration`.
85    Stream(ChatStream),
86    /// The iteration ended before the LLM was called (depth error,
87    /// already finished, timeout, stop command, max iterations).
88    Terminal(Box<IterationOutcome>),
89}
90
91// ── LoopCore ────────────────────────────────────────────────────────
92
93/// Shared mutable state for all tool loop variants.
94///
95/// Contains the iteration logic used by all loop implementations.
96/// Methods that need the LLM provider or tool registry accept them by
97/// reference, so this struct works with both borrowed and Arc-owned
98/// callers.
99///
100/// # Two-phase iteration
101///
102/// 1. [`start_iteration`](Self::start_iteration) — pre-checks, opens the
103///    LLM stream via `stream_boxed()`.
104/// 2. [`finish_iteration`](Self::finish_iteration) — given the accumulated
105///    `ChatResponse`, runs termination checks and tool execution.
106///
107/// For callers that don't need streaming, [`do_iteration`](Self::do_iteration)
108/// composes both phases, collecting the stream internally.
109pub(crate) struct LoopCore<Ctx: LoopDepth + Send + Sync + 'static> {
110    pub(crate) params: ChatParams,
111    config: ToolLoopConfig,
112    nested_ctx: Ctx,
113    total_usage: Usage,
114    iterations: u32,
115    tool_calls_executed: usize,
116    last_tool_results: Vec<ToolResult>,
117    loop_state: LoopDetectionState,
118    start_time: Instant,
119    finished: bool,
120    pending_command: Option<LoopCommand>,
121    final_result: Option<ToolLoopResult>,
122    depth_error: Option<LlmError>,
123    events: Vec<LoopEvent>,
124    /// Maps message index → iteration number for tool result messages.
125    /// Used by observation masking to determine which results are old.
126    tool_result_meta: Vec<ToolResultMeta>,
127}
128
129/// Metadata for a tool result message in the conversation.
130struct ToolResultMeta {
131    /// Index of this message in `params.messages`.
132    message_index: usize,
133    /// Iteration in which this tool result was added.
134    iteration: u32,
135    /// Whether this result has already been masked.
136    masked: bool,
137}
138
139impl<Ctx: LoopDepth + Send + Sync + 'static> LoopCore<Ctx> {
140    /// Create a new `LoopCore` with the given params, config, and context.
141    ///
142    /// If the context's depth already exceeds `config.max_depth`, the depth
143    /// error is stored and returned on the first iteration call.
144    pub(crate) fn new(params: ChatParams, config: ToolLoopConfig, ctx: &Ctx) -> Self {
145        let current_depth = ctx.loop_depth();
146        let depth_error = config.max_depth.and_then(|max_depth| {
147            if current_depth >= max_depth {
148                Some(LlmError::MaxDepthExceeded {
149                    current: current_depth,
150                    limit: max_depth,
151                })
152            } else {
153                None
154            }
155        });
156
157        let nested_ctx = ctx.with_depth(current_depth + 1);
158
159        Self {
160            params,
161            config,
162            nested_ctx,
163            total_usage: Usage::default(),
164            iterations: 0,
165            tool_calls_executed: 0,
166            last_tool_results: Vec::new(),
167            loop_state: LoopDetectionState::default(),
168            start_time: Instant::now(),
169            finished: false,
170            pending_command: None,
171            final_result: None,
172            depth_error,
173            events: Vec::new(),
174            tool_result_meta: Vec::new(),
175        }
176    }
177
178    // ── Two-phase iteration (streaming-first) ────────────────────
179
180    /// Phase 1: Pre-checks and LLM stream creation.
181    ///
182    /// Runs precondition guards (depth, finished, pending command, timeout,
183    /// max iterations), then calls `provider.stream_boxed()` to start the
184    /// LLM generation.
185    ///
186    /// Returns `StartOutcome::Stream` with the raw `ChatStream` if the LLM
187    /// call succeeded, or `StartOutcome::Terminal` if the iteration ended
188    /// early.
189    ///
190    /// Pushes `LoopEvent::IterationStart` into the event buffer on success.
191    ///
192    /// After obtaining the stream, the caller should:
193    /// 1. Consume all `StreamEvent`s (optionally forwarding them)
194    /// 2. Accumulate text, tool calls, and usage into a `ChatResponse`
195    /// 3. Call [`finish_iteration`](Self::finish_iteration) with the result
196    pub(crate) async fn start_iteration(&mut self, provider: &dyn DynProvider) -> StartOutcome {
197        // Phase 1: Pre-iteration guards
198        if let Some(outcome) = self.check_preconditions() {
199            return StartOutcome::Terminal(Box::new(outcome));
200        }
201
202        self.iterations += 1;
203
204        // Push iteration start event
205        self.events.push(LoopEvent::IterationStart {
206            iteration: self.iterations,
207            message_count: self.params.messages.len(),
208        });
209
210        // Max iterations check (after increment, before LLM call)
211        if self.iterations > self.config.max_iterations {
212            return StartOutcome::Terminal(Box::new(self.finish(
213                ChatResponse::empty(),
214                TerminationReason::MaxIterations {
215                    limit: self.config.max_iterations,
216                },
217            )));
218        }
219
220        // Observation masking: replace old tool results with placeholders
221        self.mask_old_observations();
222
223        // Start LLM stream
224        match provider.stream_boxed(&self.params).await {
225            Ok(stream) => StartOutcome::Stream(stream),
226            Err(e) => StartOutcome::Terminal(Box::new(self.finish_error(e))),
227        }
228    }
229
230    /// Phase 2: Post-stream termination checks and tool execution.
231    ///
232    /// Called after the caller has fully consumed the `ChatStream` from
233    /// `start_iteration` and assembled the accumulated `ChatResponse`.
234    ///
235    /// Runs: usage accounting → stop condition → natural completion →
236    /// loop detection → tool execution.
237    ///
238    /// Tool execution events (`ToolExecutionStart`, `ToolExecutionEnd`)
239    /// are pushed into the event buffer. Terminal outcomes push
240    /// `LoopEvent::Done`.
241    pub(crate) async fn finish_iteration(
242        &mut self,
243        response: ChatResponse,
244        registry: &ToolRegistry<Ctx>,
245    ) -> IterationOutcome {
246        self.total_usage += &response.usage;
247
248        // Termination checks
249        let call_refs: Vec<&ToolCall> = response.tool_calls();
250        if let Some(outcome) = self.check_termination(&response, &call_refs) {
251            return outcome;
252        }
253
254        // Execute tools
255        self.execute_tools(registry, response).await
256    }
257
258    // ── Composed iteration (non-streaming) ───────────────────────
259
260    /// Perform one full iteration: start → collect stream → finish.
261    ///
262    /// This is the non-streaming path. It calls `start_iteration`, collects
263    /// the entire stream into a `ChatResponse`, then calls `finish_iteration`.
264    pub(crate) async fn do_iteration(
265        &mut self,
266        provider: &dyn DynProvider,
267        registry: &ToolRegistry<Ctx>,
268    ) -> IterationOutcome {
269        let stream = match self.start_iteration(provider).await {
270            StartOutcome::Stream(s) => s,
271            StartOutcome::Terminal(outcome) => return *outcome,
272        };
273
274        let response = collect_stream(stream).await;
275        match response {
276            Ok(resp) => self.finish_iteration(resp, registry).await,
277            Err(e) => self.finish_error(e),
278        }
279    }
280
281    // ── Event buffer ────────────────────────────────────────────
282
283    /// Drain all buffered events, returning them and clearing the buffer.
284    pub(crate) fn drain_events(&mut self) -> Vec<LoopEvent> {
285        std::mem::take(&mut self.events)
286    }
287
288    // ── Pre-iteration guards ────────────────────────────────────
289
290    fn check_preconditions(&mut self) -> Option<IterationOutcome> {
291        // Depth error deferred from new()
292        if let Some(error) = self.depth_error.take() {
293            return Some(self.finish_error(error));
294        }
295
296        // Already finished — return cached terminal event
297        if self.finished {
298            return Some(self.make_terminal_outcome());
299        }
300
301        // Apply pending command from previous resume() call
302        if let Some(command) = self.pending_command.take() {
303            match command {
304                LoopCommand::Continue => {}
305                LoopCommand::InjectMessages(messages) => {
306                    self.params.messages.extend(messages);
307                }
308                LoopCommand::Stop(reason) => {
309                    return Some(self.finish(
310                        ChatResponse::empty(),
311                        TerminationReason::StopCondition { reason },
312                    ));
313                }
314            }
315        }
316
317        // Timeout
318        if let Some(limit) = self.config.timeout {
319            if self.start_time.elapsed() >= limit {
320                return Some(
321                    self.finish(ChatResponse::empty(), TerminationReason::Timeout { limit }),
322                );
323            }
324        }
325
326        None
327    }
328
329    // ── Post-response termination checks ────────────────────────
330
331    fn check_termination(
332        &mut self,
333        response: &ChatResponse,
334        call_refs: &[&ToolCall],
335    ) -> Option<IterationOutcome> {
336        // Custom stop condition
337        if let Some(ref stop_fn) = self.config.stop_when {
338            let ctx = StopContext {
339                iteration: self.iterations,
340                response,
341                total_usage: &self.total_usage,
342                tool_calls_executed: self.tool_calls_executed,
343                last_tool_results: &self.last_tool_results,
344            };
345            match stop_fn(&ctx) {
346                StopDecision::Continue => {}
347                StopDecision::Stop => {
348                    return Some(self.finish(
349                        response.clone(),
350                        TerminationReason::StopCondition { reason: None },
351                    ));
352                }
353                StopDecision::StopWithReason(reason) => {
354                    return Some(self.finish(
355                        response.clone(),
356                        TerminationReason::StopCondition {
357                            reason: Some(reason),
358                        },
359                    ));
360                }
361            }
362        }
363
364        // Natural completion (no tool calls)
365        if call_refs.is_empty() || response.stop_reason != StopReason::ToolUse {
366            return Some(self.finish(response.clone(), TerminationReason::Complete));
367        }
368
369        // Max iterations (second check — covers edge cases where start_iteration
370        // incremented but the check was at the boundary)
371        if self.iterations > self.config.max_iterations {
372            return Some(self.finish(
373                response.clone(),
374                TerminationReason::MaxIterations {
375                    limit: self.config.max_iterations,
376                },
377            ));
378        }
379
380        // Loop detection
381        let snap = IterationSnapshot {
382            response,
383            call_refs,
384            iterations: self.iterations,
385            total_usage: &self.total_usage,
386            config: &self.config,
387        };
388        if let Some(result) = handle_loop_detection(
389            &mut self.loop_state,
390            &snap,
391            &mut self.params.messages,
392            &mut self.events,
393        ) {
394            return Some(self.finish(result.response, result.termination_reason));
395        }
396
397        None
398    }
399
400    // ── Tool execution ──────────────────────────────────────────
401
402    async fn execute_tools(
403        &mut self,
404        registry: &ToolRegistry<Ctx>,
405        response: ChatResponse,
406    ) -> IterationOutcome {
407        let (calls, other_content) = response.partition_content();
408
409        // Clone calls once: we need them for the outcome AND the approval pipeline.
410        // The assistant message is built from refs to avoid a second clone.
411        let outcome_calls = calls.clone();
412
413        // Build the assistant message with text + tool-call blocks.
414        // Anthropic (and others) require tool_use blocks in the assistant message
415        // so that subsequent tool_result messages can reference them by ID.
416        let mut msg_content = other_content.clone();
417        msg_content.extend(calls.iter().map(|c| ContentBlock::ToolCall(c.clone())));
418        self.params.messages.push(ChatMessage {
419            role: crate::chat::ChatRole::Assistant,
420            content: msg_content,
421        });
422
423        // Approve and execute tools (consumes owned calls)
424        let (approved_calls, denied_results) = approve_calls(calls, &self.config);
425        let exec_result = execute_with_events(
426            registry,
427            approved_calls,
428            denied_results,
429            self.config.parallel_tool_execution,
430            &self.nested_ctx,
431        )
432        .await;
433
434        self.events.extend(exec_result.events);
435
436        let mut results = exec_result.results;
437        self.tool_calls_executed += results.len();
438
439        // Post-process, extract, and optionally cache tool results
440        self.postprocess_results(&mut results, &outcome_calls).await;
441
442        self.last_tool_results.clone_from(&results);
443
444        // Append tool results to conversation and record metadata for masking
445        for result in &results {
446            let idx = self.params.messages.len();
447            self.params
448                .messages
449                .push(ChatMessage::tool_result_full(result.clone()));
450            self.tool_result_meta.push(ToolResultMeta {
451                message_index: idx,
452                iteration: self.iterations,
453                masked: false,
454            });
455        }
456
457        IterationOutcome::ToolsExecuted {
458            tool_calls: outcome_calls,
459            results,
460            assistant_content: other_content,
461            iteration: self.iterations,
462            total_usage: self.total_usage.clone(),
463        }
464    }
465
466    // ── Result post-processing pipeline ────────────────────────
467
468    /// Three-stage post-processing pipeline for tool results.
469    ///
470    /// 1. **Structural pruning** — sync `ToolResultProcessor`
471    /// 2. **Semantic extraction** — async `ToolResultExtractor` (for large results)
472    /// 3. **Cache overflow** — sync `ToolResultCacher` (for still-large results)
473    async fn postprocess_results(&mut self, results: &mut [ToolResult], calls: &[ToolCall]) {
474        let has_processor = self.config.result_processor.is_some();
475        let has_extractor = self.config.result_extractor.is_some();
476        let has_cacher = self.config.result_cacher.is_some();
477
478        if !has_processor && !has_extractor && !has_cacher {
479            return;
480        }
481
482        // Build call_id → tool_name lookup from the original calls
483        let call_id_to_name: HashMap<&str, &str> = calls
484            .iter()
485            .map(|c| (c.id.as_str(), c.name.as_str()))
486            .collect();
487
488        // Extract the last user message for relevance-guided extraction
489        let user_query: String = self
490            .params
491            .messages
492            .iter()
493            .rev()
494            .find_map(|m| {
495                if m.role == crate::chat::ChatRole::User {
496                    m.content.iter().find_map(|b| match b {
497                        ContentBlock::Text(t) => Some(t.clone()),
498                        _ => None,
499                    })
500                } else {
501                    None
502                }
503            })
504            .unwrap_or_default();
505
506        for result in results.iter_mut() {
507            let tool_name = call_id_to_name
508                .get(result.tool_call_id.as_str())
509                .copied()
510                .unwrap_or("unknown");
511
512            if result.is_error {
513                continue;
514            }
515
516            // Stage 1: structural pruning via processor
517            if let Some(ref processor) = self.config.result_processor {
518                let processed = processor.process(tool_name, &result.content);
519                if processed.was_processed {
520                    self.events.push(LoopEvent::ToolResultProcessed {
521                        tool_name: tool_name.to_string(),
522                        original_tokens: processed.original_tokens_est,
523                        processed_tokens: processed.processed_tokens_est,
524                    });
525                    result.content = processed.content;
526                }
527            }
528
529            // Stage 2: semantic extraction via async extractor
530            if let Some(ref extractor) = self.config.result_extractor {
531                let tokens = crate::context::estimate_tokens(&result.content);
532                if tokens > extractor.extraction_threshold() {
533                    if let Some(extracted) = extractor
534                        .extract(tool_name, &result.content, &user_query)
535                        .await
536                    {
537                        self.events.push(LoopEvent::ToolResultExtracted {
538                            tool_name: tool_name.to_string(),
539                            original_tokens: extracted.original_tokens_est,
540                            extracted_tokens: extracted.extracted_tokens_est,
541                        });
542                        result.content = extracted.content;
543                    }
544                }
545            }
546
547            // Stage 3: cache overflow — store externally if still too large
548            if let Some(ref cacher) = self.config.result_cacher {
549                let tokens = crate::context::estimate_tokens(&result.content);
550                if tokens > cacher.inline_threshold() {
551                    if let Some(cached) = cacher.cache(tool_name, &result.content) {
552                        self.events.push(LoopEvent::ToolResultCached {
553                            tool_name: tool_name.to_string(),
554                            original_tokens: cached.original_tokens_est,
555                            summary_tokens: cached.summary_tokens_est,
556                        });
557                        result.content = cached.summary;
558                    }
559                }
560            }
561        }
562    }
563
564    // ── Terminal outcome helpers ─────────────────────────────────
565
566    /// Mark the loop as finished and return a `Completed` outcome.
567    ///
568    /// Also pushes `LoopEvent::Done` into the event buffer.
569    fn finish(
570        &mut self,
571        response: ChatResponse,
572        termination_reason: TerminationReason,
573    ) -> IterationOutcome {
574        self.finished = true;
575        let usage = self.total_usage.clone();
576        let result = ToolLoopResult {
577            response: response.clone(),
578            iterations: self.iterations,
579            total_usage: usage.clone(),
580            termination_reason: termination_reason.clone(),
581        };
582        self.final_result = Some(result.clone());
583        self.events.push(LoopEvent::Done(result));
584
585        IterationOutcome::Completed(CompletedData {
586            response,
587            termination_reason,
588            iterations: self.iterations,
589            total_usage: usage,
590        })
591    }
592
593    /// Mark the loop as finished and return an `Error` outcome.
594    pub(crate) fn finish_error(&mut self, error: LlmError) -> IterationOutcome {
595        self.finished = true;
596        let usage = self.total_usage.clone();
597        self.final_result = Some(ToolLoopResult {
598            response: ChatResponse::empty(),
599            iterations: self.iterations,
600            total_usage: usage.clone(),
601            termination_reason: TerminationReason::Complete,
602        });
603        IterationOutcome::Error(ErrorData {
604            error,
605            iterations: self.iterations,
606            total_usage: usage,
607        })
608    }
609
610    /// Build a terminal outcome from cached state (for repeated calls after finish).
611    fn make_terminal_outcome(&self) -> IterationOutcome {
612        if let Some(ref result) = self.final_result {
613            IterationOutcome::Completed(CompletedData {
614                response: result.response.clone(),
615                termination_reason: result.termination_reason.clone(),
616                iterations: result.iterations,
617                total_usage: result.total_usage.clone(),
618            })
619        } else {
620            IterationOutcome::Completed(CompletedData {
621                response: ChatResponse::empty(),
622                termination_reason: TerminationReason::Complete,
623                iterations: self.iterations,
624                total_usage: self.total_usage.clone(),
625            })
626        }
627    }
628
629    // ── Observation masking ──────────────────────────────────────
630
631    /// Replace old tool results with compact placeholders.
632    ///
633    /// Scans `tool_result_meta` for results from iterations older than
634    /// `config.masking.max_iterations_to_keep` and larger than
635    /// `config.masking.min_tokens_to_mask`, replacing their content
636    /// with a one-line placeholder.
637    fn mask_old_observations(&mut self) {
638        let Some(masking_config) = self.config.masking else {
639            return;
640        };
641
642        // Collect force-mask iterations (agent-directed via context_release)
643        let force_mask = self
644            .config
645            .force_mask_iterations
646            .as_ref()
647            .and_then(|fm| fm.lock().ok())
648            .map(|set| set.clone());
649
650        // Only mask starting from iteration 3+ (need enough history),
651        // unless there are force-masked iterations to process
652        let has_force_masks = force_mask.as_ref().is_some_and(|s| !s.is_empty());
653        if !has_force_masks && self.iterations <= masking_config.max_iterations_to_keep {
654            return;
655        }
656
657        let cutoff = self
658            .iterations
659            .saturating_sub(masking_config.max_iterations_to_keep);
660        let mut masked_count: usize = 0;
661        let mut tokens_saved: u32 = 0;
662
663        for meta in &mut self.tool_result_meta {
664            if meta.masked {
665                continue;
666            }
667
668            // Check: age-based OR force-masked
669            let is_old = meta.iteration <= cutoff;
670            let is_forced = force_mask
671                .as_ref()
672                .is_some_and(|s| s.contains(&meta.iteration));
673
674            if !is_old && !is_forced {
675                continue;
676            }
677
678            let msg = &self.params.messages[meta.message_index];
679
680            // Extract the tool result content and estimate tokens
681            let (tool_call_id, content, is_error) = match msg.content.first() {
682                Some(ContentBlock::ToolResult(tr)) => {
683                    (tr.tool_call_id.clone(), &tr.content, tr.is_error)
684                }
685                _ => continue,
686            };
687
688            // Don't mask errors (they're usually small and informative)
689            if is_error {
690                continue;
691            }
692
693            let content_tokens = crate::context::estimate_tokens(content);
694            if content_tokens < masking_config.min_tokens_to_mask {
695                continue;
696            }
697
698            // Build placeholder
699            let placeholder = format!(
700                "[Masked — tool result from iteration {iter}, ~{content_tokens} tokens. \
701                 Use result_cache tool if available, or re-invoke tool.]",
702                iter = meta.iteration,
703            );
704            let placeholder_tokens = crate::context::estimate_tokens(&placeholder);
705
706            // Replace the message content
707            self.params.messages[meta.message_index] = ChatMessage::tool_result_full(ToolResult {
708                tool_call_id,
709                content: placeholder,
710                is_error: false,
711            });
712
713            meta.masked = true;
714            masked_count += 1;
715            tokens_saved += content_tokens.saturating_sub(placeholder_tokens);
716        }
717
718        if masked_count > 0 {
719            self.events.push(LoopEvent::ObservationsMasked {
720                masked_count,
721                tokens_saved,
722            });
723        }
724    }
725
726    // ── Accessors ───────────────────────────────────────────────
727
728    /// Set a pending command for the next iteration.
729    pub(crate) fn resume(&mut self, command: LoopCommand) {
730        if !self.finished {
731            self.pending_command = Some(command);
732        }
733    }
734
735    /// Read-only access to conversation messages.
736    pub(crate) fn messages(&self) -> &[ChatMessage] {
737        &self.params.messages
738    }
739
740    /// Mutable access to conversation messages.
741    pub(crate) fn messages_mut(&mut self) -> &mut Vec<ChatMessage> {
742        &mut self.params.messages
743    }
744
745    /// Accumulated usage across all iterations.
746    pub(crate) fn total_usage(&self) -> &Usage {
747        &self.total_usage
748    }
749
750    /// Current iteration count.
751    pub(crate) fn iterations(&self) -> u32 {
752        self.iterations
753    }
754
755    /// Whether the loop has finished.
756    pub(crate) fn is_finished(&self) -> bool {
757        self.finished
758    }
759
760    /// Consume the core and return a `ToolLoopResult`.
761    pub(crate) fn into_result(self) -> ToolLoopResult {
762        self.final_result.unwrap_or_else(|| ToolLoopResult {
763            response: ChatResponse::empty(),
764            iterations: self.iterations,
765            total_usage: self.total_usage,
766            termination_reason: TerminationReason::Complete,
767        })
768    }
769}
770
771// ── Stream collector ────────────────────────────────────────────────
772
773/// Collect a `ChatStream` into a `ChatResponse`.
774///
775/// Consumes all events from the stream, accumulating text, tool calls,
776/// and usage into a single response. Used by `do_iteration` to bridge
777/// the streaming-first core to non-streaming callers.
778pub(crate) async fn collect_stream(mut stream: ChatStream) -> Result<ChatResponse, LlmError> {
779    let mut text = String::new();
780    let mut tool_calls: Vec<ToolCall> = Vec::new();
781    let mut usage = Usage::default();
782    let mut stop_reason = StopReason::EndTurn;
783
784    while let Some(event) = stream.next().await {
785        match event? {
786            StreamEvent::TextDelta(t) => text.push_str(&t),
787            StreamEvent::ToolCallComplete { call, .. } => tool_calls.push(call),
788            StreamEvent::Usage(u) => usage += &u,
789            StreamEvent::Done { stop_reason: sr } => stop_reason = sr,
790            // ToolCallStart, ToolCallDelta, ReasoningDelta — not needed for response
791            _ => {}
792        }
793    }
794
795    let mut content = Vec::new();
796    if !text.is_empty() {
797        content.push(ContentBlock::Text(text));
798    }
799    for call in tool_calls {
800        content.push(ContentBlock::ToolCall(call));
801    }
802
803    Ok(ChatResponse {
804        content,
805        usage,
806        stop_reason,
807        model: String::new(),
808        metadata: HashMap::new(),
809    })
810}
811
812// ── ChatMessage helper ──────────────────────────────────────────────
813
814impl ChatMessage {
815    /// Creates a tool result message from a [`ToolResult`].
816    pub fn tool_result_full(result: ToolResult) -> Self {
817        Self {
818            role: crate::chat::ChatRole::Tool,
819            content: vec![ContentBlock::ToolResult(result)],
820        }
821    }
822}
823
824impl<Ctx: LoopDepth + Send + Sync + 'static> std::fmt::Debug for LoopCore<Ctx> {
825    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826        f.debug_struct("LoopCore")
827            .field("iterations", &self.iterations)
828            .field("tool_calls_executed", &self.tool_calls_executed)
829            .field("finished", &self.finished)
830            .field("has_pending_command", &self.pending_command.is_some())
831            .field("buffered_events", &self.events.len())
832            .finish_non_exhaustive()
833    }
834}