Skip to main content

ailoop_core/
stream.rs

1//! Engine event vocabulary: [`StreamChunk`], [`FinishReason`], and
2//! [`Usage`].
3
4use std::{ops::Add, sync::Arc};
5
6use crate::{Message, RunId, StepId, ToolResultContent};
7
8/// Event the engine emits as a run progresses.
9///
10/// Variants split into two families:
11///
12/// 1. **Provider stream events**, surfaced once per turn —
13///    `TextDelta`, `ToolCall*`, `Reasoning*`, `RedactedReasoningBlock`,
14///    `TurnFinished`. Adapters lower wire deltas into these. Started/
15///    Finished pairs always nest cleanly: a `ToolCallFinished` arrives
16///    before any other tool call's `Started`.
17/// 2. **Engine lifecycle events**, synthesized by the engine itself
18///    around the provider stream — `RunStarted`, `StepStarted`,
19///    `StepFinished`, `ToolResult`, `RunFinished`, `HistoryCompacted`.
20///
21/// Every chunk reaches every [`crate::ChatMiddleware::on_chunk`]; the
22/// engine also drives its own assistant-history reconstruction off
23/// these events, so middlewares that override
24/// [`crate::ChatMiddleware::on_chunk_mut`] can rewrite them in flight
25/// to influence what gets persisted.
26#[derive(Debug)]
27#[non_exhaustive]
28pub enum StreamChunk {
29    /// Incremental visible text from the model. Concatenate deltas in
30    /// arrival order to reconstruct the assistant text block.
31    TextDelta {
32        /// New text appended this delta. Empty deltas are legal.
33        delta: String,
34    },
35    /// A new tool call has begun. The model has emitted the tool name
36    /// but no arguments yet. Pair with the matching
37    /// [`Self::ToolCallFinished`] (same `id`) once the call is fully
38    /// assembled.
39    ToolCallStarted {
40        /// Provider-assigned id; mirrors back as `call_id` on the
41        /// [`Self::ToolResult`] the engine emits after execution.
42        id: String,
43        /// Tool name as registered in the request's `tools` list.
44        name: String,
45    },
46    /// Incremental tool-call argument JSON. Concatenate deltas in
47    /// arrival order to rebuild the `args` JSON for live UIs;
48    /// engines that only need the final structure can ignore these
49    /// and read [`Self::ToolCallFinished::args`] instead.
50    ToolCallArgsDelta {
51        /// Tool call id; matches the originating
52        /// [`Self::ToolCallStarted::id`].
53        id: String,
54        /// JSON fragment appended this delta.
55        delta: String,
56    },
57    /// A tool call is fully assembled and ready to execute. The
58    /// engine invokes the tool after this chunk and emits a
59    /// [`Self::ToolResult`] when execution completes.
60    ToolCallFinished {
61        /// Tool call id; matches the originating
62        /// [`Self::ToolCallStarted::id`].
63        id: String,
64        /// Tool name, repeated for convenience so consumers do not
65        /// have to track the originating `Started` chunk.
66        name: String,
67        /// Final, parsed JSON arguments.
68        args: serde_json::Value,
69    },
70    /// Incremental reasoning text. Same accumulation contract as
71    /// [`Self::TextDelta`], but feeds an
72    /// [`crate::AssistantBlock::Reasoning`] block instead of
73    /// [`crate::AssistantBlock::Text`]. Some providers (Anthropic
74    /// extended thinking) require the assembled reasoning to be
75    /// replayed verbatim on subsequent turns when tools are involved.
76    ReasoningDelta {
77        /// New reasoning text appended this delta.
78        delta: String,
79    },
80    /// End of a visible reasoning block. Carries the provider signature when
81    /// applicable (Anthropic extended thinking); other providers may emit
82    /// `None`. Engines should pair this with the accumulated reasoning text
83    /// to materialize an `AssistantBlock::Reasoning`.
84    ReasoningFinished {
85        /// Provider signature for the reasoning block (Anthropic
86        /// extended thinking). Persist alongside the reasoning text
87        /// in [`crate::AssistantBlock::Reasoning`]; replay verbatim on
88        /// subsequent turns when tools are involved.
89        signature: Option<String>,
90    },
91    /// A complete redacted reasoning block delivered atomically. `data` is
92    /// opaque provider material that must be replayed verbatim on the next
93    /// request. Engines should materialize `AssistantBlock::RedactedReasoning`
94    /// directly from this chunk; no deltas are emitted around it.
95    RedactedReasoningBlock {
96        /// Verbatim provider payload; treat as opaque bytes.
97        data: String,
98    },
99    /// End of a single provider turn. Equivalent to a Chat
100    /// Completions `finish_reason` plus the final `usage`. Multiple
101    /// turns can fire per run when the model is in a tool-use loop.
102    TurnFinished {
103        /// Why the model stopped this turn.
104        reason: FinishReason,
105        /// Token counters reported by the provider for this turn.
106        usage: Usage,
107        /// Provider-reported service tier for the turn (Anthropic:
108        /// `"standard"` / `"priority"` / `"batch"`). `None` when the
109        /// provider does not surface one. Per-turn rather than
110        /// aggregated because it is a categorical label, not a counter.
111        service_tier: Option<String>,
112    },
113
114    // Extend
115    /// Engine has accepted the run; emitted exactly once per run
116    /// before the first provider call.
117    RunStarted {
118        /// Identifier shared by every chunk this run produces.
119        run_id: RunId,
120    },
121    /// Engine is starting a step (one provider turn plus the tool
122    /// calls it triggers).
123    StepStarted {
124        /// Run this step belongs to.
125        run_id: RunId,
126        /// Identifier shared by every chunk this step produces.
127        step_id: StepId,
128        /// 0-based iteration number; bounded by
129        /// [`crate::RunConfig::max_iterations`].
130        iteration: usize,
131    },
132    /// Engine has finished a step. Includes the cumulative messages
133    /// added to history so far, so observers can snapshot
134    /// mid-conversation without waiting for [`Self::RunFinished`].
135    StepFinished {
136        /// Run this step belongs to.
137        run_id: RunId,
138        /// Step that just finished.
139        step_id: StepId,
140        /// 0-based iteration number, matching [`Self::StepStarted::iteration`].
141        iteration: usize,
142        /// All messages this run has appended to history so far,
143        /// shared so observers can read without cloning the vector.
144        new_messages_so_far: Arc<Vec<Message>>,
145    },
146    /// A tool finished executing and produced a reply. Emitted
147    /// **after** [`Self::ToolCallFinished`] and before the next
148    /// provider turn picks up the result.
149    ToolResult {
150        /// Run that owns the tool call.
151        run_id: RunId,
152        /// Step that owns the tool call.
153        step_id: StepId,
154        /// Matches [`Self::ToolCallFinished::id`].
155        call_id: String,
156        /// Tool reply, with `is_error` preserved for the next provider
157        /// turn.
158        content: ToolResultContent,
159    },
160    /// Engine has finished the run. Emitted exactly once per run,
161    /// even on aborts and middleware terminations.
162    RunFinished {
163        /// Run that just finished.
164        run_id: RunId,
165        /// Why the run ended.
166        reason: FinishReason,
167        /// Token totals across every turn in the run.
168        usage: Usage,
169        /// All messages this run added to history. On abort, partial
170        /// tool results are preserved so the next run sees a
171        /// consistent shape.
172        new_messages: Vec<Message>,
173    },
174    /// Emitted by `Conversation::stream` (not the engine) when history
175    /// compaction ran before the request was sent. Carries message
176    /// counts from before/after compaction and the strategy's name so
177    /// observability middlewares can report what was dropped.
178    HistoryCompacted {
179        /// Run for which compaction ran. Shared with the
180        /// engine-emitted chunks of the same run.
181        run_id: RunId,
182        /// Number of messages in history before compaction.
183        before_count: usize,
184        /// Number of messages in history after compaction.
185        after_count: usize,
186        /// Name reported by the strategy
187        /// (`CompactionStrategy::name()`), e.g. `"truncate"` or
188        /// `"summarize"`.
189        strategy: &'static str,
190    },
191}
192
193/// Reason a provider turn (or an entire run) ended.
194#[derive(Debug, Clone)]
195#[non_exhaustive]
196pub enum FinishReason {
197    /// Model produced a complete reply with no tool call. The natural
198    /// terminator of a run.
199    EndTurn,
200    /// Model emitted at least one tool call. The engine continues the
201    /// run by executing tools and issuing the next turn.
202    ToolUse,
203    /// Model stopped because [`crate::ChatRequest::max_tokens`] was
204    /// reached. The reply is partial.
205    MaxTokens,
206    /// Model emitted one of the configured
207    /// [`crate::ChatRequest::stop_sequences`].
208    StopSequence,
209    /// Run was terminated outside the model: cancellation token,
210    /// timeout, [`crate::HookAction::Terminate`], or
211    /// [`crate::ToolDecision::Terminate`]. The string carries a
212    /// human-readable reason (`"cancelled by caller"`,
213    /// `"timeout: ..."`, the middleware-supplied `reason`, etc.).
214    /// The engine guarantees this is the *only* finish reason ever
215    /// surfaced for caller-initiated stops — `Err` results are
216    /// reserved for transport errors.
217    Aborted(String),
218    /// Provider reported a finish reason the adapter did not map to
219    /// one of the typed variants. Treat as terminal.
220    Other(String),
221}
222
223/// Token counters reported by the provider for a turn.
224///
225/// Aggregated to the run level by the engine and surfaced on
226/// [`StreamChunk::RunFinished`] / [`StreamChunk::TurnFinished`]. Fields
227/// not surfaced by a given provider stay at `0`.
228#[derive(Debug, Default, Clone, Copy)]
229#[non_exhaustive]
230pub struct Usage {
231    /// Total prompt tokens charged this turn (cached + uncached).
232    pub input_tokens: u32,
233    /// Tokens generated by the model this turn.
234    pub output_tokens: u32,
235    /// Subset of `input_tokens` that were served from prompt cache
236    /// rather than recomputed. Zero when the provider does not
237    /// support prompt caching or when this turn missed the cache.
238    pub cached_input_tokens: u32,
239    /// Total tokens written to a cache during this turn. When the
240    /// provider reports a TTL breakdown (Anthropic), this equals the sum
241    /// of [`Self::cache_creation_5m_tokens`] + [`Self::cache_creation_1h_tokens`].
242    /// When only the legacy flat field is reported, the breakdown stays
243    /// at zero and only this total is populated.
244    pub cache_creation_input_tokens: u32,
245    /// Cache writes with a 5-minute TTL (Anthropic ephemeral default).
246    /// Zero when the provider does not surface a TTL breakdown.
247    pub cache_creation_5m_tokens: u32,
248    /// Cache writes with a 1-hour TTL (Anthropic explicit ttl="1h").
249    /// Zero when the provider does not surface a TTL breakdown.
250    pub cache_creation_1h_tokens: u32,
251}
252
253impl Add for Usage {
254    type Output = Usage;
255
256    fn add(self, other: Usage) -> Usage {
257        Usage {
258            input_tokens: self.input_tokens + other.input_tokens,
259            output_tokens: self.output_tokens + other.output_tokens,
260            cached_input_tokens: self.cached_input_tokens + other.cached_input_tokens,
261            cache_creation_input_tokens: self.cache_creation_input_tokens
262                + other.cache_creation_input_tokens,
263            cache_creation_5m_tokens: self.cache_creation_5m_tokens
264                + other.cache_creation_5m_tokens,
265            cache_creation_1h_tokens: self.cache_creation_1h_tokens
266                + other.cache_creation_1h_tokens,
267        }
268    }
269}
270
271impl std::ops::AddAssign for Usage {
272    fn add_assign(&mut self, other: Usage) {
273        self.input_tokens += other.input_tokens;
274        self.output_tokens += other.output_tokens;
275        self.cached_input_tokens += other.cached_input_tokens;
276        self.cache_creation_input_tokens += other.cache_creation_input_tokens;
277        self.cache_creation_5m_tokens += other.cache_creation_5m_tokens;
278        self.cache_creation_1h_tokens += other.cache_creation_1h_tokens;
279    }
280}