1use crate::agent::Agent;
32use crate::attachment::{self, Attachment};
33use crate::config::Config;
34use crate::factory::AgentFactory;
35use crate::json_validation;
36use crate::listen::{self, ListenFormat};
37use crate::output::AgentOutput;
38use crate::process_registration::{self, ProcessRegistration, RegisterOptionsOwned};
39use crate::progress::{ProgressHandler, SilentProgress};
40use crate::providers::claude::Claude;
41use crate::providers::ollama::Ollama;
42use crate::sandbox::SandboxConfig;
43use crate::session::{SessionEntry, SessionStore};
44use crate::session_log::{
45 AgentLogEvent, LiveLogContext, LogEventCallback, SessionLogCoordinator, SessionLogMetadata,
46 live_adapter_for_provider, logs_dir,
47};
48use crate::streaming::StreamingSession;
49use crate::worktree;
50use anyhow::{Result, bail};
51use log::{debug, warn};
52use std::sync::Arc;
53use std::time::Duration;
54
55fn format_duration(d: Duration) -> String {
57 let total_secs = d.as_secs();
58 let h = total_secs / 3600;
59 let m = (total_secs % 3600) / 60;
60 let s = total_secs % 60;
61 let mut parts = Vec::new();
62 if h > 0 {
63 parts.push(format!("{h}h"));
64 }
65 if m > 0 {
66 parts.push(format!("{m}m"));
67 }
68 if s > 0 || parts.is_empty() {
69 parts.push(format!("{s}s"));
70 }
71 parts.join("")
72}
73
74#[derive(Debug, Clone, Default)]
79pub struct SessionMetadata {
80 pub name: Option<String>,
81 pub description: Option<String>,
82 pub tags: Vec<String>,
83}
84
85struct SessionLogGuard {
90 coordinator: Option<SessionLogCoordinator>,
93 wrapper_session_id: String,
94 log_path: Option<std::path::PathBuf>,
95 external_writer: Option<crate::session_log::SessionLogWriter>,
98 _owned_external: Option<SessionLogCoordinator>,
102}
103
104impl SessionLogGuard {
105 fn log_path_string(&self) -> Option<String> {
106 self.log_path
107 .as_ref()
108 .map(|p| p.to_string_lossy().to_string())
109 }
110
111 async fn finish(mut self, success: bool, error: Option<String>) {
116 if let Some(coord) = self.coordinator.take() {
119 if let Err(e) = coord.finish(success, error).await {
120 warn!("Failed to finalize session log: {e}");
121 }
122 }
123 if let Some(w) = self.external_writer.take() {
124 let _ = w.clear_event_callback();
125 }
126 }
127}
128
129impl Drop for SessionLogGuard {
130 fn drop(&mut self) {
131 if let Some(ref w) = self.external_writer {
136 let _ = w.clear_event_callback();
137 }
138 if let Some(ref c) = self.coordinator {
139 let _ = c.writer().clear_event_callback();
140 }
141 }
142}
143
144#[derive(Default)]
156pub enum SessionLogMode {
157 #[default]
160 Disabled,
161 Auto,
165 External(SessionLogCoordinator),
168}
169
170pub struct AgentBuilder {
175 provider: Option<String>,
176 provider_explicit: bool,
180 model: Option<String>,
181 system_prompt: Option<String>,
182 root: Option<String>,
183 auto_approve: bool,
184 add_dirs: Vec<String>,
185 files: Vec<String>,
186 env_vars: Vec<(String, String)>,
187 worktree: Option<Option<String>>,
188 sandbox: Option<Option<String>>,
189 size: Option<String>,
190 json_mode: bool,
191 json_schema: Option<serde_json::Value>,
192 session_id: Option<String>,
193 metadata: SessionMetadata,
194 output_format: Option<String>,
195 input_format: Option<String>,
196 replay_user_messages: bool,
197 include_partial_messages: bool,
198 verbose: bool,
199 quiet: bool,
200 show_usage: bool,
201 max_turns: Option<u32>,
202 timeout: Option<std::time::Duration>,
203 mcp_config: Option<String>,
204 progress: Box<dyn ProgressHandler>,
205 session_log_mode: SessionLogMode,
206 log_event_callback: Option<LogEventCallback>,
210 stream_events_format: Option<ListenFormat>,
213 stream_show_thinking: bool,
216 on_spawn_hook: Option<crate::agent::OnSpawnHook>,
221 register_process_opts: Option<RegisterOptionsOwned>,
228}
229
230impl Default for AgentBuilder {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236impl AgentBuilder {
237 pub fn new() -> Self {
239 Self {
240 provider: None,
241 provider_explicit: false,
242 model: None,
243 system_prompt: None,
244 root: None,
245 auto_approve: false,
246 add_dirs: Vec::new(),
247 files: Vec::new(),
248 env_vars: Vec::new(),
249 worktree: None,
250 sandbox: None,
251 size: None,
252 json_mode: false,
253 json_schema: None,
254 session_id: None,
255 metadata: SessionMetadata::default(),
256 output_format: None,
257 input_format: None,
258 replay_user_messages: false,
259 include_partial_messages: false,
260 verbose: false,
261 quiet: false,
262 show_usage: false,
263 max_turns: None,
264 timeout: None,
265 mcp_config: None,
266 progress: Box::new(SilentProgress),
267 session_log_mode: SessionLogMode::Disabled,
268 log_event_callback: None,
269 stream_events_format: None,
270 stream_show_thinking: false,
271 on_spawn_hook: None,
272 register_process_opts: None,
273 }
274 }
275
276 pub fn provider(mut self, provider: &str) -> Self {
283 self.provider = Some(provider.to_string());
284 self.provider_explicit = true;
285 self
286 }
287
288 pub fn model(mut self, model: &str) -> Self {
290 self.model = Some(model.to_string());
291 self
292 }
293
294 pub fn system_prompt(mut self, prompt: &str) -> Self {
296 self.system_prompt = Some(prompt.to_string());
297 self
298 }
299
300 pub fn root(mut self, root: &str) -> Self {
302 self.root = Some(root.to_string());
303 self
304 }
305
306 pub fn auto_approve(mut self, approve: bool) -> Self {
308 self.auto_approve = approve;
309 self
310 }
311
312 pub fn add_dir(mut self, dir: &str) -> Self {
314 self.add_dirs.push(dir.to_string());
315 self
316 }
317
318 pub fn file(mut self, path: &str) -> Self {
320 self.files.push(path.to_string());
321 self
322 }
323
324 pub fn env(mut self, key: &str, value: &str) -> Self {
326 self.env_vars.push((key.to_string(), value.to_string()));
327 self
328 }
329
330 pub fn worktree(mut self, name: Option<&str>) -> Self {
332 self.worktree = Some(name.map(String::from));
333 self
334 }
335
336 pub fn sandbox(mut self, name: Option<&str>) -> Self {
338 self.sandbox = Some(name.map(String::from));
339 self
340 }
341
342 pub fn size(mut self, size: &str) -> Self {
344 self.size = Some(size.to_string());
345 self
346 }
347
348 pub fn json(mut self) -> Self {
350 self.json_mode = true;
351 self
352 }
353
354 pub fn json_schema(mut self, schema: serde_json::Value) -> Self {
357 self.json_schema = Some(schema);
358 self.json_mode = true;
359 self
360 }
361
362 pub fn session_id(mut self, id: &str) -> Self {
364 self.session_id = Some(id.to_string());
365 self
366 }
367
368 pub fn name(mut self, name: &str) -> Self {
375 self.metadata.name = Some(name.to_string());
376 self
377 }
378
379 pub fn description(mut self, description: &str) -> Self {
382 self.metadata.description = Some(description.to_string());
383 self
384 }
385
386 pub fn tag(mut self, tag: &str) -> Self {
389 self.metadata.tags.push(tag.to_string());
390 self
391 }
392
393 pub fn metadata(mut self, metadata: SessionMetadata) -> Self {
395 self.metadata = metadata;
396 self
397 }
398
399 pub fn output_format(mut self, format: &str) -> Self {
401 self.output_format = Some(format.to_string());
402 self
403 }
404
405 pub fn input_format(mut self, format: &str) -> Self {
410 self.input_format = Some(format.to_string());
411 self
412 }
413
414 pub fn replay_user_messages(mut self, replay: bool) -> Self {
420 self.replay_user_messages = replay;
421 self
422 }
423
424 pub fn include_partial_messages(mut self, include: bool) -> Self {
434 self.include_partial_messages = include;
435 self
436 }
437
438 pub fn verbose(mut self, v: bool) -> Self {
440 self.verbose = v;
441 self
442 }
443
444 pub fn quiet(mut self, q: bool) -> Self {
446 self.quiet = q;
447 self
448 }
449
450 pub fn show_usage(mut self, show: bool) -> Self {
452 self.show_usage = show;
453 self
454 }
455
456 pub fn max_turns(mut self, turns: u32) -> Self {
458 self.max_turns = Some(turns);
459 self
460 }
461
462 pub fn timeout(mut self, duration: std::time::Duration) -> Self {
465 self.timeout = Some(duration);
466 self
467 }
468
469 pub fn mcp_config(mut self, config: &str) -> Self {
476 self.mcp_config = Some(config.to_string());
477 self
478 }
479
480 pub fn on_progress(mut self, handler: Box<dyn ProgressHandler>) -> Self {
482 self.progress = handler;
483 self
484 }
485
486 pub fn session_log(mut self, mode: SessionLogMode) -> Self {
488 self.session_log_mode = mode;
489 self
490 }
491
492 pub fn enable_session_log(mut self, enable: bool) -> Self {
495 self.session_log_mode = if enable {
496 SessionLogMode::Auto
497 } else {
498 SessionLogMode::Disabled
499 };
500 self
501 }
502
503 pub fn on_log_event<F>(mut self, f: F) -> Self
508 where
509 F: Fn(&AgentLogEvent) + Send + Sync + 'static,
510 {
511 self.log_event_callback = Some(Arc::new(f));
512 if matches!(self.session_log_mode, SessionLogMode::Disabled) {
513 self.session_log_mode = SessionLogMode::Auto;
514 }
515 self
516 }
517
518 pub fn stream_events_to_stderr(mut self, format: ListenFormat) -> Self {
525 self.stream_events_format = Some(format);
526 if matches!(self.session_log_mode, SessionLogMode::Disabled) {
527 self.session_log_mode = SessionLogMode::Auto;
528 }
529 self
530 }
531
532 pub fn stream_show_thinking(mut self, show: bool) -> Self {
535 self.stream_show_thinking = show;
536 self
537 }
538
539 pub fn on_spawn<F>(mut self, f: F) -> Self
549 where
550 F: Fn(u32) + Send + Sync + 'static,
551 {
552 self.on_spawn_hook = Some(Arc::new(f));
553 self
554 }
555
556 pub fn register_process(mut self, opts: RegisterOptionsOwned) -> Self {
575 self.register_process_opts = Some(opts);
576 self
577 }
578}
579
580fn apply_registration(builder: &mut AgentBuilder, reg: &ProcessRegistration) {
585 for (k, v) in reg.env_vars() {
586 builder.env_vars.push((k.clone(), v.clone()));
587 }
588 let reg_hook = reg.on_spawn_hook();
589 let prev_hook = builder.on_spawn_hook.take();
590 builder.on_spawn_hook = Some(Arc::new(move |pid: u32| {
591 reg_hook(pid);
592 if let Some(ref h) = prev_hook {
593 h(pid);
594 }
595 }));
596}
597
598fn status_for_result<T>(result: &Result<T>) -> (&'static str, Option<i32>) {
603 match result {
604 Ok(_) => ("exited", Some(0)),
605 Err(err) => {
606 let exit_code = err
607 .downcast_ref::<crate::process::ProcessError>()
608 .and_then(|pe| pe.exit_code)
609 .unwrap_or(1);
610 ("killed", Some(exit_code))
611 }
612 }
613}
614
615impl AgentBuilder {
616 fn persist_session_metadata_with_id(
626 &self,
627 provider: &str,
628 model: &str,
629 effective_root: Option<&str>,
630 explicit_session_id: Option<&str>,
631 ) -> Option<String> {
632 let has_metadata = self.metadata.name.is_some()
633 || self.metadata.description.is_some()
634 || !self.metadata.tags.is_empty();
635 if !has_metadata {
636 return None;
637 }
638
639 let session_id = explicit_session_id
640 .map(String::from)
641 .or_else(|| self.session_id.clone())
642 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
643 let workspace_path = effective_root
644 .map(String::from)
645 .or_else(|| self.root.clone())
646 .unwrap_or_else(|| {
647 std::env::current_dir()
648 .map(|p| p.to_string_lossy().to_string())
649 .unwrap_or_default()
650 });
651
652 let entry = SessionEntry {
653 session_id: session_id.clone(),
654 provider: provider.to_string(),
655 model: model.to_string(),
656 worktree_path: workspace_path,
657 worktree_name: String::new(),
658 created_at: chrono::Utc::now().to_rfc3339(),
659 provider_session_id: None,
660 sandbox_name: None,
661 is_worktree: self.worktree.is_some(),
662 discovered: false,
663 discovery_source: None,
664 log_path: None,
665 log_completeness: "partial".to_string(),
666 name: self.metadata.name.clone(),
667 description: self.metadata.description.clone(),
668 tags: self.metadata.tags.clone(),
669 dependencies: Vec::new(),
670 retried_from: None,
671 interactive: false,
672 };
673
674 let mut store = SessionStore::load(self.root.as_deref()).unwrap_or_default();
675 store.add(entry);
676 if let Err(e) = store.save(self.root.as_deref()) {
677 warn!("Failed to persist session metadata: {e}");
678 }
679
680 Some(session_id)
681 }
682
683 fn prepend_files(&self, prompt: &str) -> Result<String> {
685 if self.files.is_empty() {
686 return Ok(prompt.to_string());
687 }
688 let attachments: Vec<Attachment> = self
689 .files
690 .iter()
691 .map(|f| Attachment::from_path(std::path::Path::new(f)))
692 .collect::<Result<Vec<_>>>()?;
693 let prefix = attachment::format_attachments_prefix(&attachments);
694 Ok(format!("{prefix}{prompt}"))
695 }
696
697 fn resolve_provider(&self) -> Result<String> {
699 if let Some(ref p) = self.provider {
700 let p = p.to_lowercase();
701 if !Config::VALID_PROVIDERS.contains(&p.as_str()) {
702 bail!(
703 "Invalid provider '{}'. Available: {}",
704 p,
705 Config::VALID_PROVIDERS.join(", ")
706 );
707 }
708 return Ok(p);
709 }
710 let config = Config::load(self.root.as_deref()).unwrap_or_default();
711 if let Some(p) = config.provider() {
712 return Ok(p.to_string());
713 }
714 Ok("claude".to_string())
715 }
716
717 async fn create_agent(&self, provider: &str) -> Result<(Box<dyn Agent + Send + Sync>, String)> {
724 let base_system_prompt = self.system_prompt.clone().or_else(|| {
726 Config::load(self.root.as_deref())
727 .unwrap_or_default()
728 .system_prompt()
729 .map(String::from)
730 });
731
732 let system_prompt = if self.json_mode && provider != "claude" {
734 let mut prompt = base_system_prompt.unwrap_or_default();
735 if let Some(ref schema) = self.json_schema {
736 let schema_str = serde_json::to_string_pretty(schema).unwrap_or_default();
737 prompt.push_str(&format!(
738 "\n\nYou MUST respond with valid JSON only. No markdown fences, no explanations. \
739 Your response must conform to this JSON schema:\n{schema_str}"
740 ));
741 } else {
742 prompt.push_str(
743 "\n\nYou MUST respond with valid JSON only. No markdown fences, no explanations.",
744 );
745 }
746 Some(prompt)
747 } else {
748 base_system_prompt
749 };
750
751 self.progress
752 .on_spinner_start(&format!("Initializing {provider} agent"));
753
754 let progress = &*self.progress;
755 let mut on_downgrade = |from: &str, to: &str, reason: &str| {
756 progress.on_warning(&format!("Downgrading provider: {from} → {to} ({reason})"));
757 };
758 let (mut agent, effective_provider) = AgentFactory::create_with_fallback(
759 provider,
760 self.provider_explicit,
761 system_prompt,
762 self.model.clone(),
763 self.root.clone(),
764 self.auto_approve,
765 self.add_dirs.clone(),
766 &mut on_downgrade,
767 )
768 .await?;
769 let provider = effective_provider.as_str();
770
771 let effective_max_turns = self.max_turns.or_else(|| {
773 Config::load(self.root.as_deref())
774 .unwrap_or_default()
775 .max_turns()
776 });
777 if let Some(turns) = effective_max_turns {
778 agent.set_max_turns(turns);
779 }
780
781 let mut output_format = self.output_format.clone();
783 if self.json_mode && output_format.is_none() {
784 output_format = Some("json".to_string());
785 if provider != "claude" {
786 agent.set_capture_output(true);
787 }
788 }
789 agent.set_output_format(output_format);
790
791 if provider == "claude"
793 && let Some(claude_agent) = agent.as_any_mut().downcast_mut::<Claude>()
794 {
795 claude_agent.set_verbose(self.verbose);
796 if let Some(ref session_id) = self.session_id {
797 claude_agent.set_session_id(session_id.clone());
798 }
799 if let Some(ref input_fmt) = self.input_format {
800 claude_agent.set_input_format(Some(input_fmt.clone()));
801 }
802 if self.replay_user_messages {
803 claude_agent.set_replay_user_messages(true);
804 }
805 if self.include_partial_messages {
806 claude_agent.set_include_partial_messages(true);
807 }
808 if self.json_mode
809 && let Some(ref schema) = self.json_schema
810 {
811 let schema_str = serde_json::to_string(schema).unwrap_or_default();
812 claude_agent.set_json_schema(Some(schema_str));
813 }
814 if self.mcp_config.is_some() {
815 claude_agent.set_mcp_config(self.mcp_config.clone());
816 }
817 }
818
819 if provider == "ollama"
821 && let Some(ollama_agent) = agent.as_any_mut().downcast_mut::<Ollama>()
822 {
823 let config = Config::load(self.root.as_deref()).unwrap_or_default();
824 if let Some(ref size) = self.size {
825 let resolved = config.ollama_size_for(size);
826 ollama_agent.set_size(resolved.to_string());
827 }
828 }
829
830 if let Some(ref sandbox_opt) = self.sandbox {
832 let sandbox_name = sandbox_opt
833 .as_deref()
834 .map(String::from)
835 .unwrap_or_else(crate::sandbox::generate_name);
836 let template = crate::sandbox::template_for_provider(provider);
837 let workspace = self.root.clone().unwrap_or_else(|| ".".to_string());
838 agent.set_sandbox(SandboxConfig {
839 name: sandbox_name,
840 template: template.to_string(),
841 workspace,
842 });
843 }
844
845 if !self.env_vars.is_empty() {
846 agent.set_env_vars(self.env_vars.clone());
847 }
848
849 if let Some(ref hook) = self.on_spawn_hook {
850 agent.set_on_spawn_hook(hook.clone());
851 }
852
853 self.progress.on_spinner_finish();
854 self.progress.on_success(&format!(
855 "{} initialized with model {}",
856 provider,
857 agent.get_model()
858 ));
859
860 Ok((agent, effective_provider))
861 }
862
863 fn start_session_log(
871 &mut self,
872 command: &str,
873 resumed: bool,
874 provider: &str,
875 model: &str,
876 ) -> Option<SessionLogGuard> {
877 let mode = std::mem::replace(&mut self.session_log_mode, SessionLogMode::Disabled);
878 match mode {
879 SessionLogMode::Disabled => None,
880 SessionLogMode::External(c) => {
881 let wrapper_session_id = c
882 .writer()
883 .log_path()
884 .ok()
885 .and_then(|p| p.file_stem().map(|s| s.to_string_lossy().to_string()))
886 .unwrap_or_default();
887 let log_path = c.writer().log_path().ok();
888 self.apply_event_callback(c.writer());
889 Some(SessionLogGuard {
890 coordinator: None, wrapper_session_id,
892 log_path,
893 external_writer: Some(c.writer().clone()),
894 _owned_external: Some(c),
895 })
896 }
897 SessionLogMode::Auto => {
898 let wrapper_session_id = self
899 .session_id
900 .clone()
901 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
902 let metadata = SessionLogMetadata {
903 provider: provider.to_string(),
904 wrapper_session_id: wrapper_session_id.clone(),
905 provider_session_id: None,
906 workspace_path: self.root.clone().or_else(|| {
907 std::env::current_dir()
908 .ok()
909 .map(|p| p.to_string_lossy().to_string())
910 }),
911 command: command.to_string(),
912 model: Some(model.to_string()),
913 resumed,
914 backfilled: false,
915 };
916 let live_ctx = LiveLogContext {
917 root: self.root.clone(),
918 provider_session_id: metadata.provider_session_id.clone(),
919 workspace_path: metadata.workspace_path.clone(),
920 started_at: chrono::Utc::now(),
921 is_worktree: self.worktree.is_some(),
922 };
923 let adapter = live_adapter_for_provider(provider, live_ctx, true);
924 let callback = self.build_event_callback();
925 match SessionLogCoordinator::start_with_callback(
926 &logs_dir(self.root.as_deref()),
927 metadata,
928 adapter,
929 callback,
930 ) {
931 Ok(c) => {
932 let _ = c.writer().set_global_index_dir(Config::global_base_dir());
933 let log_path = c.writer().log_path().ok();
934 Some(SessionLogGuard {
935 coordinator: Some(c),
936 wrapper_session_id,
937 log_path,
938 external_writer: None,
939 _owned_external: None,
940 })
941 }
942 Err(e) => {
943 warn!("Failed to start session log coordinator: {e}");
944 None
945 }
946 }
947 }
948 }
949 }
950
951 fn build_event_callback(&self) -> Option<LogEventCallback> {
955 let user_cb = self.log_event_callback.clone();
956 let stream_fmt = self.stream_events_format;
957 let show_thinking = self.stream_show_thinking;
958
959 if user_cb.is_none() && stream_fmt.is_none() {
960 return None;
961 }
962
963 Some(Arc::new(move |event: &AgentLogEvent| {
964 if let Some(ref user) = user_cb {
965 user(event);
966 }
967 if let Some(fmt) = stream_fmt
968 && let Some(text) = listen::format_event(event, fmt, show_thinking)
969 {
970 eprintln!("{text}");
971 }
972 }))
973 }
974
975 fn apply_event_callback(&self, writer: &crate::session_log::SessionLogWriter) {
980 if let Some(cb) = self.build_event_callback() {
981 if let Err(e) = writer.set_event_callback(cb) {
982 warn!("Failed to register session log event callback: {e}");
983 }
984 }
985 }
986
987 pub async fn exec(mut self, prompt: &str) -> Result<AgentOutput> {
991 let registration = self
992 .register_process_opts
993 .as_ref()
994 .map(|opts| process_registration::register(opts.as_borrowed()));
995 if let Some(ref reg) = registration {
996 apply_registration(&mut self, reg);
997 }
998 let result = self.exec_inner(prompt).await;
999 if let Some(reg) = registration {
1000 let (status, code) = status_for_result(&result);
1001 reg.update_status(status, code);
1002 }
1003 result
1004 }
1005
1006 async fn exec_inner(self, prompt: &str) -> Result<AgentOutput> {
1007 let provider = self.resolve_provider()?;
1008 debug!("exec: provider={provider}");
1009
1010 let effective_root = if let Some(ref wt_opt) = self.worktree {
1012 let wt_name = wt_opt
1013 .as_deref()
1014 .map(String::from)
1015 .unwrap_or_else(worktree::generate_name);
1016 let repo_root = worktree::git_repo_root(self.root.as_deref())?;
1017 let wt_path = worktree::create_worktree(&repo_root, &wt_name)?;
1018 self.progress
1019 .on_success(&format!("Worktree created at {}", wt_path.display()));
1020 Some(wt_path.to_string_lossy().to_string())
1021 } else {
1022 self.root.clone()
1023 };
1024
1025 let mut builder = self;
1026 if effective_root.is_some() {
1027 builder.root = effective_root;
1028 }
1029
1030 let (agent, provider) = builder.create_agent(&provider).await?;
1031
1032 let log_guard = builder.start_session_log("exec", false, &provider, agent.get_model());
1036
1037 let _ = builder.persist_session_metadata_with_id(
1042 &provider,
1043 agent.get_model(),
1044 builder.root.as_deref(),
1045 log_guard.as_ref().map(|g| g.wrapper_session_id.as_str()),
1046 );
1047
1048 let prompt_with_files = builder.prepend_files(prompt)?;
1050
1051 let effective_prompt = if builder.json_mode && provider != "claude" {
1053 format!(
1054 "IMPORTANT: You MUST respond with valid JSON only. No markdown, no explanation.\n\n{prompt_with_files}"
1055 )
1056 } else {
1057 prompt_with_files
1058 };
1059
1060 let result = if let Some(timeout_dur) = builder.timeout {
1061 match tokio::time::timeout(timeout_dur, agent.run(Some(&effective_prompt))).await {
1062 Ok(r) => r?,
1063 Err(_) => {
1064 agent.cleanup().await.ok();
1065 bail!("Agent timed out after {}", format_duration(timeout_dur));
1066 }
1067 }
1068 } else {
1069 agent.run(Some(&effective_prompt)).await?
1070 };
1071
1072 agent.cleanup().await?;
1074
1075 let log_path_string = log_guard.as_ref().and_then(|g| g.log_path_string());
1076
1077 if let Some(mut output) = result {
1078 if let Some(ref schema) = builder.json_schema {
1080 if !builder.json_mode {
1081 warn!(
1082 "json_schema is set but json_mode is false — \
1083 schema will not be sent to the agent, only used for output validation"
1084 );
1085 }
1086 if let Some(ref result_text) = output.result {
1087 debug!(
1088 "exec: validating result ({} bytes): {:.300}",
1089 result_text.len(),
1090 result_text
1091 );
1092 if let Err(errors) = json_validation::validate_json_schema(result_text, schema)
1093 {
1094 let preview = if result_text.len() > 500 {
1095 &result_text[..500]
1096 } else {
1097 result_text.as_str()
1098 };
1099 bail!(
1100 "JSON schema validation failed: {}\nRaw agent output ({} bytes):\n{}",
1101 errors.join("; "),
1102 result_text.len(),
1103 preview
1104 );
1105 }
1106 }
1107 }
1108 output.log_path = log_path_string;
1109 let success = !output.is_error;
1110 let err_msg = output.error_message.clone();
1111 if let Some(g) = log_guard {
1112 g.finish(success, err_msg).await;
1113 }
1114 Ok(output)
1115 } else {
1116 let mut output = AgentOutput::from_text(&provider, "");
1118 output.log_path = log_path_string;
1119 if let Some(g) = log_guard {
1120 g.finish(true, None).await;
1121 }
1122 Ok(output)
1123 }
1124 }
1125
1126 pub async fn exec_streaming(self, prompt: &str) -> Result<StreamingSession> {
1195 let provider = self.resolve_provider()?;
1196 debug!("exec_streaming: provider={provider}");
1197
1198 if provider != "claude" {
1199 bail!("Streaming input is only supported by the Claude provider");
1200 }
1201
1202 let prompt_with_files = self.prepend_files(prompt)?;
1204
1205 let mut builder = self;
1208 builder.provider_explicit = true;
1209 let (agent, _provider) = builder.create_agent(&provider).await?;
1210
1211 let claude_agent = agent
1213 .as_any_ref()
1214 .downcast_ref::<Claude>()
1215 .ok_or_else(|| anyhow::anyhow!("Failed to downcast agent to Claude"))?;
1216
1217 claude_agent.execute_streaming(Some(&prompt_with_files))
1218 }
1219
1220 pub async fn run(mut self, prompt: Option<&str>) -> Result<()> {
1224 let registration = self
1225 .register_process_opts
1226 .as_ref()
1227 .map(|opts| process_registration::register(opts.as_borrowed()));
1228 if let Some(ref reg) = registration {
1229 apply_registration(&mut self, reg);
1230 }
1231 let result = self.run_inner(prompt).await;
1232 if let Some(reg) = registration {
1233 let (status, code) = status_for_result(&result);
1234 reg.update_status(status, code);
1235 }
1236 result
1237 }
1238
1239 async fn run_inner(self, prompt: Option<&str>) -> Result<()> {
1240 let provider = self.resolve_provider()?;
1241 debug!("run: provider={provider}");
1242
1243 let prompt_with_files = match prompt {
1245 Some(p) => Some(self.prepend_files(p)?),
1246 None if !self.files.is_empty() => {
1247 let attachments: Vec<Attachment> = self
1248 .files
1249 .iter()
1250 .map(|f| Attachment::from_path(std::path::Path::new(f)))
1251 .collect::<Result<Vec<_>>>()?;
1252 Some(attachment::format_attachments_prefix(&attachments))
1253 }
1254 None => None,
1255 };
1256
1257 let mut builder = self;
1258 let (agent, effective_provider) = builder.create_agent(&provider).await?;
1259 let log_guard =
1260 builder.start_session_log("run", false, &effective_provider, agent.get_model());
1261 let _ = builder.persist_session_metadata_with_id(
1262 &effective_provider,
1263 agent.get_model(),
1264 builder.root.as_deref(),
1265 log_guard.as_ref().map(|g| g.wrapper_session_id.as_str()),
1266 );
1267 agent.run_interactive(prompt_with_files.as_deref()).await?;
1268 agent.cleanup().await?;
1269 if let Some(g) = log_guard {
1270 g.finish(true, None).await;
1271 }
1272 Ok(())
1273 }
1274
1275 pub async fn resume(mut self, session_id: &str) -> Result<()> {
1277 let registration = self
1278 .register_process_opts
1279 .as_ref()
1280 .map(|opts| process_registration::register(opts.as_borrowed()));
1281 if let Some(ref reg) = registration {
1282 apply_registration(&mut self, reg);
1283 }
1284 let result = self.resume_inner(session_id).await;
1285 if let Some(reg) = registration {
1286 let (status, code) = status_for_result(&result);
1287 reg.update_status(status, code);
1288 }
1289 result
1290 }
1291
1292 async fn resume_inner(self, session_id: &str) -> Result<()> {
1293 let provider = self.resolve_provider()?;
1294 debug!("resume: provider={provider}, session={session_id}");
1295
1296 let mut builder = self;
1298 builder.provider_explicit = true;
1299 let (agent, effective_provider) = builder.create_agent(&provider).await?;
1300 let log_guard =
1301 builder.start_session_log("resume", true, &effective_provider, agent.get_model());
1302 agent.run_resume(Some(session_id), false).await?;
1303 agent.cleanup().await?;
1304 if let Some(g) = log_guard {
1305 g.finish(true, None).await;
1306 }
1307 Ok(())
1308 }
1309
1310 pub async fn continue_last(mut self) -> Result<()> {
1312 let registration = self
1313 .register_process_opts
1314 .as_ref()
1315 .map(|opts| process_registration::register(opts.as_borrowed()));
1316 if let Some(ref reg) = registration {
1317 apply_registration(&mut self, reg);
1318 }
1319 let result = self.continue_last_inner().await;
1320 if let Some(reg) = registration {
1321 let (status, code) = status_for_result(&result);
1322 reg.update_status(status, code);
1323 }
1324 result
1325 }
1326
1327 async fn continue_last_inner(self) -> Result<()> {
1328 let provider = self.resolve_provider()?;
1329 debug!("continue_last: provider={provider}");
1330
1331 let mut builder = self;
1333 builder.provider_explicit = true;
1334 let (agent, effective_provider) = builder.create_agent(&provider).await?;
1335 let log_guard =
1336 builder.start_session_log("resume", true, &effective_provider, agent.get_model());
1337 agent.run_resume(None, true).await?;
1338 agent.cleanup().await?;
1339 if let Some(g) = log_guard {
1340 g.finish(true, None).await;
1341 }
1342 Ok(())
1343 }
1344}
1345
1346#[cfg(test)]
1347#[path = "builder_tests.rs"]
1348mod tests;