Skip to main content

mermaid_cli/providers/
ctx.rs

1//! Per-call context passed to providers and tool executors.
2//!
3//! The two structs below are the single point where per-turn
4//! cancellation + progress reporting + session identity meet a
5//! specific provider call. Everything a model or tool adapter needs
6//! to participate in structured concurrency is here.
7//!
8//! - `StreamContext` is handed to a `ModelProvider::chat()`. It
9//!   carries the cancellation token for the turn and a bounded mpsc
10//!   sink for streaming events. The adapter `select!`s on
11//!   `token.cancelled()` inside its read loop and awaits
12//!   `sink.send(event)` — if the main loop is drowning, the `await`
13//!   applies natural backpressure and the provider's TCP buffer fills
14//!   instead of the channel growing unbounded.
15//!
16//! - `ExecContext` is handed to a `ToolExecutor::execute()`. Same
17//!   token (so Ctrl+C cancels tools too) plus a progress sink and
18//!   identifiers so the reducer can match results to the call that
19//!   produced them.
20
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use tokio::sync::mpsc;
25use tokio_util::sync::CancellationToken;
26
27use crate::domain::{ToolCallId, TurnId};
28use crate::models::tool_call::ToolCall as ModelToolCall;
29use crate::models::{ChatMessage, ReasoningChunk, TokenUsage};
30
31/// What a `ModelProvider::chat()` receives.
32#[derive(Debug)]
33pub struct StreamContext {
34    pub token: CancellationToken,
35    pub sink: mpsc::Sender<StreamEvent>,
36    pub turn: TurnId,
37}
38
39impl StreamContext {
40    pub fn new(token: CancellationToken, sink: mpsc::Sender<StreamEvent>, turn: TurnId) -> Self {
41        Self { token, sink, turn }
42    }
43}
44
45/// One event emitted during a streaming model call. Adapters MUST
46/// emit exactly one `Done` at the end of a successful stream. `Text`
47/// and `Reasoning` may interleave. `ToolCall` events typically arrive
48/// near the end but the contract is "before `Done`".
49#[derive(Debug, Clone)]
50pub enum StreamEvent {
51    Text(String),
52    Reasoning(ReasoningChunk),
53    ToolCall(ModelToolCall),
54    /// Optional — some providers emit a signature mid-stream
55    /// (Anthropic). Adapters that only have it at the end can attach
56    /// it to `Done` instead.
57    ThinkingSignature(String),
58    /// Stream complete. Carries final token usage (None if unknown)
59    /// and any terminal thinking signature.
60    Done {
61        usage: Option<TokenUsage>,
62        thinking_signature: Option<String>,
63    },
64}
65
66/// Final response returned by `ModelProvider::chat()` after the
67/// stream drains. Carries what the reducer can't derive from the
68/// stream events themselves — token usage and the opaque thinking
69/// signature needed for Anthropic extended-thinking continuation.
70#[derive(Debug, Clone)]
71pub struct FinalResponse {
72    pub usage: Option<TokenUsage>,
73    pub thinking_signature: Option<String>,
74    pub tool_calls: Vec<ModelToolCall>,
75}
76
77/// What a `ToolExecutor::execute()` receives.
78#[derive(Debug)]
79pub struct ExecContext {
80    pub token: CancellationToken,
81    pub progress: mpsc::Sender<ProgressEvent>,
82    pub call_id: ToolCallId,
83    pub turn: TurnId,
84    pub workdir: PathBuf,
85    /// Parent session's `app::Config`. Needed by `SubagentTool` so the
86    /// child reducer uses the same Ollama host, reasoning prefs, MCP
87    /// servers, etc. Other tools don't consult it — keeping it as a
88    /// typed field (rather than a global) means the dependency is
89    /// explicit in the signature.
90    pub config: Arc<crate::app::Config>,
91    /// Parent session's active model id (e.g. `"anthropic/claude-opus-4-7"`).
92    /// Subagents inherit this so they hit the same provider.
93    pub model_id: String,
94}
95
96impl ExecContext {
97    pub fn new(
98        token: CancellationToken,
99        progress: mpsc::Sender<ProgressEvent>,
100        call_id: ToolCallId,
101        turn: TurnId,
102        workdir: PathBuf,
103        config: Arc<crate::app::Config>,
104        model_id: String,
105    ) -> Self {
106        Self {
107            token,
108            progress,
109            call_id,
110            turn,
111            workdir,
112            config,
113            model_id,
114        }
115    }
116}
117
118/// Tool-side progress event. The reducer already knows `ToolStarted`
119/// and `ToolFinished`; this carries everything in between (streaming
120/// subprocess output, long-running download status, multimodal
121/// artifacts like inline screenshots, and nested activity from
122/// subagents).
123#[derive(Debug, Clone)]
124pub enum ProgressEvent {
125    /// Partial stdout/stderr chunk.
126    Output(String),
127    /// Arbitrary status string for display.
128    Status(String),
129    /// Byte-count progress for long downloads/transfers. `total` is
130    /// None when the producer doesn't know the final size.
131    Bytes { done: u64, total: Option<u64> },
132    /// Binary artifact produced mid-execution (screenshot preview,
133    /// generated file, etc.). MIME string determines routing in the
134    /// reducer — `image/*` attaches inline to the active assistant
135    /// message; anything else lands on the status line as a label.
136    Artifact {
137        mime: String,
138        data: Vec<u8>,
139        caption: Option<String>,
140    },
141    /// A child subagent just started or finished a tool call. Carries
142    /// the CHILD's call identity + tool name + phase so the parent UI
143    /// can surface it without needing to recurse into the child's
144    /// event vocabulary.
145    SubagentToolCall {
146        child_call_id: ToolCallId,
147        tool_name: String,
148        phase: SubagentPhase,
149    },
150    /// A chunk of assistant text produced by a child subagent. Mostly
151    /// UI flavor — lets the parent status line show what the sub is
152    /// "saying" in real time.
153    SubagentText(String),
154}
155
156/// Phase a subagent tool-call is in, from the parent's perspective.
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158pub enum SubagentPhase {
159    Started,
160    Finished,
161    Errored,
162}
163
164/// Narrow shim from the reducer's `ChatRequest` to the adapter-facing
165/// messages. Providers often want to mutate the last assistant
166/// message (e.g. Anthropic cache_control injection); this helper
167/// clones the slice as owned so the provider can do that without
168/// fighting the borrow checker.
169pub fn clone_messages(msgs: &[ChatMessage]) -> Vec<ChatMessage> {
170    msgs.to_vec()
171}
172
173/// Builder that lets tests construct a pair of `StreamContext` +
174/// receiver without needing a runtime. Used by provider unit tests
175/// and by integration harnesses in C9.
176pub fn test_stream_context(turn: TurnId) -> (StreamContext, mpsc::Receiver<StreamEvent>) {
177    let token = CancellationToken::new();
178    let (tx, rx) = mpsc::channel(64);
179    (StreamContext::new(token, tx, turn), rx)
180}
181
182/// Builder counterpart for `ExecContext`. Uses a default `Config` and
183/// empty `model_id` — tests that specifically exercise subagent model
184/// inheritance should construct `ExecContext::new` directly with their
185/// chosen values.
186pub fn test_exec_context(
187    turn: TurnId,
188    call_id: ToolCallId,
189    workdir: PathBuf,
190) -> (ExecContext, mpsc::Receiver<ProgressEvent>) {
191    let token = CancellationToken::new();
192    let (tx, rx) = mpsc::channel(64);
193    let config = Arc::new(crate::app::Config::default());
194    (
195        ExecContext::new(token, tx, call_id, turn, workdir, config, String::new()),
196        rx,
197    )
198}
199
200/// Convenience: build a Send+Sync sharable sink for tests.
201pub fn arc_sink(tx: mpsc::Sender<StreamEvent>) -> Arc<mpsc::Sender<StreamEvent>> {
202    Arc::new(tx)
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use std::path::PathBuf;
209
210    #[tokio::test]
211    async fn stream_context_carries_token_and_turn() {
212        let (ctx, _rx) = test_stream_context(TurnId(5));
213        assert_eq!(ctx.turn, TurnId(5));
214        assert!(!ctx.token.is_cancelled());
215    }
216
217    #[tokio::test]
218    async fn exec_context_propagates_cancel_signal() {
219        let (ctx, _rx) = test_exec_context(TurnId(1), ToolCallId(2), PathBuf::from("/tmp"));
220        let token = ctx.token.clone();
221        tokio::spawn(async move {
222            token.cancel();
223        });
224        // Wait until cancelled.
225        ctx.token.cancelled().await;
226        assert!(ctx.token.is_cancelled());
227    }
228
229    #[tokio::test]
230    async fn progress_event_round_trips_through_channel() {
231        let (ctx, mut rx) = test_exec_context(TurnId(1), ToolCallId(2), PathBuf::from("/tmp"));
232        ctx.progress
233            .send(ProgressEvent::Status("halfway".to_string()))
234            .await
235            .expect("send");
236        match rx.recv().await.expect("recv") {
237            ProgressEvent::Status(s) => assert_eq!(s, "halfway"),
238            _ => panic!("wrong variant"),
239        }
240    }
241}