mermaid_cli/providers/ctx.rs
1//! Per-call context passed to providers and tool executors.
2//!
3//! The two structs below are the single point where per-turn
4//! cancellation + progress reporting + session identity meet a
5//! specific provider call. Everything a model or tool adapter needs
6//! to participate in structured concurrency is here.
7//!
8//! - `StreamContext` is handed to a `ModelProvider::chat()`. It
9//! carries the cancellation token for the turn and a bounded mpsc
10//! sink for streaming events. The adapter `select!`s on
11//! `token.cancelled()` inside its read loop and awaits
12//! `sink.send(event)` — if the main loop is drowning, the `await`
13//! applies natural backpressure and the provider's TCP buffer fills
14//! instead of the channel growing unbounded.
15//!
16//! - `ExecContext` is handed to a `ToolExecutor::execute()`. Same
17//! token (so Ctrl+C cancels tools too) plus a progress sink and
18//! identifiers so the reducer can match results to the call that
19//! produced them.
20
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use tokio::sync::mpsc;
25use tokio_util::sync::CancellationToken;
26
27use crate::domain::{ToolCallId, TurnId};
28use crate::models::tool_call::ToolCall as ModelToolCall;
29use crate::models::{ChatMessage, ReasoningChunk, TokenUsage};
30
31/// What a `ModelProvider::chat()` receives.
32#[derive(Debug)]
33pub struct StreamContext {
34 pub token: CancellationToken,
35 pub sink: mpsc::Sender<StreamEvent>,
36 pub turn: TurnId,
37}
38
39impl StreamContext {
40 pub fn new(token: CancellationToken, sink: mpsc::Sender<StreamEvent>, turn: TurnId) -> Self {
41 Self { token, sink, turn }
42 }
43}
44
45/// One event emitted during a streaming model call. Adapters MUST
46/// emit exactly one `Done` at the end of a successful stream. `Text`
47/// and `Reasoning` may interleave. `ToolCall` events typically arrive
48/// near the end but the contract is "before `Done`".
49#[derive(Debug, Clone)]
50pub enum StreamEvent {
51 Text(String),
52 Reasoning(ReasoningChunk),
53 ToolCall(ModelToolCall),
54 /// Optional — some providers emit a signature mid-stream
55 /// (Anthropic). Adapters that only have it at the end can attach
56 /// it to `Done` instead.
57 ThinkingSignature(String),
58 /// Stream complete. Carries final token usage (None if unknown)
59 /// and any terminal thinking signature.
60 Done {
61 usage: Option<TokenUsage>,
62 thinking_signature: Option<String>,
63 },
64}
65
66/// Final response returned by `ModelProvider::chat()` after the
67/// stream drains. Carries what the reducer can't derive from the
68/// stream events themselves — token usage and the opaque thinking
69/// signature needed for Anthropic extended-thinking continuation.
70#[derive(Debug, Clone)]
71pub struct FinalResponse {
72 pub usage: Option<TokenUsage>,
73 pub thinking_signature: Option<String>,
74 pub tool_calls: Vec<ModelToolCall>,
75}
76
77/// What a `ToolExecutor::execute()` receives.
78#[derive(Debug)]
79pub struct ExecContext {
80 pub token: CancellationToken,
81 pub progress: mpsc::Sender<ProgressEvent>,
82 pub call_id: ToolCallId,
83 pub turn: TurnId,
84 pub workdir: PathBuf,
85 /// Parent session's `app::Config`. Needed by `SubagentTool` so the
86 /// child reducer uses the same Ollama host, reasoning prefs, MCP
87 /// servers, etc. Other tools don't consult it — keeping it as a
88 /// typed field (rather than a global) means the dependency is
89 /// explicit in the signature.
90 pub config: Arc<crate::app::Config>,
91 /// Parent session's active model id (e.g. `"anthropic/claude-opus-4-7"`).
92 /// Subagents inherit this so they hit the same provider.
93 pub model_id: String,
94}
95
96impl ExecContext {
97 pub fn new(
98 token: CancellationToken,
99 progress: mpsc::Sender<ProgressEvent>,
100 call_id: ToolCallId,
101 turn: TurnId,
102 workdir: PathBuf,
103 config: Arc<crate::app::Config>,
104 model_id: String,
105 ) -> Self {
106 Self {
107 token,
108 progress,
109 call_id,
110 turn,
111 workdir,
112 config,
113 model_id,
114 }
115 }
116}
117
118/// Tool-side progress event. The reducer already knows `ToolStarted`
119/// and `ToolFinished`; this carries everything in between (streaming
120/// subprocess output, long-running download status, multimodal
121/// artifacts like inline screenshots, and nested activity from
122/// subagents).
123#[derive(Debug, Clone)]
124pub enum ProgressEvent {
125 /// Partial stdout/stderr chunk.
126 Output(String),
127 /// Arbitrary status string for display.
128 Status(String),
129 /// Byte-count progress for long downloads/transfers. `total` is
130 /// None when the producer doesn't know the final size.
131 Bytes { done: u64, total: Option<u64> },
132 /// Binary artifact produced mid-execution (screenshot preview,
133 /// generated file, etc.). MIME string determines routing in the
134 /// reducer — `image/*` attaches inline to the active assistant
135 /// message; anything else lands on the status line as a label.
136 Artifact {
137 mime: String,
138 data: Vec<u8>,
139 caption: Option<String>,
140 },
141 /// A child subagent just started or finished a tool call. Carries
142 /// the CHILD's call identity + tool name + phase so the parent UI
143 /// can surface it without needing to recurse into the child's
144 /// event vocabulary.
145 SubagentToolCall {
146 child_call_id: ToolCallId,
147 tool_name: String,
148 phase: SubagentPhase,
149 },
150 /// A chunk of assistant text produced by a child subagent. Mostly
151 /// UI flavor — lets the parent status line show what the sub is
152 /// "saying" in real time.
153 SubagentText(String),
154}
155
156/// Phase a subagent tool-call is in, from the parent's perspective.
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158pub enum SubagentPhase {
159 Started,
160 Finished,
161 Errored,
162}
163
164/// Narrow shim from the reducer's `ChatRequest` to the adapter-facing
165/// messages. Providers often want to mutate the last assistant
166/// message (e.g. Anthropic cache_control injection); this helper
167/// clones the slice as owned so the provider can do that without
168/// fighting the borrow checker.
169pub fn clone_messages(msgs: &[ChatMessage]) -> Vec<ChatMessage> {
170 msgs.to_vec()
171}
172
173/// Builder that lets tests construct a pair of `StreamContext` +
174/// receiver without needing a runtime. Used by provider unit tests
175/// and by integration harnesses in C9.
176pub fn test_stream_context(turn: TurnId) -> (StreamContext, mpsc::Receiver<StreamEvent>) {
177 let token = CancellationToken::new();
178 let (tx, rx) = mpsc::channel(64);
179 (StreamContext::new(token, tx, turn), rx)
180}
181
182/// Builder counterpart for `ExecContext`. Uses a default `Config` and
183/// empty `model_id` — tests that specifically exercise subagent model
184/// inheritance should construct `ExecContext::new` directly with their
185/// chosen values.
186pub fn test_exec_context(
187 turn: TurnId,
188 call_id: ToolCallId,
189 workdir: PathBuf,
190) -> (ExecContext, mpsc::Receiver<ProgressEvent>) {
191 let token = CancellationToken::new();
192 let (tx, rx) = mpsc::channel(64);
193 let config = Arc::new(crate::app::Config::default());
194 (
195 ExecContext::new(token, tx, call_id, turn, workdir, config, String::new()),
196 rx,
197 )
198}
199
200/// Convenience: build a Send+Sync sharable sink for tests.
201pub fn arc_sink(tx: mpsc::Sender<StreamEvent>) -> Arc<mpsc::Sender<StreamEvent>> {
202 Arc::new(tx)
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use std::path::PathBuf;
209
210 #[tokio::test]
211 async fn stream_context_carries_token_and_turn() {
212 let (ctx, _rx) = test_stream_context(TurnId(5));
213 assert_eq!(ctx.turn, TurnId(5));
214 assert!(!ctx.token.is_cancelled());
215 }
216
217 #[tokio::test]
218 async fn exec_context_propagates_cancel_signal() {
219 let (ctx, _rx) = test_exec_context(TurnId(1), ToolCallId(2), PathBuf::from("/tmp"));
220 let token = ctx.token.clone();
221 tokio::spawn(async move {
222 token.cancel();
223 });
224 // Wait until cancelled.
225 ctx.token.cancelled().await;
226 assert!(ctx.token.is_cancelled());
227 }
228
229 #[tokio::test]
230 async fn progress_event_round_trips_through_channel() {
231 let (ctx, mut rx) = test_exec_context(TurnId(1), ToolCallId(2), PathBuf::from("/tmp"));
232 ctx.progress
233 .send(ProgressEvent::Status("halfway".to_string()))
234 .await
235 .expect("send");
236 match rx.recv().await.expect("recv") {
237 ProgressEvent::Status(s) => assert_eq!(s, "halfway"),
238 _ => panic!("wrong variant"),
239 }
240 }
241}