1mod error;
16mod execution;
17mod model;
18mod settings;
19
20pub use error::RunnerError;
21pub(crate) use error::{
22 RetryableReason, RunnerFailureClass, runner_execution_error, runner_execution_error_with_source,
23};
24
25pub(crate) use execution::{
26 BuiltInRunnerPlugin, ResolvedRunnerCliOptions, RunnerPlugin, ctrlc_state,
27};
28
29pub(crate) use model::{
30 default_model_for_runner, parse_model, parse_reasoning_effort, resolve_model_for_runner,
31 validate_model_for_runner,
32};
33
34pub(crate) use settings::{
35 AgentSettings, PhaseSettingsMatrix, ResolvedPhaseSettings, resolve_agent_settings,
36 resolve_phase_settings_matrix,
37};
38
39#[allow(unused)]
41const _: () = {
42 fn _use_resolved_phase_settings(_: ResolvedPhaseSettings) {}
43};
44
45use crate::commands::run::PhaseType;
46use crate::contracts::{ClaudePermissionMode, Model, ReasoningEffort, Runner};
47use crate::plugins::registry::PluginRegistry;
48use crate::redaction::redact_text;
49use anyhow::{Result, anyhow};
50use std::fmt;
51use std::path::Path;
52use std::process::ExitStatus;
53use std::sync::Arc;
54use std::time::Duration;
55
56pub type OutputHandler = Arc<Box<dyn Fn(&str) + Send + Sync>>;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum OutputStream {
63 Terminal,
65 HandlerOnly,
67}
68
69impl OutputStream {
70 pub fn streams_to_terminal(self) -> bool {
72 matches!(self, OutputStream::Terminal)
73 }
74}
75
76pub(crate) struct RunnerOutput {
77 pub status: ExitStatus,
78 pub stdout: String,
79 pub stderr: String,
80 pub session_id: Option<String>,
81}
82
83impl fmt::Display for RunnerOutput {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 write!(
86 f,
87 "status: {}\nstdout: {}\nstderr: {}",
88 self.status,
89 redact_text(&self.stdout),
90 redact_text(&self.stderr)
91 )
92 }
93}
94
95impl fmt::Debug for RunnerOutput {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 f.debug_struct("RunnerOutput")
98 .field("status", &self.status)
99 .field("stdout", &redact_text(&self.stdout))
100 .field("stderr", &redact_text(&self.stderr))
101 .field("session_id", &self.session_id.as_deref())
102 .finish()
103 }
104}
105
106#[derive(Clone, Copy)]
107pub struct RunnerBinaries<'a> {
108 pub codex: &'a str,
109 pub opencode: &'a str,
110 pub gemini: &'a str,
111 pub claude: &'a str,
112 pub cursor: &'a str,
113 pub kimi: &'a str,
114 pub pi: &'a str,
115}
116
117pub(crate) fn resolve_binaries(agent: &crate::contracts::AgentConfig) -> RunnerBinaries<'_> {
118 let codex = agent.codex_bin.as_deref().unwrap_or("codex");
119 let opencode = agent.opencode_bin.as_deref().unwrap_or("opencode");
120 let gemini = agent.gemini_bin.as_deref().unwrap_or("gemini");
121 let claude = agent.claude_bin.as_deref().unwrap_or("claude");
122 let cursor = agent.cursor_bin.as_deref().unwrap_or("agent");
123 let kimi = agent.kimi_bin.as_deref().unwrap_or("kimi");
124 let pi = agent.pi_bin.as_deref().unwrap_or("pi");
125 RunnerBinaries {
126 codex,
127 opencode,
128 gemini,
129 claude,
130 cursor,
131 kimi,
132 pi,
133 }
134}
135
136pub(crate) fn extract_final_assistant_response(stdout: &str) -> Option<String> {
137 execution::extract_final_assistant_response(stdout)
138}
139
140fn runner_label(runner: Runner) -> String {
141 runner.id().to_string()
142}
143
144fn runner_requires_session_id(_runner: &Runner) -> bool {
145 true
149}
150
151fn semantic_failure_reason(runner: &Runner, stderr: &str) -> Option<&'static str> {
152 match runner {
153 Runner::Opencode => {
154 let stderr_lower = stderr.to_ascii_lowercase();
155 let has_zod_session_validation_error = stderr_lower.contains("zoderror")
156 && stderr_lower.contains("sessionid")
157 && ((stderr_lower.contains("must start with") && stderr_lower.contains("ses"))
158 || stderr_lower.contains("invalid_format"));
159
160 if has_zod_session_validation_error {
161 Some("opencode rejected session_id during resume validation")
162 } else {
163 None
164 }
165 }
166 _ => None,
167 }
168}
169
170fn semantic_failure_error(
171 operation: &str,
172 runner: &Runner,
173 bin: &str,
174 reason: &str,
175) -> RunnerError {
176 RunnerError::Other(anyhow!(
177 "Runner execution failed (operation={}, runner={}, bin={}): semantic failure with zero exit status: {}.",
178 operation,
179 runner_label(runner.clone()),
180 bin,
181 reason
182 ))
183}
184
185#[cfg(unix)]
186fn exit_status_signal(status: &ExitStatus) -> Option<i32> {
187 use std::os::unix::process::ExitStatusExt;
188 status.signal()
189}
190
191#[cfg(not(unix))]
192fn exit_status_signal(_status: &ExitStatus) -> Option<i32> {
193 None
194}
195
196#[allow(clippy::too_many_arguments)]
197pub(crate) fn run_prompt(
198 runner: Runner,
199 work_dir: &Path,
200 bins: RunnerBinaries<'_>,
201 model: Model,
202 reasoning_effort: Option<ReasoningEffort>,
203 runner_cli: execution::ResolvedRunnerCliOptions,
204 prompt: &str,
205 timeout: Option<Duration>,
206 permission_mode: Option<ClaudePermissionMode>,
207 output_handler: Option<OutputHandler>,
208 output_stream: OutputStream,
209 phase_type: PhaseType,
210 session_id: Option<String>,
211 plugins: Option<&PluginRegistry>,
212) -> Result<RunnerOutput, RunnerError> {
213 if let Runner::Plugin(plugin_id) = &runner {
215 return run_plugin_prompt(
216 plugin_id,
217 work_dir,
218 runner_cli,
219 model,
220 prompt,
221 timeout,
222 output_handler,
223 output_stream,
224 session_id,
225 plugins,
226 );
227 }
228
229 let bin = match runner {
230 Runner::Codex => bins.codex,
231 Runner::Opencode => bins.opencode,
232 Runner::Gemini => bins.gemini,
233 Runner::Cursor => bins.cursor,
234 Runner::Claude => bins.claude,
235 Runner::Kimi => bins.kimi,
236 Runner::Pi => bins.pi,
237 Runner::Plugin(_) => unreachable!(),
238 };
239 validate_model_for_runner(&runner, &model).map_err(|err| {
240 RunnerError::Other(anyhow!(
241 "Runner configuration error (operation=run_prompt, runner={}, bin={}): {}",
242 runner_label(runner.clone()),
243 bin,
244 err
245 ))
246 })?;
247 let output = match runner {
248 Runner::Codex => {
249 let executor = execution::PluginExecutor::new();
250 executor.run(
251 Runner::Codex,
252 work_dir,
253 bins.codex,
254 model,
255 reasoning_effort,
256 runner_cli,
257 prompt,
258 timeout,
259 None, output_handler.clone(),
261 output_stream,
262 phase_type,
263 session_id.clone(),
264 plugins,
265 )?
266 }
267 Runner::Opencode => {
268 let executor = execution::PluginExecutor::new();
269 executor.run(
270 Runner::Opencode,
271 work_dir,
272 bins.opencode,
273 model,
274 None, runner_cli,
276 prompt,
277 timeout,
278 None, output_handler.clone(),
280 output_stream,
281 phase_type,
282 session_id.clone(),
283 plugins,
284 )?
285 }
286 Runner::Gemini => {
287 let executor = execution::PluginExecutor::new();
288 executor.run(
289 Runner::Gemini,
290 work_dir,
291 bins.gemini,
292 model,
293 None, runner_cli,
295 prompt,
296 timeout,
297 None, output_handler.clone(),
299 output_stream,
300 phase_type,
301 session_id.clone(),
302 plugins,
303 )?
304 }
305 Runner::Cursor => {
306 let executor = execution::PluginExecutor::new();
307 executor.run(
308 Runner::Cursor,
309 work_dir,
310 bins.cursor,
311 model,
312 None, runner_cli,
314 prompt,
315 timeout,
316 None, output_handler.clone(),
318 output_stream,
319 phase_type,
320 session_id.clone(),
321 plugins,
322 )?
323 }
324 Runner::Claude => {
325 let executor = execution::PluginExecutor::new();
326 executor.run(
327 Runner::Claude,
328 work_dir,
329 bins.claude,
330 model,
331 reasoning_effort,
332 runner_cli,
333 prompt,
334 timeout,
335 runner_cli.effective_claude_permission_mode(permission_mode),
336 output_handler.clone(),
337 output_stream,
338 phase_type,
339 session_id.clone(),
340 plugins,
341 )?
342 }
343 Runner::Kimi => {
344 let executor = execution::PluginExecutor::new();
346 executor.run(
347 Runner::Kimi,
348 work_dir,
349 bins.kimi,
350 model,
351 None, runner_cli,
353 prompt,
354 timeout,
355 None, output_handler.clone(),
357 output_stream,
358 phase_type,
359 session_id.clone(),
360 plugins,
361 )?
362 }
363 Runner::Pi => execution::run_builtin_prompt(
364 execution::BuiltInRunnerPlugin::Pi,
365 work_dir,
366 bins.pi,
367 runner_cli,
368 model,
369 prompt,
370 timeout,
371 output_handler.clone(),
372 output_stream,
373 )?,
374 Runner::Plugin(_) => unreachable!(),
375 };
376
377 if !output.status.success() {
378 if let Some(code) = output.status.code() {
379 return Err(RunnerError::NonZeroExit {
380 code,
381 stdout: output.stdout.into(),
382 stderr: output.stderr.into(),
383 session_id: output.session_id.clone(),
384 });
385 } else {
386 return Err(RunnerError::TerminatedBySignal {
387 signal: exit_status_signal(&output.status),
388 stdout: output.stdout.into(),
389 stderr: output.stderr.into(),
390 session_id: output.session_id.clone(),
391 });
392 }
393 }
394
395 if let Some(reason) = semantic_failure_reason(&runner, &output.stderr) {
396 return Err(semantic_failure_error("run_prompt", &runner, bin, reason));
397 }
398
399 Ok(output)
400}
401
402#[allow(clippy::too_many_arguments)]
403pub(crate) fn resume_session(
404 runner: Runner,
405 work_dir: &Path,
406 bins: RunnerBinaries<'_>,
407 model: Model,
408 reasoning_effort: Option<ReasoningEffort>,
409 runner_cli: execution::ResolvedRunnerCliOptions,
410 session_id: &str,
411 message: &str,
412 permission_mode: Option<ClaudePermissionMode>,
413 timeout: Option<Duration>,
414 output_handler: Option<OutputHandler>,
415 output_stream: OutputStream,
416 phase_type: PhaseType,
417 plugins: Option<&PluginRegistry>,
418) -> Result<RunnerOutput, RunnerError> {
419 if let Runner::Plugin(plugin_id) = &runner {
421 return run_plugin_resume(
422 plugin_id,
423 work_dir,
424 runner_cli,
425 model,
426 session_id,
427 message,
428 timeout,
429 output_handler,
430 output_stream,
431 plugins,
432 );
433 }
434
435 let bin = match runner {
436 Runner::Codex => bins.codex,
437 Runner::Opencode => bins.opencode,
438 Runner::Gemini => bins.gemini,
439 Runner::Cursor => bins.cursor,
440 Runner::Claude => bins.claude,
441 Runner::Kimi => bins.kimi,
442 Runner::Pi => bins.pi,
443 Runner::Plugin(_) => unreachable!(),
444 };
445 validate_model_for_runner(&runner, &model).map_err(|err| {
446 RunnerError::Other(anyhow!(
447 "Runner configuration error (operation=resume_session, runner={}, bin={}): {}",
448 runner_label(runner.clone()),
449 bin,
450 err
451 ))
452 })?;
453 let session_id = session_id.trim();
454 if runner_requires_session_id(&runner) && session_id.is_empty() {
455 return Err(RunnerError::Other(anyhow!(
456 "Runner input error (operation=resume_session, runner={}, bin={}): session_id is required (non-empty). Example: --resume <SESSION_ID>.",
457 runner_label(runner.clone()),
458 bin
459 )));
460 }
461 let message = message.trim();
462 if message.is_empty() {
463 return Err(RunnerError::Other(anyhow!(
464 "Runner input error (operation=resume_session, runner={}, bin={}): message is required (non-empty).",
465 runner_label(runner.clone()),
466 bin
467 )));
468 }
469
470 let output = match runner {
471 Runner::Codex => {
472 let executor = execution::PluginExecutor::new();
473 executor.resume(
474 Runner::Codex,
475 work_dir,
476 bins.codex,
477 model,
478 reasoning_effort,
479 runner_cli,
480 session_id,
481 message,
482 timeout,
483 None, output_handler,
485 output_stream,
486 phase_type,
487 plugins,
488 )
489 }
490 Runner::Opencode => {
491 let executor = execution::PluginExecutor::new();
492 executor.resume(
493 Runner::Opencode,
494 work_dir,
495 bins.opencode,
496 model,
497 None, runner_cli,
499 session_id,
500 message,
501 timeout,
502 None, output_handler,
504 output_stream,
505 phase_type,
506 plugins,
507 )
508 }
509 Runner::Gemini => {
510 let executor = execution::PluginExecutor::new();
511 executor.resume(
512 Runner::Gemini,
513 work_dir,
514 bins.gemini,
515 model,
516 None, runner_cli,
518 session_id,
519 message,
520 timeout,
521 None, output_handler,
523 output_stream,
524 phase_type,
525 plugins,
526 )
527 }
528 Runner::Cursor => {
529 let executor = execution::PluginExecutor::new();
530 executor.resume(
531 Runner::Cursor,
532 work_dir,
533 bins.cursor,
534 model,
535 None, runner_cli,
537 session_id,
538 message,
539 timeout,
540 None, output_handler,
542 output_stream,
543 phase_type,
544 plugins,
545 )
546 }
547 Runner::Claude => {
548 let executor = execution::PluginExecutor::new();
549 executor.resume(
550 Runner::Claude,
551 work_dir,
552 bins.claude,
553 model,
554 reasoning_effort,
555 runner_cli,
556 session_id,
557 message,
558 timeout,
559 runner_cli.effective_claude_permission_mode(permission_mode),
560 output_handler,
561 output_stream,
562 phase_type,
563 plugins,
564 )
565 }
566 Runner::Kimi => {
567 let executor = execution::PluginExecutor::new();
569 executor.resume(
570 Runner::Kimi,
571 work_dir,
572 bins.kimi,
573 model,
574 None, runner_cli,
576 session_id,
577 message,
578 timeout,
579 None, output_handler,
581 output_stream,
582 phase_type,
583 plugins,
584 )
585 }
586 Runner::Pi => execution::run_builtin_resume(
587 execution::BuiltInRunnerPlugin::Pi,
588 work_dir,
589 bins.pi,
590 runner_cli,
591 model,
592 session_id,
593 message,
594 timeout,
595 output_handler,
596 output_stream,
597 ),
598 Runner::Plugin(_) => unreachable!(),
599 }?;
600
601 if !output.status.success() {
602 if let Some(code) = output.status.code() {
603 return Err(RunnerError::NonZeroExit {
604 code,
605 stdout: output.stdout.into(),
606 stderr: output.stderr.into(),
607 session_id: output.session_id.clone(),
608 });
609 } else {
610 return Err(RunnerError::TerminatedBySignal {
611 signal: exit_status_signal(&output.status),
612 stdout: output.stdout.into(),
613 stderr: output.stderr.into(),
614 session_id: output.session_id.clone(),
615 });
616 }
617 }
618
619 if let Some(reason) = semantic_failure_reason(&runner, &output.stderr) {
620 return Err(semantic_failure_error(
621 "resume_session",
622 &runner,
623 bin,
624 reason,
625 ));
626 }
627
628 Ok(output)
629}
630
631#[allow(clippy::too_many_arguments)]
633fn run_plugin_prompt(
634 plugin_id: &str,
635 work_dir: &Path,
636 runner_cli: execution::ResolvedRunnerCliOptions,
637 model: Model,
638 prompt: &str,
639 timeout: Option<Duration>,
640 output_handler: Option<OutputHandler>,
641 output_stream: OutputStream,
642 session_id: Option<String>,
643 plugins: Option<&PluginRegistry>,
644) -> Result<RunnerOutput, RunnerError> {
645 let registry = plugins.ok_or_else(|| {
646 RunnerError::Other(anyhow!(
647 "Plugin registry unavailable for plugin runner: {}",
648 plugin_id
649 ))
650 })?;
651
652 if !registry.is_enabled(plugin_id) {
653 return Err(RunnerError::Other(anyhow!(
654 "Plugin runner is disabled: {}. Enable it under config.plugins.plugins.{}.enabled=true",
655 plugin_id,
656 plugin_id
657 )));
658 }
659
660 let bin_path = registry
661 .resolve_runner_bin(plugin_id)
662 .map_err(RunnerError::Other)?;
663 let bin = bin_path.to_string_lossy().to_string();
664
665 let plugin_cfg = registry
666 .plugin_config_blob(plugin_id)
667 .map(|v| execution::serialize_plugin_env_json(plugin_id, &bin, "plugin config", &v))
668 .transpose()?;
669
670 execution::run_plugin_runner(
671 work_dir,
672 &bin,
673 plugin_id,
674 runner_cli,
675 model,
676 prompt,
677 timeout,
678 output_handler,
679 output_stream,
680 session_id.as_deref(),
681 plugin_cfg,
682 )
683}
684
685#[allow(clippy::too_many_arguments)]
687fn run_plugin_resume(
688 plugin_id: &str,
689 work_dir: &Path,
690 runner_cli: execution::ResolvedRunnerCliOptions,
691 model: Model,
692 session_id: &str,
693 message: &str,
694 timeout: Option<Duration>,
695 output_handler: Option<OutputHandler>,
696 output_stream: OutputStream,
697 plugins: Option<&PluginRegistry>,
698) -> Result<RunnerOutput, RunnerError> {
699 let registry = plugins.ok_or_else(|| {
700 RunnerError::Other(anyhow!(
701 "Plugin registry unavailable for plugin runner: {}",
702 plugin_id
703 ))
704 })?;
705
706 if !registry.is_enabled(plugin_id) {
707 return Err(RunnerError::Other(anyhow!(
708 "Plugin runner is disabled: {}. Enable it under config.plugins.plugins.{}.enabled=true",
709 plugin_id,
710 plugin_id
711 )));
712 }
713
714 let bin_path = registry
715 .resolve_runner_bin(plugin_id)
716 .map_err(RunnerError::Other)?;
717 let bin = bin_path.to_string_lossy().to_string();
718
719 let plugin_cfg = registry
720 .plugin_config_blob(plugin_id)
721 .map(|v| execution::serialize_plugin_env_json(plugin_id, &bin, "plugin config", &v))
722 .transpose()?;
723
724 execution::run_plugin_runner_resume(
725 work_dir,
726 &bin,
727 plugin_id,
728 runner_cli,
729 model,
730 session_id,
731 message,
732 timeout,
733 output_handler,
734 output_stream,
735 plugin_cfg,
736 )
737}
738
739#[cfg(test)]
740mod tests {
741 use super::*;
742 use std::process::ExitStatus;
743 use tempfile::tempdir;
744
745 #[test]
746 fn runner_output_display_redacts_output() {
747 let output = RunnerOutput {
748 status: ExitStatus::default(), stdout: "out: API_KEY=secret123".to_string(),
750 stderr: "err: bearer abc123def456".to_string(),
751 session_id: None,
752 };
753 let msg = format!("{}", output);
754 assert!(msg.contains("API_KEY=[REDACTED]"));
755 assert!(msg.contains("bearer [REDACTED]"));
756 assert!(!msg.contains("secret123"));
757 assert!(!msg.contains("abc123def456"));
758 }
759
760 #[test]
761 fn output_stream_terminal_allows_terminal_output() {
762 assert!(OutputStream::Terminal.streams_to_terminal());
763 }
764
765 #[test]
766 fn output_stream_handler_only_suppresses_terminal_output() {
767 assert!(!OutputStream::HandlerOnly.streams_to_terminal());
768 }
769
770 #[test]
771 fn resume_session_missing_session_id_includes_runner_and_bin() {
772 let dir = tempdir().expect("tempdir");
773 let bins = RunnerBinaries {
774 codex: "codex",
775 opencode: "opencode",
776 gemini: "gemini",
777 claude: "claude",
778 cursor: "agent",
779 kimi: "kimi",
780 pi: "pi",
781 };
782
783 let err = resume_session(
784 Runner::Opencode,
785 dir.path(),
786 bins,
787 Model::Glm47,
788 None,
789 execution::ResolvedRunnerCliOptions::default(),
790 " ",
791 "hello",
792 None,
793 None,
794 None,
795 OutputStream::HandlerOnly,
796 PhaseType::Implementation,
797 None,
798 )
799 .unwrap_err();
800
801 let msg = format!("{err}");
802 assert!(msg.contains("operation=resume_session"));
803 assert!(msg.contains("runner=opencode"));
804 assert!(msg.contains("bin=opencode"));
805 assert!(msg.to_lowercase().contains("session_id"));
806 }
807
808 #[test]
809 fn runner_requires_session_id_requires_for_all_runners() {
810 assert!(runner_requires_session_id(&Runner::Kimi));
812 assert!(runner_requires_session_id(&Runner::Codex));
813 assert!(runner_requires_session_id(&Runner::Opencode));
814 assert!(runner_requires_session_id(&Runner::Gemini));
815 assert!(runner_requires_session_id(&Runner::Cursor));
816 assert!(runner_requires_session_id(&Runner::Claude));
817 assert!(runner_requires_session_id(&Runner::Pi));
818 }
819
820 #[test]
821 fn run_prompt_invalid_model_includes_operation_and_bin() {
822 let dir = tempdir().expect("tempdir");
823 let bins = RunnerBinaries {
824 codex: "codex",
825 opencode: "opencode",
826 gemini: "gemini",
827 claude: "claude",
828 cursor: "agent",
829 kimi: "kimi",
830 pi: "pi",
831 };
832
833 let err = run_prompt(
834 Runner::Codex,
835 dir.path(),
836 bins,
837 Model::Glm47,
838 Some(ReasoningEffort::Low),
839 execution::ResolvedRunnerCliOptions::default(),
840 "prompt",
841 None,
842 None,
843 None,
844 OutputStream::HandlerOnly,
845 PhaseType::Implementation,
846 None,
847 None,
848 )
849 .unwrap_err();
850
851 let msg = format!("{err}");
852 assert!(msg.contains("operation=run_prompt"));
853 assert!(msg.contains("runner=codex"));
854 assert!(msg.contains("bin=codex"));
855 }
856
857 #[test]
858 fn semantic_failure_reason_detects_opencode_session_validation_error() {
859 let stderr = r#"ZodError: [{"path":["sessionID"],"message":"Invalid string: must start with \"ses\""}]"#;
860 let reason = semantic_failure_reason(&Runner::Opencode, stderr);
861 assert_eq!(
862 reason,
863 Some("opencode rejected session_id during resume validation")
864 );
865 }
866
867 #[test]
868 fn semantic_failure_reason_ignores_non_opencode_runners() {
869 let stderr = "ZodError sessionID invalid_format must start with \"ses\"";
870 let reason = semantic_failure_reason(&Runner::Gemini, stderr);
871 assert_eq!(reason, None);
872 }
873}