Skip to main content

zag_agent/
builder.rs

1//! High-level builder API for driving agents programmatically.
2//!
3//! Instead of shelling out to the `agent` CLI binary, Rust programs can
4//! use `AgentBuilder` to configure and execute agent sessions directly.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use zag_agent::builder::AgentBuilder;
10//!
11//! # async fn example() -> anyhow::Result<()> {
12//! // Non-interactive exec — returns structured output
13//! let output = AgentBuilder::new()
14//!     .provider("claude")
15//!     .model("sonnet")
16//!     .auto_approve(true)
17//!     .exec("write a hello world program")
18//!     .await?;
19//!
20//! println!("{}", output.result.unwrap_or_default());
21//!
22//! // Interactive session
23//! AgentBuilder::new()
24//!     .provider("claude")
25//!     .run(Some("initial prompt"))
26//!     .await?;
27//! # Ok(())
28//! # }
29//! ```
30
31use crate::agent::Agent;
32use crate::attachment::{self, Attachment};
33use crate::config::Config;
34use crate::factory::AgentFactory;
35use crate::json_validation;
36use crate::listen::{self, ListenFormat};
37use crate::output::AgentOutput;
38use crate::progress::{ProgressHandler, SilentProgress};
39use crate::providers::claude::Claude;
40use crate::providers::ollama::Ollama;
41use crate::sandbox::SandboxConfig;
42use crate::session::{SessionEntry, SessionStore};
43use crate::session_log::{
44    AgentLogEvent, LiveLogContext, LogEventCallback, SessionLogCoordinator, SessionLogMetadata,
45    live_adapter_for_provider, logs_dir,
46};
47use crate::streaming::StreamingSession;
48use crate::worktree;
49use anyhow::{Result, bail};
50use log::{debug, warn};
51use std::sync::Arc;
52use std::time::Duration;
53
54/// Format a Duration as a human-readable string (e.g., "5m", "1h30m").
55fn format_duration(d: Duration) -> String {
56    let total_secs = d.as_secs();
57    let h = total_secs / 3600;
58    let m = (total_secs % 3600) / 60;
59    let s = total_secs % 60;
60    let mut parts = Vec::new();
61    if h > 0 {
62        parts.push(format!("{h}h"));
63    }
64    if m > 0 {
65        parts.push(format!("{m}m"));
66    }
67    if s > 0 || parts.is_empty() {
68        parts.push(format!("{s}s"));
69    }
70    parts.join("")
71}
72
73/// Session discovery metadata — mirrors the `--name`, `--description`, and
74/// `--tag` flags on the `run`/`exec`/`spawn` CLI commands. Attached to a
75/// builder via [`AgentBuilder::name`], [`AgentBuilder::description`], and
76/// [`AgentBuilder::tag`].
77#[derive(Debug, Clone, Default)]
78pub struct SessionMetadata {
79    pub name: Option<String>,
80    pub description: Option<String>,
81    pub tags: Vec<String>,
82}
83
84/// Private guard returned by `AgentBuilder::start_session_log` — owns
85/// the coordinator (when `Auto`) or defers ownership to the caller (when
86/// `External`). Dropping the guard implicitly finalises the owned
87/// coordinator via its own `Drop` impl.
88struct SessionLogGuard {
89    /// Set when the builder started its own coordinator (`Auto`). Dropped
90    /// at the end of the terminal method, which finalises the log.
91    coordinator: Option<SessionLogCoordinator>,
92    wrapper_session_id: String,
93    log_path: Option<std::path::PathBuf>,
94    /// When the caller supplied an `External` coordinator, we keep a
95    /// writer clone so that `clear_event_callback` still works on exit.
96    external_writer: Option<crate::session_log::SessionLogWriter>,
97    /// Holds the externally-owned coordinator until the guard drops so
98    /// callers who pass `SessionLogMode::External` don't have to keep
99    /// their own handle alive. (They can; this is just a convenience.)
100    _owned_external: Option<SessionLogCoordinator>,
101}
102
103impl SessionLogGuard {
104    fn log_path_string(&self) -> Option<String> {
105        self.log_path
106            .as_ref()
107            .map(|p| p.to_string_lossy().to_string())
108    }
109
110    /// Flush the coordinator (emit `SessionEnded`, tear down the heartbeat /
111    /// live-adapter task). For `External` mode the caller retains ownership
112    /// — we merely detach our event callback so it stops firing after the
113    /// terminal method returns.
114    async fn finish(mut self, success: bool, error: Option<String>) {
115        // Run the finalization *before* detaching the callback so that the
116        // closing `SessionEnded` event still fires through the user's hook.
117        if let Some(coord) = self.coordinator.take() {
118            if let Err(e) = coord.finish(success, error).await {
119                warn!("Failed to finalize session log: {e}");
120            }
121        }
122        if let Some(w) = self.external_writer.take() {
123            let _ = w.clear_event_callback();
124        }
125    }
126}
127
128impl Drop for SessionLogGuard {
129    fn drop(&mut self) {
130        // Drop-path fallback: if a terminal method panicked or returned
131        // early without calling `finish`, still detach callbacks so user
132        // code stops receiving events. The owned coordinator's own `Drop`
133        // will kill the background task even without an explicit finish.
134        if let Some(ref w) = self.external_writer {
135            let _ = w.clear_event_callback();
136        }
137        if let Some(ref c) = self.coordinator {
138            let _ = c.writer().clear_event_callback();
139        }
140    }
141}
142
143/// Controls whether the builder manages a [`SessionLogCoordinator`] for the
144/// session it launches.
145///
146/// Default for [`AgentBuilder`] is [`SessionLogMode::Disabled`] so that
147/// existing Rust library callers see no side effects. The CLI and any
148/// caller that wants live event streaming should select
149/// [`SessionLogMode::Auto`].
150///
151/// [`SessionLogMode::External`] lets an advanced caller (e.g. the CLI's
152/// plan/review handlers, `zag-serve`) start and bookkeep its own
153/// coordinator — the builder will write through it without double-starting.
154#[derive(Default)]
155pub enum SessionLogMode {
156    /// No session log is started by the builder. No on-disk JSONL and no
157    /// live event callbacks.
158    #[default]
159    Disabled,
160    /// The builder starts its own [`SessionLogCoordinator`], tears it down
161    /// when the terminal method returns, and populates
162    /// [`AgentOutput::log_path`].
163    Auto,
164    /// The caller provides a pre-started [`SessionLogCoordinator`]; the
165    /// builder uses it verbatim and does not stop it at exit.
166    External(SessionLogCoordinator),
167}
168
169/// Builder for configuring and running agent sessions.
170///
171/// Use the builder pattern to set options, then call a terminal method
172/// (`exec`, `run`, `resume`, `continue_last`) to execute.
173pub struct AgentBuilder {
174    provider: Option<String>,
175    /// Set to true when the caller explicitly pinned a provider via
176    /// `.provider()`. When false (default), the fallback tier list is
177    /// allowed to downgrade to the next provider on binary/probe failure.
178    provider_explicit: bool,
179    model: Option<String>,
180    system_prompt: Option<String>,
181    root: Option<String>,
182    auto_approve: bool,
183    add_dirs: Vec<String>,
184    files: Vec<String>,
185    env_vars: Vec<(String, String)>,
186    worktree: Option<Option<String>>,
187    sandbox: Option<Option<String>>,
188    size: Option<String>,
189    json_mode: bool,
190    json_schema: Option<serde_json::Value>,
191    session_id: Option<String>,
192    metadata: SessionMetadata,
193    output_format: Option<String>,
194    input_format: Option<String>,
195    replay_user_messages: bool,
196    include_partial_messages: bool,
197    verbose: bool,
198    quiet: bool,
199    show_usage: bool,
200    max_turns: Option<u32>,
201    timeout: Option<std::time::Duration>,
202    mcp_config: Option<String>,
203    progress: Box<dyn ProgressHandler>,
204    session_log_mode: SessionLogMode,
205    /// Registered via [`AgentBuilder::on_log_event`] — fired for each
206    /// `AgentLogEvent` written to the session log while the terminal method
207    /// runs. Requires `session_log_mode != Disabled`.
208    log_event_callback: Option<LogEventCallback>,
209    /// Set via [`AgentBuilder::stream_events_to_stderr`] — overrides
210    /// `log_event_callback` to format and print each event to `stderr`.
211    stream_events_format: Option<ListenFormat>,
212    /// Whether the built-in stderr streamer should include reasoning events
213    /// (set via [`AgentBuilder::stream_show_thinking`]).
214    stream_show_thinking: bool,
215    /// Registered via [`AgentBuilder::on_spawn`] — invoked once with the
216    /// OS pid of the spawned agent subprocess right after spawn, before
217    /// the terminal wait. Useful for registering the child pid with an
218    /// external process store.
219    on_spawn_hook: Option<crate::agent::OnSpawnHook>,
220}
221
222impl Default for AgentBuilder {
223    fn default() -> Self {
224        Self::new()
225    }
226}
227
228impl AgentBuilder {
229    /// Create a new builder with default settings.
230    pub fn new() -> Self {
231        Self {
232            provider: None,
233            provider_explicit: false,
234            model: None,
235            system_prompt: None,
236            root: None,
237            auto_approve: false,
238            add_dirs: Vec::new(),
239            files: Vec::new(),
240            env_vars: Vec::new(),
241            worktree: None,
242            sandbox: None,
243            size: None,
244            json_mode: false,
245            json_schema: None,
246            session_id: None,
247            metadata: SessionMetadata::default(),
248            output_format: None,
249            input_format: None,
250            replay_user_messages: false,
251            include_partial_messages: false,
252            verbose: false,
253            quiet: false,
254            show_usage: false,
255            max_turns: None,
256            timeout: None,
257            mcp_config: None,
258            progress: Box::new(SilentProgress),
259            session_log_mode: SessionLogMode::Disabled,
260            log_event_callback: None,
261            stream_events_format: None,
262            stream_show_thinking: false,
263            on_spawn_hook: None,
264        }
265    }
266
267    /// Set the provider (e.g., "claude", "codex", "gemini", "copilot", "ollama").
268    ///
269    /// Calling this method pins the provider — it will NOT be downgraded to
270    /// another provider in the tier list if its binary is missing or the
271    /// startup probe fails. Omit this call (or set `provider` via the config
272    /// file) to allow automatic downgrading.
273    pub fn provider(mut self, provider: &str) -> Self {
274        self.provider = Some(provider.to_string());
275        self.provider_explicit = true;
276        self
277    }
278
279    /// Set the model (e.g., "sonnet", "opus", "small", "large").
280    pub fn model(mut self, model: &str) -> Self {
281        self.model = Some(model.to_string());
282        self
283    }
284
285    /// Set a system prompt to configure agent behavior.
286    pub fn system_prompt(mut self, prompt: &str) -> Self {
287        self.system_prompt = Some(prompt.to_string());
288        self
289    }
290
291    /// Set the root directory for the agent to operate in.
292    pub fn root(mut self, root: &str) -> Self {
293        self.root = Some(root.to_string());
294        self
295    }
296
297    /// Enable auto-approve mode (skip permission prompts).
298    pub fn auto_approve(mut self, approve: bool) -> Self {
299        self.auto_approve = approve;
300        self
301    }
302
303    /// Add an additional directory for the agent to include.
304    pub fn add_dir(mut self, dir: &str) -> Self {
305        self.add_dirs.push(dir.to_string());
306        self
307    }
308
309    /// Attach a file to the prompt (text files ≤50 KB inlined, others referenced).
310    pub fn file(mut self, path: &str) -> Self {
311        self.files.push(path.to_string());
312        self
313    }
314
315    /// Add an environment variable for the agent subprocess.
316    pub fn env(mut self, key: &str, value: &str) -> Self {
317        self.env_vars.push((key.to_string(), value.to_string()));
318        self
319    }
320
321    /// Enable worktree mode with an optional name.
322    pub fn worktree(mut self, name: Option<&str>) -> Self {
323        self.worktree = Some(name.map(String::from));
324        self
325    }
326
327    /// Enable sandbox mode with an optional name.
328    pub fn sandbox(mut self, name: Option<&str>) -> Self {
329        self.sandbox = Some(name.map(String::from));
330        self
331    }
332
333    /// Set the Ollama parameter size (e.g., "2b", "9b", "35b").
334    pub fn size(mut self, size: &str) -> Self {
335        self.size = Some(size.to_string());
336        self
337    }
338
339    /// Request JSON output from the agent.
340    pub fn json(mut self) -> Self {
341        self.json_mode = true;
342        self
343    }
344
345    /// Set a JSON schema for structured output validation.
346    /// Implies `json()`.
347    pub fn json_schema(mut self, schema: serde_json::Value) -> Self {
348        self.json_schema = Some(schema);
349        self.json_mode = true;
350        self
351    }
352
353    /// Set a specific session ID (UUID).
354    pub fn session_id(mut self, id: &str) -> Self {
355        self.session_id = Some(id.to_string());
356        self
357    }
358
359    /// Set a human-readable session name (mirrors the CLI's `--name`).
360    ///
361    /// Names are used by `zag input --name <n>`, `zag session list --name
362    /// <n>`, and for session discovery across the store. When the session
363    /// has a generated wrapper ID, the builder will persist this name to
364    /// the session store so CLI tools can find it later.
365    pub fn name(mut self, name: &str) -> Self {
366        self.metadata.name = Some(name.to_string());
367        self
368    }
369
370    /// Set a short description for the session (mirrors the CLI's
371    /// `--description`).
372    pub fn description(mut self, description: &str) -> Self {
373        self.metadata.description = Some(description.to_string());
374        self
375    }
376
377    /// Add a discovery tag for the session (mirrors the CLI's `--tag`,
378    /// repeatable).
379    pub fn tag(mut self, tag: &str) -> Self {
380        self.metadata.tags.push(tag.to_string());
381        self
382    }
383
384    /// Replace the full session metadata in one call.
385    pub fn metadata(mut self, metadata: SessionMetadata) -> Self {
386        self.metadata = metadata;
387        self
388    }
389
390    /// Set the output format (e.g., "text", "json", "json-pretty", "stream-json").
391    pub fn output_format(mut self, format: &str) -> Self {
392        self.output_format = Some(format.to_string());
393        self
394    }
395
396    /// Set the input format (Claude only, e.g., "text", "stream-json").
397    ///
398    /// No-op for Codex, Gemini, Copilot, and Ollama. See `docs/providers.md`
399    /// for the full per-provider support matrix.
400    pub fn input_format(mut self, format: &str) -> Self {
401        self.input_format = Some(format.to_string());
402        self
403    }
404
405    /// Re-emit user messages from stdin on stdout (Claude only).
406    ///
407    /// Only works with `--input-format stream-json` and `--output-format stream-json`.
408    /// [`exec_streaming`](Self::exec_streaming) auto-enables this flag, so most
409    /// callers never need to set it manually. No-op for non-Claude providers.
410    pub fn replay_user_messages(mut self, replay: bool) -> Self {
411        self.replay_user_messages = replay;
412        self
413    }
414
415    /// Include partial message chunks in streaming output (Claude only).
416    ///
417    /// Only works with `--output-format stream-json`. Defaults to `false`.
418    ///
419    /// When `false` (the default), streaming surfaces one `assistant_message`
420    /// event per complete assistant turn. When `true`, the agent instead emits
421    /// a stream of token-level partial `assistant_message` chunks as the model
422    /// generates them — use this for responsive, token-by-token UIs over
423    /// [`exec_streaming`](Self::exec_streaming). No-op for non-Claude providers.
424    pub fn include_partial_messages(mut self, include: bool) -> Self {
425        self.include_partial_messages = include;
426        self
427    }
428
429    /// Enable verbose output.
430    pub fn verbose(mut self, v: bool) -> Self {
431        self.verbose = v;
432        self
433    }
434
435    /// Enable quiet mode (suppress all non-essential output).
436    pub fn quiet(mut self, q: bool) -> Self {
437        self.quiet = q;
438        self
439    }
440
441    /// Show token usage statistics.
442    pub fn show_usage(mut self, show: bool) -> Self {
443        self.show_usage = show;
444        self
445    }
446
447    /// Set the maximum number of agentic turns.
448    pub fn max_turns(mut self, turns: u32) -> Self {
449        self.max_turns = Some(turns);
450        self
451    }
452
453    /// Set a timeout for exec. If the agent doesn't complete within this
454    /// duration, it will be killed and an error returned.
455    pub fn timeout(mut self, duration: std::time::Duration) -> Self {
456        self.timeout = Some(duration);
457        self
458    }
459
460    /// Set MCP server config for this invocation (Claude only).
461    ///
462    /// Accepts either a JSON string (`{"mcpServers": {...}}`) or a path to a JSON file.
463    /// No-op for Codex, Gemini, Copilot, and Ollama — those providers manage
464    /// MCP configuration through their own CLIs or do not support it. See
465    /// `docs/providers.md` for the full per-provider support matrix.
466    pub fn mcp_config(mut self, config: &str) -> Self {
467        self.mcp_config = Some(config.to_string());
468        self
469    }
470
471    /// Set a custom progress handler for status reporting.
472    pub fn on_progress(mut self, handler: Box<dyn ProgressHandler>) -> Self {
473        self.progress = handler;
474        self
475    }
476
477    /// Select how the builder manages the session log. See [`SessionLogMode`].
478    pub fn session_log(mut self, mode: SessionLogMode) -> Self {
479        self.session_log_mode = mode;
480        self
481    }
482
483    /// Shortcut for `.session_log(SessionLogMode::Auto)` when `true`, or
484    /// `.session_log(SessionLogMode::Disabled)` when `false`.
485    pub fn enable_session_log(mut self, enable: bool) -> Self {
486        self.session_log_mode = if enable {
487            SessionLogMode::Auto
488        } else {
489            SessionLogMode::Disabled
490        };
491        self
492    }
493
494    /// Register a callback fired for each `AgentLogEvent` written to the
495    /// session log during the terminal method. Implicitly switches
496    /// `session_log_mode` to [`SessionLogMode::Auto`] if it is currently
497    /// [`SessionLogMode::Disabled`].
498    pub fn on_log_event<F>(mut self, f: F) -> Self
499    where
500        F: Fn(&AgentLogEvent) + Send + Sync + 'static,
501    {
502        self.log_event_callback = Some(Arc::new(f));
503        if matches!(self.session_log_mode, SessionLogMode::Disabled) {
504            self.session_log_mode = SessionLogMode::Auto;
505        }
506        self
507    }
508
509    /// Convenience: tail the session log to stderr during the terminal
510    /// method, using the same formatters as the `zag listen` command.
511    ///
512    /// This is the drop-in replacement for the live stderr tail that a
513    /// previous shell-out-to-`zag` wrapper produced. Implicitly enables
514    /// session logging.
515    pub fn stream_events_to_stderr(mut self, format: ListenFormat) -> Self {
516        self.stream_events_format = Some(format);
517        if matches!(self.session_log_mode, SessionLogMode::Disabled) {
518            self.session_log_mode = SessionLogMode::Auto;
519        }
520        self
521    }
522
523    /// Include `Reasoning` events in the stderr stream when
524    /// [`stream_events_to_stderr`] is active. Off by default.
525    pub fn stream_show_thinking(mut self, show: bool) -> Self {
526        self.stream_show_thinking = show;
527        self
528    }
529
530    /// Register a callback invoked once with the OS pid of the spawned
531    /// agent subprocess, right after spawn and before the terminal
532    /// wait. Useful for registering the child pid with an external
533    /// process store so `zag ps kill self` (or equivalent) can SIGTERM
534    /// the agent child rather than the parent zag process.
535    ///
536    /// The callback fires once per spawn — on retries or resumes it
537    /// fires again for each new child. See [`crate::agent::OnSpawnHook`]
538    /// for the full semantics.
539    pub fn on_spawn<F>(mut self, f: F) -> Self
540    where
541        F: Fn(u32) + Send + Sync + 'static,
542    {
543        self.on_spawn_hook = Some(Arc::new(f));
544        self
545    }
546
547    /// Persist a `SessionEntry` to the session store so `zag session list`
548    /// and `zag input --name <n>` can discover this builder-spawned session.
549    ///
550    /// No-op when no metadata is set — callers who don't name their sessions
551    /// still get the old behavior of not leaving a trail in the session store.
552    ///
553    /// Returns the session ID that was persisted (either the caller-provided
554    /// one or a freshly generated UUID), so downstream logging can reference
555    /// the same ID.
556    fn persist_session_metadata_with_id(
557        &self,
558        provider: &str,
559        model: &str,
560        effective_root: Option<&str>,
561        explicit_session_id: Option<&str>,
562    ) -> Option<String> {
563        let has_metadata = self.metadata.name.is_some()
564            || self.metadata.description.is_some()
565            || !self.metadata.tags.is_empty();
566        if !has_metadata {
567            return None;
568        }
569
570        let session_id = explicit_session_id
571            .map(String::from)
572            .or_else(|| self.session_id.clone())
573            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
574        let workspace_path = effective_root
575            .map(String::from)
576            .or_else(|| self.root.clone())
577            .unwrap_or_else(|| {
578                std::env::current_dir()
579                    .map(|p| p.to_string_lossy().to_string())
580                    .unwrap_or_default()
581            });
582
583        let entry = SessionEntry {
584            session_id: session_id.clone(),
585            provider: provider.to_string(),
586            model: model.to_string(),
587            worktree_path: workspace_path,
588            worktree_name: String::new(),
589            created_at: chrono::Utc::now().to_rfc3339(),
590            provider_session_id: None,
591            sandbox_name: None,
592            is_worktree: self.worktree.is_some(),
593            discovered: false,
594            discovery_source: None,
595            log_path: None,
596            log_completeness: "partial".to_string(),
597            name: self.metadata.name.clone(),
598            description: self.metadata.description.clone(),
599            tags: self.metadata.tags.clone(),
600            dependencies: Vec::new(),
601            retried_from: None,
602            interactive: false,
603        };
604
605        let mut store = SessionStore::load(self.root.as_deref()).unwrap_or_default();
606        store.add(entry);
607        if let Err(e) = store.save(self.root.as_deref()) {
608            warn!("Failed to persist session metadata: {e}");
609        }
610
611        Some(session_id)
612    }
613
614    /// Resolve file attachments and prepend them to a prompt.
615    fn prepend_files(&self, prompt: &str) -> Result<String> {
616        if self.files.is_empty() {
617            return Ok(prompt.to_string());
618        }
619        let attachments: Vec<Attachment> = self
620            .files
621            .iter()
622            .map(|f| Attachment::from_path(std::path::Path::new(f)))
623            .collect::<Result<Vec<_>>>()?;
624        let prefix = attachment::format_attachments_prefix(&attachments);
625        Ok(format!("{prefix}{prompt}"))
626    }
627
628    /// Resolve the effective provider name.
629    fn resolve_provider(&self) -> Result<String> {
630        if let Some(ref p) = self.provider {
631            let p = p.to_lowercase();
632            if !Config::VALID_PROVIDERS.contains(&p.as_str()) {
633                bail!(
634                    "Invalid provider '{}'. Available: {}",
635                    p,
636                    Config::VALID_PROVIDERS.join(", ")
637                );
638            }
639            return Ok(p);
640        }
641        let config = Config::load(self.root.as_deref()).unwrap_or_default();
642        if let Some(p) = config.provider() {
643            return Ok(p.to_string());
644        }
645        Ok("claude".to_string())
646    }
647
648    /// Create and configure the agent.
649    ///
650    /// Returns the constructed agent along with the provider name that
651    /// actually succeeded. When `provider_explicit` is false, the factory
652    /// may downgrade to another provider in the tier list, so the returned
653    /// provider can differ from the one passed in.
654    async fn create_agent(&self, provider: &str) -> Result<(Box<dyn Agent + Send + Sync>, String)> {
655        // Apply system_prompt config fallback
656        let base_system_prompt = self.system_prompt.clone().or_else(|| {
657            Config::load(self.root.as_deref())
658                .unwrap_or_default()
659                .system_prompt()
660                .map(String::from)
661        });
662
663        // Augment system prompt with JSON instructions for non-Claude agents
664        let system_prompt = if self.json_mode && provider != "claude" {
665            let mut prompt = base_system_prompt.unwrap_or_default();
666            if let Some(ref schema) = self.json_schema {
667                let schema_str = serde_json::to_string_pretty(schema).unwrap_or_default();
668                prompt.push_str(&format!(
669                    "\n\nYou MUST respond with valid JSON only. No markdown fences, no explanations. \
670                     Your response must conform to this JSON schema:\n{schema_str}"
671                ));
672            } else {
673                prompt.push_str(
674                    "\n\nYou MUST respond with valid JSON only. No markdown fences, no explanations.",
675                );
676            }
677            Some(prompt)
678        } else {
679            base_system_prompt
680        };
681
682        self.progress
683            .on_spinner_start(&format!("Initializing {provider} agent"));
684
685        let progress = &*self.progress;
686        let mut on_downgrade = |from: &str, to: &str, reason: &str| {
687            progress.on_warning(&format!("Downgrading provider: {from} → {to} ({reason})"));
688        };
689        let (mut agent, effective_provider) = AgentFactory::create_with_fallback(
690            provider,
691            self.provider_explicit,
692            system_prompt,
693            self.model.clone(),
694            self.root.clone(),
695            self.auto_approve,
696            self.add_dirs.clone(),
697            &mut on_downgrade,
698        )
699        .await?;
700        let provider = effective_provider.as_str();
701
702        // Apply max_turns: explicit > config > none
703        let effective_max_turns = self.max_turns.or_else(|| {
704            Config::load(self.root.as_deref())
705                .unwrap_or_default()
706                .max_turns()
707        });
708        if let Some(turns) = effective_max_turns {
709            agent.set_max_turns(turns);
710        }
711
712        // Set output format
713        let mut output_format = self.output_format.clone();
714        if self.json_mode && output_format.is_none() {
715            output_format = Some("json".to_string());
716            if provider != "claude" {
717                agent.set_capture_output(true);
718            }
719        }
720        agent.set_output_format(output_format);
721
722        // Configure Claude-specific options
723        if provider == "claude"
724            && let Some(claude_agent) = agent.as_any_mut().downcast_mut::<Claude>()
725        {
726            claude_agent.set_verbose(self.verbose);
727            if let Some(ref session_id) = self.session_id {
728                claude_agent.set_session_id(session_id.clone());
729            }
730            if let Some(ref input_fmt) = self.input_format {
731                claude_agent.set_input_format(Some(input_fmt.clone()));
732            }
733            if self.replay_user_messages {
734                claude_agent.set_replay_user_messages(true);
735            }
736            if self.include_partial_messages {
737                claude_agent.set_include_partial_messages(true);
738            }
739            if self.json_mode
740                && let Some(ref schema) = self.json_schema
741            {
742                let schema_str = serde_json::to_string(schema).unwrap_or_default();
743                claude_agent.set_json_schema(Some(schema_str));
744            }
745            if self.mcp_config.is_some() {
746                claude_agent.set_mcp_config(self.mcp_config.clone());
747            }
748        }
749
750        // Configure Ollama-specific options
751        if provider == "ollama"
752            && let Some(ollama_agent) = agent.as_any_mut().downcast_mut::<Ollama>()
753        {
754            let config = Config::load(self.root.as_deref()).unwrap_or_default();
755            if let Some(ref size) = self.size {
756                let resolved = config.ollama_size_for(size);
757                ollama_agent.set_size(resolved.to_string());
758            }
759        }
760
761        // Configure sandbox
762        if let Some(ref sandbox_opt) = self.sandbox {
763            let sandbox_name = sandbox_opt
764                .as_deref()
765                .map(String::from)
766                .unwrap_or_else(crate::sandbox::generate_name);
767            let template = crate::sandbox::template_for_provider(provider);
768            let workspace = self.root.clone().unwrap_or_else(|| ".".to_string());
769            agent.set_sandbox(SandboxConfig {
770                name: sandbox_name,
771                template: template.to_string(),
772                workspace,
773            });
774        }
775
776        if !self.env_vars.is_empty() {
777            agent.set_env_vars(self.env_vars.clone());
778        }
779
780        if let Some(ref hook) = self.on_spawn_hook {
781            agent.set_on_spawn_hook(hook.clone());
782        }
783
784        self.progress.on_spinner_finish();
785        self.progress.on_success(&format!(
786            "{} initialized with model {}",
787            provider,
788            agent.get_model()
789        ));
790
791        Ok((agent, effective_provider))
792    }
793
794    /// Start (or adopt) a [`SessionLogCoordinator`] for the session about
795    /// to run, honouring the builder's [`SessionLogMode`] and wiring up any
796    /// registered `on_log_event` / `stream_events_to_stderr` callback.
797    ///
798    /// Returns a guard that owns the coordinator (where applicable) and the
799    /// resolved `wrapper_session_id` + log path, or `None` when logging is
800    /// disabled.
801    fn start_session_log(
802        &mut self,
803        command: &str,
804        resumed: bool,
805        provider: &str,
806        model: &str,
807    ) -> Option<SessionLogGuard> {
808        let mode = std::mem::replace(&mut self.session_log_mode, SessionLogMode::Disabled);
809        match mode {
810            SessionLogMode::Disabled => None,
811            SessionLogMode::External(c) => {
812                let wrapper_session_id = c
813                    .writer()
814                    .log_path()
815                    .ok()
816                    .and_then(|p| p.file_stem().map(|s| s.to_string_lossy().to_string()))
817                    .unwrap_or_default();
818                let log_path = c.writer().log_path().ok();
819                self.apply_event_callback(c.writer());
820                Some(SessionLogGuard {
821                    coordinator: None, // externally owned
822                    wrapper_session_id,
823                    log_path,
824                    external_writer: Some(c.writer().clone()),
825                    _owned_external: Some(c),
826                })
827            }
828            SessionLogMode::Auto => {
829                let wrapper_session_id = self
830                    .session_id
831                    .clone()
832                    .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
833                let metadata = SessionLogMetadata {
834                    provider: provider.to_string(),
835                    wrapper_session_id: wrapper_session_id.clone(),
836                    provider_session_id: None,
837                    workspace_path: self.root.clone().or_else(|| {
838                        std::env::current_dir()
839                            .ok()
840                            .map(|p| p.to_string_lossy().to_string())
841                    }),
842                    command: command.to_string(),
843                    model: Some(model.to_string()),
844                    resumed,
845                    backfilled: false,
846                };
847                let live_ctx = LiveLogContext {
848                    root: self.root.clone(),
849                    provider_session_id: metadata.provider_session_id.clone(),
850                    workspace_path: metadata.workspace_path.clone(),
851                    started_at: chrono::Utc::now(),
852                    is_worktree: self.worktree.is_some(),
853                };
854                let adapter = live_adapter_for_provider(provider, live_ctx, true);
855                let callback = self.build_event_callback();
856                match SessionLogCoordinator::start_with_callback(
857                    &logs_dir(self.root.as_deref()),
858                    metadata,
859                    adapter,
860                    callback,
861                ) {
862                    Ok(c) => {
863                        let _ = c.writer().set_global_index_dir(Config::global_base_dir());
864                        let log_path = c.writer().log_path().ok();
865                        Some(SessionLogGuard {
866                            coordinator: Some(c),
867                            wrapper_session_id,
868                            log_path,
869                            external_writer: None,
870                            _owned_external: None,
871                        })
872                    }
873                    Err(e) => {
874                        warn!("Failed to start session log coordinator: {e}");
875                        None
876                    }
877                }
878            }
879        }
880    }
881
882    /// Build a combined event callback from any registered `on_log_event`
883    /// and `stream_events_to_stderr` setters. Returns `None` when neither
884    /// is set so the writer doesn't pay any per-event cost.
885    fn build_event_callback(&self) -> Option<LogEventCallback> {
886        let user_cb = self.log_event_callback.clone();
887        let stream_fmt = self.stream_events_format;
888        let show_thinking = self.stream_show_thinking;
889
890        if user_cb.is_none() && stream_fmt.is_none() {
891            return None;
892        }
893
894        Some(Arc::new(move |event: &AgentLogEvent| {
895            if let Some(ref user) = user_cb {
896                user(event);
897            }
898            if let Some(fmt) = stream_fmt
899                && let Some(text) = listen::format_event(event, fmt, show_thinking)
900            {
901                eprintln!("{text}");
902            }
903        }))
904    }
905
906    /// Register the builder's callback on an externally-owned writer (used
907    /// by [`SessionLogMode::External`] — the coordinator is already running
908    /// so we can't register the callback before `SessionStarted`, but any
909    /// post-adoption event will still fire).
910    fn apply_event_callback(&self, writer: &crate::session_log::SessionLogWriter) {
911        if let Some(cb) = self.build_event_callback() {
912            if let Err(e) = writer.set_event_callback(cb) {
913                warn!("Failed to register session log event callback: {e}");
914            }
915        }
916    }
917
918    /// Run the agent non-interactively and return structured output.
919    ///
920    /// This is the primary entry point for programmatic use.
921    pub async fn exec(self, prompt: &str) -> Result<AgentOutput> {
922        let provider = self.resolve_provider()?;
923        debug!("exec: provider={provider}");
924
925        // Set up worktree if requested
926        let effective_root = if let Some(ref wt_opt) = self.worktree {
927            let wt_name = wt_opt
928                .as_deref()
929                .map(String::from)
930                .unwrap_or_else(worktree::generate_name);
931            let repo_root = worktree::git_repo_root(self.root.as_deref())?;
932            let wt_path = worktree::create_worktree(&repo_root, &wt_name)?;
933            self.progress
934                .on_success(&format!("Worktree created at {}", wt_path.display()));
935            Some(wt_path.to_string_lossy().to_string())
936        } else {
937            self.root.clone()
938        };
939
940        let mut builder = self;
941        if effective_root.is_some() {
942            builder.root = effective_root;
943        }
944
945        let (agent, provider) = builder.create_agent(&provider).await?;
946
947        // Start (or adopt) the session log coordinator. Held for the whole
948        // terminal method; dropped after cleanup so the log file is
949        // finalised exactly once.
950        let log_guard = builder.start_session_log("exec", false, &provider, agent.get_model());
951
952        // Persist the session entry so discovery (session list --name, input
953        // --name) works for builder-spawned sessions. No-op when no metadata
954        // is set. When session logging is active, share its wrapper_session_id
955        // so the store entry and the JSONL log agree.
956        let _ = builder.persist_session_metadata_with_id(
957            &provider,
958            agent.get_model(),
959            builder.root.as_deref(),
960            log_guard.as_ref().map(|g| g.wrapper_session_id.as_str()),
961        );
962
963        // Prepend file attachments
964        let prompt_with_files = builder.prepend_files(prompt)?;
965
966        // Handle JSON mode with prompt wrapping for non-Claude agents
967        let effective_prompt = if builder.json_mode && provider != "claude" {
968            format!(
969                "IMPORTANT: You MUST respond with valid JSON only. No markdown, no explanation.\n\n{prompt_with_files}"
970            )
971        } else {
972            prompt_with_files
973        };
974
975        let result = if let Some(timeout_dur) = builder.timeout {
976            match tokio::time::timeout(timeout_dur, agent.run(Some(&effective_prompt))).await {
977                Ok(r) => r?,
978                Err(_) => {
979                    agent.cleanup().await.ok();
980                    bail!("Agent timed out after {}", format_duration(timeout_dur));
981                }
982            }
983        } else {
984            agent.run(Some(&effective_prompt)).await?
985        };
986
987        // Clean up
988        agent.cleanup().await?;
989
990        let log_path_string = log_guard.as_ref().and_then(|g| g.log_path_string());
991
992        if let Some(mut output) = result {
993            // Validate JSON output if schema is provided
994            if let Some(ref schema) = builder.json_schema {
995                if !builder.json_mode {
996                    warn!(
997                        "json_schema is set but json_mode is false — \
998                         schema will not be sent to the agent, only used for output validation"
999                    );
1000                }
1001                if let Some(ref result_text) = output.result {
1002                    debug!(
1003                        "exec: validating result ({} bytes): {:.300}",
1004                        result_text.len(),
1005                        result_text
1006                    );
1007                    if let Err(errors) = json_validation::validate_json_schema(result_text, schema)
1008                    {
1009                        let preview = if result_text.len() > 500 {
1010                            &result_text[..500]
1011                        } else {
1012                            result_text.as_str()
1013                        };
1014                        bail!(
1015                            "JSON schema validation failed: {}\nRaw agent output ({} bytes):\n{}",
1016                            errors.join("; "),
1017                            result_text.len(),
1018                            preview
1019                        );
1020                    }
1021                }
1022            }
1023            output.log_path = log_path_string;
1024            let success = !output.is_error;
1025            let err_msg = output.error_message.clone();
1026            if let Some(g) = log_guard {
1027                g.finish(success, err_msg).await;
1028            }
1029            Ok(output)
1030        } else {
1031            // Agent returned no structured output — create a minimal one
1032            let mut output = AgentOutput::from_text(&provider, "");
1033            output.log_path = log_path_string;
1034            if let Some(g) = log_guard {
1035                g.finish(true, None).await;
1036            }
1037            Ok(output)
1038        }
1039    }
1040
1041    /// Run the agent with streaming input and output (Claude only).
1042    ///
1043    /// Returns a [`StreamingSession`] that allows sending NDJSON messages to
1044    /// the agent's stdin and reading events from stdout. Automatically
1045    /// configures `--input-format stream-json`, `--output-format stream-json`,
1046    /// and `--replay-user-messages`.
1047    ///
1048    /// # Default emission granularity
1049    ///
1050    /// By default `assistant_message` events are emitted **once per complete
1051    /// assistant turn** — you get one event when the model finishes speaking,
1052    /// not a stream of token chunks. For responsive, token-level UIs call
1053    /// [`include_partial_messages(true)`](Self::include_partial_messages)
1054    /// on the builder before `exec_streaming`; the session will then emit
1055    /// partial `assistant_message` chunks as the model generates them.
1056    ///
1057    /// The default is kept `false` so existing callers that render whole-turn
1058    /// bubbles are not broken. See `docs/providers.md` for the full
1059    /// per-provider flag support matrix.
1060    ///
1061    /// # Event lifecycle
1062    ///
1063    /// The session emits a unified
1064    /// [`Event::Result`](crate::output::Event::Result) at the **end of every
1065    /// agent turn** — not only at final session end. Use that event as the
1066    /// authoritative turn-boundary signal. After a `Result`, the session
1067    /// remains open and accepts another
1068    /// [`send_user_message`](StreamingSession::send_user_message) for the next
1069    /// turn. Call
1070    /// [`close_input`](StreamingSession::close_input) followed by
1071    /// [`wait`](StreamingSession::wait) to terminate the session cleanly.
1072    ///
1073    /// Do not depend on replayed `user_message` events to detect turn
1074    /// boundaries; those only appear while `--replay-user-messages` is set.
1075    ///
1076    /// # Examples
1077    ///
1078    /// ```no_run
1079    /// use zag_agent::builder::AgentBuilder;
1080    /// use zag_agent::output::Event;
1081    ///
1082    /// # async fn example() -> anyhow::Result<()> {
1083    /// let mut session = AgentBuilder::new()
1084    ///     .provider("claude")
1085    ///     .exec_streaming("initial prompt")
1086    ///     .await?;
1087    ///
1088    /// // Drain the first turn until Result.
1089    /// while let Some(event) = session.next_event().await? {
1090    ///     println!("{:?}", event);
1091    ///     if matches!(event, Event::Result { .. }) {
1092    ///         break;
1093    ///     }
1094    /// }
1095    ///
1096    /// // Follow-up turn.
1097    /// session.send_user_message("do something else").await?;
1098    /// while let Some(event) = session.next_event().await? {
1099    ///     if matches!(event, Event::Result { .. }) {
1100    ///         break;
1101    ///     }
1102    /// }
1103    ///
1104    /// session.close_input();
1105    /// session.wait().await?;
1106    /// # Ok(())
1107    /// # }
1108    /// ```
1109    pub async fn exec_streaming(self, prompt: &str) -> Result<StreamingSession> {
1110        let provider = self.resolve_provider()?;
1111        debug!("exec_streaming: provider={provider}");
1112
1113        if provider != "claude" {
1114            bail!("Streaming input is only supported by the Claude provider");
1115        }
1116
1117        // Prepend file attachments
1118        let prompt_with_files = self.prepend_files(prompt)?;
1119
1120        // Streaming only works on Claude — do not allow the fallback loop
1121        // to downgrade to a provider that can't stream.
1122        let mut builder = self;
1123        builder.provider_explicit = true;
1124        let (agent, _provider) = builder.create_agent(&provider).await?;
1125
1126        // Downcast to Claude to call execute_streaming
1127        let claude_agent = agent
1128            .as_any_ref()
1129            .downcast_ref::<Claude>()
1130            .ok_or_else(|| anyhow::anyhow!("Failed to downcast agent to Claude"))?;
1131
1132        claude_agent.execute_streaming(Some(&prompt_with_files))
1133    }
1134
1135    /// Start an interactive agent session.
1136    ///
1137    /// This takes over stdin/stdout for the duration of the session.
1138    pub async fn run(self, prompt: Option<&str>) -> Result<()> {
1139        let provider = self.resolve_provider()?;
1140        debug!("run: provider={provider}");
1141
1142        // Prepend file attachments
1143        let prompt_with_files = match prompt {
1144            Some(p) => Some(self.prepend_files(p)?),
1145            None if !self.files.is_empty() => {
1146                let attachments: Vec<Attachment> = self
1147                    .files
1148                    .iter()
1149                    .map(|f| Attachment::from_path(std::path::Path::new(f)))
1150                    .collect::<Result<Vec<_>>>()?;
1151                Some(attachment::format_attachments_prefix(&attachments))
1152            }
1153            None => None,
1154        };
1155
1156        let mut builder = self;
1157        let (agent, effective_provider) = builder.create_agent(&provider).await?;
1158        let log_guard =
1159            builder.start_session_log("run", false, &effective_provider, agent.get_model());
1160        let _ = builder.persist_session_metadata_with_id(
1161            &effective_provider,
1162            agent.get_model(),
1163            builder.root.as_deref(),
1164            log_guard.as_ref().map(|g| g.wrapper_session_id.as_str()),
1165        );
1166        agent.run_interactive(prompt_with_files.as_deref()).await?;
1167        agent.cleanup().await?;
1168        if let Some(g) = log_guard {
1169            g.finish(true, None).await;
1170        }
1171        Ok(())
1172    }
1173
1174    /// Resume a previous session by ID.
1175    pub async fn resume(self, session_id: &str) -> Result<()> {
1176        let provider = self.resolve_provider()?;
1177        debug!("resume: provider={provider}, session={session_id}");
1178
1179        // Resuming must stick with the recorded provider — no downgrade.
1180        let mut builder = self;
1181        builder.provider_explicit = true;
1182        let (agent, effective_provider) = builder.create_agent(&provider).await?;
1183        let log_guard =
1184            builder.start_session_log("resume", true, &effective_provider, agent.get_model());
1185        agent.run_resume(Some(session_id), false).await?;
1186        agent.cleanup().await?;
1187        if let Some(g) = log_guard {
1188            g.finish(true, None).await;
1189        }
1190        Ok(())
1191    }
1192
1193    /// Resume the most recent session.
1194    pub async fn continue_last(self) -> Result<()> {
1195        let provider = self.resolve_provider()?;
1196        debug!("continue_last: provider={provider}");
1197
1198        // Resuming must stick with the recorded provider — no downgrade.
1199        let mut builder = self;
1200        builder.provider_explicit = true;
1201        let (agent, effective_provider) = builder.create_agent(&provider).await?;
1202        let log_guard =
1203            builder.start_session_log("resume", true, &effective_provider, agent.get_model());
1204        agent.run_resume(None, true).await?;
1205        agent.cleanup().await?;
1206        if let Some(g) = log_guard {
1207            g.finish(true, None).await;
1208        }
1209        Ok(())
1210    }
1211}
1212
1213#[cfg(test)]
1214#[path = "builder_tests.rs"]
1215mod tests;