Skip to main content

mermaid_cli/effect/
mod.rs

1//! The effect runner: dispatches `Cmd` values into tokio tasks.
2//!
3//! There are exactly two places in the codebase that spawn a tokio
4//! task: this module and tests. Everywhere else asks the
5//! reducer to return a `Cmd`, and the runner handles it. That
6//! centralization is what makes structured concurrency per turn
7//! actually work — nothing can accidentally spawn a detached task
8//! that outlives the turn it was started for.
9//!
10//! Architecture:
11//!
12//! ```text
13//!   main loop ── reducer ── Cmd ── dispatch ── EffectRunner
14//!                                                 ├── TurnScope(turn A) ── JoinSet
15//!                                                 ├── TurnScope(turn B) ── JoinSet
16//!                                                 └── detached effects (Save, Exit, …)
17//!                                                       ↓
18//!                                              Msg via mpsc::Sender<Msg>
19//!                                                       ↓
20//!                                                 main loop (next iteration)
21//! ```
22//!
23//! The runner dispatches every `Cmd` variant to a real handler —
24//! model streaming (`CallModel` → `ModelProvider::chat`), tool
25//! execution (`ExecuteTool` → `ToolExecutor::execute`), persistence
26//! (`SaveConversation`, `LoadConversation`, `PersistLastModel`,
27//! `PersistReasoningFor`, `RefreshInstructions`), MCP lifecycle
28//! (`InitMcpServers`, `StopMcpServer`), local side-effects
29//! (`WriteImageToTemp`, `OpenInSystem`, `PullOllamaModel`,
30//! `SetTerminalTitle`, `DismissStatusAfter`). Cancellation flows
31//! through `Cmd::CancelScope(TurnId)` → the scope's
32//! `CancellationToken`.
33
34mod middleware;
35mod turn_scope;
36
37use std::collections::HashMap;
38use std::path::PathBuf;
39use std::sync::Arc;
40use std::time::Instant;
41
42use tokio::sync::mpsc;
43
44use crate::app::Config;
45use crate::domain::{
46    Cmd, CompactionPolicy, CompactionRequest, CompactionResult, CompactionTrigger, Msg, TurnId,
47};
48use crate::models::{ModelError, TokenUsage};
49use crate::providers::ctx::{ExecContext, StreamContext};
50use crate::providers::model::ModelProvider;
51use crate::providers::{ProviderFactory, StreamEvent, ToolRegistry};
52
53pub use middleware::{DEFAULT_MAX_ATTEMPTS, retry_transient_http};
54pub use turn_scope::TurnScope;
55
56#[cfg(not(test))]
57const CANCEL_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);
58#[cfg(test)]
59const CANCEL_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
60
61/// Single channel back to the reducer. `EffectRunner` holds the
62/// sender; every spawned task clones this so it can emit `Msg` as
63/// work progresses. Bounded capacity applies natural backpressure —
64/// if the main loop can't keep up, the provider's streaming send
65/// `.await`s and the whole pipeline throttles.
66pub type MsgSender = mpsc::Sender<Msg>;
67
68/// Bounded channel capacity for the effect → reducer stream. 512 is
69/// generous — a single streaming chunk fits comfortably, and the
70/// main loop drains at ~60 Hz so backlog rarely grows. Bigger wastes
71/// RAM; smaller introduces spurious backpressure on bursty tool
72/// output.
73pub const MSG_CHANNEL_CAPACITY: usize = 512;
74
75/// The runner. One instance per process, constructed by
76/// `app::run` and consumed when the main loop exits.
77pub struct EffectRunner {
78    msg_tx: MsgSender,
79    /// Per-turn scopes. Populated lazily: the first `Cmd` bearing a
80    /// TurnId creates a scope; `Cmd::CancelScope` tears it down.
81    /// Empty (drained) scopes are reaped by `reap_empty_scopes`, which
82    /// runs at the top of every `dispatch` call so the map stays
83    /// bounded across long sessions (F12).
84    scopes: HashMap<TurnId, TurnScope>,
85    /// Detached work (saves, persists, MCP lifecycle) lives here.
86    /// This one set never gets cancelled piecemeal — shutdown drains
87    /// it during `EffectRunner::shutdown`.
88    detached: tokio::task::JoinSet<()>,
89    /// MCP manager handle is held elsewhere (`crate::mcp` has a
90    /// `OnceLock` for its global manager); we just note workdir so
91    /// handlers can construct absolute paths.
92    workdir: PathBuf,
93    /// Lazy provider registry. `CallModel` resolves through this.
94    /// Tests that don't care about real providers leave this `None`
95    /// and observe the fallback `UpstreamError` Msg; production
96    /// construction via `with_bindings` sets it.
97    providers: Option<Arc<ProviderFactory>>,
98    /// Shared tool registry. See `providers` — same optionality
99    /// rationale for unit tests.
100    tools: Option<Arc<ToolRegistry>>,
101}
102
103impl EffectRunner {
104    /// Create an unused runner. Pair with `msg_rx` from `channel()`.
105    pub fn new(msg_tx: MsgSender, workdir: PathBuf) -> Self {
106        Self {
107            msg_tx,
108            scopes: HashMap::new(),
109            detached: tokio::task::JoinSet::new(),
110            workdir,
111            providers: None,
112            tools: None,
113        }
114    }
115
116    /// Attach provider + tool registries. Production wiring uses
117    /// this; unit tests that don't need real dispatch can skip.
118    /// Without bindings, `CallModel` / `ExecuteTool` emit well-
119    /// formed error Msgs so the reducer still transitions cleanly.
120    pub fn with_bindings(
121        mut self,
122        providers: Arc<ProviderFactory>,
123        tools: Arc<ToolRegistry>,
124    ) -> Self {
125        self.providers = Some(providers);
126        self.tools = Some(tools);
127        self
128    }
129
130    /// Pair-constructor: returns both the runner and the receiving
131    /// end of the Msg channel. Preferred for production wiring
132    /// because it keeps the channel capacity constant in one place.
133    pub fn pair(workdir: PathBuf) -> (Self, mpsc::Receiver<Msg>) {
134        let (tx, rx) = mpsc::channel(MSG_CHANNEL_CAPACITY);
135        (Self::new(tx, workdir), rx)
136    }
137
138    /// Pair constructor that also wires the real provider factory +
139    /// tool registry. Used by `app::run_interactive`.
140    pub fn pair_with_bindings(
141        workdir: PathBuf,
142        config: Config,
143        tools: Arc<ToolRegistry>,
144    ) -> (Self, mpsc::Receiver<Msg>) {
145        let providers = Arc::new(ProviderFactory::new(config));
146        Self::pair_from(workdir, providers, tools)
147    }
148
149    /// Pair constructor that takes a pre-built `ProviderFactory`.
150    /// Used when the caller needs to share a `ProviderFactory` with
151    /// the `SubagentSpawner` so subagents can issue model calls
152    /// through the same cache.
153    pub fn pair_from(
154        workdir: PathBuf,
155        providers: Arc<ProviderFactory>,
156        tools: Arc<ToolRegistry>,
157    ) -> (Self, mpsc::Receiver<Msg>) {
158        let (tx, rx) = mpsc::channel(MSG_CHANNEL_CAPACITY);
159        (Self::new(tx, workdir).with_bindings(providers, tools), rx)
160    }
161
162    /// Construct a runner that shares a pre-derived cancellation
163    /// token for its turn scopes. Used by `SubagentSpawner` so the
164    /// child runner's work aborts as soon as the parent's `ctx.token`
165    /// fires.
166    pub fn new_child(
167        msg_tx: MsgSender,
168        workdir: PathBuf,
169        providers: Arc<ProviderFactory>,
170        tools: Arc<ToolRegistry>,
171    ) -> Self {
172        Self::new(msg_tx, workdir).with_bindings(providers, tools)
173    }
174
175    /// Get or create the scope for a turn. Idempotent. The scope is
176    /// retained until `CancelScope` tears it down or it naturally
177    /// drains.
178    fn scope_mut(&mut self, turn: TurnId) -> &mut TurnScope {
179        self.scopes
180            .entry(turn)
181            .or_insert_with(|| TurnScope::new(turn))
182    }
183
184    /// Drop the scope for a turn, signalling cancellation to every
185    /// child first. Safe to call for non-existent turns.
186    ///
187    /// After the scope is cancelled, a detached task moves it off the
188    /// runner, drains its `JoinSet` (so child tasks unwind), then emits
189    /// `Msg::TurnCancelled(turn)` so the reducer can transition
190    /// `Cancelling → Idle`. Without this terminal event the TUI would
191    /// stick in `Cancelling` — the reducer has no other way to learn
192    /// that the abort fully landed.
193    fn drop_scope(&mut self, turn: TurnId) {
194        if let Some(mut scope) = self.scopes.remove(&turn) {
195            scope.cancel();
196            let tx = self.msg_tx.clone();
197            self.detached.spawn(async move {
198                if tokio::time::timeout(CANCEL_DRAIN_TIMEOUT, scope.drain())
199                    .await
200                    .is_err()
201                {
202                    tracing::warn!(
203                        turn = %turn,
204                        timeout_ms = CANCEL_DRAIN_TIMEOUT.as_millis(),
205                        "cancel drain timed out; aborting remaining scoped tasks"
206                    );
207                }
208                let _ = tx.send(Msg::TurnCancelled(turn)).await;
209            });
210        }
211    }
212
213    /// Number of active per-turn scopes. Tests use this to observe
214    /// lifecycle without racing on internal state.
215    pub fn scope_count(&self) -> usize {
216        self.scopes.len()
217    }
218
219    /// F12: remove scope entries whose `JoinSet` is empty — every
220    /// child task has completed, so the scope is just an orphan key
221    /// in the map. Called at the top of `dispatch` so the map stays
222    /// bounded over long sessions. Cheap: one linear walk, no async.
223    ///
224    /// `JoinSet::is_empty` only returns true after completed tasks are
225    /// harvested via `join_next`/`try_join_next`, so we first drain
226    /// any ready completions per scope.
227    fn reap_empty_scopes(&mut self) {
228        self.scopes.retain(|_, scope| {
229            scope.drain_completed();
230            !scope.is_empty()
231        });
232    }
233
234    /// Route a single `Cmd` into the appropriate spawn + handler.
235    /// Returns immediately; handlers work asynchronously and emit
236    /// `Msg` back through the sender channel.
237    pub fn dispatch(&mut self, cmd: Cmd) {
238        // F12: reap any drained scopes before touching the map. Keeps
239        // `scope_count()` bounded as the session grows.
240        self.reap_empty_scopes();
241        tracing::trace!(cmd = %cmd.summary(), "effect: dispatch");
242
243        match cmd {
244            Cmd::CallModel { turn, mut request } => {
245                let tx = self.msg_tx.clone();
246                let providers = self.providers.clone();
247                // Enrich `request.tools` with every user-facing
248                // tool in the bound registry. The reducer has
249                // already populated MCP tools from `state.mcp`;
250                // built-ins come from the runner (which holds the
251                // registry). This keeps `ChatRequest.tools` the
252                // single source of truth for what the model sees.
253                if let Some(tools) = &self.tools {
254                    let mut enriched = tools.describe_all();
255                    enriched.append(&mut request.tools);
256                    request.tools = enriched;
257                }
258                let scope = self.scope_mut(turn);
259                let token = scope.token();
260                scope.spawn(async move {
261                    dispatch_call_model(tx, providers, turn, request, token).await;
262                });
263            },
264            Cmd::CompactConversation { turn, mut request } => {
265                let tx = self.msg_tx.clone();
266                let providers = self.providers.clone();
267                if let Some(tools) = &self.tools {
268                    let mut enriched = tools.describe_all();
269                    enriched.append(&mut request.chat.tools);
270                    request.chat.tools = enriched;
271                }
272                let scope = self.scope_mut(turn);
273                let token = scope.token();
274                scope.spawn(async move {
275                    dispatch_compact_conversation(tx, providers, turn, request, token).await;
276                });
277            },
278            Cmd::ExecuteTool {
279                turn,
280                call_id,
281                source,
282                model_id,
283            } => {
284                let tx = self.msg_tx.clone();
285                let tools = self.tools.clone();
286                let workdir = self.workdir.clone();
287                // Pass the shared Config from ProviderFactory so
288                // subagents inherit it (F7). Falls back to
289                // Config::default() when providers aren't bound (unit
290                // tests without real wiring).
291                let config = self
292                    .providers
293                    .as_ref()
294                    .map(|p| Arc::new(p.config().clone()))
295                    .unwrap_or_else(|| Arc::new(crate::app::Config::default()));
296                let scope = self.scope_mut(turn);
297                let token = scope.token();
298                scope.spawn(async move {
299                    dispatch_execute_tool(
300                        tx, tools, workdir, turn, call_id, source, token, config, model_id,
301                    )
302                    .await;
303                });
304            },
305            Cmd::CancelScope(turn) => {
306                self.drop_scope(turn);
307            },
308            Cmd::SaveConversation(history) => {
309                let tx = self.msg_tx.clone();
310                let workdir = self.workdir.clone();
311                self.detached.spawn(async move {
312                    if let Ok(manager) = crate::session::ConversationManager::new(&workdir)
313                        && manager.save_conversation(&history).is_ok()
314                    {
315                        let _ = tx.send(Msg::SessionSaved).await;
316                    } else {
317                        tracing::warn!("SaveConversation: failed to write to disk");
318                    }
319                });
320            },
321            Cmd::SaveCompactionArchive(archive) => {
322                let workdir = self.workdir.clone();
323                self.detached.spawn(async move {
324                    if let Ok(manager) = crate::session::ConversationManager::new(&workdir)
325                        && let Err(err) = manager.save_compaction_archive(&archive)
326                    {
327                        tracing::warn!(error = %err, "SaveCompactionArchive: failed to write archive");
328                    }
329                });
330            },
331            Cmd::PersistLastModel(model) => {
332                self.detached.spawn(async move {
333                    let _ = crate::app::persist_last_model(&model);
334                });
335            },
336            Cmd::PersistReasoningFor { model_id, level } => {
337                self.detached.spawn(async move {
338                    let _ = crate::app::persist_reasoning_for_model(&model_id, level);
339                });
340            },
341            Cmd::RefreshInstructions => {
342                let tx = self.msg_tx.clone();
343                let workdir = self.workdir.clone();
344                self.detached.spawn(async move {
345                    let (loaded, _outcome) = crate::app::instructions::refresh(None, &workdir);
346                    let _ = tx.send(Msg::InstructionsChanged(loaded)).await;
347                });
348            },
349            Cmd::LoadConversation(id) => {
350                let tx = self.msg_tx.clone();
351                let workdir = self.workdir.clone();
352                self.detached.spawn(async move {
353                    match crate::session::ConversationManager::new(&workdir) {
354                        Ok(mgr) => match mgr.load_conversation(&id) {
355                            Ok(history) => {
356                                let _ = tx.send(Msg::ConversationLoaded(history)).await;
357                            },
358                            Err(e) => {
359                                tracing::warn!(id = %id, error = %e, "LoadConversation failed");
360                            },
361                        },
362                        Err(e) => {
363                            tracing::warn!(error = %e, "ConversationManager init failed");
364                        },
365                    }
366                });
367            },
368            Cmd::ListConversations => {
369                let tx = self.msg_tx.clone();
370                let workdir = self.workdir.clone();
371                self.detached.spawn(async move {
372                    let summaries = match crate::session::ConversationManager::new(&workdir) {
373                        Ok(mgr) => mgr
374                            .list_conversations()
375                            .unwrap_or_default()
376                            .into_iter()
377                            .map(|h| crate::domain::ConversationSummary {
378                                id: h.id.clone(),
379                                title: h.title.clone(),
380                                message_count: h.messages.len(),
381                                updated_at: h.updated_at.to_rfc3339(),
382                            })
383                            .collect(),
384                        Err(_) => Vec::new(),
385                    };
386                    let _ = tx.send(Msg::ConversationsListed(summaries)).await;
387                });
388            },
389            Cmd::InitMcpServers(configs) => {
390                let tx = self.msg_tx.clone();
391                self.detached.spawn(async move {
392                    if configs.is_empty() {
393                        return;
394                    }
395                    crate::mcp::manager_ref::mark_init_started();
396                    let manager =
397                        std::sync::Arc::new(crate::mcp::McpServerManager::start(&configs).await);
398                    // Emit a Ready or Errored per server based on
399                    // what came up. Zero-tool servers are still ready
400                    // if the manager has an active client for them.
401                    for (name, _cfg) in configs.iter() {
402                        let server_tools: Vec<crate::domain::McpToolSpec> = manager
403                            .get_all_tools()
404                            .iter()
405                            .filter(|(server, _)| server == name)
406                            .map(|(_, def)| crate::domain::McpToolSpec {
407                                name: def.name.clone(),
408                                description: def.description.clone(),
409                                input_schema: def.input_schema.clone(),
410                            })
411                            .collect();
412                        let msg = mcp_startup_msg(name, manager.has_server(name), server_tools);
413                        let _ = tx.send(msg).await;
414                    }
415                    crate::mcp::manager_ref::set_manager(manager);
416                    crate::mcp::manager_ref::mark_init_complete();
417                });
418            },
419            Cmd::StopMcpServer { name } => {
420                let tx = self.msg_tx.clone();
421                self.detached.spawn(async move {
422                    let _ = tx.send(Msg::McpServerStopped { name }).await;
423                });
424            },
425            Cmd::PullOllamaModel { model } => {
426                let tx = self.msg_tx.clone();
427                self.detached.spawn(async move {
428                    dispatch_pull_ollama_model(tx, model).await;
429                });
430            },
431            Cmd::OpenInSystem(path) => {
432                self.detached.spawn(async move {
433                    let _ = tokio::task::spawn_blocking(move || {
434                        crate::utils::open_file(&path);
435                    })
436                    .await;
437                });
438            },
439            Cmd::DismissStatusAfter { ms } => {
440                let tx = self.msg_tx.clone();
441                self.detached.spawn(async move {
442                    tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
443                    let _ = tx.send(Msg::StatusDismiss).await;
444                });
445            },
446            Cmd::WriteImageToTemp {
447                path,
448                bytes,
449                format: _,
450            } => {
451                self.detached.spawn(async move {
452                    if let Err(e) = tokio::fs::write(&path, &bytes).await {
453                        tracing::warn!(path = %path.display(), error = %e, "WriteImageToTemp failed");
454                    }
455                });
456            },
457            Cmd::ReadClipboard => {
458                let tx = self.msg_tx.clone();
459                self.detached.spawn(async move {
460                    dispatch_read_clipboard(tx).await;
461                });
462            },
463            Cmd::Exit => {
464                // The main loop observes `state.should_exit` after
465                // the reducer returns; the runner doesn't need to
466                // take any special action. Documented here for
467                // exhaustiveness.
468            },
469            Cmd::SetTerminalTitle(title) => {
470                self.detached.spawn(async move {
471                    use std::io::Write;
472                    let seq = format!("\x1b]2;{}\x07", title);
473                    let mut stdout = std::io::stdout();
474                    let _ = stdout.write_all(seq.as_bytes());
475                    let _ = stdout.flush();
476                });
477            },
478        }
479    }
480
481    /// Async shutdown: cancel every scope, then wait for all spawned
482    /// work to drain. Bounded by 5 seconds — a hung task past that
483    /// gets aborted outright by `JoinSet::drop`.
484    pub async fn shutdown(mut self) {
485        for (id, scope) in self.scopes.iter() {
486            tracing::debug!(turn = %id, "shutdown: cancelling scope");
487            scope.cancel();
488        }
489
490        // Drain with a bounded timeout.
491        let shutdown_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
492
493        let drain = async {
494            for (_, mut scope) in self.scopes.drain() {
495                scope.drain().await;
496            }
497            while let Some(result) = self.detached.join_next().await {
498                if let Err(e) = result
499                    && !e.is_cancelled()
500                {
501                    tracing::warn!(error = %e, "shutdown: detached task panic");
502                }
503            }
504        };
505
506        let _ = tokio::time::timeout_at(shutdown_deadline, drain).await;
507    }
508
509    /// Test helper: clone the Msg sender so a test can synthesize a
510    /// message as if it came from an effect handler.
511    #[doc(hidden)]
512    pub fn msg_sender(&self) -> MsgSender {
513        self.msg_tx.clone()
514    }
515}
516
517/// Dispatch a `CallModel` command. Resolves the provider (lazy,
518/// cached) and streams its events onto the Msg channel. Without a
519/// bound `ProviderFactory` (unit tests), emits a single
520/// `UpstreamError` so the reducer ends the turn cleanly.
521async fn dispatch_call_model(
522    msg_tx: MsgSender,
523    providers: Option<Arc<ProviderFactory>>,
524    turn: TurnId,
525    mut request: crate::domain::ChatRequest,
526    token: tokio_util::sync::CancellationToken,
527) {
528    use crate::models::UserFacingError;
529
530    let Some(factory) = providers else {
531        let error = UserFacingError {
532            summary: "not wired".to_string(),
533            message: "EffectRunner has no ProviderFactory bound".to_string(),
534            suggestion: "construct via EffectRunner::pair_with_bindings".to_string(),
535            category: crate::models::ErrorCategory::Internal,
536            recoverable: false,
537        };
538        let _ = msg_tx.send(Msg::UpstreamError { turn, error }).await;
539        return;
540    };
541
542    // Lazily resolve the provider for this model.
543    let provider = match factory.resolve(&request.model_id).await {
544        Ok(p) => p,
545        Err(e) => {
546            let error = classify_error_for_ui(&e);
547            let _ = msg_tx.send(Msg::UpstreamError { turn, error }).await;
548            return;
549        },
550    };
551
552    let max_context_tokens = provider.capabilities().max_context_tokens.or_else(|| {
553        crate::domain::runtime::infer_static_context_window_for_model_id(&request.model_id)
554    });
555    let context_snapshot =
556        crate::domain::estimate_context_usage_for_request(&request, max_context_tokens);
557    let _ = msg_tx
558        .send(Msg::ContextUsageEstimated {
559            turn,
560            snapshot: context_snapshot.clone(),
561        })
562        .await;
563
564    let policy = CompactionPolicy::default();
565    let mut compacted_before_stream = false;
566    if crate::domain::should_auto_compact(&context_snapshot, &request, policy).is_ok() {
567        let compaction = CompactionRequest::auto(request.clone(), CompactionTrigger::AutoThreshold);
568        match run_compaction(
569            Arc::clone(&provider),
570            turn,
571            compaction,
572            context_snapshot.clone(),
573            max_context_tokens,
574            token.clone(),
575        )
576        .await
577        {
578            Ok(result) => {
579                request.messages = result.replacement_messages.clone();
580                compacted_before_stream = true;
581                let _ = msg_tx.send(Msg::CompactionFinished { turn, result }).await;
582            },
583            Err(err) => {
584                let hard_limit =
585                    crate::domain::context_exceeds_hard_limit(&context_snapshot, &request, policy);
586                let _ = msg_tx
587                    .send(Msg::CompactionFailed {
588                        turn,
589                        trigger: CompactionTrigger::AutoThreshold,
590                        message: err.to_string(),
591                        kind: if hard_limit {
592                            crate::domain::StatusKind::Error
593                        } else {
594                            crate::domain::StatusKind::Warn
595                        },
596                    })
597                    .await;
598                if hard_limit {
599                    let error = UserFacingError {
600                        summary: "Context too large".to_string(),
601                        message: format!(
602                            "The next request needs {} tokens before response reserve, and automatic compaction failed: {}",
603                            context_snapshot.used_tokens, err
604                        ),
605                        suggestion: "Run /compact with focus instructions, or /clear to start a fresh session.".to_string(),
606                        category: crate::models::ErrorCategory::Config,
607                        recoverable: true,
608                    };
609                    let _ = msg_tx.send(Msg::UpstreamError { turn, error }).await;
610                    return;
611                }
612            },
613        }
614    }
615
616    // Build a StreamContext — provider writes typed events into the
617    // internal sink; we relay each to the reducer as a Msg.
618    let (stream_tx, mut stream_rx) = mpsc::channel::<StreamEvent>(256);
619    let ctx = StreamContext::new(token.clone(), stream_tx, turn);
620
621    // Drain stream events into Msgs on a sibling task. Drops when
622    // the sink closes (provider's final `Done` or cancel).
623    let relay_tx = msg_tx.clone();
624    let relay = tokio::spawn(async move {
625        while let Some(event) = stream_rx.recv().await {
626            let msg = match event {
627                StreamEvent::Text(chunk) => Msg::StreamText { turn, chunk },
628                StreamEvent::Reasoning(chunk) => Msg::StreamReasoning { turn, chunk },
629                StreamEvent::ToolCall(call) => Msg::StreamToolCall { turn, call },
630                StreamEvent::ThinkingSignature(_) => continue, // folded into Done below
631                StreamEvent::Done {
632                    usage,
633                    thinking_signature,
634                } => Msg::StreamDone {
635                    turn,
636                    usage,
637                    thinking_signature,
638                },
639            };
640            if relay_tx.send(msg).await.is_err() {
641                break;
642            }
643        }
644    });
645
646    // Run the actual provider. On error, the relay will have
647    // already emitted partial events; we follow with a single
648    // UpstreamError to terminate the turn cleanly.
649    //
650    // `ModelError::Cancelled` is swallowed — the terminal
651    // `Msg::TurnCancelled` is emitted from `drop_scope` after the
652    // turn's `TurnScope` drains. Emitting `UpstreamError` here would
653    // commit a "cancelled" message the user didn't ask to see.
654    match provider.chat(request.clone(), ctx).await {
655        Ok(_final_response) => {
656            // Success — the final `Done` flowed through the sink.
657        },
658        Err(crate::models::ModelError::Cancelled) => {
659            // Silent: `drop_scope` will emit `Msg::TurnCancelled`.
660        },
661        Err(e) => {
662            let retry_context_limit = !compacted_before_stream && is_context_limit_error(&e);
663            if retry_context_limit {
664                let latest_snapshot =
665                    crate::domain::estimate_context_usage_for_request(&request, max_context_tokens);
666                let compaction =
667                    CompactionRequest::auto(request.clone(), CompactionTrigger::ContextLimitRetry);
668                match run_compaction(
669                    Arc::clone(&provider),
670                    turn,
671                    compaction,
672                    latest_snapshot,
673                    max_context_tokens,
674                    token.clone(),
675                )
676                .await
677                {
678                    Ok(result) => {
679                        let mut retry_request = request;
680                        retry_request.messages = result.replacement_messages.clone();
681                        let _ = msg_tx.send(Msg::CompactionFinished { turn, result }).await;
682                        let _ = relay.await;
683                        dispatch_provider_stream(msg_tx, provider, turn, retry_request, token)
684                            .await;
685                        return;
686                    },
687                    Err(compact_err) => {
688                        let _ = msg_tx
689                            .send(Msg::CompactionFailed {
690                                turn,
691                                trigger: CompactionTrigger::ContextLimitRetry,
692                                message: compact_err.to_string(),
693                                kind: crate::domain::StatusKind::Error,
694                            })
695                            .await;
696                    },
697                }
698            }
699            let error = classify_error_for_ui(&e);
700            let _ = msg_tx.send(Msg::UpstreamError { turn, error }).await;
701        },
702    }
703
704    let _ = relay.await;
705}
706
707async fn dispatch_provider_stream(
708    msg_tx: MsgSender,
709    provider: Arc<dyn ModelProvider>,
710    turn: TurnId,
711    request: crate::domain::ChatRequest,
712    token: tokio_util::sync::CancellationToken,
713) {
714    let (stream_tx, mut stream_rx) = mpsc::channel::<StreamEvent>(256);
715    let ctx = StreamContext::new(token.clone(), stream_tx, turn);
716    let relay_tx = msg_tx.clone();
717    let relay = tokio::spawn(async move {
718        while let Some(event) = stream_rx.recv().await {
719            let msg = match event {
720                StreamEvent::Text(chunk) => Msg::StreamText { turn, chunk },
721                StreamEvent::Reasoning(chunk) => Msg::StreamReasoning { turn, chunk },
722                StreamEvent::ToolCall(call) => Msg::StreamToolCall { turn, call },
723                StreamEvent::ThinkingSignature(_) => continue,
724                StreamEvent::Done {
725                    usage,
726                    thinking_signature,
727                } => Msg::StreamDone {
728                    turn,
729                    usage,
730                    thinking_signature,
731                },
732            };
733            if relay_tx.send(msg).await.is_err() {
734                break;
735            }
736        }
737    });
738
739    match provider.chat(request, ctx).await {
740        Ok(_) | Err(ModelError::Cancelled) => {},
741        Err(e) => {
742            let error = classify_error_for_ui(&e);
743            let _ = msg_tx.send(Msg::UpstreamError { turn, error }).await;
744        },
745    }
746
747    let _ = relay.await;
748}
749
750async fn dispatch_compact_conversation(
751    msg_tx: MsgSender,
752    providers: Option<Arc<ProviderFactory>>,
753    turn: TurnId,
754    request: CompactionRequest,
755    token: tokio_util::sync::CancellationToken,
756) {
757    let Some(factory) = providers else {
758        let _ = msg_tx
759            .send(Msg::CompactionFailed {
760                turn,
761                trigger: request.trigger,
762                message: "EffectRunner has no ProviderFactory bound".to_string(),
763                kind: crate::domain::StatusKind::Error,
764            })
765            .await;
766        return;
767    };
768
769    let provider = match factory.resolve(&request.chat.model_id).await {
770        Ok(provider) => provider,
771        Err(err) => {
772            let _ = msg_tx
773                .send(Msg::CompactionFailed {
774                    turn,
775                    trigger: request.trigger,
776                    message: err.to_string(),
777                    kind: crate::domain::StatusKind::Error,
778                })
779                .await;
780            return;
781        },
782    };
783
784    let max_context_tokens = provider.capabilities().max_context_tokens.or_else(|| {
785        crate::domain::runtime::infer_static_context_window_for_model_id(&request.chat.model_id)
786    });
787    let before_snapshot =
788        crate::domain::estimate_context_usage_for_request(&request.chat, max_context_tokens);
789
790    let trigger = request.trigger;
791    match run_compaction(
792        provider,
793        turn,
794        request,
795        before_snapshot,
796        max_context_tokens,
797        token,
798    )
799    .await
800    {
801        Ok(result) => {
802            let _ = msg_tx.send(Msg::CompactionFinished { turn, result }).await;
803        },
804        Err(err) => {
805            let _ = msg_tx
806                .send(Msg::CompactionFailed {
807                    turn,
808                    trigger,
809                    message: err.to_string(),
810                    kind: crate::domain::StatusKind::Error,
811                })
812                .await;
813        },
814    }
815}
816
817async fn run_compaction(
818    provider: Arc<dyn ModelProvider>,
819    turn: TurnId,
820    request: CompactionRequest,
821    before_snapshot: crate::domain::ContextUsageSnapshot,
822    max_context_tokens: Option<usize>,
823    token: tokio_util::sync::CancellationToken,
824) -> Result<CompactionResult, ModelError> {
825    let started = Instant::now();
826    let prepared = crate::domain::prepare_compaction(&request, max_context_tokens)
827        .map_err(|skip| ModelError::InvalidRequest(skip.to_string()))?;
828
829    let summary_request = crate::domain::build_summary_request(
830        &request.chat,
831        &prepared,
832        request.instructions.as_deref(),
833        request.policy,
834    );
835    let (draft, draft_usage) =
836        collect_compaction_text(Arc::clone(&provider), turn, summary_request, token.clone())
837            .await?;
838    let draft_summary = crate::domain::normalize_summary(&draft);
839    if draft_summary.trim().is_empty() {
840        return Err(ModelError::InvalidRequest(
841            "compaction produced an empty summary".to_string(),
842        ));
843    }
844
845    let verify_request = crate::domain::build_verification_request(
846        &request.chat,
847        &prepared,
848        &draft_summary,
849        request.instructions.as_deref(),
850        request.policy,
851    );
852    let (verified, verify_usage) =
853        collect_compaction_text(Arc::clone(&provider), turn, verify_request, token).await?;
854    let verified_summary = crate::domain::normalize_summary(&verified);
855    let final_summary = if verified_summary.trim().is_empty() {
856        draft_summary
857    } else {
858        verified_summary
859    };
860
861    let id = format!(
862        "compact_{}",
863        chrono::Local::now().format("%Y%m%d_%H%M%S_%3f")
864    );
865    let mut record = crate::domain::CompactionRecord {
866        id,
867        trigger: request.trigger,
868        created_at: chrono::Local::now(),
869        before_tokens: before_snapshot.used_tokens,
870        after_tokens: 0,
871        archived_message_count: prepared.archived_messages.len(),
872        preserved_message_count: prepared.preserved_messages.len(),
873        summary_tokens: final_summary.len().div_ceil(4),
874        duration_secs: started.elapsed().as_secs_f64(),
875        focus: request.instructions.clone(),
876        archive_path: None,
877    };
878
879    let mut replacement =
880        crate::domain::build_replacement_messages(&final_summary, &prepared, &record);
881    let mut compacted_request = request.chat.clone();
882    compacted_request.messages = replacement.clone();
883    let mut after_snapshot =
884        crate::domain::estimate_context_usage_for_request(&compacted_request, max_context_tokens);
885    record.after_tokens = after_snapshot.used_tokens;
886    record.duration_secs = started.elapsed().as_secs_f64();
887    replacement = crate::domain::build_replacement_messages(&final_summary, &prepared, &record);
888    compacted_request.messages = replacement.clone();
889    after_snapshot =
890        crate::domain::estimate_context_usage_for_request(&compacted_request, max_context_tokens);
891    record.after_tokens = after_snapshot.used_tokens;
892
893    if after_snapshot.used_tokens >= before_snapshot.used_tokens {
894        return Err(ModelError::InvalidRequest(format!(
895            "compaction did not reduce context ({} -> {} tokens)",
896            before_snapshot.used_tokens, after_snapshot.used_tokens
897        )));
898    }
899
900    if crate::domain::context_exceeds_hard_limit(
901        &after_snapshot,
902        &compacted_request,
903        request.policy,
904    ) {
905        return Err(ModelError::InvalidRequest(format!(
906            "compacted context still exceeds response reserve ({} tokens used)",
907            after_snapshot.used_tokens
908        )));
909    }
910
911    Ok(CompactionResult {
912        record,
913        replacement_messages: replacement,
914        archived_messages: prepared.archived_messages,
915        before_snapshot,
916        after_snapshot,
917        usage: crate::domain::combine_usage(draft_usage, verify_usage),
918    })
919}
920
921async fn collect_compaction_text(
922    provider: Arc<dyn ModelProvider>,
923    turn: TurnId,
924    request: crate::domain::ChatRequest,
925    token: tokio_util::sync::CancellationToken,
926) -> Result<(String, Option<TokenUsage>), ModelError> {
927    let (stream_tx, mut stream_rx) = mpsc::channel::<StreamEvent>(128);
928    let ctx = StreamContext::new(token, stream_tx, turn);
929    let collector = tokio::spawn(async move {
930        let mut text = String::new();
931        let mut usage = None;
932        while let Some(event) = stream_rx.recv().await {
933            match event {
934                StreamEvent::Text(chunk) => text.push_str(&chunk),
935                StreamEvent::Done {
936                    usage: done_usage, ..
937                } => usage = done_usage,
938                StreamEvent::Reasoning(_)
939                | StreamEvent::ToolCall(_)
940                | StreamEvent::ThinkingSignature(_) => {},
941            }
942        }
943        (text, usage)
944    });
945
946    let response = provider.chat(request, ctx).await;
947    let (text, stream_usage) = collector
948        .await
949        .map_err(|err| ModelError::StreamError(format!("compaction collector failed: {}", err)))?;
950    match response {
951        Ok(final_response) => Ok((text, final_response.usage.or(stream_usage))),
952        Err(err) => Err(err),
953    }
954}
955
956fn is_context_limit_error(error: &ModelError) -> bool {
957    let text = error.to_string().to_lowercase();
958    text.contains("context")
959        && (text.contains("too large")
960            || text.contains("exceed")
961            || text.contains("maximum")
962            || text.contains("token"))
963}
964
965/// Dispatch an `ExecuteTool` command.
966#[allow(clippy::too_many_arguments)]
967async fn dispatch_execute_tool(
968    msg_tx: MsgSender,
969    tools: Option<Arc<ToolRegistry>>,
970    workdir: PathBuf,
971    turn: TurnId,
972    call_id: crate::domain::ToolCallId,
973    source: crate::models::tool_call::ToolCall,
974    token: tokio_util::sync::CancellationToken,
975    config: Arc<crate::app::Config>,
976    model_id: String,
977) {
978    let _ = msg_tx.send(Msg::ToolStarted { turn, call_id }).await;
979
980    let Some(registry) = tools else {
981        let _ = msg_tx
982            .send(Msg::ToolFinished {
983                turn,
984                call_id,
985                outcome: crate::domain::ToolOutcome::error(
986                    "EffectRunner has no ToolRegistry bound",
987                    0.0,
988                ),
989            })
990            .await;
991        return;
992    };
993
994    // Route MCP-prefixed calls to the mcp proxy, which takes
995    // {server_name, tool_name, arguments}. The raw model call has
996    // those embedded in the function name and arguments respectively.
997    let (tool_key, args) = if source.function.name.starts_with("mcp__") {
998        let rest = &source.function.name[5..];
999        if let Some((server, tool)) = rest.split_once("__") {
1000            (
1001                "mcp_proxy",
1002                serde_json::json!({
1003                    "server_name": server,
1004                    "tool_name": tool,
1005                    "arguments": source.function.arguments.clone(),
1006                }),
1007            )
1008        } else {
1009            let _ = msg_tx
1010                .send(Msg::ToolFinished {
1011                    turn,
1012                    call_id,
1013                    outcome: crate::domain::ToolOutcome::error(
1014                        format!("invalid MCP tool name: {}", source.function.name),
1015                        0.0,
1016                    ),
1017                })
1018                .await;
1019            return;
1020        }
1021    } else {
1022        (
1023            source.function.name.as_str(),
1024            source.function.arguments.clone(),
1025        )
1026    };
1027
1028    let Some(tool) = registry.get(tool_key) else {
1029        let _ = msg_tx
1030            .send(Msg::ToolFinished {
1031                turn,
1032                call_id,
1033                outcome: crate::domain::ToolOutcome::error(
1034                    format!("unknown tool: {}", tool_key),
1035                    0.0,
1036                ),
1037            })
1038            .await;
1039        return;
1040    };
1041
1042    // Bridge the tool's progress channel to `Msg::ToolProgress`.
1043    // A sibling task drains progress events while the tool runs.
1044    // The channel closes when `progress_tx` drops (when `ctx`
1045    // drops at the end of `tool.execute`), which terminates the
1046    // relay loop cleanly.
1047    let (progress_tx, mut progress_rx) = mpsc::channel(16);
1048    let relay_tx = msg_tx.clone();
1049    let progress_relay = tokio::spawn(async move {
1050        while let Some(event) = progress_rx.recv().await {
1051            if relay_tx
1052                .send(Msg::ToolProgress {
1053                    turn,
1054                    call_id,
1055                    event,
1056                })
1057                .await
1058                .is_err()
1059            {
1060                break;
1061            }
1062        }
1063    });
1064
1065    let ctx = ExecContext::new(token, progress_tx, call_id, turn, workdir, config, model_id);
1066    let outcome = tool.execute(args, ctx).await;
1067    let _ = progress_relay.await;
1068    let _ = msg_tx
1069        .send(Msg::ToolFinished {
1070            turn,
1071            call_id,
1072            outcome,
1073        })
1074        .await;
1075}
1076
1077/// Spawn `ollama pull <model>` and stream its stdout lines as
1078/// `Msg::ModelPullProgress` status updates. Emits a final
1079/// `Msg::ModelPullFinished` on successful exit; on failure, emits a
1080/// single `ModelPullProgress` with the error text.
1081async fn dispatch_pull_ollama_model(tx: MsgSender, model: String) {
1082    use tokio::io::{AsyncBufReadExt, BufReader};
1083    use tokio::process::Command;
1084
1085    let mut cmd = Command::new("ollama");
1086    cmd.arg("pull")
1087        .arg(&model)
1088        .stdin(std::process::Stdio::null())
1089        .stdout(std::process::Stdio::piped())
1090        .stderr(std::process::Stdio::piped())
1091        .kill_on_drop(true);
1092
1093    let mut child = match cmd.spawn() {
1094        Ok(c) => c,
1095        Err(e) => {
1096            let _ = tx
1097                .send(Msg::ModelPullProgress(format!(
1098                    "ollama pull failed to start: {}",
1099                    e
1100                )))
1101                .await;
1102            return;
1103        },
1104    };
1105
1106    if let Some(stdout) = child.stdout.take() {
1107        let tx_inner = tx.clone();
1108        tokio::spawn(async move {
1109            let mut reader = BufReader::new(stdout).lines();
1110            while let Ok(Some(line)) = reader.next_line().await {
1111                let _ = tx_inner.send(Msg::ModelPullProgress(line)).await;
1112            }
1113        });
1114    }
1115
1116    match child.wait().await {
1117        Ok(status) if status.success() => {
1118            let _ = tx.send(Msg::ModelPullFinished { model }).await;
1119        },
1120        Ok(status) => {
1121            let _ = tx
1122                .send(Msg::ModelPullProgress(format!(
1123                    "ollama pull exited with status {}",
1124                    status.code().unwrap_or(-1)
1125                )))
1126                .await;
1127        },
1128        Err(e) => {
1129            let _ = tx
1130                .send(Msg::ModelPullProgress(format!(
1131                    "ollama pull wait error: {}",
1132                    e
1133                )))
1134                .await;
1135        },
1136    }
1137}
1138
1139fn mcp_startup_msg(name: &str, started: bool, tools: Vec<crate::domain::McpToolSpec>) -> Msg {
1140    if started {
1141        Msg::McpServerReady {
1142            name: name.to_string(),
1143            tools,
1144        }
1145    } else {
1146        Msg::McpServerErrored {
1147            name: name.to_string(),
1148            reason: "server failed to start or initialize".to_string(),
1149        }
1150    }
1151}
1152
1153/// Read the system clipboard on a blocking thread and emit a `Msg`
1154/// back into the main loop. Image content wins when present; falls
1155/// back to text; empty or error surface as `Msg::TransientStatus` so
1156/// the user gets visible feedback (a silent no-op on Ctrl+V would be
1157/// confusing, especially on macOS where `osascript` can take ~300ms).
1158///
1159/// `tokio::task::spawn_blocking` is the right primitive: `clipboard::
1160/// has_image` / `read_image_bytes` / `read_text` shell out to xclip /
1161/// wl-paste / pngpaste / PowerShell, all of which block synchronously.
1162async fn dispatch_read_clipboard(tx: MsgSender) {
1163    use crate::domain::{Paste, StatusKind};
1164
1165    enum Outcome {
1166        Image { bytes: Vec<u8>, format: String },
1167        Text(String),
1168        Empty,
1169        Error(String),
1170    }
1171
1172    let outcome = tokio::task::spawn_blocking(|| {
1173        if crate::clipboard::has_image() {
1174            match crate::clipboard::read_image_bytes() {
1175                Ok((bytes, format)) => Outcome::Image { bytes, format },
1176                Err(e) => Outcome::Error(format!("Clipboard image read failed: {}", e)),
1177            }
1178        } else {
1179            match crate::clipboard::read_text() {
1180                Ok(t) if !t.is_empty() => Outcome::Text(t),
1181                Ok(_) => Outcome::Empty,
1182                Err(e) => Outcome::Error(format!("Clipboard empty / read failed: {}", e)),
1183            }
1184        }
1185    })
1186    .await
1187    .unwrap_or_else(|e| Outcome::Error(format!("clipboard spawn_blocking: {}", e)));
1188
1189    let msg = match outcome {
1190        Outcome::Image { bytes, format } => Msg::Paste(Paste::Image { bytes, format }),
1191        Outcome::Text(text) => Msg::Paste(Paste::Text(text)),
1192        Outcome::Empty => Msg::TransientStatus {
1193            text: "Clipboard is empty".to_string(),
1194            kind: StatusKind::Info,
1195            dismiss_ms: 2_000,
1196        },
1197        Outcome::Error(text) => Msg::TransientStatus {
1198            text,
1199            kind: StatusKind::Warn,
1200            dismiss_ms: 4_000,
1201        },
1202    };
1203    let _ = tx.send(msg).await;
1204}
1205
1206fn classify_error_for_ui(e: &crate::models::ModelError) -> crate::models::UserFacingError {
1207    use crate::models::{ErrorCategory, ModelError, UserFacingError};
1208    match e {
1209        ModelError::Backend(b) => UserFacingError {
1210            summary: "Backend error".to_string(),
1211            message: b.to_string(),
1212            suggestion: "Check the provider endpoint / API key.".to_string(),
1213            category: ErrorCategory::Connection,
1214            recoverable: true,
1215        },
1216        ModelError::Authentication(msg) => UserFacingError {
1217            summary: "Auth error".to_string(),
1218            message: msg.clone(),
1219            suggestion: "Set the env var the provider expects.".to_string(),
1220            category: ErrorCategory::Auth,
1221            recoverable: false,
1222        },
1223        ModelError::RateLimit { retry_after } => UserFacingError {
1224            summary: "Rate limit".to_string(),
1225            message: format!("retry after {:?}", retry_after),
1226            suggestion: "Wait and try again.".to_string(),
1227            category: ErrorCategory::Temporary,
1228            recoverable: true,
1229        },
1230        ModelError::StreamError(msg) => UserFacingError {
1231            summary: "Stream error".to_string(),
1232            message: msg.clone(),
1233            suggestion: "Retry the request.".to_string(),
1234            category: ErrorCategory::Connection,
1235            recoverable: true,
1236        },
1237        other => UserFacingError {
1238            summary: "Model error".to_string(),
1239            message: other.to_string(),
1240            suggestion: String::new(),
1241            category: ErrorCategory::Internal,
1242            recoverable: false,
1243        },
1244    }
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249    use super::*;
1250    use crate::domain::ToolCallId;
1251    use std::time::Duration;
1252
1253    fn runner() -> (EffectRunner, mpsc::Receiver<Msg>) {
1254        EffectRunner::pair(PathBuf::from("/tmp"))
1255    }
1256
1257    #[tokio::test]
1258    async fn dispatch_exit_is_noop_on_runner_state() {
1259        let (mut r, _rx) = runner();
1260        r.dispatch(Cmd::Exit);
1261        assert_eq!(r.scope_count(), 0);
1262    }
1263
1264    #[tokio::test]
1265    async fn dispatch_save_emits_session_saved() {
1266        let (mut r, mut rx) = runner();
1267        r.dispatch(Cmd::SaveConversation(
1268            crate::session::ConversationHistory::new("/p".to_string(), "m".to_string()),
1269        ));
1270        let msg = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1271            .await
1272            .expect("sender emits")
1273            .expect("channel alive");
1274        assert!(matches!(msg, Msg::SessionSaved));
1275    }
1276
1277    #[tokio::test]
1278    async fn dispatch_dismiss_after_delay_emits_status_dismiss() {
1279        let (mut r, mut rx) = runner();
1280        let t0 = std::time::Instant::now();
1281        r.dispatch(Cmd::DismissStatusAfter { ms: 30 });
1282        let msg = tokio::time::timeout(Duration::from_millis(300), rx.recv())
1283            .await
1284            .expect("sender emits")
1285            .expect("channel alive");
1286        assert!(matches!(msg, Msg::StatusDismiss));
1287        assert!(t0.elapsed() >= Duration::from_millis(25));
1288    }
1289
1290    #[test]
1291    fn mcp_startup_msg_treats_zero_tool_started_server_as_ready() {
1292        let msg = mcp_startup_msg("empty", true, Vec::new());
1293        assert!(matches!(
1294            msg,
1295            Msg::McpServerReady { name, tools } if name == "empty" && tools.is_empty()
1296        ));
1297    }
1298
1299    #[test]
1300    fn mcp_startup_msg_reports_unstarted_server_as_error() {
1301        let msg = mcp_startup_msg("bad", false, Vec::new());
1302        assert!(matches!(
1303            msg,
1304            Msg::McpServerErrored { name, reason }
1305                if name == "bad" && reason.contains("failed to start")
1306        ));
1307    }
1308
1309    #[tokio::test]
1310    async fn cancel_scope_emits_turn_cancelled_after_bounded_timeout() {
1311        let (mut r, mut rx) = runner();
1312        let turn = TurnId(77);
1313        {
1314            let scope = r.scope_mut(turn);
1315            scope.spawn(async {
1316                std::future::pending::<()>().await;
1317            });
1318        }
1319        assert_eq!(r.scope_count(), 1);
1320
1321        let start = std::time::Instant::now();
1322        r.dispatch(Cmd::CancelScope(turn));
1323        assert_eq!(r.scope_count(), 0);
1324        let msg = tokio::time::timeout(Duration::from_millis(500), rx.recv())
1325            .await
1326            .expect("bounded cancel should emit terminal message")
1327            .expect("channel alive");
1328        assert!(matches!(msg, Msg::TurnCancelled(t) if t == turn));
1329        assert!(
1330            start.elapsed() < Duration::from_millis(500),
1331            "cancel terminal message took {:?}",
1332            start.elapsed()
1333        );
1334    }
1335
1336    #[tokio::test]
1337    async fn dispatch_call_model_creates_scope() {
1338        let (mut r, _rx) = runner();
1339        let turn = TurnId(7);
1340        let request = crate::domain::ChatRequest {
1341            model_id: "test/m".to_string(),
1342            messages: vec![],
1343            system_prompt: String::new(),
1344            instructions: None,
1345            reasoning: crate::models::ReasoningLevel::Medium,
1346            temperature: 0.7,
1347            max_tokens: 4096,
1348            tools: vec![],
1349        };
1350        r.dispatch(Cmd::CallModel { turn, request });
1351        assert_eq!(r.scope_count(), 1);
1352    }
1353
1354    /// F12: after a spawned task completes (here via the
1355    /// no-ProviderFactory error path), the next `dispatch` call reaps
1356    /// the empty scope instead of leaving an orphan entry in the map.
1357    #[tokio::test]
1358    async fn empty_scopes_are_reaped_on_next_dispatch() {
1359        let (mut r, mut rx) = runner();
1360        let turn = TurnId(42);
1361        let request = crate::domain::ChatRequest {
1362            model_id: "test/m".to_string(),
1363            messages: vec![],
1364            system_prompt: String::new(),
1365            instructions: None,
1366            reasoning: crate::models::ReasoningLevel::Medium,
1367            temperature: 0.7,
1368            max_tokens: 4096,
1369            tools: vec![],
1370        };
1371        r.dispatch(Cmd::CallModel { turn, request });
1372        assert_eq!(r.scope_count(), 1);
1373
1374        // Runner has no provider bindings → dispatch_call_model hits
1375        // the "not wired" error path and emits UpstreamError, then the
1376        // spawned task returns. Drain that message so we know the task
1377        // ran to completion.
1378        let msg = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1379            .await
1380            .expect("upstream error arrived")
1381            .expect("channel alive");
1382        assert!(matches!(msg, Msg::UpstreamError { .. }));
1383
1384        // Give the JoinSet a tick to notice the task finished.
1385        tokio::task::yield_now().await;
1386
1387        // Any subsequent dispatch reaps the now-empty scope.
1388        r.dispatch(Cmd::DismissStatusAfter { ms: 10 });
1389        assert_eq!(
1390            r.scope_count(),
1391            0,
1392            "completed scope must be reaped on next dispatch"
1393        );
1394    }
1395
1396    #[tokio::test]
1397    async fn dispatch_execute_tool_under_turn_emits_tool_started() {
1398        let (mut r, mut rx) = runner();
1399        let turn = TurnId(7);
1400        let call_id = ToolCallId(1);
1401        let source = crate::models::tool_call::ToolCall {
1402            id: Some("c1".to_string()),
1403            function: crate::models::tool_call::FunctionCall {
1404                name: "read_file".to_string(),
1405                arguments: serde_json::json!({"path": "x"}),
1406            },
1407        };
1408        r.dispatch(Cmd::ExecuteTool {
1409            turn,
1410            call_id,
1411            source,
1412            model_id: "ollama/test".to_string(),
1413        });
1414        let first = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1415            .await
1416            .expect("some msg")
1417            .expect("channel alive");
1418        assert!(matches!(
1419            first,
1420            Msg::ToolStarted {
1421                turn: t,
1422                call_id: c,
1423            } if t == turn && c == call_id
1424        ));
1425    }
1426
1427    #[tokio::test]
1428    async fn cancel_scope_before_execute_tool_drops_pending_work() {
1429        let (mut r, _rx) = runner();
1430        let turn = TurnId(9);
1431        r.dispatch(Cmd::CallModel {
1432            turn,
1433            request: crate::domain::ChatRequest {
1434                model_id: "m".to_string(),
1435                messages: vec![],
1436                system_prompt: String::new(),
1437                instructions: None,
1438                reasoning: crate::models::ReasoningLevel::Medium,
1439                temperature: 0.7,
1440                max_tokens: 4096,
1441                tools: vec![],
1442            },
1443        });
1444        assert_eq!(r.scope_count(), 1);
1445
1446        r.dispatch(Cmd::CancelScope(turn));
1447        assert_eq!(r.scope_count(), 0);
1448    }
1449
1450    #[tokio::test]
1451    async fn shutdown_drains_pending_saves() {
1452        let (mut r, _rx) = runner();
1453        for _ in 0..5 {
1454            r.dispatch(Cmd::SaveConversation(
1455                crate::session::ConversationHistory::new("/p".to_string(), "m".to_string()),
1456            ));
1457        }
1458        // Shutdown waits for all five to complete (should be instant).
1459        let start = std::time::Instant::now();
1460        r.shutdown().await;
1461        assert!(start.elapsed() < Duration::from_secs(2));
1462    }
1463}