1use crate::agent::Agent;
32use crate::attachment::{self, Attachment};
33use crate::config::Config;
34use crate::factory::AgentFactory;
35use crate::json_validation;
36use crate::listen::{self, ListenFormat};
37use crate::output::AgentOutput;
38use crate::process_registration::{self, ProcessRegistration, RegisterOptionsOwned};
39use crate::progress::{ProgressHandler, SilentProgress};
40use crate::providers::claude::Claude;
41use crate::providers::ollama::Ollama;
42use crate::sandbox::SandboxConfig;
43use crate::session::{SessionEntry, SessionStore};
44use crate::session_log::{
45 AgentLogEvent, LiveLogContext, LogEventCallback, SessionLogCoordinator, SessionLogMetadata,
46 live_adapter_for_provider, logs_dir,
47};
48use crate::streaming::StreamingSession;
49use crate::worktree;
50use anyhow::{Result, bail};
51use log::{debug, warn};
52use std::sync::Arc;
53use std::time::Duration;
54
55fn format_duration(d: Duration) -> String {
57 let total_secs = d.as_secs();
58 let h = total_secs / 3600;
59 let m = (total_secs % 3600) / 60;
60 let s = total_secs % 60;
61 let mut parts = Vec::new();
62 if h > 0 {
63 parts.push(format!("{h}h"));
64 }
65 if m > 0 {
66 parts.push(format!("{m}m"));
67 }
68 if s > 0 || parts.is_empty() {
69 parts.push(format!("{s}s"));
70 }
71 parts.join("")
72}
73
74#[derive(Debug, Clone, Default)]
79pub struct SessionMetadata {
80 pub name: Option<String>,
81 pub description: Option<String>,
82 pub tags: Vec<String>,
83}
84
85struct SessionLogGuard {
90 coordinator: Option<SessionLogCoordinator>,
93 wrapper_session_id: String,
94 log_path: Option<std::path::PathBuf>,
95 external_writer: Option<crate::session_log::SessionLogWriter>,
98 _owned_external: Option<SessionLogCoordinator>,
102}
103
104impl SessionLogGuard {
105 fn log_path_string(&self) -> Option<String> {
106 self.log_path
107 .as_ref()
108 .map(|p| p.to_string_lossy().to_string())
109 }
110
111 async fn finish(mut self, success: bool, error: Option<String>) {
116 if let Some(coord) = self.coordinator.take() {
119 if let Err(e) = coord.finish(success, error).await {
120 warn!("Failed to finalize session log: {e}");
121 }
122 }
123 if let Some(w) = self.external_writer.take() {
124 let _ = w.clear_event_callback();
125 }
126 }
127}
128
129impl Drop for SessionLogGuard {
130 fn drop(&mut self) {
131 if let Some(ref w) = self.external_writer {
136 let _ = w.clear_event_callback();
137 }
138 if let Some(ref c) = self.coordinator {
139 let _ = c.writer().clear_event_callback();
140 }
141 }
142}
143
144#[derive(Default)]
156pub enum SessionLogMode {
157 #[default]
160 Disabled,
161 Auto,
165 External(SessionLogCoordinator),
168}
169
170pub struct AgentBuilder {
175 provider: Option<String>,
176 provider_explicit: bool,
180 model: Option<String>,
181 system_prompt: Option<String>,
182 root: Option<String>,
183 auto_approve: bool,
184 add_dirs: Vec<String>,
185 files: Vec<String>,
186 env_vars: Vec<(String, String)>,
187 worktree: Option<Option<String>>,
188 sandbox: Option<Option<String>>,
189 size: Option<String>,
190 json_mode: bool,
191 json_schema: Option<serde_json::Value>,
192 session_id: Option<String>,
193 metadata: SessionMetadata,
194 output_format: Option<String>,
195 input_format: Option<String>,
196 replay_user_messages: bool,
197 include_partial_messages: bool,
198 verbose: bool,
199 quiet: bool,
200 show_usage: bool,
201 max_turns: Option<u32>,
202 timeout: Option<std::time::Duration>,
203 mcp_config: Option<String>,
204 progress: Box<dyn ProgressHandler>,
205 session_log_mode: SessionLogMode,
206 log_event_callback: Option<LogEventCallback>,
210 stream_events_format: Option<ListenFormat>,
213 stream_show_thinking: bool,
216 on_spawn_hook: Option<crate::agent::OnSpawnHook>,
221 register_process_opts: Option<RegisterOptionsOwned>,
228}
229
230impl Default for AgentBuilder {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236impl AgentBuilder {
237 pub fn new() -> Self {
239 Self {
240 provider: None,
241 provider_explicit: false,
242 model: None,
243 system_prompt: None,
244 root: None,
245 auto_approve: false,
246 add_dirs: Vec::new(),
247 files: Vec::new(),
248 env_vars: Vec::new(),
249 worktree: None,
250 sandbox: None,
251 size: None,
252 json_mode: false,
253 json_schema: None,
254 session_id: None,
255 metadata: SessionMetadata::default(),
256 output_format: None,
257 input_format: None,
258 replay_user_messages: false,
259 include_partial_messages: false,
260 verbose: false,
261 quiet: false,
262 show_usage: false,
263 max_turns: None,
264 timeout: None,
265 mcp_config: None,
266 progress: Box::new(SilentProgress),
267 session_log_mode: SessionLogMode::Disabled,
268 log_event_callback: None,
269 stream_events_format: None,
270 stream_show_thinking: false,
271 on_spawn_hook: None,
272 register_process_opts: None,
273 }
274 }
275
276 pub fn provider(mut self, provider: &str) -> Self {
283 self.provider = Some(provider.to_string());
284 self.provider_explicit = true;
285 self
286 }
287
288 pub fn model(mut self, model: &str) -> Self {
290 self.model = Some(model.to_string());
291 self
292 }
293
294 pub fn system_prompt(mut self, prompt: &str) -> Self {
296 self.system_prompt = Some(prompt.to_string());
297 self
298 }
299
300 pub fn root(mut self, root: &str) -> Self {
302 self.root = Some(root.to_string());
303 self
304 }
305
306 pub fn auto_approve(mut self, approve: bool) -> Self {
308 self.auto_approve = approve;
309 self
310 }
311
312 pub fn add_dir(mut self, dir: &str) -> Self {
314 self.add_dirs.push(dir.to_string());
315 self
316 }
317
318 pub fn file(mut self, path: &str) -> Self {
320 self.files.push(path.to_string());
321 self
322 }
323
324 pub fn env(mut self, key: &str, value: &str) -> Self {
326 self.env_vars.push((key.to_string(), value.to_string()));
327 self
328 }
329
330 pub fn worktree(mut self, name: Option<&str>) -> Self {
332 self.worktree = Some(name.map(String::from));
333 self
334 }
335
336 pub fn sandbox(mut self, name: Option<&str>) -> Self {
338 self.sandbox = Some(name.map(String::from));
339 self
340 }
341
342 pub fn size(mut self, size: &str) -> Self {
344 self.size = Some(size.to_string());
345 self
346 }
347
348 pub fn json(mut self) -> Self {
350 self.json_mode = true;
351 self
352 }
353
354 pub fn json_schema(mut self, schema: serde_json::Value) -> Self {
357 self.json_schema = Some(schema);
358 self.json_mode = true;
359 self
360 }
361
362 pub fn session_id(mut self, id: &str) -> Self {
364 self.session_id = Some(id.to_string());
365 self
366 }
367
368 pub fn name(mut self, name: &str) -> Self {
375 self.metadata.name = Some(name.to_string());
376 self
377 }
378
379 pub fn description(mut self, description: &str) -> Self {
382 self.metadata.description = Some(description.to_string());
383 self
384 }
385
386 pub fn tag(mut self, tag: &str) -> Self {
389 self.metadata.tags.push(tag.to_string());
390 self
391 }
392
393 pub fn metadata(mut self, metadata: SessionMetadata) -> Self {
395 self.metadata = metadata;
396 self
397 }
398
399 pub fn output_format(mut self, format: &str) -> Self {
401 self.output_format = Some(format.to_string());
402 self
403 }
404
405 pub fn input_format(mut self, format: &str) -> Self {
410 self.input_format = Some(format.to_string());
411 self
412 }
413
414 pub fn replay_user_messages(mut self, replay: bool) -> Self {
420 self.replay_user_messages = replay;
421 self
422 }
423
424 pub fn include_partial_messages(mut self, include: bool) -> Self {
434 self.include_partial_messages = include;
435 self
436 }
437
438 pub fn verbose(mut self, v: bool) -> Self {
440 self.verbose = v;
441 self
442 }
443
444 pub fn quiet(mut self, q: bool) -> Self {
446 self.quiet = q;
447 self
448 }
449
450 pub fn show_usage(mut self, show: bool) -> Self {
452 self.show_usage = show;
453 self
454 }
455
456 pub fn max_turns(mut self, turns: u32) -> Self {
458 self.max_turns = Some(turns);
459 self
460 }
461
462 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
465 self.timeout = Some(duration);
466 self
467 }
468
469 pub fn mcp_config(mut self, config: &str) -> Self {
476 self.mcp_config = Some(config.to_string());
477 self
478 }
479
480 pub fn on_progress(mut self, handler: Box<dyn ProgressHandler>) -> Self {
482 self.progress = handler;
483 self
484 }
485
486 pub fn session_log(mut self, mode: SessionLogMode) -> Self {
488 self.session_log_mode = mode;
489 self
490 }
491
492 pub fn enable_session_log(mut self, enable: bool) -> Self {
495 self.session_log_mode = if enable {
496 SessionLogMode::Auto
497 } else {
498 SessionLogMode::Disabled
499 };
500 self
501 }
502
503 pub fn on_log_event<F>(mut self, f: F) -> Self
508 where
509 F: Fn(&AgentLogEvent) + Send + Sync + 'static,
510 {
511 self.log_event_callback = Some(Arc::new(f));
512 if matches!(self.session_log_mode, SessionLogMode::Disabled) {
513 self.session_log_mode = SessionLogMode::Auto;
514 }
515 self
516 }
517
518 pub fn stream_events_to_stderr(mut self, format: ListenFormat) -> Self {
525 self.stream_events_format = Some(format);
526 if matches!(self.session_log_mode, SessionLogMode::Disabled) {
527 self.session_log_mode = SessionLogMode::Auto;
528 }
529 self
530 }
531
532 pub fn stream_show_thinking(mut self, show: bool) -> Self {
535 self.stream_show_thinking = show;
536 self
537 }
538
539 pub fn on_spawn<F>(mut self, f: F) -> Self
549 where
550 F: Fn(u32) + Send + Sync + 'static,
551 {
552 self.on_spawn_hook = Some(Arc::new(f));
553 self
554 }
555
556 pub fn register_process(mut self, opts: RegisterOptionsOwned) -> Self {
575 self.register_process_opts = Some(opts);
576 self
577 }
578}
579
580fn apply_registration(builder: &mut AgentBuilder, reg: &ProcessRegistration) {
585 for (k, v) in reg.env_vars() {
586 builder.env_vars.push((k.clone(), v.clone()));
587 }
588 let reg_hook = reg.on_spawn_hook();
589 let prev_hook = builder.on_spawn_hook.take();
590 builder.on_spawn_hook = Some(Arc::new(move |pid: u32| {
591 reg_hook(pid);
592 if let Some(ref h) = prev_hook {
593 h(pid);
594 }
595 }));
596}
597
598fn status_for_result<T>(result: &Result<T>) -> (&'static str, Option<i32>) {
603 match result {
604 Ok(_) => ("exited", Some(0)),
605 Err(err) => {
606 let exit_code = err
607 .downcast_ref::<crate::process::ProcessError>()
608 .and_then(|pe| pe.exit_code)
609 .unwrap_or(1);
610 ("killed", Some(exit_code))
611 }
612 }
613}
614
615impl AgentBuilder {
616 fn persist_session_metadata_with_id(
626 &self,
627 provider: &str,
628 model: &str,
629 effective_root: Option<&str>,
630 explicit_session_id: Option<&str>,
631 ) -> Option<String> {
632 let has_metadata = self.metadata.name.is_some()
633 || self.metadata.description.is_some()
634 || !self.metadata.tags.is_empty();
635 if !has_metadata {
636 return None;
637 }
638
639 let session_id = explicit_session_id
640 .map(String::from)
641 .or_else(|| self.session_id.clone())
642 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
643 let workspace_path = effective_root
644 .map(String::from)
645 .or_else(|| self.root.clone())
646 .unwrap_or_else(|| {
647 std::env::current_dir()
648 .map(|p| p.to_string_lossy().to_string())
649 .unwrap_or_default()
650 });
651
652 let entry = SessionEntry {
653 session_id: session_id.clone(),
654 provider: provider.to_string(),
655 model: model.to_string(),
656 worktree_path: workspace_path,
657 worktree_name: String::new(),
658 created_at: chrono::Utc::now().to_rfc3339(),
659 provider_session_id: None,
660 sandbox_name: None,
661 is_worktree: self.worktree.is_some(),
662 discovered: false,
663 discovery_source: None,
664 log_path: None,
665 log_completeness: "partial".to_string(),
666 name: self.metadata.name.clone(),
667 description: self.metadata.description.clone(),
668 tags: self.metadata.tags.clone(),
669 dependencies: Vec::new(),
670 retried_from: None,
671 interactive: false,
672 };
673
674 let mut store = SessionStore::load(self.root.as_deref()).unwrap_or_default();
675 store.add(entry);
676 if let Err(e) = store.save(self.root.as_deref()) {
677 warn!("Failed to persist session metadata: {e}");
678 }
679
680 Some(session_id)
681 }
682
683 fn prepend_files(&self, prompt: &str) -> Result<String> {
685 if self.files.is_empty() {
686 return Ok(prompt.to_string());
687 }
688 let attachments: Vec<Attachment> = self
689 .files
690 .iter()
691 .map(|f| Attachment::from_path(std::path::Path::new(f)))
692 .collect::<Result<Vec<_>>>()?;
693 let prefix = attachment::format_attachments_prefix(&attachments);
694 Ok(format!("{prefix}{prompt}"))
695 }
696
697 fn resolve_provider(&self) -> Result<String> {
699 if let Some(ref p) = self.provider {
700 let p = p.to_lowercase();
701 if !Config::VALID_PROVIDERS.contains(&p.as_str()) {
702 bail!(
703 "Invalid provider '{}'. Available: {}",
704 p,
705 Config::VALID_PROVIDERS.join(", ")
706 );
707 }
708 return Ok(p);
709 }
710 let config = Config::load(self.root.as_deref()).unwrap_or_default();
711 if let Some(p) = config.provider() {
712 return Ok(p.to_string());
713 }
714 Ok("claude".to_string())
715 }
716
717 async fn create_agent(&self, provider: &str) -> Result<(Box<dyn Agent + Send + Sync>, String)> {
724 let base_system_prompt = self.system_prompt.clone().or_else(|| {
726 Config::load(self.root.as_deref())
727 .unwrap_or_default()
728 .system_prompt()
729 .map(String::from)
730 });
731
732 let system_prompt = if self.json_mode && provider != "claude" {
734 let mut prompt = base_system_prompt.unwrap_or_default();
735 if let Some(ref schema) = self.json_schema {
736 let schema_str = serde_json::to_string_pretty(schema).unwrap_or_default();
737 prompt.push_str(&format!(
738 "\n\nYou MUST respond with valid JSON only. No markdown fences, no explanations. \
739 Your response must conform to this JSON schema:\n{schema_str}"
740 ));
741 } else {
742 prompt.push_str(
743 "\n\nYou MUST respond with valid JSON only. No markdown fences, no explanations.",
744 );
745 }
746 Some(prompt)
747 } else {
748 base_system_prompt
749 };
750
751 self.progress
752 .on_spinner_start(&format!("Initializing {provider} agent"));
753
754 let progress = &*self.progress;
755 let mut on_downgrade = |from: &str, to: &str, reason: &str| {
756 progress.on_warning(&format!("Downgrading provider: {from} → {to} ({reason})"));
757 };
758 let (mut agent, effective_provider) = AgentFactory::create_with_fallback(
759 provider,
760 self.provider_explicit,
761 system_prompt,
762 self.model.clone(),
763 self.root.clone(),
764 self.auto_approve,
765 self.add_dirs.clone(),
766 &mut on_downgrade,
767 )
768 .await?;
769 let provider = effective_provider.as_str();
770
771 let effective_max_turns = self.max_turns.or_else(|| {
773 Config::load(self.root.as_deref())
774 .unwrap_or_default()
775 .max_turns()
776 });
777 if let Some(turns) = effective_max_turns {
778 agent.set_max_turns(turns);
779 }
780
781 let mut output_format = self.output_format.clone();
783 if self.json_mode && output_format.is_none() {
784 output_format = Some("json".to_string());
785 if provider != "claude" {
786 agent.set_capture_output(true);
787 }
788 }
789 agent.set_output_format(output_format);
790
791 if provider == "claude"
793 && let Some(claude_agent) = agent.as_any_mut().downcast_mut::<Claude>()
794 {
795 claude_agent.set_verbose(self.verbose);
796 if let Some(ref session_id) = self.session_id {
797 claude_agent.set_session_id(session_id.clone());
798 }
799 if let Some(ref input_fmt) = self.input_format {
800 claude_agent.set_input_format(Some(input_fmt.clone()));
801 }
802 if self.replay_user_messages {
803 claude_agent.set_replay_user_messages(true);
804 }
805 if self.include_partial_messages {
806 claude_agent.set_include_partial_messages(true);
807 }
808 if self.json_mode
809 && let Some(ref schema) = self.json_schema
810 {
811 let schema_str = serde_json::to_string(schema).unwrap_or_default();
812 claude_agent.set_json_schema(Some(schema_str));
813 }
814 if self.mcp_config.is_some() {
815 claude_agent.set_mcp_config(self.mcp_config.clone());
816 }
817 }
818
819 if provider == "ollama"
821 && let Some(ollama_agent) = agent.as_any_mut().downcast_mut::<Ollama>()
822 {
823 let config = Config::load(self.root.as_deref()).unwrap_or_default();
824 if let Some(ref size) = self.size {
825 let resolved = config.ollama_size_for(size);
826 ollama_agent.set_size(resolved.to_string());
827 }
828 }
829
830 if let Some(ref sandbox_opt) = self.sandbox {
832 let sandbox_name = sandbox_opt
833 .as_deref()
834 .map(String::from)
835 .unwrap_or_else(crate::sandbox::generate_name);
836 let template = crate::sandbox::template_for_provider(provider);
837 let workspace = self.root.clone().unwrap_or_else(|| ".".to_string());
838 agent.set_sandbox(SandboxConfig {
839 name: sandbox_name,
840 template: template.to_string(),
841 workspace,
842 });
843 }
844
845 if !self.env_vars.is_empty() {
846 agent.set_env_vars(self.env_vars.clone());
847 }
848
849 if let Some(ref hook) = self.on_spawn_hook {
850 agent.set_on_spawn_hook(hook.clone());
851 }
852
853 self.progress.on_spinner_finish();
854 self.progress.on_success(&format!(
855 "{} initialized with model {}",
856 provider,
857 agent.get_model()
858 ));
859
860 Ok((agent, effective_provider))
861 }
862
863 fn start_session_log(
871 &mut self,
872 command: &str,
873 resumed: bool,
874 provider: &str,
875 model: &str,
876 provider_session_id: Option<&str>,
877 ) -> Option<SessionLogGuard> {
878 let mode = std::mem::replace(&mut self.session_log_mode, SessionLogMode::Disabled);
879 match mode {
880 SessionLogMode::Disabled => None,
881 SessionLogMode::External(c) => {
882 let wrapper_session_id = c
883 .writer()
884 .log_path()
885 .ok()
886 .and_then(|p| p.file_stem().map(|s| s.to_string_lossy().to_string()))
887 .unwrap_or_default();
888 let log_path = c.writer().log_path().ok();
889 self.apply_event_callback(c.writer());
890 Some(SessionLogGuard {
891 coordinator: None, wrapper_session_id,
893 log_path,
894 external_writer: Some(c.writer().clone()),
895 _owned_external: Some(c),
896 })
897 }
898 SessionLogMode::Auto => {
899 let wrapper_session_id = self
900 .session_id
901 .clone()
902 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
903 let metadata = SessionLogMetadata {
904 provider: provider.to_string(),
905 wrapper_session_id: wrapper_session_id.clone(),
906 provider_session_id: provider_session_id.map(str::to_string),
907 workspace_path: self.root.clone().or_else(|| {
908 std::env::current_dir()
909 .ok()
910 .map(|p| p.to_string_lossy().to_string())
911 }),
912 command: command.to_string(),
913 model: Some(model.to_string()),
914 resumed,
915 backfilled: false,
916 };
917 let live_ctx = LiveLogContext {
918 root: self.root.clone(),
919 provider_session_id: metadata.provider_session_id.clone(),
920 workspace_path: metadata.workspace_path.clone(),
921 started_at: chrono::Utc::now(),
922 is_worktree: self.worktree.is_some(),
923 };
924 let adapter = live_adapter_for_provider(provider, live_ctx, true);
925 let callback = self.build_event_callback();
926 match SessionLogCoordinator::start_with_callback(
927 &logs_dir(self.root.as_deref()),
928 metadata,
929 adapter,
930 callback,
931 ) {
932 Ok(c) => {
933 let _ = c.writer().set_global_index_dir(Config::global_base_dir());
934 let log_path = c.writer().log_path().ok();
935 Some(SessionLogGuard {
936 coordinator: Some(c),
937 wrapper_session_id,
938 log_path,
939 external_writer: None,
940 _owned_external: None,
941 })
942 }
943 Err(e) => {
944 warn!("Failed to start session log coordinator: {e}");
945 None
946 }
947 }
948 }
949 }
950 }
951
952 fn build_event_callback(&self) -> Option<LogEventCallback> {
956 let user_cb = self.log_event_callback.clone();
957 let stream_fmt = self.stream_events_format;
958 let show_thinking = self.stream_show_thinking;
959
960 if user_cb.is_none() && stream_fmt.is_none() {
961 return None;
962 }
963
964 Some(Arc::new(move |event: &AgentLogEvent| {
965 if let Some(ref user) = user_cb {
966 user(event);
967 }
968 if let Some(fmt) = stream_fmt
969 && let Some(text) = listen::format_event(event, fmt, show_thinking)
970 {
971 eprintln!("{text}");
972 }
973 }))
974 }
975
976 fn apply_event_callback(&self, writer: &crate::session_log::SessionLogWriter) {
981 if let Some(cb) = self.build_event_callback() {
982 if let Err(e) = writer.set_event_callback(cb) {
983 warn!("Failed to register session log event callback: {e}");
984 }
985 }
986 }
987
988 pub async fn exec(mut self, prompt: &str) -> Result<AgentOutput> {
992 let registration = self
993 .register_process_opts
994 .as_ref()
995 .map(|opts| process_registration::register(opts.as_borrowed()));
996 if let Some(ref reg) = registration {
997 apply_registration(&mut self, reg);
998 }
999 let result = self.exec_inner(prompt).await;
1000 if let Some(reg) = registration {
1001 let (status, code) = status_for_result(&result);
1002 reg.update_status(status, code);
1003 }
1004 result
1005 }
1006
1007 async fn exec_inner(self, prompt: &str) -> Result<AgentOutput> {
1008 let provider = self.resolve_provider()?;
1009 debug!("exec: provider={provider}");
1010
1011 let effective_root = if let Some(ref wt_opt) = self.worktree {
1013 let wt_name = wt_opt
1014 .as_deref()
1015 .map(String::from)
1016 .unwrap_or_else(worktree::generate_name);
1017 let repo_root = worktree::git_repo_root(self.root.as_deref())?;
1018 let wt_path = worktree::create_worktree(&repo_root, &wt_name)?;
1019 self.progress
1020 .on_success(&format!("Worktree created at {}", wt_path.display()));
1021 Some(wt_path.to_string_lossy().to_string())
1022 } else {
1023 self.root.clone()
1024 };
1025
1026 let mut builder = self;
1027 if effective_root.is_some() {
1028 builder.root = effective_root;
1029 }
1030
1031 let (agent, provider) = builder.create_agent(&provider).await?;
1032
1033 let log_guard =
1037 builder.start_session_log("exec", false, &provider, agent.get_model(), None);
1038
1039 let _ = builder.persist_session_metadata_with_id(
1044 &provider,
1045 agent.get_model(),
1046 builder.root.as_deref(),
1047 log_guard.as_ref().map(|g| g.wrapper_session_id.as_str()),
1048 );
1049
1050 let prompt_with_files = builder.prepend_files(prompt)?;
1052
1053 let effective_prompt = if builder.json_mode && provider != "claude" {
1055 format!(
1056 "IMPORTANT: You MUST respond with valid JSON only. No markdown, no explanation.\n\n{prompt_with_files}"
1057 )
1058 } else {
1059 prompt_with_files
1060 };
1061
1062 let result = if let Some(timeout_dur) = builder.timeout {
1063 match tokio::time::timeout(timeout_dur, agent.run(Some(&effective_prompt))).await {
1064 Ok(r) => r?,
1065 Err(_) => {
1066 agent.cleanup().await.ok();
1067 bail!("Agent timed out after {}", format_duration(timeout_dur));
1068 }
1069 }
1070 } else {
1071 agent.run(Some(&effective_prompt)).await?
1072 };
1073
1074 agent.cleanup().await?;
1076
1077 let log_path_string = log_guard.as_ref().and_then(|g| g.log_path_string());
1078
1079 if let Some(mut output) = result {
1080 if let Some(ref schema) = builder.json_schema {
1082 if !builder.json_mode {
1083 warn!(
1084 "json_schema is set but json_mode is false — \
1085 schema will not be sent to the agent, only used for output validation"
1086 );
1087 }
1088 if let Some(ref result_text) = output.result {
1089 debug!(
1090 "exec: validating result ({} bytes): {:.300}",
1091 result_text.len(),
1092 result_text
1093 );
1094 if let Err(errors) = json_validation::validate_json_schema(result_text, schema)
1095 {
1096 let preview = if result_text.len() > 500 {
1097 &result_text[..500]
1098 } else {
1099 result_text.as_str()
1100 };
1101 bail!(
1102 "JSON schema validation failed: {}\nRaw agent output ({} bytes):\n{}",
1103 errors.join("; "),
1104 result_text.len(),
1105 preview
1106 );
1107 }
1108 }
1109 }
1110 output.log_path = log_path_string;
1111 let success = !output.is_error;
1112 let err_msg = output.error_message.clone();
1113 if let Some(g) = log_guard {
1114 g.finish(success, err_msg).await;
1115 }
1116 Ok(output)
1117 } else {
1118 let mut output = AgentOutput::from_text(&provider, "");
1120 output.log_path = log_path_string;
1121 if let Some(g) = log_guard {
1122 g.finish(true, None).await;
1123 }
1124 Ok(output)
1125 }
1126 }
1127
1128 pub async fn exec_streaming(self, prompt: &str) -> Result<StreamingSession> {
1197 let provider = self.resolve_provider()?;
1198 debug!("exec_streaming: provider={provider}");
1199
1200 if provider != "claude" {
1201 bail!("Streaming input is only supported by the Claude provider");
1202 }
1203
1204 let prompt_with_files = self.prepend_files(prompt)?;
1206
1207 let mut builder = self;
1210 builder.provider_explicit = true;
1211 let (agent, _provider) = builder.create_agent(&provider).await?;
1212
1213 let claude_agent = agent
1215 .as_any_ref()
1216 .downcast_ref::<Claude>()
1217 .ok_or_else(|| anyhow::anyhow!("Failed to downcast agent to Claude"))?;
1218
1219 claude_agent.execute_streaming(Some(&prompt_with_files))
1220 }
1221
1222 pub async fn run(mut self, prompt: Option<&str>) -> Result<()> {
1226 let registration = self
1227 .register_process_opts
1228 .as_ref()
1229 .map(|opts| process_registration::register(opts.as_borrowed()));
1230 if let Some(ref reg) = registration {
1231 apply_registration(&mut self, reg);
1232 }
1233 let result = self.run_inner(prompt).await;
1234 if let Some(reg) = registration {
1235 let (status, code) = status_for_result(&result);
1236 reg.update_status(status, code);
1237 }
1238 result
1239 }
1240
1241 async fn run_inner(self, prompt: Option<&str>) -> Result<()> {
1242 let provider = self.resolve_provider()?;
1243 debug!("run: provider={provider}");
1244
1245 let prompt_with_files = match prompt {
1247 Some(p) => Some(self.prepend_files(p)?),
1248 None if !self.files.is_empty() => {
1249 let attachments: Vec<Attachment> = self
1250 .files
1251 .iter()
1252 .map(|f| Attachment::from_path(std::path::Path::new(f)))
1253 .collect::<Result<Vec<_>>>()?;
1254 Some(attachment::format_attachments_prefix(&attachments))
1255 }
1256 None => None,
1257 };
1258
1259 let mut builder = self;
1260 let (agent, effective_provider) = builder.create_agent(&provider).await?;
1261 let log_guard =
1262 builder.start_session_log("run", false, &effective_provider, agent.get_model(), None);
1263 let _ = builder.persist_session_metadata_with_id(
1264 &effective_provider,
1265 agent.get_model(),
1266 builder.root.as_deref(),
1267 log_guard.as_ref().map(|g| g.wrapper_session_id.as_str()),
1268 );
1269 agent.run_interactive(prompt_with_files.as_deref()).await?;
1270 agent.cleanup().await?;
1271 if let Some(g) = log_guard {
1272 g.finish(true, None).await;
1273 }
1274 Ok(())
1275 }
1276
1277 pub async fn resume(mut self, session_id: &str) -> Result<()> {
1279 let registration = self
1280 .register_process_opts
1281 .as_ref()
1282 .map(|opts| process_registration::register(opts.as_borrowed()));
1283 if let Some(ref reg) = registration {
1284 apply_registration(&mut self, reg);
1285 }
1286 let result = self.resume_inner(session_id).await;
1287 if let Some(reg) = registration {
1288 let (status, code) = status_for_result(&result);
1289 reg.update_status(status, code);
1290 }
1291 result
1292 }
1293
1294 async fn resume_inner(self, session_id: &str) -> Result<()> {
1295 let provider = self.resolve_provider()?;
1296 debug!("resume: provider={provider}, session={session_id}");
1297
1298 let mut builder = self;
1300 builder.provider_explicit = true;
1301 let (agent, effective_provider) = builder.create_agent(&provider).await?;
1302 let log_guard = builder.start_session_log(
1303 "resume",
1304 true,
1305 &effective_provider,
1306 agent.get_model(),
1307 Some(session_id),
1308 );
1309 agent.run_resume(Some(session_id), false).await?;
1310 agent.cleanup().await?;
1311 if let Some(g) = log_guard {
1312 g.finish(true, None).await;
1313 }
1314 Ok(())
1315 }
1316
1317 pub async fn resume_with_prompt(
1330 mut self,
1331 session_id: &str,
1332 prompt: &str,
1333 ) -> Result<Option<AgentOutput>> {
1334 let registration = self
1335 .register_process_opts
1336 .as_ref()
1337 .map(|opts| process_registration::register(opts.as_borrowed()));
1338 if let Some(ref reg) = registration {
1339 apply_registration(&mut self, reg);
1340 }
1341 let result = self.resume_with_prompt_inner(session_id, prompt).await;
1342 if let Some(reg) = registration {
1343 let (status, code) = status_for_result(&result);
1344 reg.update_status(status, code);
1345 }
1346 result
1347 }
1348
1349 async fn resume_with_prompt_inner(
1350 self,
1351 session_id: &str,
1352 prompt: &str,
1353 ) -> Result<Option<AgentOutput>> {
1354 let provider = self.resolve_provider()?;
1355 debug!("resume_with_prompt: provider={provider}, session={session_id}");
1356
1357 let mut builder = self;
1359 builder.provider_explicit = true;
1360 let (agent, effective_provider) = builder.create_agent(&provider).await?;
1361 let log_guard = builder.start_session_log(
1367 "resume",
1368 true,
1369 &effective_provider,
1370 agent.get_model(),
1371 Some(session_id),
1372 );
1373 let output = agent.run_resume_with_prompt(session_id, prompt).await?;
1374 agent.cleanup().await?;
1375 if let Some(g) = log_guard {
1376 g.finish(true, None).await;
1377 }
1378 Ok(output)
1379 }
1380
1381 pub async fn continue_last(mut self) -> Result<()> {
1383 let registration = self
1384 .register_process_opts
1385 .as_ref()
1386 .map(|opts| process_registration::register(opts.as_borrowed()));
1387 if let Some(ref reg) = registration {
1388 apply_registration(&mut self, reg);
1389 }
1390 let result = self.continue_last_inner().await;
1391 if let Some(reg) = registration {
1392 let (status, code) = status_for_result(&result);
1393 reg.update_status(status, code);
1394 }
1395 result
1396 }
1397
1398 async fn continue_last_inner(self) -> Result<()> {
1399 let provider = self.resolve_provider()?;
1400 debug!("continue_last: provider={provider}");
1401
1402 let mut builder = self;
1404 builder.provider_explicit = true;
1405 let (agent, effective_provider) = builder.create_agent(&provider).await?;
1406 let log_guard =
1407 builder.start_session_log("resume", true, &effective_provider, agent.get_model(), None);
1408 agent.run_resume(None, true).await?;
1409 agent.cleanup().await?;
1410 if let Some(g) = log_guard {
1411 g.finish(true, None).await;
1412 }
1413 Ok(())
1414 }
1415}
1416
1417#[cfg(test)]
1418#[path = "builder_tests.rs"]
1419mod tests;