ailoop_core/stream.rs
1//! Engine event vocabulary: [`StreamChunk`], [`FinishReason`], and
2//! [`Usage`].
3
4use std::{ops::Add, sync::Arc};
5
6use crate::{Message, RunId, StepId, ToolResultContent};
7
8/// Event the engine emits as a run progresses.
9///
10/// Variants split into two families:
11///
12/// 1. **Provider stream events**, surfaced once per turn —
13/// `TextDelta`, `ToolCall*`, `Reasoning*`, `RedactedReasoningBlock`,
14/// `TurnFinished`. Adapters lower wire deltas into these. Started/
15/// Finished pairs always nest cleanly: a `ToolCallFinished` arrives
16/// before any other tool call's `Started`.
17/// 2. **Engine lifecycle events**, synthesized by the engine itself
18/// around the provider stream — `RunStarted`, `StepStarted`,
19/// `StepFinished`, `ToolResult`, `RunFinished`, `HistoryCompacted`.
20///
21/// Every chunk reaches every [`crate::ChatMiddleware::on_chunk`]; the
22/// engine also drives its own assistant-history reconstruction off
23/// these events, so middlewares that override
24/// [`crate::ChatMiddleware::on_chunk_mut`] can rewrite them in flight
25/// to influence what gets persisted.
26#[derive(Debug)]
27#[non_exhaustive]
28pub enum StreamChunk {
29 /// Incremental visible text from the model. Concatenate deltas in
30 /// arrival order to reconstruct the assistant text block.
31 TextDelta {
32 /// New text appended this delta. Empty deltas are legal.
33 delta: String,
34 },
35 /// A new tool call has begun. The model has emitted the tool name
36 /// but no arguments yet. Pair with the matching
37 /// [`Self::ToolCallFinished`] (same `id`) once the call is fully
38 /// assembled.
39 ToolCallStarted {
40 /// Provider-assigned id; mirrors back as `call_id` on the
41 /// [`Self::ToolResult`] the engine emits after execution.
42 id: String,
43 /// Tool name as registered in the request's `tools` list.
44 name: String,
45 },
46 /// Incremental tool-call argument JSON. Concatenate deltas in
47 /// arrival order to rebuild the `args` JSON for live UIs;
48 /// engines that only need the final structure can ignore these
49 /// and read [`Self::ToolCallFinished::args`] instead.
50 ToolCallArgsDelta {
51 /// Tool call id; matches the originating
52 /// [`Self::ToolCallStarted::id`].
53 id: String,
54 /// JSON fragment appended this delta.
55 delta: String,
56 },
57 /// A tool call is fully assembled and ready to execute. The
58 /// engine invokes the tool after this chunk and emits a
59 /// [`Self::ToolResult`] when execution completes.
60 ToolCallFinished {
61 /// Tool call id; matches the originating
62 /// [`Self::ToolCallStarted::id`].
63 id: String,
64 /// Tool name, repeated for convenience so consumers do not
65 /// have to track the originating `Started` chunk.
66 name: String,
67 /// Final, parsed JSON arguments.
68 args: serde_json::Value,
69 },
70 /// Incremental reasoning text. Same accumulation contract as
71 /// [`Self::TextDelta`], but feeds an
72 /// [`crate::AssistantBlock::Reasoning`] block instead of
73 /// [`crate::AssistantBlock::Text`]. Some providers (Anthropic
74 /// extended thinking) require the assembled reasoning to be
75 /// replayed verbatim on subsequent turns when tools are involved.
76 ReasoningDelta {
77 /// New reasoning text appended this delta.
78 delta: String,
79 },
80 /// End of a visible reasoning block. Carries the provider signature when
81 /// applicable (Anthropic extended thinking); other providers may emit
82 /// `None`. Engines should pair this with the accumulated reasoning text
83 /// to materialize an `AssistantBlock::Reasoning`.
84 ReasoningFinished {
85 /// Provider signature for the reasoning block (Anthropic
86 /// extended thinking). Persist alongside the reasoning text
87 /// in [`crate::AssistantBlock::Reasoning`]; replay verbatim on
88 /// subsequent turns when tools are involved.
89 signature: Option<String>,
90 },
91 /// A complete redacted reasoning block delivered atomically. `data` is
92 /// opaque provider material that must be replayed verbatim on the next
93 /// request. Engines should materialize `AssistantBlock::RedactedReasoning`
94 /// directly from this chunk; no deltas are emitted around it.
95 RedactedReasoningBlock {
96 /// Verbatim provider payload; treat as opaque bytes.
97 data: String,
98 },
99 /// End of a single provider turn. Equivalent to a Chat
100 /// Completions `finish_reason` plus the final `usage`. Multiple
101 /// turns can fire per run when the model is in a tool-use loop.
102 TurnFinished {
103 /// Why the model stopped this turn.
104 reason: FinishReason,
105 /// Token counters reported by the provider for this turn.
106 usage: Usage,
107 /// Provider-reported service tier for the turn (Anthropic:
108 /// `"standard"` / `"priority"` / `"batch"`). `None` when the
109 /// provider does not surface one. Per-turn rather than
110 /// aggregated because it is a categorical label, not a counter.
111 service_tier: Option<String>,
112 },
113
114 // Extend
115 /// Engine has accepted the run; emitted exactly once per run
116 /// before the first provider call.
117 RunStarted {
118 /// Identifier shared by every chunk this run produces.
119 run_id: RunId,
120 },
121 /// Engine is starting a step (one provider turn plus the tool
122 /// calls it triggers).
123 StepStarted {
124 /// Run this step belongs to.
125 run_id: RunId,
126 /// Identifier shared by every chunk this step produces.
127 step_id: StepId,
128 /// 0-based iteration number; bounded by
129 /// [`crate::RunConfig::max_iterations`].
130 iteration: usize,
131 },
132 /// Engine has finished a step. Includes the cumulative messages
133 /// added to history so far, so observers can snapshot
134 /// mid-conversation without waiting for [`Self::RunFinished`].
135 StepFinished {
136 /// Run this step belongs to.
137 run_id: RunId,
138 /// Step that just finished.
139 step_id: StepId,
140 /// 0-based iteration number, matching [`Self::StepStarted::iteration`].
141 iteration: usize,
142 /// All messages this run has appended to history so far,
143 /// shared so observers can read without cloning the vector.
144 new_messages_so_far: Arc<Vec<Message>>,
145 },
146 /// A tool finished executing and produced a reply. Emitted
147 /// **after** [`Self::ToolCallFinished`] and before the next
148 /// provider turn picks up the result.
149 ToolResult {
150 /// Run that owns the tool call.
151 run_id: RunId,
152 /// Step that owns the tool call.
153 step_id: StepId,
154 /// Matches [`Self::ToolCallFinished::id`].
155 call_id: String,
156 /// Tool reply, with `is_error` preserved for the next provider
157 /// turn.
158 content: ToolResultContent,
159 },
160 /// Engine has finished the run. Emitted exactly once per run,
161 /// even on aborts and middleware terminations.
162 RunFinished {
163 /// Run that just finished.
164 run_id: RunId,
165 /// Why the run ended.
166 reason: FinishReason,
167 /// Token totals across every turn in the run.
168 usage: Usage,
169 /// All messages this run added to history. On abort, partial
170 /// tool results are preserved so the next run sees a
171 /// consistent shape.
172 new_messages: Vec<Message>,
173 },
174 /// Emitted by `Conversation::stream` (not the engine) when history
175 /// compaction ran before the request was sent. Carries message
176 /// counts from before/after compaction and the strategy's name so
177 /// observability middlewares can report what was dropped.
178 HistoryCompacted {
179 /// Run for which compaction ran. Shared with the
180 /// engine-emitted chunks of the same run.
181 run_id: RunId,
182 /// Number of messages in history before compaction.
183 before_count: usize,
184 /// Number of messages in history after compaction.
185 after_count: usize,
186 /// Name reported by the strategy
187 /// (`CompactionStrategy::name()`), e.g. `"truncate"` or
188 /// `"summarize"`.
189 strategy: &'static str,
190 },
191}
192
193/// Reason a provider turn (or an entire run) ended.
194#[derive(Debug, Clone)]
195#[non_exhaustive]
196pub enum FinishReason {
197 /// Model produced a complete reply with no tool call. The natural
198 /// terminator of a run.
199 EndTurn,
200 /// Model emitted at least one tool call. The engine continues the
201 /// run by executing tools and issuing the next turn.
202 ToolUse,
203 /// Model stopped because [`crate::ChatRequest::max_tokens`] was
204 /// reached. The reply is partial.
205 MaxTokens,
206 /// Model emitted one of the configured
207 /// [`crate::ChatRequest::stop_sequences`].
208 StopSequence,
209 /// Run was terminated outside the model: cancellation token,
210 /// timeout, [`crate::HookAction::Terminate`], or
211 /// [`crate::ToolDecision::Terminate`]. The string carries a
212 /// human-readable reason (`"cancelled by caller"`,
213 /// `"timeout: ..."`, the middleware-supplied `reason`, etc.).
214 /// The engine guarantees this is the *only* finish reason ever
215 /// surfaced for caller-initiated stops — `Err` results are
216 /// reserved for transport errors.
217 Aborted(String),
218 /// Provider reported a finish reason the adapter did not map to
219 /// one of the typed variants. Treat as terminal.
220 Other(String),
221}
222
223/// Token counters reported by the provider for a turn.
224///
225/// Aggregated to the run level by the engine and surfaced on
226/// [`StreamChunk::RunFinished`] / [`StreamChunk::TurnFinished`]. Fields
227/// not surfaced by a given provider stay at `0`.
228#[derive(Debug, Default, Clone, Copy)]
229#[non_exhaustive]
230pub struct Usage {
231 /// Total prompt tokens charged this turn (cached + uncached).
232 pub input_tokens: u32,
233 /// Tokens generated by the model this turn.
234 pub output_tokens: u32,
235 /// Subset of `input_tokens` that were served from prompt cache
236 /// rather than recomputed. Zero when the provider does not
237 /// support prompt caching or when this turn missed the cache.
238 pub cached_input_tokens: u32,
239 /// Total tokens written to a cache during this turn. When the
240 /// provider reports a TTL breakdown (Anthropic), this equals the sum
241 /// of [`Self::cache_creation_5m_tokens`] + [`Self::cache_creation_1h_tokens`].
242 /// When only the legacy flat field is reported, the breakdown stays
243 /// at zero and only this total is populated.
244 pub cache_creation_input_tokens: u32,
245 /// Cache writes with a 5-minute TTL (Anthropic ephemeral default).
246 /// Zero when the provider does not surface a TTL breakdown.
247 pub cache_creation_5m_tokens: u32,
248 /// Cache writes with a 1-hour TTL (Anthropic explicit ttl="1h").
249 /// Zero when the provider does not surface a TTL breakdown.
250 pub cache_creation_1h_tokens: u32,
251}
252
253impl Add for Usage {
254 type Output = Usage;
255
256 fn add(self, other: Usage) -> Usage {
257 Usage {
258 input_tokens: self.input_tokens + other.input_tokens,
259 output_tokens: self.output_tokens + other.output_tokens,
260 cached_input_tokens: self.cached_input_tokens + other.cached_input_tokens,
261 cache_creation_input_tokens: self.cache_creation_input_tokens
262 + other.cache_creation_input_tokens,
263 cache_creation_5m_tokens: self.cache_creation_5m_tokens
264 + other.cache_creation_5m_tokens,
265 cache_creation_1h_tokens: self.cache_creation_1h_tokens
266 + other.cache_creation_1h_tokens,
267 }
268 }
269}
270
271impl std::ops::AddAssign for Usage {
272 fn add_assign(&mut self, other: Usage) {
273 self.input_tokens += other.input_tokens;
274 self.output_tokens += other.output_tokens;
275 self.cached_input_tokens += other.cached_input_tokens;
276 self.cache_creation_input_tokens += other.cache_creation_input_tokens;
277 self.cache_creation_5m_tokens += other.cache_creation_5m_tokens;
278 self.cache_creation_1h_tokens += other.cache_creation_1h_tokens;
279 }
280}