1mod middleware;
35mod turn_scope;
36
37use std::collections::HashMap;
38use std::path::PathBuf;
39use std::sync::Arc;
40use std::time::Instant;
41
42use tokio::sync::mpsc;
43
44use crate::app::Config;
45use crate::domain::{
46 Cmd, CompactionPolicy, CompactionRequest, CompactionResult, CompactionTrigger, Msg, TurnId,
47};
48use crate::models::{ModelError, TokenUsage};
49use crate::providers::ctx::{ExecContext, StreamContext};
50use crate::providers::model::ModelProvider;
51use crate::providers::{ProviderFactory, StreamEvent, ToolRegistry};
52
53pub use middleware::{DEFAULT_MAX_ATTEMPTS, retry_transient_http};
54pub use turn_scope::TurnScope;
55
56#[cfg(not(test))]
57const CANCEL_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);
58#[cfg(test)]
59const CANCEL_DRAIN_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
60
61pub type MsgSender = mpsc::Sender<Msg>;
67
68pub const MSG_CHANNEL_CAPACITY: usize = 512;
74
75pub struct EffectRunner {
78 msg_tx: MsgSender,
79 scopes: HashMap<TurnId, TurnScope>,
85 detached: tokio::task::JoinSet<()>,
89 workdir: PathBuf,
93 providers: Option<Arc<ProviderFactory>>,
98 tools: Option<Arc<ToolRegistry>>,
101}
102
103impl EffectRunner {
104 pub fn new(msg_tx: MsgSender, workdir: PathBuf) -> Self {
106 Self {
107 msg_tx,
108 scopes: HashMap::new(),
109 detached: tokio::task::JoinSet::new(),
110 workdir,
111 providers: None,
112 tools: None,
113 }
114 }
115
116 pub fn with_bindings(
121 mut self,
122 providers: Arc<ProviderFactory>,
123 tools: Arc<ToolRegistry>,
124 ) -> Self {
125 self.providers = Some(providers);
126 self.tools = Some(tools);
127 self
128 }
129
130 pub fn pair(workdir: PathBuf) -> (Self, mpsc::Receiver<Msg>) {
134 let (tx, rx) = mpsc::channel(MSG_CHANNEL_CAPACITY);
135 (Self::new(tx, workdir), rx)
136 }
137
138 pub fn pair_with_bindings(
141 workdir: PathBuf,
142 config: Config,
143 tools: Arc<ToolRegistry>,
144 ) -> (Self, mpsc::Receiver<Msg>) {
145 let providers = Arc::new(ProviderFactory::new(config));
146 Self::pair_from(workdir, providers, tools)
147 }
148
149 pub fn pair_from(
154 workdir: PathBuf,
155 providers: Arc<ProviderFactory>,
156 tools: Arc<ToolRegistry>,
157 ) -> (Self, mpsc::Receiver<Msg>) {
158 let (tx, rx) = mpsc::channel(MSG_CHANNEL_CAPACITY);
159 (Self::new(tx, workdir).with_bindings(providers, tools), rx)
160 }
161
162 pub fn new_child(
167 msg_tx: MsgSender,
168 workdir: PathBuf,
169 providers: Arc<ProviderFactory>,
170 tools: Arc<ToolRegistry>,
171 ) -> Self {
172 Self::new(msg_tx, workdir).with_bindings(providers, tools)
173 }
174
175 fn scope_mut(&mut self, turn: TurnId) -> &mut TurnScope {
179 self.scopes
180 .entry(turn)
181 .or_insert_with(|| TurnScope::new(turn))
182 }
183
184 fn drop_scope(&mut self, turn: TurnId) {
194 if let Some(mut scope) = self.scopes.remove(&turn) {
195 scope.cancel();
196 let tx = self.msg_tx.clone();
197 self.detached.spawn(async move {
198 if tokio::time::timeout(CANCEL_DRAIN_TIMEOUT, scope.drain())
199 .await
200 .is_err()
201 {
202 tracing::warn!(
203 turn = %turn,
204 timeout_ms = CANCEL_DRAIN_TIMEOUT.as_millis(),
205 "cancel drain timed out; aborting remaining scoped tasks"
206 );
207 }
208 let _ = tx.send(Msg::TurnCancelled(turn)).await;
209 });
210 }
211 }
212
213 pub fn scope_count(&self) -> usize {
216 self.scopes.len()
217 }
218
219 fn reap_empty_scopes(&mut self) {
228 self.scopes.retain(|_, scope| {
229 scope.drain_completed();
230 !scope.is_empty()
231 });
232 }
233
234 pub fn dispatch(&mut self, cmd: Cmd) {
238 self.reap_empty_scopes();
241 tracing::trace!(cmd = %cmd.summary(), "effect: dispatch");
242
243 match cmd {
244 Cmd::CallModel { turn, mut request } => {
245 let tx = self.msg_tx.clone();
246 let providers = self.providers.clone();
247 if let Some(tools) = &self.tools {
254 let mut enriched = tools.describe_all();
255 enriched.append(&mut request.tools);
256 request.tools = enriched;
257 }
258 let scope = self.scope_mut(turn);
259 let token = scope.token();
260 scope.spawn(async move {
261 dispatch_call_model(tx, providers, turn, request, token).await;
262 });
263 },
264 Cmd::CompactConversation { turn, mut request } => {
265 let tx = self.msg_tx.clone();
266 let providers = self.providers.clone();
267 if let Some(tools) = &self.tools {
268 let mut enriched = tools.describe_all();
269 enriched.append(&mut request.chat.tools);
270 request.chat.tools = enriched;
271 }
272 let scope = self.scope_mut(turn);
273 let token = scope.token();
274 scope.spawn(async move {
275 dispatch_compact_conversation(tx, providers, turn, request, token).await;
276 });
277 },
278 Cmd::ExecuteTool {
279 turn,
280 call_id,
281 source,
282 model_id,
283 } => {
284 let tx = self.msg_tx.clone();
285 let tools = self.tools.clone();
286 let workdir = self.workdir.clone();
287 let config = self
292 .providers
293 .as_ref()
294 .map(|p| Arc::new(p.config().clone()))
295 .unwrap_or_else(|| Arc::new(crate::app::Config::default()));
296 let scope = self.scope_mut(turn);
297 let token = scope.token();
298 scope.spawn(async move {
299 dispatch_execute_tool(
300 tx, tools, workdir, turn, call_id, source, token, config, model_id,
301 )
302 .await;
303 });
304 },
305 Cmd::CancelScope(turn) => {
306 self.drop_scope(turn);
307 },
308 Cmd::SaveConversation(history) => {
309 let tx = self.msg_tx.clone();
310 let workdir = self.workdir.clone();
311 self.detached.spawn(async move {
312 if let Ok(manager) = crate::session::ConversationManager::new(&workdir)
313 && manager.save_conversation(&history).is_ok()
314 {
315 let _ = tx.send(Msg::SessionSaved).await;
316 } else {
317 tracing::warn!("SaveConversation: failed to write to disk");
318 }
319 });
320 },
321 Cmd::SaveCompactionArchive(archive) => {
322 let workdir = self.workdir.clone();
323 self.detached.spawn(async move {
324 if let Ok(manager) = crate::session::ConversationManager::new(&workdir)
325 && let Err(err) = manager.save_compaction_archive(&archive)
326 {
327 tracing::warn!(error = %err, "SaveCompactionArchive: failed to write archive");
328 }
329 });
330 },
331 Cmd::PersistLastModel(model) => {
332 self.detached.spawn(async move {
333 let _ = crate::app::persist_last_model(&model);
334 });
335 },
336 Cmd::PersistReasoningFor { model_id, level } => {
337 self.detached.spawn(async move {
338 let _ = crate::app::persist_reasoning_for_model(&model_id, level);
339 });
340 },
341 Cmd::RefreshInstructions => {
342 let tx = self.msg_tx.clone();
343 let workdir = self.workdir.clone();
344 self.detached.spawn(async move {
345 let (loaded, _outcome) = crate::app::instructions::refresh(None, &workdir);
346 let _ = tx.send(Msg::InstructionsChanged(loaded)).await;
347 });
348 },
349 Cmd::LoadConversation(id) => {
350 let tx = self.msg_tx.clone();
351 let workdir = self.workdir.clone();
352 self.detached.spawn(async move {
353 match crate::session::ConversationManager::new(&workdir) {
354 Ok(mgr) => match mgr.load_conversation(&id) {
355 Ok(history) => {
356 let _ = tx.send(Msg::ConversationLoaded(history)).await;
357 },
358 Err(e) => {
359 tracing::warn!(id = %id, error = %e, "LoadConversation failed");
360 },
361 },
362 Err(e) => {
363 tracing::warn!(error = %e, "ConversationManager init failed");
364 },
365 }
366 });
367 },
368 Cmd::ListConversations => {
369 let tx = self.msg_tx.clone();
370 let workdir = self.workdir.clone();
371 self.detached.spawn(async move {
372 let summaries = match crate::session::ConversationManager::new(&workdir) {
373 Ok(mgr) => mgr
374 .list_conversations()
375 .unwrap_or_default()
376 .into_iter()
377 .map(|h| crate::domain::ConversationSummary {
378 id: h.id.clone(),
379 title: h.title.clone(),
380 message_count: h.messages.len(),
381 updated_at: h.updated_at.to_rfc3339(),
382 })
383 .collect(),
384 Err(_) => Vec::new(),
385 };
386 let _ = tx.send(Msg::ConversationsListed(summaries)).await;
387 });
388 },
389 Cmd::InitMcpServers(configs) => {
390 let tx = self.msg_tx.clone();
391 self.detached.spawn(async move {
392 if configs.is_empty() {
393 return;
394 }
395 crate::mcp::manager_ref::mark_init_started();
396 let manager =
397 std::sync::Arc::new(crate::mcp::McpServerManager::start(&configs).await);
398 for (name, _cfg) in configs.iter() {
402 let server_tools: Vec<crate::domain::McpToolSpec> = manager
403 .get_all_tools()
404 .iter()
405 .filter(|(server, _)| server == name)
406 .map(|(_, def)| crate::domain::McpToolSpec {
407 name: def.name.clone(),
408 description: def.description.clone(),
409 input_schema: def.input_schema.clone(),
410 })
411 .collect();
412 let msg = mcp_startup_msg(name, manager.has_server(name), server_tools);
413 let _ = tx.send(msg).await;
414 }
415 crate::mcp::manager_ref::set_manager(manager);
416 crate::mcp::manager_ref::mark_init_complete();
417 });
418 },
419 Cmd::StopMcpServer { name } => {
420 let tx = self.msg_tx.clone();
421 self.detached.spawn(async move {
422 let _ = tx.send(Msg::McpServerStopped { name }).await;
423 });
424 },
425 Cmd::PullOllamaModel { model } => {
426 let tx = self.msg_tx.clone();
427 self.detached.spawn(async move {
428 dispatch_pull_ollama_model(tx, model).await;
429 });
430 },
431 Cmd::OpenInSystem(path) => {
432 self.detached.spawn(async move {
433 let _ = tokio::task::spawn_blocking(move || {
434 crate::utils::open_file(&path);
435 })
436 .await;
437 });
438 },
439 Cmd::DismissStatusAfter { ms } => {
440 let tx = self.msg_tx.clone();
441 self.detached.spawn(async move {
442 tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
443 let _ = tx.send(Msg::StatusDismiss).await;
444 });
445 },
446 Cmd::WriteImageToTemp {
447 path,
448 bytes,
449 format: _,
450 } => {
451 self.detached.spawn(async move {
452 if let Err(e) = tokio::fs::write(&path, &bytes).await {
453 tracing::warn!(path = %path.display(), error = %e, "WriteImageToTemp failed");
454 }
455 });
456 },
457 Cmd::ReadClipboard => {
458 let tx = self.msg_tx.clone();
459 self.detached.spawn(async move {
460 dispatch_read_clipboard(tx).await;
461 });
462 },
463 Cmd::Exit => {
464 },
469 Cmd::SetTerminalTitle(title) => {
470 self.detached.spawn(async move {
471 use std::io::Write;
472 let seq = format!("\x1b]2;{}\x07", title);
473 let mut stdout = std::io::stdout();
474 let _ = stdout.write_all(seq.as_bytes());
475 let _ = stdout.flush();
476 });
477 },
478 }
479 }
480
481 pub async fn shutdown(mut self) {
485 for (id, scope) in self.scopes.iter() {
486 tracing::debug!(turn = %id, "shutdown: cancelling scope");
487 scope.cancel();
488 }
489
490 let shutdown_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
492
493 let drain = async {
494 for (_, mut scope) in self.scopes.drain() {
495 scope.drain().await;
496 }
497 while let Some(result) = self.detached.join_next().await {
498 if let Err(e) = result
499 && !e.is_cancelled()
500 {
501 tracing::warn!(error = %e, "shutdown: detached task panic");
502 }
503 }
504 };
505
506 let _ = tokio::time::timeout_at(shutdown_deadline, drain).await;
507 }
508
509 #[doc(hidden)]
512 pub fn msg_sender(&self) -> MsgSender {
513 self.msg_tx.clone()
514 }
515}
516
517async fn dispatch_call_model(
522 msg_tx: MsgSender,
523 providers: Option<Arc<ProviderFactory>>,
524 turn: TurnId,
525 mut request: crate::domain::ChatRequest,
526 token: tokio_util::sync::CancellationToken,
527) {
528 use crate::models::UserFacingError;
529
530 let Some(factory) = providers else {
531 let error = UserFacingError {
532 summary: "not wired".to_string(),
533 message: "EffectRunner has no ProviderFactory bound".to_string(),
534 suggestion: "construct via EffectRunner::pair_with_bindings".to_string(),
535 category: crate::models::ErrorCategory::Internal,
536 recoverable: false,
537 };
538 let _ = msg_tx.send(Msg::UpstreamError { turn, error }).await;
539 return;
540 };
541
542 let provider = match factory.resolve(&request.model_id).await {
544 Ok(p) => p,
545 Err(e) => {
546 let error = classify_error_for_ui(&e);
547 let _ = msg_tx.send(Msg::UpstreamError { turn, error }).await;
548 return;
549 },
550 };
551
552 let max_context_tokens = provider.capabilities().max_context_tokens.or_else(|| {
553 crate::domain::runtime::infer_static_context_window_for_model_id(&request.model_id)
554 });
555 let context_snapshot =
556 crate::domain::estimate_context_usage_for_request(&request, max_context_tokens);
557 let _ = msg_tx
558 .send(Msg::ContextUsageEstimated {
559 turn,
560 snapshot: context_snapshot.clone(),
561 })
562 .await;
563
564 let policy = CompactionPolicy::default();
565 let mut compacted_before_stream = false;
566 if crate::domain::should_auto_compact(&context_snapshot, &request, policy).is_ok() {
567 let compaction = CompactionRequest::auto(request.clone(), CompactionTrigger::AutoThreshold);
568 match run_compaction(
569 Arc::clone(&provider),
570 turn,
571 compaction,
572 context_snapshot.clone(),
573 max_context_tokens,
574 token.clone(),
575 )
576 .await
577 {
578 Ok(result) => {
579 request.messages = result.replacement_messages.clone();
580 compacted_before_stream = true;
581 let _ = msg_tx.send(Msg::CompactionFinished { turn, result }).await;
582 },
583 Err(err) => {
584 let hard_limit =
585 crate::domain::context_exceeds_hard_limit(&context_snapshot, &request, policy);
586 let _ = msg_tx
587 .send(Msg::CompactionFailed {
588 turn,
589 trigger: CompactionTrigger::AutoThreshold,
590 message: err.to_string(),
591 kind: if hard_limit {
592 crate::domain::StatusKind::Error
593 } else {
594 crate::domain::StatusKind::Warn
595 },
596 })
597 .await;
598 if hard_limit {
599 let error = UserFacingError {
600 summary: "Context too large".to_string(),
601 message: format!(
602 "The next request needs {} tokens before response reserve, and automatic compaction failed: {}",
603 context_snapshot.used_tokens, err
604 ),
605 suggestion: "Run /compact with focus instructions, or /clear to start a fresh session.".to_string(),
606 category: crate::models::ErrorCategory::Config,
607 recoverable: true,
608 };
609 let _ = msg_tx.send(Msg::UpstreamError { turn, error }).await;
610 return;
611 }
612 },
613 }
614 }
615
616 let (stream_tx, mut stream_rx) = mpsc::channel::<StreamEvent>(256);
619 let ctx = StreamContext::new(token.clone(), stream_tx, turn);
620
621 let relay_tx = msg_tx.clone();
624 let relay = tokio::spawn(async move {
625 while let Some(event) = stream_rx.recv().await {
626 let msg = match event {
627 StreamEvent::Text(chunk) => Msg::StreamText { turn, chunk },
628 StreamEvent::Reasoning(chunk) => Msg::StreamReasoning { turn, chunk },
629 StreamEvent::ToolCall(call) => Msg::StreamToolCall { turn, call },
630 StreamEvent::ThinkingSignature(_) => continue, StreamEvent::Done {
632 usage,
633 thinking_signature,
634 } => Msg::StreamDone {
635 turn,
636 usage,
637 thinking_signature,
638 },
639 };
640 if relay_tx.send(msg).await.is_err() {
641 break;
642 }
643 }
644 });
645
646 match provider.chat(request.clone(), ctx).await {
655 Ok(_final_response) => {
656 },
658 Err(crate::models::ModelError::Cancelled) => {
659 },
661 Err(e) => {
662 let retry_context_limit = !compacted_before_stream && is_context_limit_error(&e);
663 if retry_context_limit {
664 let latest_snapshot =
665 crate::domain::estimate_context_usage_for_request(&request, max_context_tokens);
666 let compaction =
667 CompactionRequest::auto(request.clone(), CompactionTrigger::ContextLimitRetry);
668 match run_compaction(
669 Arc::clone(&provider),
670 turn,
671 compaction,
672 latest_snapshot,
673 max_context_tokens,
674 token.clone(),
675 )
676 .await
677 {
678 Ok(result) => {
679 let mut retry_request = request;
680 retry_request.messages = result.replacement_messages.clone();
681 let _ = msg_tx.send(Msg::CompactionFinished { turn, result }).await;
682 let _ = relay.await;
683 dispatch_provider_stream(msg_tx, provider, turn, retry_request, token)
684 .await;
685 return;
686 },
687 Err(compact_err) => {
688 let _ = msg_tx
689 .send(Msg::CompactionFailed {
690 turn,
691 trigger: CompactionTrigger::ContextLimitRetry,
692 message: compact_err.to_string(),
693 kind: crate::domain::StatusKind::Error,
694 })
695 .await;
696 },
697 }
698 }
699 let error = classify_error_for_ui(&e);
700 let _ = msg_tx.send(Msg::UpstreamError { turn, error }).await;
701 },
702 }
703
704 let _ = relay.await;
705}
706
707async fn dispatch_provider_stream(
708 msg_tx: MsgSender,
709 provider: Arc<dyn ModelProvider>,
710 turn: TurnId,
711 request: crate::domain::ChatRequest,
712 token: tokio_util::sync::CancellationToken,
713) {
714 let (stream_tx, mut stream_rx) = mpsc::channel::<StreamEvent>(256);
715 let ctx = StreamContext::new(token.clone(), stream_tx, turn);
716 let relay_tx = msg_tx.clone();
717 let relay = tokio::spawn(async move {
718 while let Some(event) = stream_rx.recv().await {
719 let msg = match event {
720 StreamEvent::Text(chunk) => Msg::StreamText { turn, chunk },
721 StreamEvent::Reasoning(chunk) => Msg::StreamReasoning { turn, chunk },
722 StreamEvent::ToolCall(call) => Msg::StreamToolCall { turn, call },
723 StreamEvent::ThinkingSignature(_) => continue,
724 StreamEvent::Done {
725 usage,
726 thinking_signature,
727 } => Msg::StreamDone {
728 turn,
729 usage,
730 thinking_signature,
731 },
732 };
733 if relay_tx.send(msg).await.is_err() {
734 break;
735 }
736 }
737 });
738
739 match provider.chat(request, ctx).await {
740 Ok(_) | Err(ModelError::Cancelled) => {},
741 Err(e) => {
742 let error = classify_error_for_ui(&e);
743 let _ = msg_tx.send(Msg::UpstreamError { turn, error }).await;
744 },
745 }
746
747 let _ = relay.await;
748}
749
750async fn dispatch_compact_conversation(
751 msg_tx: MsgSender,
752 providers: Option<Arc<ProviderFactory>>,
753 turn: TurnId,
754 request: CompactionRequest,
755 token: tokio_util::sync::CancellationToken,
756) {
757 let Some(factory) = providers else {
758 let _ = msg_tx
759 .send(Msg::CompactionFailed {
760 turn,
761 trigger: request.trigger,
762 message: "EffectRunner has no ProviderFactory bound".to_string(),
763 kind: crate::domain::StatusKind::Error,
764 })
765 .await;
766 return;
767 };
768
769 let provider = match factory.resolve(&request.chat.model_id).await {
770 Ok(provider) => provider,
771 Err(err) => {
772 let _ = msg_tx
773 .send(Msg::CompactionFailed {
774 turn,
775 trigger: request.trigger,
776 message: err.to_string(),
777 kind: crate::domain::StatusKind::Error,
778 })
779 .await;
780 return;
781 },
782 };
783
784 let max_context_tokens = provider.capabilities().max_context_tokens.or_else(|| {
785 crate::domain::runtime::infer_static_context_window_for_model_id(&request.chat.model_id)
786 });
787 let before_snapshot =
788 crate::domain::estimate_context_usage_for_request(&request.chat, max_context_tokens);
789
790 let trigger = request.trigger;
791 match run_compaction(
792 provider,
793 turn,
794 request,
795 before_snapshot,
796 max_context_tokens,
797 token,
798 )
799 .await
800 {
801 Ok(result) => {
802 let _ = msg_tx.send(Msg::CompactionFinished { turn, result }).await;
803 },
804 Err(err) => {
805 let _ = msg_tx
806 .send(Msg::CompactionFailed {
807 turn,
808 trigger,
809 message: err.to_string(),
810 kind: crate::domain::StatusKind::Error,
811 })
812 .await;
813 },
814 }
815}
816
817async fn run_compaction(
818 provider: Arc<dyn ModelProvider>,
819 turn: TurnId,
820 request: CompactionRequest,
821 before_snapshot: crate::domain::ContextUsageSnapshot,
822 max_context_tokens: Option<usize>,
823 token: tokio_util::sync::CancellationToken,
824) -> Result<CompactionResult, ModelError> {
825 let started = Instant::now();
826 let prepared = crate::domain::prepare_compaction(&request, max_context_tokens)
827 .map_err(|skip| ModelError::InvalidRequest(skip.to_string()))?;
828
829 let summary_request = crate::domain::build_summary_request(
830 &request.chat,
831 &prepared,
832 request.instructions.as_deref(),
833 request.policy,
834 );
835 let (draft, draft_usage) =
836 collect_compaction_text(Arc::clone(&provider), turn, summary_request, token.clone())
837 .await?;
838 let draft_summary = crate::domain::normalize_summary(&draft);
839 if draft_summary.trim().is_empty() {
840 return Err(ModelError::InvalidRequest(
841 "compaction produced an empty summary".to_string(),
842 ));
843 }
844
845 let verify_request = crate::domain::build_verification_request(
846 &request.chat,
847 &prepared,
848 &draft_summary,
849 request.instructions.as_deref(),
850 request.policy,
851 );
852 let (verified, verify_usage) =
853 collect_compaction_text(Arc::clone(&provider), turn, verify_request, token).await?;
854 let verified_summary = crate::domain::normalize_summary(&verified);
855 let final_summary = if verified_summary.trim().is_empty() {
856 draft_summary
857 } else {
858 verified_summary
859 };
860
861 let id = format!(
862 "compact_{}",
863 chrono::Local::now().format("%Y%m%d_%H%M%S_%3f")
864 );
865 let mut record = crate::domain::CompactionRecord {
866 id,
867 trigger: request.trigger,
868 created_at: chrono::Local::now(),
869 before_tokens: before_snapshot.used_tokens,
870 after_tokens: 0,
871 archived_message_count: prepared.archived_messages.len(),
872 preserved_message_count: prepared.preserved_messages.len(),
873 summary_tokens: final_summary.len().div_ceil(4),
874 duration_secs: started.elapsed().as_secs_f64(),
875 focus: request.instructions.clone(),
876 archive_path: None,
877 };
878
879 let mut replacement =
880 crate::domain::build_replacement_messages(&final_summary, &prepared, &record);
881 let mut compacted_request = request.chat.clone();
882 compacted_request.messages = replacement.clone();
883 let mut after_snapshot =
884 crate::domain::estimate_context_usage_for_request(&compacted_request, max_context_tokens);
885 record.after_tokens = after_snapshot.used_tokens;
886 record.duration_secs = started.elapsed().as_secs_f64();
887 replacement = crate::domain::build_replacement_messages(&final_summary, &prepared, &record);
888 compacted_request.messages = replacement.clone();
889 after_snapshot =
890 crate::domain::estimate_context_usage_for_request(&compacted_request, max_context_tokens);
891 record.after_tokens = after_snapshot.used_tokens;
892
893 if after_snapshot.used_tokens >= before_snapshot.used_tokens {
894 return Err(ModelError::InvalidRequest(format!(
895 "compaction did not reduce context ({} -> {} tokens)",
896 before_snapshot.used_tokens, after_snapshot.used_tokens
897 )));
898 }
899
900 if crate::domain::context_exceeds_hard_limit(
901 &after_snapshot,
902 &compacted_request,
903 request.policy,
904 ) {
905 return Err(ModelError::InvalidRequest(format!(
906 "compacted context still exceeds response reserve ({} tokens used)",
907 after_snapshot.used_tokens
908 )));
909 }
910
911 Ok(CompactionResult {
912 record,
913 replacement_messages: replacement,
914 archived_messages: prepared.archived_messages,
915 before_snapshot,
916 after_snapshot,
917 usage: crate::domain::combine_usage(draft_usage, verify_usage),
918 })
919}
920
921async fn collect_compaction_text(
922 provider: Arc<dyn ModelProvider>,
923 turn: TurnId,
924 request: crate::domain::ChatRequest,
925 token: tokio_util::sync::CancellationToken,
926) -> Result<(String, Option<TokenUsage>), ModelError> {
927 let (stream_tx, mut stream_rx) = mpsc::channel::<StreamEvent>(128);
928 let ctx = StreamContext::new(token, stream_tx, turn);
929 let collector = tokio::spawn(async move {
930 let mut text = String::new();
931 let mut usage = None;
932 while let Some(event) = stream_rx.recv().await {
933 match event {
934 StreamEvent::Text(chunk) => text.push_str(&chunk),
935 StreamEvent::Done {
936 usage: done_usage, ..
937 } => usage = done_usage,
938 StreamEvent::Reasoning(_)
939 | StreamEvent::ToolCall(_)
940 | StreamEvent::ThinkingSignature(_) => {},
941 }
942 }
943 (text, usage)
944 });
945
946 let response = provider.chat(request, ctx).await;
947 let (text, stream_usage) = collector
948 .await
949 .map_err(|err| ModelError::StreamError(format!("compaction collector failed: {}", err)))?;
950 match response {
951 Ok(final_response) => Ok((text, final_response.usage.or(stream_usage))),
952 Err(err) => Err(err),
953 }
954}
955
956fn is_context_limit_error(error: &ModelError) -> bool {
957 let text = error.to_string().to_lowercase();
958 text.contains("context")
959 && (text.contains("too large")
960 || text.contains("exceed")
961 || text.contains("maximum")
962 || text.contains("token"))
963}
964
965#[allow(clippy::too_many_arguments)]
967async fn dispatch_execute_tool(
968 msg_tx: MsgSender,
969 tools: Option<Arc<ToolRegistry>>,
970 workdir: PathBuf,
971 turn: TurnId,
972 call_id: crate::domain::ToolCallId,
973 source: crate::models::tool_call::ToolCall,
974 token: tokio_util::sync::CancellationToken,
975 config: Arc<crate::app::Config>,
976 model_id: String,
977) {
978 let _ = msg_tx.send(Msg::ToolStarted { turn, call_id }).await;
979
980 let Some(registry) = tools else {
981 let _ = msg_tx
982 .send(Msg::ToolFinished {
983 turn,
984 call_id,
985 outcome: crate::domain::ToolOutcome::error(
986 "EffectRunner has no ToolRegistry bound",
987 0.0,
988 ),
989 })
990 .await;
991 return;
992 };
993
994 let (tool_key, args) = if source.function.name.starts_with("mcp__") {
998 let rest = &source.function.name[5..];
999 if let Some((server, tool)) = rest.split_once("__") {
1000 (
1001 "mcp_proxy",
1002 serde_json::json!({
1003 "server_name": server,
1004 "tool_name": tool,
1005 "arguments": source.function.arguments.clone(),
1006 }),
1007 )
1008 } else {
1009 let _ = msg_tx
1010 .send(Msg::ToolFinished {
1011 turn,
1012 call_id,
1013 outcome: crate::domain::ToolOutcome::error(
1014 format!("invalid MCP tool name: {}", source.function.name),
1015 0.0,
1016 ),
1017 })
1018 .await;
1019 return;
1020 }
1021 } else {
1022 (
1023 source.function.name.as_str(),
1024 source.function.arguments.clone(),
1025 )
1026 };
1027
1028 let Some(tool) = registry.get(tool_key) else {
1029 let _ = msg_tx
1030 .send(Msg::ToolFinished {
1031 turn,
1032 call_id,
1033 outcome: crate::domain::ToolOutcome::error(
1034 format!("unknown tool: {}", tool_key),
1035 0.0,
1036 ),
1037 })
1038 .await;
1039 return;
1040 };
1041
1042 let (progress_tx, mut progress_rx) = mpsc::channel(16);
1048 let relay_tx = msg_tx.clone();
1049 let progress_relay = tokio::spawn(async move {
1050 while let Some(event) = progress_rx.recv().await {
1051 if relay_tx
1052 .send(Msg::ToolProgress {
1053 turn,
1054 call_id,
1055 event,
1056 })
1057 .await
1058 .is_err()
1059 {
1060 break;
1061 }
1062 }
1063 });
1064
1065 let ctx = ExecContext::new(token, progress_tx, call_id, turn, workdir, config, model_id);
1066 let outcome = tool.execute(args, ctx).await;
1067 let _ = progress_relay.await;
1068 let _ = msg_tx
1069 .send(Msg::ToolFinished {
1070 turn,
1071 call_id,
1072 outcome,
1073 })
1074 .await;
1075}
1076
1077async fn dispatch_pull_ollama_model(tx: MsgSender, model: String) {
1082 use tokio::io::{AsyncBufReadExt, BufReader};
1083 use tokio::process::Command;
1084
1085 let mut cmd = Command::new("ollama");
1086 cmd.arg("pull")
1087 .arg(&model)
1088 .stdin(std::process::Stdio::null())
1089 .stdout(std::process::Stdio::piped())
1090 .stderr(std::process::Stdio::piped())
1091 .kill_on_drop(true);
1092
1093 let mut child = match cmd.spawn() {
1094 Ok(c) => c,
1095 Err(e) => {
1096 let _ = tx
1097 .send(Msg::ModelPullProgress(format!(
1098 "ollama pull failed to start: {}",
1099 e
1100 )))
1101 .await;
1102 return;
1103 },
1104 };
1105
1106 if let Some(stdout) = child.stdout.take() {
1107 let tx_inner = tx.clone();
1108 tokio::spawn(async move {
1109 let mut reader = BufReader::new(stdout).lines();
1110 while let Ok(Some(line)) = reader.next_line().await {
1111 let _ = tx_inner.send(Msg::ModelPullProgress(line)).await;
1112 }
1113 });
1114 }
1115
1116 match child.wait().await {
1117 Ok(status) if status.success() => {
1118 let _ = tx.send(Msg::ModelPullFinished { model }).await;
1119 },
1120 Ok(status) => {
1121 let _ = tx
1122 .send(Msg::ModelPullProgress(format!(
1123 "ollama pull exited with status {}",
1124 status.code().unwrap_or(-1)
1125 )))
1126 .await;
1127 },
1128 Err(e) => {
1129 let _ = tx
1130 .send(Msg::ModelPullProgress(format!(
1131 "ollama pull wait error: {}",
1132 e
1133 )))
1134 .await;
1135 },
1136 }
1137}
1138
1139fn mcp_startup_msg(name: &str, started: bool, tools: Vec<crate::domain::McpToolSpec>) -> Msg {
1140 if started {
1141 Msg::McpServerReady {
1142 name: name.to_string(),
1143 tools,
1144 }
1145 } else {
1146 Msg::McpServerErrored {
1147 name: name.to_string(),
1148 reason: "server failed to start or initialize".to_string(),
1149 }
1150 }
1151}
1152
1153async fn dispatch_read_clipboard(tx: MsgSender) {
1163 use crate::domain::{Paste, StatusKind};
1164
1165 enum Outcome {
1166 Image { bytes: Vec<u8>, format: String },
1167 Text(String),
1168 Empty,
1169 Error(String),
1170 }
1171
1172 let outcome = tokio::task::spawn_blocking(|| {
1173 if crate::clipboard::has_image() {
1174 match crate::clipboard::read_image_bytes() {
1175 Ok((bytes, format)) => Outcome::Image { bytes, format },
1176 Err(e) => Outcome::Error(format!("Clipboard image read failed: {}", e)),
1177 }
1178 } else {
1179 match crate::clipboard::read_text() {
1180 Ok(t) if !t.is_empty() => Outcome::Text(t),
1181 Ok(_) => Outcome::Empty,
1182 Err(e) => Outcome::Error(format!("Clipboard empty / read failed: {}", e)),
1183 }
1184 }
1185 })
1186 .await
1187 .unwrap_or_else(|e| Outcome::Error(format!("clipboard spawn_blocking: {}", e)));
1188
1189 let msg = match outcome {
1190 Outcome::Image { bytes, format } => Msg::Paste(Paste::Image { bytes, format }),
1191 Outcome::Text(text) => Msg::Paste(Paste::Text(text)),
1192 Outcome::Empty => Msg::TransientStatus {
1193 text: "Clipboard is empty".to_string(),
1194 kind: StatusKind::Info,
1195 dismiss_ms: 2_000,
1196 },
1197 Outcome::Error(text) => Msg::TransientStatus {
1198 text,
1199 kind: StatusKind::Warn,
1200 dismiss_ms: 4_000,
1201 },
1202 };
1203 let _ = tx.send(msg).await;
1204}
1205
1206fn classify_error_for_ui(e: &crate::models::ModelError) -> crate::models::UserFacingError {
1207 use crate::models::{ErrorCategory, ModelError, UserFacingError};
1208 match e {
1209 ModelError::Backend(b) => UserFacingError {
1210 summary: "Backend error".to_string(),
1211 message: b.to_string(),
1212 suggestion: "Check the provider endpoint / API key.".to_string(),
1213 category: ErrorCategory::Connection,
1214 recoverable: true,
1215 },
1216 ModelError::Authentication(msg) => UserFacingError {
1217 summary: "Auth error".to_string(),
1218 message: msg.clone(),
1219 suggestion: "Set the env var the provider expects.".to_string(),
1220 category: ErrorCategory::Auth,
1221 recoverable: false,
1222 },
1223 ModelError::RateLimit { retry_after } => UserFacingError {
1224 summary: "Rate limit".to_string(),
1225 message: format!("retry after {:?}", retry_after),
1226 suggestion: "Wait and try again.".to_string(),
1227 category: ErrorCategory::Temporary,
1228 recoverable: true,
1229 },
1230 ModelError::StreamError(msg) => UserFacingError {
1231 summary: "Stream error".to_string(),
1232 message: msg.clone(),
1233 suggestion: "Retry the request.".to_string(),
1234 category: ErrorCategory::Connection,
1235 recoverable: true,
1236 },
1237 other => UserFacingError {
1238 summary: "Model error".to_string(),
1239 message: other.to_string(),
1240 suggestion: String::new(),
1241 category: ErrorCategory::Internal,
1242 recoverable: false,
1243 },
1244 }
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249 use super::*;
1250 use crate::domain::ToolCallId;
1251 use std::time::Duration;
1252
1253 fn runner() -> (EffectRunner, mpsc::Receiver<Msg>) {
1254 EffectRunner::pair(PathBuf::from("/tmp"))
1255 }
1256
1257 #[tokio::test]
1258 async fn dispatch_exit_is_noop_on_runner_state() {
1259 let (mut r, _rx) = runner();
1260 r.dispatch(Cmd::Exit);
1261 assert_eq!(r.scope_count(), 0);
1262 }
1263
1264 #[tokio::test]
1265 async fn dispatch_save_emits_session_saved() {
1266 let (mut r, mut rx) = runner();
1267 r.dispatch(Cmd::SaveConversation(
1268 crate::session::ConversationHistory::new("/p".to_string(), "m".to_string()),
1269 ));
1270 let msg = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1271 .await
1272 .expect("sender emits")
1273 .expect("channel alive");
1274 assert!(matches!(msg, Msg::SessionSaved));
1275 }
1276
1277 #[tokio::test]
1278 async fn dispatch_dismiss_after_delay_emits_status_dismiss() {
1279 let (mut r, mut rx) = runner();
1280 let t0 = std::time::Instant::now();
1281 r.dispatch(Cmd::DismissStatusAfter { ms: 30 });
1282 let msg = tokio::time::timeout(Duration::from_millis(300), rx.recv())
1283 .await
1284 .expect("sender emits")
1285 .expect("channel alive");
1286 assert!(matches!(msg, Msg::StatusDismiss));
1287 assert!(t0.elapsed() >= Duration::from_millis(25));
1288 }
1289
1290 #[test]
1291 fn mcp_startup_msg_treats_zero_tool_started_server_as_ready() {
1292 let msg = mcp_startup_msg("empty", true, Vec::new());
1293 assert!(matches!(
1294 msg,
1295 Msg::McpServerReady { name, tools } if name == "empty" && tools.is_empty()
1296 ));
1297 }
1298
1299 #[test]
1300 fn mcp_startup_msg_reports_unstarted_server_as_error() {
1301 let msg = mcp_startup_msg("bad", false, Vec::new());
1302 assert!(matches!(
1303 msg,
1304 Msg::McpServerErrored { name, reason }
1305 if name == "bad" && reason.contains("failed to start")
1306 ));
1307 }
1308
1309 #[tokio::test]
1310 async fn cancel_scope_emits_turn_cancelled_after_bounded_timeout() {
1311 let (mut r, mut rx) = runner();
1312 let turn = TurnId(77);
1313 {
1314 let scope = r.scope_mut(turn);
1315 scope.spawn(async {
1316 std::future::pending::<()>().await;
1317 });
1318 }
1319 assert_eq!(r.scope_count(), 1);
1320
1321 let start = std::time::Instant::now();
1322 r.dispatch(Cmd::CancelScope(turn));
1323 assert_eq!(r.scope_count(), 0);
1324 let msg = tokio::time::timeout(Duration::from_millis(500), rx.recv())
1325 .await
1326 .expect("bounded cancel should emit terminal message")
1327 .expect("channel alive");
1328 assert!(matches!(msg, Msg::TurnCancelled(t) if t == turn));
1329 assert!(
1330 start.elapsed() < Duration::from_millis(500),
1331 "cancel terminal message took {:?}",
1332 start.elapsed()
1333 );
1334 }
1335
1336 #[tokio::test]
1337 async fn dispatch_call_model_creates_scope() {
1338 let (mut r, _rx) = runner();
1339 let turn = TurnId(7);
1340 let request = crate::domain::ChatRequest {
1341 model_id: "test/m".to_string(),
1342 messages: vec![],
1343 system_prompt: String::new(),
1344 instructions: None,
1345 reasoning: crate::models::ReasoningLevel::Medium,
1346 temperature: 0.7,
1347 max_tokens: 4096,
1348 tools: vec![],
1349 };
1350 r.dispatch(Cmd::CallModel { turn, request });
1351 assert_eq!(r.scope_count(), 1);
1352 }
1353
1354 #[tokio::test]
1358 async fn empty_scopes_are_reaped_on_next_dispatch() {
1359 let (mut r, mut rx) = runner();
1360 let turn = TurnId(42);
1361 let request = crate::domain::ChatRequest {
1362 model_id: "test/m".to_string(),
1363 messages: vec![],
1364 system_prompt: String::new(),
1365 instructions: None,
1366 reasoning: crate::models::ReasoningLevel::Medium,
1367 temperature: 0.7,
1368 max_tokens: 4096,
1369 tools: vec![],
1370 };
1371 r.dispatch(Cmd::CallModel { turn, request });
1372 assert_eq!(r.scope_count(), 1);
1373
1374 let msg = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1379 .await
1380 .expect("upstream error arrived")
1381 .expect("channel alive");
1382 assert!(matches!(msg, Msg::UpstreamError { .. }));
1383
1384 tokio::task::yield_now().await;
1386
1387 r.dispatch(Cmd::DismissStatusAfter { ms: 10 });
1389 assert_eq!(
1390 r.scope_count(),
1391 0,
1392 "completed scope must be reaped on next dispatch"
1393 );
1394 }
1395
1396 #[tokio::test]
1397 async fn dispatch_execute_tool_under_turn_emits_tool_started() {
1398 let (mut r, mut rx) = runner();
1399 let turn = TurnId(7);
1400 let call_id = ToolCallId(1);
1401 let source = crate::models::tool_call::ToolCall {
1402 id: Some("c1".to_string()),
1403 function: crate::models::tool_call::FunctionCall {
1404 name: "read_file".to_string(),
1405 arguments: serde_json::json!({"path": "x"}),
1406 },
1407 };
1408 r.dispatch(Cmd::ExecuteTool {
1409 turn,
1410 call_id,
1411 source,
1412 model_id: "ollama/test".to_string(),
1413 });
1414 let first = tokio::time::timeout(Duration::from_millis(200), rx.recv())
1415 .await
1416 .expect("some msg")
1417 .expect("channel alive");
1418 assert!(matches!(
1419 first,
1420 Msg::ToolStarted {
1421 turn: t,
1422 call_id: c,
1423 } if t == turn && c == call_id
1424 ));
1425 }
1426
1427 #[tokio::test]
1428 async fn cancel_scope_before_execute_tool_drops_pending_work() {
1429 let (mut r, _rx) = runner();
1430 let turn = TurnId(9);
1431 r.dispatch(Cmd::CallModel {
1432 turn,
1433 request: crate::domain::ChatRequest {
1434 model_id: "m".to_string(),
1435 messages: vec![],
1436 system_prompt: String::new(),
1437 instructions: None,
1438 reasoning: crate::models::ReasoningLevel::Medium,
1439 temperature: 0.7,
1440 max_tokens: 4096,
1441 tools: vec![],
1442 },
1443 });
1444 assert_eq!(r.scope_count(), 1);
1445
1446 r.dispatch(Cmd::CancelScope(turn));
1447 assert_eq!(r.scope_count(), 0);
1448 }
1449
1450 #[tokio::test]
1451 async fn shutdown_drains_pending_saves() {
1452 let (mut r, _rx) = runner();
1453 for _ in 0..5 {
1454 r.dispatch(Cmd::SaveConversation(
1455 crate::session::ConversationHistory::new("/p".to_string(), "m".to_string()),
1456 ));
1457 }
1458 let start = std::time::Instant::now();
1460 r.shutdown().await;
1461 assert!(start.elapsed() < Duration::from_secs(2));
1462 }
1463}