Skip to main content

smol_workflow_engine/
workflow.rs

1use crate::agent_providers::{
2    create_agent_provider, AgentProvider, AgentProviderContext, AgentProviderResult,
3    AgentProviderRunInput, AgentRunIsolation, AgentUsage, AgentUsageCost,
4};
5use crate::environment::{
6    AgentExecutionEnvironment, LocalExecutionEnvironment, SandboxExecutionEnvironment,
7};
8use crate::js_runtime::rquickjs::RQuickJSWorkflowRuntime;
9use crate::js_runtime::{
10    WorkflowBudgetSnapshot, WorkflowJSRuntime, WorkflowModuleInput, WorkflowModuleOutput,
11    WorkflowRef, WorkflowRuntimeCall, WorkflowRuntimeExecution, WorkflowRuntimePoll,
12    WorkflowRuntimeRequest, WorkflowRuntimeRequestResolution,
13};
14use crate::metadata::{read_workflow_metadata, WorkflowMetadata};
15use anyhow::{anyhow, bail, Context};
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use smol_workflow_sandbox::{
19    Metadata as SandboxMetadata, OpenSandboxRequest, ProfileRef, WorkspaceSync,
20};
21use std::collections::{BTreeMap, BTreeSet, VecDeque};
22use std::fs;
23use std::path::{Path, PathBuf};
24use std::process::Command as StdCommand;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{mpsc, watch};
28use tokio::task::{JoinSet, LocalSet};
29
30pub use crate::events::{
31    WorkflowEvent, WorkflowEventMetadata, WorkflowEventSink, WorkflowEventType,
32};
33
34#[async_trait::async_trait]
35pub trait AgentSessionLogSink: Send + Sync {
36    async fn write_agent_result(
37        &self,
38        provider: &str,
39        result: &AgentProviderResult,
40    ) -> anyhow::Result<()>;
41}
42
43#[async_trait::async_trait]
44pub trait WorkflowAgentRunner: Send + Sync {
45    async fn run_agent(
46        &self,
47        default_provider: Arc<dyn AgentProvider>,
48        provider_override: Option<String>,
49        input: AgentProviderRunInput,
50    ) -> anyhow::Result<AgentProviderResult>;
51
52    /// Whether the workflow scheduler should wrap this runner's `run_agent` call
53    /// in the per-agent retry loop.
54    ///
55    /// Most runners should use the default `true`: their `run_agent` method is a
56    /// single agent/provider boundary, so retrying it is safe and keeps retry
57    /// behavior centralized in the runtime scheduler.
58    ///
59    /// Runners that perform their own checkpointing or replay should return
60    /// `false` and apply retry internally around the nondeterministic provider
61    /// call. For example, the SQLite durable runner must not have the scheduler
62    /// retry its whole `run_agent` method, because each call advances durable
63    /// occurrence counters and may create a distinct checkpoint such as
64    /// `step:sig_x#2` instead of retrying the original durable step.
65    fn retry_in_runtime(&self) -> bool {
66        true
67    }
68
69    async fn sleep(&self, duration_ms: u64) -> anyhow::Result<()> {
70        tokio::time::sleep(std::time::Duration::from_millis(duration_ms)).await;
71        Ok(())
72    }
73}
74
75#[derive(Debug, Default)]
76pub struct DirectWorkflowAgentRunner;
77
78#[async_trait::async_trait]
79impl WorkflowAgentRunner for DirectWorkflowAgentRunner {
80    async fn run_agent(
81        &self,
82        default_provider: Arc<dyn AgentProvider>,
83        provider_override: Option<String>,
84        input: AgentProviderRunInput,
85    ) -> anyhow::Result<AgentProviderResult> {
86        run_agent_provider(default_provider, provider_override, input).await
87    }
88}
89
90pub struct RunWorkflowOptions {
91    pub script_path: PathBuf,
92    /// Host working directory used for agent provider cwd, worktree isolation,
93    /// and sandbox workspace sync. Defaults to the workflow script directory.
94    pub workflow_cwd: Option<PathBuf>,
95    pub args: Value,
96    pub agent_provider: Arc<dyn AgentProvider>,
97    pub model_map: BTreeMap<String, String>,
98    pub budget_total: Option<u64>,
99    pub budget_spent: u64,
100    pub nesting_depth: usize,
101    pub max_parallel_agent_requests: Option<usize>,
102    pub agent_runner: Option<Arc<dyn WorkflowAgentRunner>>,
103    pub cancel_rx: Option<watch::Receiver<bool>>,
104    pub event_sink: Option<Arc<dyn WorkflowEventSink>>,
105    pub event_parent_step_id: Option<String>,
106    pub event_stream_start: Option<Instant>,
107    pub session_log_sink: Option<Arc<dyn AgentSessionLogSink>>,
108}
109
110#[derive(Debug)]
111pub struct RunWorkflowResult {
112    pub output: WorkflowModuleOutput,
113    pub logs: Vec<Vec<Value>>,
114    pub phases: Vec<WorkflowPhaseCall>,
115    pub agent_calls: Vec<WorkflowRuntimeRequest>,
116    pub workflow_calls: Vec<WorkflowRuntimeRequest>,
117    pub budget: WorkflowBudgetSnapshot,
118    pub token_usage: WorkflowTokenUsage,
119    pub token_usage_by_phase: std::collections::BTreeMap<String, WorkflowTokenUsage>,
120    pub agent_runs: Vec<WorkflowAgentRunSummary>,
121}
122
123#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
124#[serde(rename_all = "camelCase")]
125pub struct WorkflowTokenUsage {
126    pub input_tokens: u64,
127    pub output_tokens: u64,
128    pub cache_read_tokens: u64,
129    pub cache_write_tokens: u64,
130    pub total_tokens: u64,
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub cost: Option<AgentUsageCost>,
133}
134
135#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
136#[serde(rename_all = "camelCase")]
137pub struct WorkflowAgentRunSummary {
138    pub id: String,
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub phase: Option<String>,
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub provider: Option<String>,
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub model: Option<String>,
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub provider_session_id: Option<String>,
147    #[serde(skip_serializing_if = "Option::is_none")]
148    pub usage: Option<AgentUsage>,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub isolation: Option<AgentRunIsolation>,
151}
152
153#[derive(Debug, Clone, PartialEq)]
154pub struct WorkflowPhaseCall {
155    pub name: String,
156    pub options: Option<Value>,
157}
158
159pub async fn run_workflow(options: RunWorkflowOptions) -> anyhow::Result<RunWorkflowResult> {
160    LocalSet::new().run_until(run_workflow_inner(options)).await
161}
162
163async fn run_workflow_inner(options: RunWorkflowOptions) -> anyhow::Result<RunWorkflowResult> {
164    log::debug!(
165        "run_workflow start script={} provider={} nesting_depth={} budget_total={:?} budget_spent={}",
166        options.script_path.display(),
167        options.agent_provider.name(),
168        options.nesting_depth,
169        options.budget_total,
170        options.budget_spent
171    );
172    let script_path = fs::canonicalize(&options.script_path).with_context(|| {
173        format!(
174            "failed to resolve workflow script {}",
175            options.script_path.display()
176        )
177    })?;
178    let metadata = read_workflow_metadata(&script_path)?.ok_or_else(|| {
179        anyhow!("Workflow script must export valid literal metadata as `export const meta = {{ name, description, ... }}`")
180    })?;
181    log::debug!(
182        "workflow metadata loaded name={} phases={}",
183        metadata.name,
184        metadata.phases.len()
185    );
186    let workflow_cwd = match options.workflow_cwd {
187        Some(cwd) => Some(
188            fs::canonicalize(&cwd)
189                .with_context(|| format!("failed to resolve workflow cwd {}", cwd.display()))?,
190        ),
191        None => script_path.parent().map(Path::to_path_buf),
192    };
193    let source = fs::read_to_string(&script_path)
194        .with_context(|| format!("failed to read workflow script {}", script_path.display()))?;
195    let runtime = RQuickJSWorkflowRuntime::new();
196    let execution = runtime.start_module(WorkflowModuleInput {
197        source,
198        source_name: script_path.to_string_lossy().into_owned(),
199        args: options.args,
200        budget: WorkflowBudgetSnapshot {
201            total: options.budget_total,
202            spent: options.budget_spent,
203        },
204        sandbox: Default::default(),
205    })?;
206
207    let (js_commands, js_command_rx) = mpsc::channel::<JsCommand>(64);
208    let (js_event_tx, mut js_events) = mpsc::channel::<JsEvent>(64);
209    let js_task = tokio::task::spawn_local(js_runtime_actor(execution, js_command_rx, js_event_tx));
210
211    let emit_lifecycle_events = options.event_sink.is_some();
212    let event_start = options.event_stream_start.unwrap_or_else(Instant::now);
213
214    let mut state = RunState {
215        script_path,
216        workflow_cwd,
217        metadata,
218        event_start,
219        agent_provider: options.agent_provider,
220        model_map: options.model_map,
221        logs: Vec::new(),
222        phases: Vec::new(),
223        agent_calls: Vec::new(),
224        workflow_calls: Vec::new(),
225        budget: WorkflowBudgetSnapshot {
226            total: options.budget_total,
227            spent: options.budget_spent,
228        },
229        token_usage: WorkflowTokenUsage::default(),
230        token_usage_by_phase: Default::default(),
231        agent_runs: Vec::new(),
232        active_request_ids: BTreeSet::new(),
233        nesting_depth: options.nesting_depth,
234        max_parallel_agent_requests: options.max_parallel_agent_requests,
235        agent_runner: options
236            .agent_runner
237            .unwrap_or_else(|| Arc::new(DirectWorkflowAgentRunner)),
238        cancel_rx: options.cancel_rx,
239        event_sink: options.event_sink,
240        event_parent_step_id: options.event_parent_step_id,
241        session_log_sink: options.session_log_sink,
242    };
243
244    let mut pending_requests = VecDeque::<WorkflowRuntimeRequest>::new();
245    let mut agent_tasks = JoinSet::<AgentTaskCompletion>::new();
246    let mut sleep_tasks = JoinSet::<SleepTaskCompletion>::new();
247
248    if emit_lifecycle_events {
249        if let Err(error) = state
250            .emit_event(WorkflowEvent::started(rfc3339_now()?))
251            .await
252        {
253            let _ = send_js_command(&js_commands, JsCommand::Shutdown).await;
254            let _ = js_task.await;
255            return Err(error);
256        }
257    }
258
259    let workflow_result: anyhow::Result<RunWorkflowResult> = loop {
260        if let Err(error) = state
261            .start_pending_requests(
262                &mut pending_requests,
263                &mut agent_tasks,
264                &mut sleep_tasks,
265                &js_commands,
266            )
267            .await
268        {
269            break Err(error);
270        }
271
272        tokio::select! {
273            biased;
274            () = wait_for_cancellation(&mut state.cancel_rx) => {
275                break state.cancel_workflow(
276                    &mut pending_requests,
277                    &mut agent_tasks,
278                    &mut sleep_tasks,
279                    &js_commands,
280                    &mut js_events,
281                ).await;
282            }
283            event = js_events.recv() => {
284                let event = match event {
285                    Some(event) => event,
286                    None => break Err(anyhow!("JavaScript runtime actor stopped unexpectedly")),
287                };
288                match state.handle_js_event(event, &mut pending_requests).await {
289                    Ok(Some(result)) => break Ok(result),
290                    Ok(None) => {}
291                    Err(error) => break Err(error),
292                }
293            }
294            completion = agent_tasks.join_next(), if !agent_tasks.is_empty() => {
295                let completion = match completion {
296                    Some(Ok(completion)) => completion,
297                    Some(Err(error)) => break Err(anyhow!("agent provider task failed: {error}")),
298                    None => break Err(anyhow!("agent task set ended unexpectedly")),
299                };
300                let AgentTaskCompletion { id, input, provider, result } = completion;
301                state.active_request_ids.remove(&id);
302                let resolution = match result {
303                    Ok(result) => match state.apply_agent_result(&id, &input, provider, result).await {
304                        Ok(value) => WorkflowRuntimeRequestResolution::OkWithBudget {
305                            value,
306                            budget: state.budget.clone(),
307                        },
308                        Err(error) => WorkflowRuntimeRequestResolution::Err {
309                            message: error.to_string(),
310                        },
311                    },
312                    Err(error) => {
313                        let message = error.to_string();
314                        if let Err(emit_error) = state.emit_agent_failed_event(&id, provider.as_deref(), &message).await {
315                            log::debug!("failed to emit agent failure event: {emit_error:#}");
316                        }
317                        WorkflowRuntimeRequestResolution::Err { message }
318                    },
319                };
320                if let Err(error) = send_js_command(&js_commands, JsCommand::ResolveRequest { id, resolution }).await {
321                    break Err(error);
322                }
323            }
324            completion = sleep_tasks.join_next(), if !sleep_tasks.is_empty() => {
325                let completion = match completion {
326                    Some(Ok(completion)) => completion,
327                    Some(Err(error)) => break Err(anyhow!("sleep task failed: {error}")),
328                    None => break Err(anyhow!("sleep task set ended unexpectedly")),
329                };
330                let SleepTaskCompletion { id, result } = completion;
331                state.active_request_ids.remove(&id);
332                let resolution = match result {
333                    Ok(()) => WorkflowRuntimeRequestResolution::OkUndefined,
334                    Err(error) => WorkflowRuntimeRequestResolution::Err {
335                        message: error.to_string(),
336                    },
337                };
338                if let Err(error) = send_js_command(&js_commands, JsCommand::ResolveRequest { id, resolution }).await {
339                    break Err(error);
340                }
341            }
342        }
343    };
344
345    let _ = send_js_command(&js_commands, JsCommand::Shutdown).await;
346    let _ = js_task.await;
347
348    if emit_lifecycle_events {
349        match &workflow_result {
350            Ok(result) => {
351                state
352                    .emit_event(WorkflowEvent::result(
353                        result.token_usage.input_tokens,
354                        result.token_usage.output_tokens,
355                        result.token_usage.total_tokens,
356                        result.output.result.clone(),
357                    ))
358                    .await?
359            }
360            Err(error) => {
361                state
362                    .emit_event(WorkflowEvent::error(error.to_string(), None))
363                    .await?;
364            }
365        }
366    }
367
368    workflow_result
369}
370
371enum JsCommand {
372    ResolveRequest {
373        id: String,
374        resolution: WorkflowRuntimeRequestResolution,
375    },
376    Shutdown,
377}
378
379enum JsEvent {
380    Call(WorkflowRuntimeCall),
381    Request(WorkflowRuntimeRequest),
382    Complete(WorkflowModuleOutput),
383    Error(String),
384}
385
386async fn js_runtime_actor(
387    mut execution: Box<dyn WorkflowRuntimeExecution>,
388    mut commands: mpsc::Receiver<JsCommand>,
389    events: mpsc::Sender<JsEvent>,
390) {
391    let mut outstanding_requests = 0usize;
392    loop {
393        match execution.poll() {
394            Ok(WorkflowRuntimePoll::Call(call)) => {
395                if events.send(JsEvent::Call(call)).await.is_err() {
396                    return;
397                }
398            }
399            Ok(WorkflowRuntimePoll::Request(request)) => {
400                let requests = match execution.take_pending_requests() {
401                    Ok(requests) if requests.is_empty() => vec![request],
402                    Ok(requests) => requests,
403                    Err(error) => {
404                        let _ = events.send(JsEvent::Error(error.to_string())).await;
405                        return;
406                    }
407                };
408                outstanding_requests = outstanding_requests.saturating_add(requests.len());
409                for request in requests {
410                    if events.send(JsEvent::Request(request)).await.is_err() {
411                        return;
412                    }
413                }
414            }
415            Ok(WorkflowRuntimePoll::Complete(output)) => {
416                let _ = events.send(JsEvent::Complete(output)).await;
417                return;
418            }
419            Ok(WorkflowRuntimePoll::Pending) => {
420                if outstanding_requests == 0 {
421                    tokio::time::sleep(std::time::Duration::from_millis(1)).await;
422                    continue;
423                }
424                match commands.recv().await {
425                    Some(JsCommand::ResolveRequest { id, resolution }) => {
426                        outstanding_requests = outstanding_requests.saturating_sub(1);
427                        if let Err(error) = execution.resolve_request(&id, resolution) {
428                            let _ = events.send(JsEvent::Error(error.to_string())).await;
429                            return;
430                        }
431                    }
432                    Some(JsCommand::Shutdown) | None => return,
433                }
434            }
435            Err(error) => {
436                let _ = events.send(JsEvent::Error(error.to_string())).await;
437                return;
438            }
439        }
440    }
441}
442
443async fn send_js_command(
444    commands: &mpsc::Sender<JsCommand>,
445    command: JsCommand,
446) -> anyhow::Result<()> {
447    commands
448        .send(command)
449        .await
450        .map_err(|_| anyhow!("JavaScript runtime actor stopped unexpectedly"))
451}
452
453struct RunState {
454    script_path: PathBuf,
455    workflow_cwd: Option<PathBuf>,
456    metadata: WorkflowMetadata,
457    event_start: Instant,
458    agent_provider: Arc<dyn AgentProvider>,
459    model_map: BTreeMap<String, String>,
460    logs: Vec<Vec<Value>>,
461    phases: Vec<WorkflowPhaseCall>,
462    agent_calls: Vec<WorkflowRuntimeRequest>,
463    workflow_calls: Vec<WorkflowRuntimeRequest>,
464    budget: WorkflowBudgetSnapshot,
465    token_usage: WorkflowTokenUsage,
466    token_usage_by_phase: std::collections::BTreeMap<String, WorkflowTokenUsage>,
467    agent_runs: Vec<WorkflowAgentRunSummary>,
468    active_request_ids: BTreeSet<String>,
469    nesting_depth: usize,
470    max_parallel_agent_requests: Option<usize>,
471    agent_runner: Arc<dyn WorkflowAgentRunner>,
472    cancel_rx: Option<watch::Receiver<bool>>,
473    event_sink: Option<Arc<dyn WorkflowEventSink>>,
474    event_parent_step_id: Option<String>,
475    session_log_sink: Option<Arc<dyn AgentSessionLogSink>>,
476}
477
478struct PreparedAgentRun {
479    provider_override: Option<String>,
480    input: AgentProviderRunInput,
481}
482
483struct AgentTaskCompletion {
484    id: String,
485    input: AgentProviderRunInput,
486    provider: Option<String>,
487    result: anyhow::Result<AgentProviderResult>,
488}
489
490struct SleepTaskCompletion {
491    id: String,
492    result: anyhow::Result<()>,
493}
494
495fn add_usage(total: &mut WorkflowTokenUsage, usage: Option<&AgentUsage>) {
496    let Some(usage) = usage else {
497        return;
498    };
499
500    total.input_tokens = total
501        .input_tokens
502        .saturating_add(usage.input_tokens.unwrap_or_default());
503    total.output_tokens = total
504        .output_tokens
505        .saturating_add(usage.output_tokens.unwrap_or_default());
506    total.cache_read_tokens = total
507        .cache_read_tokens
508        .saturating_add(usage.cache_read_tokens.unwrap_or_default());
509    total.cache_write_tokens = total
510        .cache_write_tokens
511        .saturating_add(usage.cache_write_tokens.unwrap_or_default());
512    total.total_tokens = total
513        .total_tokens
514        .saturating_add(usage.total_tokens.unwrap_or_default());
515
516    if let Some(cost) = usage.cost.as_ref() {
517        total.cost = Some(merge_cost(total.cost.as_ref(), cost));
518    }
519}
520
521fn merge_token_usage(total: &mut WorkflowTokenUsage, usage: &WorkflowTokenUsage) {
522    total.input_tokens = total.input_tokens.saturating_add(usage.input_tokens);
523    total.output_tokens = total.output_tokens.saturating_add(usage.output_tokens);
524    total.cache_read_tokens = total
525        .cache_read_tokens
526        .saturating_add(usage.cache_read_tokens);
527    total.cache_write_tokens = total
528        .cache_write_tokens
529        .saturating_add(usage.cache_write_tokens);
530    total.total_tokens = total.total_tokens.saturating_add(usage.total_tokens);
531    if let Some(cost) = usage.cost.as_ref() {
532        total.cost = Some(merge_cost(total.cost.as_ref(), cost));
533    }
534}
535
536fn merge_cost(left: Option<&AgentUsageCost>, right: &AgentUsageCost) -> AgentUsageCost {
537    AgentUsageCost {
538        input: sum_f64(left.and_then(|cost| cost.input), right.input),
539        output: sum_f64(left.and_then(|cost| cost.output), right.output),
540        cache_read: sum_f64(left.and_then(|cost| cost.cache_read), right.cache_read),
541        cache_write: sum_f64(left.and_then(|cost| cost.cache_write), right.cache_write),
542        total: sum_f64(left.and_then(|cost| cost.total), right.total),
543        currency: right
544            .currency
545            .clone()
546            .or_else(|| left.and_then(|cost| cost.currency.clone())),
547    }
548}
549
550fn elapsed_nanos(start: Instant) -> u64 {
551    u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX)
552}
553
554fn rfc3339_now() -> anyhow::Result<String> {
555    Ok(time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339)?)
556}
557
558fn raw_agent_event_payloads(raw: &Value) -> Vec<Value> {
559    if let Some(events) = raw.get("events").and_then(Value::as_array) {
560        events.clone()
561    } else if let Some(items) = raw.as_array() {
562        items.clone()
563    } else {
564        vec![raw.clone()]
565    }
566}
567
568fn agent_session_event_payload(provider_event: Value, metadata: &WorkflowEventMetadata) -> Value {
569    let mut payload = serde_json::Map::new();
570    if let Some(provider) = metadata.provider.as_ref() {
571        payload.insert("provider".to_string(), Value::String(provider.clone()));
572    }
573    if let Some(session_id) = metadata.session_id.as_ref() {
574        payload.insert("sessionId".to_string(), Value::String(session_id.clone()));
575    }
576    if let Some(run_id) = metadata.run_id.as_ref() {
577        payload.insert("runId".to_string(), Value::String(run_id.clone()));
578    }
579    if let Some(step_id) = metadata.step_id.as_ref() {
580        payload.insert("stepId".to_string(), Value::String(step_id.clone()));
581    }
582    payload.insert("providerEvent".to_string(), provider_event);
583    Value::Object(payload)
584}
585
586fn truncate_for_event(value: &str, max_chars: usize) -> String {
587    let mut chars = value.chars();
588    let truncated = chars.by_ref().take(max_chars).collect::<String>();
589    if chars.next().is_some() {
590        format!("{truncated}…")
591    } else {
592        truncated
593    }
594}
595
596fn format_log_message(values: &[Value]) -> String {
597    values
598        .iter()
599        .map(|value| match value {
600            Value::String(value) => value.clone(),
601            value => serde_json::to_string(value).unwrap_or_else(|_| String::from("<unprintable>")),
602        })
603        .collect::<Vec<_>>()
604        .join(" ")
605}
606
607fn sum_f64(left: Option<f64>, right: Option<f64>) -> Option<f64> {
608    match (left, right) {
609        (None, None) => None,
610        (left, right) => Some(left.unwrap_or_default() + right.unwrap_or_default()),
611    }
612}
613
614async fn wait_for_cancellation(cancel_rx: &mut Option<watch::Receiver<bool>>) {
615    let Some(cancel_rx) = cancel_rx else {
616        std::future::pending::<()>().await;
617        return;
618    };
619    while !*cancel_rx.borrow() {
620        if cancel_rx.changed().await.is_err() {
621            return;
622        }
623    }
624}
625
626impl RunState {
627    async fn handle_js_event(
628        &mut self,
629        event: JsEvent,
630        pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
631    ) -> anyhow::Result<Option<RunWorkflowResult>> {
632        match event {
633            JsEvent::Call(call) => self.handle_call(call).await?,
634            JsEvent::Request(request) => {
635                log::debug!(
636                    "workflow runtime request id={} kind={}",
637                    request.id(),
638                    request.kind()
639                );
640                pending_requests.push_back(request);
641            }
642            JsEvent::Complete(output) => {
643                log::debug!(
644                    "run_workflow complete script={} budget_spent={}",
645                    self.script_path.display(),
646                    self.budget.spent
647                );
648                return Ok(Some(RunWorkflowResult {
649                    output,
650                    logs: std::mem::take(&mut self.logs),
651                    phases: std::mem::take(&mut self.phases),
652                    agent_calls: std::mem::take(&mut self.agent_calls),
653                    workflow_calls: std::mem::take(&mut self.workflow_calls),
654                    budget: self.budget.clone(),
655                    token_usage: std::mem::take(&mut self.token_usage),
656                    token_usage_by_phase: std::mem::take(&mut self.token_usage_by_phase),
657                    agent_runs: std::mem::take(&mut self.agent_runs),
658                }));
659            }
660            JsEvent::Error(message) => bail!(message),
661        }
662        Ok(None)
663    }
664
665    async fn start_pending_requests(
666        &mut self,
667        pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
668        agent_tasks: &mut JoinSet<AgentTaskCompletion>,
669        sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
670        js_commands: &mpsc::Sender<JsCommand>,
671    ) -> anyhow::Result<()> {
672        loop {
673            let Some(request) = pending_requests.front() else {
674                return Ok(());
675            };
676            if matches!(request, WorkflowRuntimeRequest::Agent { .. })
677                && !self.agent_capacity_available(agent_tasks.len())
678            {
679                return Ok(());
680            }
681
682            let request = pending_requests
683                .pop_front()
684                .expect("pending request should exist");
685            match request {
686                WorkflowRuntimeRequest::Agent { .. } => match self.prepare_agent_request(request) {
687                    Ok((id, prepared)) => {
688                        self.emit_agent_started_event(&id, &prepared).await?;
689                        self.spawn_agent_task(agent_tasks, id, prepared);
690                    }
691                    Err((id, error)) => {
692                        send_js_command(
693                            js_commands,
694                            JsCommand::ResolveRequest {
695                                id,
696                                resolution: WorkflowRuntimeRequestResolution::Err {
697                                    message: error.to_string(),
698                                },
699                            },
700                        )
701                        .await?;
702                    }
703                },
704                WorkflowRuntimeRequest::Sleep { id, duration_ms } => {
705                    self.spawn_sleep_task(sleep_tasks, id, duration_ms);
706                }
707                WorkflowRuntimeRequest::Workflow {
708                    id,
709                    workflow_ref,
710                    args,
711                } => {
712                    self.workflow_calls.push(WorkflowRuntimeRequest::Workflow {
713                        id: id.clone(),
714                        workflow_ref: workflow_ref.clone(),
715                        args: args.clone(),
716                    });
717                    let parent_event_step_id = self.event_step_id(&id);
718                    let resolution = match self
719                        .handle_workflow(parent_event_step_id, workflow_ref, args)
720                        .await
721                    {
722                        Ok(value) => WorkflowRuntimeRequestResolution::OkWithBudget {
723                            value,
724                            budget: self.budget.clone(),
725                        },
726                        Err(error) => WorkflowRuntimeRequestResolution::Err {
727                            message: error.to_string(),
728                        },
729                    };
730                    send_js_command(js_commands, JsCommand::ResolveRequest { id, resolution })
731                        .await?;
732                }
733            }
734        }
735    }
736
737    async fn cancel_workflow(
738        &mut self,
739        pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
740        agent_tasks: &mut JoinSet<AgentTaskCompletion>,
741        sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
742        js_commands: &mpsc::Sender<JsCommand>,
743        js_events: &mut mpsc::Receiver<JsEvent>,
744    ) -> anyhow::Result<RunWorkflowResult> {
745        log::debug!(
746            "workflow cancellation requested script={}",
747            self.script_path.display()
748        );
749
750        if pending_requests.is_empty()
751            && self.active_request_ids.is_empty()
752            && agent_tasks.is_empty()
753            && sleep_tasks.is_empty()
754            && self
755                .reject_next_runtime_request_for_cancellation(js_commands, js_events)
756                .await
757        {
758            bail!("workflow cancelled");
759        }
760
761        self.reject_pending_requests_for_cancellation(pending_requests, js_commands)
762            .await;
763        sleep_tasks.abort_all();
764        self.reject_active_sleep_requests_for_cancellation(sleep_tasks, js_commands)
765            .await;
766
767        if self.session_log_sink.is_some() {
768            while let Some(completion) = agent_tasks.join_next().await {
769                match completion {
770                    Ok(AgentTaskCompletion {
771                        id,
772                        input,
773                        provider,
774                        result: Ok(result),
775                    }) => {
776                        self.active_request_ids.remove(&id);
777                        if let Err(error) = self
778                            .emit_agent_result_events(&id, provider.as_deref(), &result)
779                            .await
780                        {
781                            log::debug!("failed to emit drained agent events during cancellation: {error:#}");
782                        }
783                        if let Err(error) = self
784                            .emit_agent_completed_event(&id, provider.as_deref(), &result)
785                            .await
786                        {
787                            log::debug!("failed to emit drained agent completion during cancellation: {error:#}");
788                        }
789                        self.record_agent_run(&id, &input, provider, &result);
790                        self.reject_request_for_cancellation(id, js_commands).await;
791                    }
792                    Ok(AgentTaskCompletion {
793                        id,
794                        provider,
795                        result: Err(error),
796                        ..
797                    }) => {
798                        self.active_request_ids.remove(&id);
799                        let message = error.to_string();
800                        if let Err(error) = self
801                            .emit_agent_failed_event(&id, provider.as_deref(), &message)
802                            .await
803                        {
804                            log::debug!("failed to emit drained agent failure during cancellation: {error:#}");
805                        }
806                        log::debug!("agent task failed while draining cancellation: {message}");
807                        self.reject_request_for_cancellation(id, js_commands).await;
808                    }
809                    Err(error) => {
810                        log::debug!("agent task join failed while draining cancellation: {error}");
811                    }
812                }
813            }
814        } else {
815            let ids: Vec<String> = self.active_request_ids.iter().cloned().collect();
816            agent_tasks.abort_all();
817            for id in ids {
818                self.active_request_ids.remove(&id);
819                self.reject_request_for_cancellation(id, js_commands).await;
820            }
821        }
822
823        self.reject_remaining_active_requests_for_cancellation(js_commands)
824            .await;
825        self.drain_runtime_after_cancellation(js_events).await;
826        let _ = send_js_command(js_commands, JsCommand::Shutdown).await;
827        bail!("workflow cancelled")
828    }
829
830    async fn reject_next_runtime_request_for_cancellation(
831        &mut self,
832        js_commands: &mpsc::Sender<JsCommand>,
833        js_events: &mut mpsc::Receiver<JsEvent>,
834    ) -> bool {
835        loop {
836            match js_events.recv().await {
837                Some(JsEvent::Call(call)) => {
838                    let _ = self.handle_call(call).await;
839                }
840                Some(JsEvent::Request(request)) => {
841                    self.reject_request_for_cancellation(request.id().to_string(), js_commands)
842                        .await;
843                    return false;
844                }
845                Some(JsEvent::Complete(_)) | Some(JsEvent::Error(_)) | None => return true,
846            }
847        }
848    }
849
850    async fn reject_pending_requests_for_cancellation(
851        &mut self,
852        pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
853        js_commands: &mpsc::Sender<JsCommand>,
854    ) {
855        while let Some(request) = pending_requests.pop_front() {
856            self.reject_request_for_cancellation(request.id().to_string(), js_commands)
857                .await;
858        }
859    }
860
861    async fn reject_active_sleep_requests_for_cancellation(
862        &mut self,
863        sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
864        js_commands: &mpsc::Sender<JsCommand>,
865    ) {
866        while let Some(completion) = sleep_tasks.join_next().await {
867            if let Ok(SleepTaskCompletion { id, .. }) = completion {
868                self.active_request_ids.remove(&id);
869                self.reject_request_for_cancellation(id, js_commands).await;
870            }
871        }
872    }
873
874    async fn reject_remaining_active_requests_for_cancellation(
875        &mut self,
876        js_commands: &mpsc::Sender<JsCommand>,
877    ) {
878        let ids: Vec<String> = self.active_request_ids.iter().cloned().collect();
879        for id in ids {
880            self.active_request_ids.remove(&id);
881            self.reject_request_for_cancellation(id, js_commands).await;
882        }
883    }
884
885    async fn reject_request_for_cancellation(
886        &self,
887        id: String,
888        js_commands: &mpsc::Sender<JsCommand>,
889    ) {
890        let _ = send_js_command(
891            js_commands,
892            JsCommand::ResolveRequest {
893                id,
894                resolution: WorkflowRuntimeRequestResolution::Err {
895                    message: "workflow cancelled".to_string(),
896                },
897            },
898        )
899        .await;
900    }
901
902    async fn drain_runtime_after_cancellation(&mut self, js_events: &mut mpsc::Receiver<JsEvent>) {
903        while let Some(event) = js_events.recv().await {
904            match event {
905                JsEvent::Call(call) => {
906                    let _ = self.handle_call(call).await;
907                }
908                JsEvent::Request(request) => {
909                    log::debug!(
910                        "ignoring request after cancellation id={} kind={}",
911                        request.id(),
912                        request.kind()
913                    );
914                }
915                JsEvent::Complete(_) | JsEvent::Error(_) => break,
916            }
917        }
918    }
919
920    fn event_step_id(&self, runtime_request_id: &str) -> String {
921        let parent = self.event_parent_step_id.as_deref().unwrap_or("");
922        let hash = blake3::hash(
923            format!("{parent}:{}:{runtime_request_id}", self.nesting_depth).as_bytes(),
924        );
925        format!("step_{}", &hash.to_hex()[..16])
926    }
927
928    async fn emit_event(&self, mut event: WorkflowEvent) -> anyhow::Result<()> {
929        if (event.event_type.as_str() != "workflow.started" || self.nesting_depth > 0)
930            && event.elapsed_nanos.is_none()
931        {
932            event.elapsed_nanos = Some(elapsed_nanos(self.event_start));
933        }
934        let metadata = event
935            .metadata
936            .get_or_insert_with(WorkflowEventMetadata::default);
937        if metadata.workflow_depth.is_none() {
938            metadata.workflow_depth = Some(u32::try_from(self.nesting_depth).unwrap_or(u32::MAX));
939        }
940        if metadata.parent_step_id.is_none() {
941            metadata.parent_step_id = self.event_parent_step_id.clone();
942        }
943        if let Some(event_sink) = self.event_sink.as_ref() {
944            event_sink.emit(event).await?;
945        }
946        Ok(())
947    }
948
949    async fn handle_call(&mut self, call: WorkflowRuntimeCall) -> anyhow::Result<()> {
950        match call {
951            WorkflowRuntimeCall::Log { values } => {
952                self.emit_event(WorkflowEvent::log(format_log_message(&values)))
953                    .await?;
954                self.logs.push(values);
955            }
956            WorkflowRuntimeCall::Phase { name, options } => {
957                let phase = WorkflowPhaseCall { name, options };
958                self.emit_event(WorkflowEvent::phase(
959                    phase.name.clone(),
960                    phase.options.clone(),
961                ))
962                .await?;
963                self.phases.push(phase);
964            }
965        }
966        Ok(())
967    }
968
969    fn agent_capacity_available(&self, in_flight: usize) -> bool {
970        let max_parallel = self
971            .max_parallel_agent_requests
972            .filter(|value| *value > 0)
973            .unwrap_or(usize::MAX);
974        in_flight < max_parallel
975    }
976
977    fn prepare_agent_request(
978        &mut self,
979        request: WorkflowRuntimeRequest,
980    ) -> Result<(String, PreparedAgentRun), (String, anyhow::Error)> {
981        match request {
982            WorkflowRuntimeRequest::Agent {
983                id,
984                prompt,
985                options,
986            } => {
987                self.agent_calls.push(WorkflowRuntimeRequest::Agent {
988                    id: id.clone(),
989                    prompt: prompt.clone(),
990                    options: options.clone(),
991                });
992                match self.prepare_agent_run(prompt, options) {
993                    Ok(prepared) => Ok((id, prepared)),
994                    Err(error) => Err((id, error)),
995                }
996            }
997            WorkflowRuntimeRequest::Workflow { .. } | WorkflowRuntimeRequest::Sleep { .. } => {
998                unreachable!("prepare_agent_request only accepts agent requests")
999            }
1000        }
1001    }
1002
1003    fn spawn_agent_task(
1004        &mut self,
1005        agent_tasks: &mut JoinSet<AgentTaskCompletion>,
1006        id: String,
1007        prepared: PreparedAgentRun,
1008    ) {
1009        let default_provider_name = self.agent_provider.name().to_string();
1010        let default_provider = Arc::clone(&self.agent_provider);
1011        let agent_runner = Arc::clone(&self.agent_runner);
1012        let retry_in_runtime = agent_runner.retry_in_runtime();
1013        let cancel_rx = self.cancel_rx.clone();
1014        let completion_input = prepared.input.clone();
1015        let completion_provider = prepared
1016            .provider_override
1017            .clone()
1018            .or(Some(default_provider_name));
1019        let session_log_sink = self.session_log_sink.clone();
1020        let max_parallel = self
1021            .max_parallel_agent_requests
1022            .filter(|value| *value > 0)
1023            .unwrap_or(usize::MAX);
1024        log::debug!(
1025            "starting agent request id={} in_flight_after_start={} max_parallel={}",
1026            id,
1027            agent_tasks.len() + 1,
1028            max_parallel
1029        );
1030        self.active_request_ids.insert(id.clone());
1031        agent_tasks.spawn(async move {
1032            let result = if retry_in_runtime {
1033                run_agent_runner_with_retry(
1034                    Arc::clone(&agent_runner),
1035                    default_provider,
1036                    prepared.provider_override,
1037                    prepared.input,
1038                    cancel_rx,
1039                )
1040                .await
1041            } else {
1042                agent_runner
1043                    .run_agent(default_provider, prepared.provider_override, prepared.input)
1044                    .await
1045            };
1046            let result = match result {
1047                Ok(result) => {
1048                    if let Some(session_log_sink) = session_log_sink.as_ref() {
1049                        let provider_name = completion_provider
1050                            .as_deref()
1051                            .expect("completion provider should always be set");
1052                        match session_log_sink
1053                            .write_agent_result(provider_name, &result)
1054                            .await
1055                        {
1056                            Ok(()) => Ok(result),
1057                            Err(error) => Err(error),
1058                        }
1059                    } else {
1060                        Ok(result)
1061                    }
1062                }
1063                Err(error) => Err(error),
1064            };
1065            AgentTaskCompletion {
1066                id,
1067                input: completion_input,
1068                provider: completion_provider,
1069                result,
1070            }
1071        });
1072    }
1073
1074    fn spawn_sleep_task(
1075        &mut self,
1076        sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
1077        id: String,
1078        duration_ms: u64,
1079    ) {
1080        let agent_runner = Arc::clone(&self.agent_runner);
1081        log::debug!(
1082            "starting sleep request id={} duration_ms={}",
1083            id,
1084            duration_ms
1085        );
1086        self.active_request_ids.insert(id.clone());
1087        sleep_tasks.spawn(async move {
1088            SleepTaskCompletion {
1089                id,
1090                result: agent_runner.sleep(duration_ms).await,
1091            }
1092        });
1093    }
1094
1095    fn prepare_agent_run(
1096        &self,
1097        prompt: String,
1098        options: Option<Value>,
1099    ) -> anyhow::Result<PreparedAgentRun> {
1100        let options = apply_phase_defaults(options, &self.metadata);
1101        let context = AgentProviderContext {
1102            phase: options
1103                .as_ref()
1104                .and_then(|options| options.get("phase"))
1105                .and_then(Value::as_str)
1106                .map(ToString::to_string),
1107            cwd: self.workflow_cwd.clone(),
1108        };
1109        let provider_override = options
1110            .as_ref()
1111            .and_then(|options| options.get("provider"))
1112            .and_then(Value::as_str)
1113            .map(ToString::to_string);
1114        let provider_name = provider_override
1115            .as_deref()
1116            .unwrap_or_else(|| self.agent_provider.name());
1117        let options = resolve_model_options(options, provider_name, &self.model_map)?;
1118        agent_retry_policy(&options)?;
1119        log::debug!(
1120            "agent call provider={} phase={:?} model={:?} prompt_len={}",
1121            provider_name,
1122            context.phase.as_deref(),
1123            options
1124                .as_ref()
1125                .and_then(|options| options.get("model"))
1126                .and_then(Value::as_str),
1127            prompt.len()
1128        );
1129        Ok(PreparedAgentRun {
1130            provider_override,
1131            input: AgentProviderRunInput {
1132                prompt,
1133                options,
1134                environment: Arc::new(LocalExecutionEnvironment::new(context.cwd.clone())?),
1135                context,
1136            },
1137        })
1138    }
1139
1140    async fn emit_agent_started_event(
1141        &self,
1142        id: &str,
1143        prepared: &PreparedAgentRun,
1144    ) -> anyhow::Result<()> {
1145        let provider = prepared
1146            .provider_override
1147            .as_deref()
1148            .unwrap_or_else(|| self.agent_provider.name());
1149        let metadata = self.agent_event_metadata(id, Some(provider), None);
1150        self.emit_event(WorkflowEvent::agent_started(
1151            serde_json::json!({
1152                "phase": prepared.input.context.phase,
1153                "promptPreview": truncate_for_event(&prepared.input.prompt, 200),
1154            }),
1155            metadata,
1156        ))
1157        .await
1158    }
1159
1160    async fn apply_agent_result(
1161        &mut self,
1162        id: &str,
1163        input: &AgentProviderRunInput,
1164        provider: Option<String>,
1165        result: AgentProviderResult,
1166    ) -> anyhow::Result<Value> {
1167        if let Some(output_tokens) = result.usage.as_ref().and_then(|usage| usage.output_tokens) {
1168            self.budget.spent = self.budget.spent.saturating_add(output_tokens);
1169        }
1170        self.emit_agent_result_events(id, provider.as_deref(), &result)
1171            .await?;
1172        self.emit_agent_completed_event(id, provider.as_deref(), &result)
1173            .await?;
1174        self.record_agent_run(id, input, provider, &result);
1175        log::debug!(
1176            "agent call complete session_id={:?} output_tokens={:?} budget_spent={}",
1177            result.session_id,
1178            result.usage.as_ref().and_then(|usage| usage.output_tokens),
1179            self.budget.spent
1180        );
1181        Ok(result.output)
1182    }
1183
1184    async fn emit_agent_result_events(
1185        &self,
1186        id: &str,
1187        provider: Option<&str>,
1188        result: &AgentProviderResult,
1189    ) -> anyhow::Result<()> {
1190        let Some(raw) = result.raw.as_ref() else {
1191            return Ok(());
1192        };
1193        let metadata = self.agent_event_metadata(id, provider, result.session_id.clone());
1194        for provider_event in raw_agent_event_payloads(raw) {
1195            let event_data = agent_session_event_payload(provider_event, &metadata);
1196            self.emit_event(WorkflowEvent::agent_event(event_data, metadata.clone()))
1197                .await?;
1198        }
1199        Ok(())
1200    }
1201
1202    async fn emit_agent_completed_event(
1203        &self,
1204        id: &str,
1205        provider: Option<&str>,
1206        result: &AgentProviderResult,
1207    ) -> anyhow::Result<()> {
1208        let metadata = self.agent_event_metadata(id, provider, result.session_id.clone());
1209        self.emit_event(WorkflowEvent::agent_completed(
1210            serde_json::json!({
1211                "sessionId": result.session_id,
1212                "model": result.model,
1213                "usage": result.usage,
1214            }),
1215            metadata,
1216        ))
1217        .await
1218    }
1219
1220    async fn emit_agent_failed_event(
1221        &self,
1222        id: &str,
1223        provider: Option<&str>,
1224        message: &str,
1225    ) -> anyhow::Result<()> {
1226        let metadata = self.agent_event_metadata(id, provider, None);
1227        self.emit_event(WorkflowEvent::agent_failed(
1228            serde_json::json!({ "message": message }),
1229            metadata,
1230        ))
1231        .await
1232    }
1233
1234    fn agent_event_metadata(
1235        &self,
1236        id: &str,
1237        provider: Option<&str>,
1238        session_id: Option<String>,
1239    ) -> WorkflowEventMetadata {
1240        WorkflowEventMetadata {
1241            run_id: None,
1242            step_id: Some(self.event_step_id(id)),
1243            provider: Some(
1244                provider
1245                    .unwrap_or_else(|| self.agent_provider.name())
1246                    .to_string(),
1247            ),
1248            session_id,
1249            workflow_depth: None,
1250            parent_step_id: None,
1251        }
1252    }
1253
1254    fn record_agent_run(
1255        &mut self,
1256        id: &str,
1257        input: &AgentProviderRunInput,
1258        provider: Option<String>,
1259        result: &AgentProviderResult,
1260    ) {
1261        add_usage(&mut self.token_usage, result.usage.as_ref());
1262        if let Some(phase) = input.context.phase.as_ref() {
1263            let phase_usage = self.token_usage_by_phase.entry(phase.clone()).or_default();
1264            add_usage(phase_usage, result.usage.as_ref());
1265        }
1266        let model = result.model.clone().or_else(|| {
1267            input
1268                .options
1269                .as_ref()
1270                .and_then(|options| options.get("model"))
1271                .and_then(Value::as_str)
1272                .map(ToString::to_string)
1273        });
1274        self.agent_runs.push(WorkflowAgentRunSummary {
1275            id: id.to_string(),
1276            phase: input.context.phase.clone(),
1277            provider,
1278            model,
1279            provider_session_id: result.session_id.clone(),
1280            usage: result.usage.clone(),
1281            isolation: result.isolation.clone(),
1282        });
1283    }
1284
1285    async fn handle_workflow(
1286        &mut self,
1287        parent_step_id: String,
1288        workflow_ref: WorkflowRef,
1289        args: Option<Value>,
1290    ) -> anyhow::Result<Value> {
1291        if self.nesting_depth >= 1 {
1292            bail!("Nested workflow() calls are limited to one level");
1293        }
1294        let script_path = match workflow_ref {
1295            WorkflowRef::ScriptPath { script_path } => {
1296                resolve_relative_script(&self.script_path, &script_path)
1297            }
1298            WorkflowRef::Name(name) => resolve_named_workflow(&name)?,
1299        };
1300        log::debug!("child workflow call script={}", script_path.display());
1301        let child = Box::pin(run_workflow_inner(RunWorkflowOptions {
1302            script_path,
1303            workflow_cwd: self.workflow_cwd.clone(),
1304            args: args.unwrap_or(Value::Null),
1305            agent_provider: Arc::clone(&self.agent_provider),
1306            model_map: self.model_map.clone(),
1307            budget_total: self.budget.total,
1308            budget_spent: self.budget.spent,
1309            nesting_depth: self.nesting_depth + 1,
1310            max_parallel_agent_requests: self.max_parallel_agent_requests,
1311            agent_runner: Some(Arc::clone(&self.agent_runner)),
1312            cancel_rx: self.cancel_rx.clone(),
1313            event_sink: self.event_sink.clone(),
1314            event_parent_step_id: Some(parent_step_id),
1315            event_stream_start: Some(self.event_start),
1316            session_log_sink: self.session_log_sink.clone(),
1317        }))
1318        .await?;
1319        self.budget = child.budget;
1320        self.logs.extend(child.logs);
1321        self.phases.extend(child.phases);
1322        self.agent_calls.extend(child.agent_calls);
1323        self.workflow_calls.extend(child.workflow_calls);
1324        merge_token_usage(&mut self.token_usage, &child.token_usage);
1325        for (phase, usage) in child.token_usage_by_phase {
1326            merge_token_usage(self.token_usage_by_phase.entry(phase).or_default(), &usage);
1327        }
1328        self.agent_runs.extend(child.agent_runs);
1329        Ok(child.output.result)
1330    }
1331}
1332
1333async fn run_agent_runner_with_retry(
1334    agent_runner: Arc<dyn WorkflowAgentRunner>,
1335    default_provider: Arc<dyn AgentProvider>,
1336    provider_override: Option<String>,
1337    input: AgentProviderRunInput,
1338    mut cancel_rx: Option<watch::Receiver<bool>>,
1339) -> anyhow::Result<AgentProviderResult> {
1340    let retry = agent_retry_policy(&input.options)?;
1341    let mut final_result = None;
1342    for attempt in 1..=retry.max_attempts {
1343        let attempt_result = agent_runner
1344            .run_agent(
1345                Arc::clone(&default_provider),
1346                provider_override.clone(),
1347                input.clone(),
1348            )
1349            .await;
1350        match attempt_result {
1351            Ok(result) => {
1352                final_result = Some(Ok(result));
1353                break;
1354            }
1355            Err(error) if attempt < retry.max_attempts => {
1356                log::debug!(
1357                    "agent call failed on attempt {attempt}/{}; retrying after {}ms: {error:#}",
1358                    retry.max_attempts,
1359                    retry.backoff_ms
1360                );
1361                sleep_retry_backoff(retry.backoff_ms, &mut cancel_rx).await?;
1362            }
1363            Err(error) => {
1364                final_result = Some(Err(error));
1365                break;
1366            }
1367        }
1368    }
1369    final_result.unwrap_or_else(|| Err(anyhow!("agent retry loop finished without a result")))
1370}
1371
1372async fn sleep_retry_backoff(
1373    backoff_ms: u64,
1374    cancel_rx: &mut Option<watch::Receiver<bool>>,
1375) -> anyhow::Result<()> {
1376    if backoff_ms == 0 {
1377        return Ok(());
1378    }
1379    let Some(cancel_rx) = cancel_rx.as_mut() else {
1380        tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1381        return Ok(());
1382    };
1383    if *cancel_rx.borrow() {
1384        bail!("workflow cancelled");
1385    }
1386    let sleep = tokio::time::sleep(Duration::from_millis(backoff_ms));
1387    tokio::pin!(sleep);
1388    loop {
1389        tokio::select! {
1390            _ = &mut sleep => return Ok(()),
1391            changed = cancel_rx.changed() => {
1392                match changed {
1393                    Ok(()) if *cancel_rx.borrow() => bail!("workflow cancelled"),
1394                    Ok(()) => continue,
1395                    Err(_) => {
1396                        sleep.await;
1397                        return Ok(());
1398                    }
1399                }
1400            }
1401        }
1402    }
1403}
1404
1405pub(crate) async fn run_agent_provider_with_retry(
1406    default_provider: Arc<dyn AgentProvider>,
1407    provider_override: Option<String>,
1408    input: AgentProviderRunInput,
1409    mut cancel_rx: Option<watch::Receiver<bool>>,
1410) -> anyhow::Result<AgentProviderResult> {
1411    let retry = agent_retry_policy(&input.options)?;
1412    let provider = resolve_agent_provider(default_provider, provider_override)?;
1413    let mut final_result = None;
1414    for attempt in 1..=retry.max_attempts {
1415        let attempt_result =
1416            run_agent_with_optional_isolation(Arc::clone(&provider), input.clone()).await;
1417        match attempt_result {
1418            Ok(result) => {
1419                final_result = Some(Ok(result));
1420                break;
1421            }
1422            Err(error) if attempt < retry.max_attempts => {
1423                log::debug!(
1424                    "agent provider failed on attempt {attempt}/{}; retrying after {}ms: {error:#}",
1425                    retry.max_attempts,
1426                    retry.backoff_ms
1427                );
1428                sleep_retry_backoff(retry.backoff_ms, &mut cancel_rx).await?;
1429            }
1430            Err(error) => {
1431                final_result = Some(Err(error));
1432                break;
1433            }
1434        }
1435    }
1436    final_result.unwrap_or_else(|| Err(anyhow!("agent retry loop finished without a result")))
1437}
1438
1439pub(crate) async fn run_agent_provider(
1440    default_provider: Arc<dyn AgentProvider>,
1441    provider_override: Option<String>,
1442    input: AgentProviderRunInput,
1443) -> anyhow::Result<AgentProviderResult> {
1444    let provider = resolve_agent_provider(default_provider, provider_override)?;
1445    run_agent_with_optional_isolation(provider, input).await
1446}
1447
1448fn resolve_agent_provider(
1449    default_provider: Arc<dyn AgentProvider>,
1450    provider_override: Option<String>,
1451) -> anyhow::Result<Arc<dyn AgentProvider>> {
1452    if let Some(provider_override) = provider_override {
1453        Ok(Arc::from(create_agent_provider(&provider_override)?))
1454    } else {
1455        Ok(default_provider)
1456    }
1457}
1458
1459#[derive(Debug, Clone, Copy)]
1460pub(crate) struct AgentRetryPolicy {
1461    pub max_attempts: u32,
1462    pub backoff_ms: u64,
1463}
1464
1465pub(crate) fn agent_retry_policy(options: &Option<Value>) -> anyhow::Result<AgentRetryPolicy> {
1466    let default = AgentRetryPolicy {
1467        max_attempts: 1,
1468        backoff_ms: 0,
1469    };
1470    let Some(retry) = options.as_ref().and_then(|options| options.get("retry")) else {
1471        return Ok(default);
1472    };
1473    if retry.is_null() {
1474        return Ok(default);
1475    }
1476    let object = retry
1477        .as_object()
1478        .ok_or_else(|| anyhow!("agent retry option must be an object"))?;
1479    let max_attempts = match object.get("maxAttempts") {
1480        Some(value) => {
1481            let value = value
1482                .as_u64()
1483                .ok_or_else(|| anyhow!("agent retry.maxAttempts must be a positive integer"))?;
1484            if value == 0 || value > u32::MAX as u64 {
1485                bail!("agent retry.maxAttempts must be between 1 and {}", u32::MAX);
1486            }
1487            value as u32
1488        }
1489        None => default.max_attempts,
1490    };
1491    let backoff_ms = match object.get("backoffMs") {
1492        Some(value) => value
1493            .as_u64()
1494            .ok_or_else(|| anyhow!("agent retry.backoffMs must be a non-negative integer"))?,
1495        None => default.backoff_ms,
1496    };
1497    Ok(AgentRetryPolicy {
1498        max_attempts,
1499        backoff_ms,
1500    })
1501}
1502
1503async fn run_agent_with_optional_isolation(
1504    provider: Arc<dyn AgentProvider>,
1505    input: AgentProviderRunInput,
1506) -> anyhow::Result<AgentProviderResult> {
1507    if let Some(request) = sandbox_isolation_request(&input.options)? {
1508        return run_agent_with_sandbox_isolation(provider, input, request).await;
1509    }
1510
1511    if requests_worktree_isolation(&input.options) {
1512        return run_agent_with_worktree_isolation(provider, input).await;
1513    }
1514
1515    run_agent_with_schema_validation(provider, input).await
1516}
1517
1518async fn run_agent_with_worktree_isolation(
1519    provider: Arc<dyn AgentProvider>,
1520    input: AgentProviderRunInput,
1521) -> anyhow::Result<AgentProviderResult> {
1522    let isolation = WorktreeIsolation::create(input.context.cwd.as_deref())?;
1523    let isolation_info = isolation.info();
1524    let mut isolated_input = input;
1525    isolated_input.context.cwd = Some(isolation.cwd.clone());
1526    isolated_input.environment =
1527        Arc::new(LocalExecutionEnvironment::new(Some(isolation.cwd.clone()))?);
1528    let mut result = run_agent_with_schema_validation(provider, isolated_input).await;
1529    if let Ok(result) = &mut result {
1530        result.isolation = Some(isolation_info);
1531    }
1532    if let Err(error) = isolation.cleanup() {
1533        log::warn!("failed to cleanup isolated agent worktree: {error:#}");
1534    }
1535    result
1536}
1537
1538async fn run_agent_with_sandbox_isolation(
1539    provider: Arc<dyn AgentProvider>,
1540    input: AgentProviderRunInput,
1541    request: SandboxIsolationRequest,
1542) -> anyhow::Result<AgentProviderResult> {
1543    let program = sandbox_provider_program(&request);
1544    let sandbox_group_id = format!("sbxgrp_{}", ulid::Ulid::new());
1545    let cwd = input.context.cwd.clone().ok_or_else(|| {
1546        anyhow!("agent sandbox isolation requires the workflow cwd to be available")
1547    })?;
1548    let sandbox_env = SandboxExecutionEnvironment::open(
1549        program,
1550        OpenSandboxRequest {
1551            metadata: SandboxMetadata::new(
1552                format!("req_{}", ulid::Ulid::new()),
1553                sandbox_group_id.clone(),
1554            ),
1555            profile: ProfileRef {
1556                provider: request.provider.clone(),
1557                name: request.profile.clone(),
1558            },
1559            workspace_sync: WorkspaceSync { host_path: cwd },
1560            cwd: request.cwd.clone(),
1561        },
1562    )
1563    .await
1564    .with_context(|| {
1565        format!(
1566            "failed to open sandbox profile `{}` with provider `{}`",
1567            request.profile, request.provider
1568        )
1569    })?;
1570
1571    let session = sandbox_env.session().clone();
1572    let mut isolated_input = input;
1573    isolated_input.context.cwd = sandbox_env.cwd().map(|path| PathBuf::from(path.as_str()));
1574    isolated_input.environment = Arc::new(sandbox_env.clone());
1575    let isolation_info = AgentRunIsolation {
1576        kind: "sandbox".to_string(),
1577        branch: None,
1578        worktree_path: None,
1579        cwd: session.cwd.clone(),
1580        profile: Some(request.profile.clone()),
1581        provider: Some(request.provider.clone()),
1582        session_id: Some(session.id.clone()),
1583    };
1584
1585    let mut result = run_agent_with_schema_validation(provider, isolated_input).await;
1586    if let Ok(result) = &mut result {
1587        result.isolation = Some(isolation_info);
1588    }
1589
1590    if let Err(error) = sandbox_env.close().await {
1591        log::warn!("failed to close sandbox-isolated agent session: {error:#}");
1592    }
1593
1594    result
1595}
1596
1597fn requests_worktree_isolation(options: &Option<Value>) -> bool {
1598    options
1599        .as_ref()
1600        .and_then(|options| options.get("isolation"))
1601        .and_then(Value::as_str)
1602        == Some("worktree")
1603}
1604
1605#[derive(Debug, Clone)]
1606struct SandboxIsolationRequest {
1607    provider: String,
1608    profile: String,
1609    cwd: Option<String>,
1610}
1611
1612fn sandbox_isolation_request(
1613    options: &Option<Value>,
1614) -> anyhow::Result<Option<SandboxIsolationRequest>> {
1615    let Some(isolation) = options
1616        .as_ref()
1617        .and_then(|options| options.get("isolation"))
1618    else {
1619        return Ok(None);
1620    };
1621    let Some(object) = isolation.as_object() else {
1622        return Ok(None);
1623    };
1624    if object.get("type").and_then(Value::as_str) != Some("sandbox") {
1625        return Ok(None);
1626    }
1627    let profile = object
1628        .get("profile")
1629        .and_then(Value::as_str)
1630        .ok_or_else(|| anyhow!("agent sandbox isolation requires isolation.profile"))?
1631        .to_string();
1632    let cwd = object
1633        .get("cwd")
1634        .and_then(Value::as_str)
1635        .map(ToString::to_string);
1636    let (provider, profile) = parse_sandbox_profile_ref(&profile)?;
1637    Ok(Some(SandboxIsolationRequest {
1638        provider,
1639        profile,
1640        cwd,
1641    }))
1642}
1643
1644fn parse_sandbox_profile_ref(value: &str) -> anyhow::Result<(String, String)> {
1645    let (provider, profile) = value.split_once('/').ok_or_else(|| {
1646        anyhow!("agent sandbox isolation.profile must use <provider>/<profile>, got `{value}`")
1647    })?;
1648    validate_sandbox_provider_name(provider)?;
1649    if profile.is_empty() {
1650        bail!(
1651            "agent sandbox isolation.profile must include a non-empty provider-local profile name"
1652        );
1653    }
1654    Ok((provider.to_string(), profile.to_string()))
1655}
1656
1657fn validate_sandbox_provider_name(provider: &str) -> anyhow::Result<()> {
1658    let valid = !provider.is_empty()
1659        && provider
1660            .bytes()
1661            .all(|byte| byte.is_ascii_lowercase() || byte.is_ascii_digit() || byte == b'-')
1662        && provider
1663            .as_bytes()
1664            .first()
1665            .is_some_and(u8::is_ascii_alphanumeric)
1666        && provider
1667            .as_bytes()
1668            .last()
1669            .is_some_and(u8::is_ascii_alphanumeric);
1670    if !valid {
1671        bail!("invalid sandbox provider name `{provider}`; expected lowercase letters, digits, and hyphens, starting and ending with an alphanumeric character");
1672    }
1673    Ok(())
1674}
1675
1676fn sandbox_provider_program(request: &SandboxIsolationRequest) -> String {
1677    format!("smol-sandbox-{}", request.provider)
1678}
1679
1680struct WorktreeIsolation {
1681    repo_root: PathBuf,
1682    worktree_root: PathBuf,
1683    cwd: PathBuf,
1684    branch_name: String,
1685    cleaned: bool,
1686    _temp_dir: tempfile::TempDir,
1687}
1688
1689impl WorktreeIsolation {
1690    fn create(cwd: Option<&Path>) -> anyhow::Result<Self> {
1691        let cwd = cwd
1692            .map(Path::to_path_buf)
1693            .unwrap_or(std::env::current_dir()?)
1694            .canonicalize()
1695            .context("failed to canonicalize workflow cwd for worktree isolation")?;
1696        let repo_root = git_output(&cwd, &["rev-parse", "--show-toplevel"]).context(
1697            "agent isolation='worktree' requires the workflow cwd to be inside a git repository",
1698        )?;
1699        let repo_root = PathBuf::from(repo_root.trim())
1700            .canonicalize()
1701            .context("failed to canonicalize git repository root for worktree isolation")?;
1702        let relative_cwd = cwd.strip_prefix(&repo_root).with_context(|| {
1703            format!(
1704                "workflow cwd {} is not under git repository root {}",
1705                cwd.display(),
1706                repo_root.display()
1707            )
1708        })?;
1709
1710        let temp_dir = tempfile::Builder::new()
1711            .prefix("smol-wf-agent-worktree-")
1712            .tempdir()
1713            .context("failed to create temp directory for agent worktree isolation")?;
1714        let worktree_root = temp_dir.path().join("worktree");
1715        let worktree_arg = path_arg(&worktree_root);
1716        let branch_name = format!(
1717            "smol-wf/agent-run/{}",
1718            ulid::Ulid::new().to_string().to_ascii_lowercase()
1719        );
1720        git_status(
1721            &repo_root,
1722            &[
1723                "worktree",
1724                "add",
1725                "--quiet",
1726                "-b",
1727                &branch_name,
1728                &worktree_arg,
1729                "HEAD",
1730            ],
1731        )
1732        .context("failed to create isolated git worktree for agent run")?;
1733        let isolated_cwd = if relative_cwd.as_os_str().is_empty() {
1734            worktree_root.clone()
1735        } else {
1736            worktree_root.join(relative_cwd)
1737        };
1738        Ok(Self {
1739            repo_root,
1740            worktree_root,
1741            cwd: isolated_cwd,
1742            branch_name,
1743            cleaned: false,
1744            _temp_dir: temp_dir,
1745        })
1746    }
1747
1748    fn info(&self) -> AgentRunIsolation {
1749        AgentRunIsolation {
1750            kind: "worktree".to_string(),
1751            branch: Some(self.branch_name.clone()),
1752            worktree_path: Some(path_arg(&self.worktree_root)),
1753            cwd: Some(path_arg(&self.cwd)),
1754            profile: None,
1755            provider: None,
1756            session_id: None,
1757        }
1758    }
1759
1760    fn cleanup(mut self) -> anyhow::Result<()> {
1761        self.remove_worktree()?;
1762        self.delete_branch()?;
1763        self.cleaned = true;
1764        Ok(())
1765    }
1766
1767    fn remove_worktree(&self) -> anyhow::Result<()> {
1768        let worktree_arg = path_arg(&self.worktree_root);
1769        git_status(
1770            &self.repo_root,
1771            &["worktree", "remove", "--force", &worktree_arg],
1772        )
1773        .context("failed to remove isolated git worktree")
1774    }
1775
1776    fn delete_branch(&self) -> anyhow::Result<()> {
1777        git_status(&self.repo_root, &["branch", "-D", &self.branch_name])
1778            .context("failed to delete isolated agent worktree branch")
1779    }
1780}
1781
1782impl Drop for WorktreeIsolation {
1783    fn drop(&mut self) {
1784        if !self.cleaned {
1785            if let Err(error) = self.remove_worktree() {
1786                log::warn!("failed to cleanup isolated agent worktree during drop: {error:#}");
1787            }
1788            if let Err(error) = self.delete_branch() {
1789                log::warn!(
1790                    "failed to delete isolated agent worktree branch during drop: {error:#}"
1791                );
1792            }
1793        }
1794    }
1795}
1796
1797fn path_arg(path: &Path) -> String {
1798    path.to_string_lossy().into_owned()
1799}
1800
1801fn git_output(cwd: &Path, args: &[&str]) -> anyhow::Result<String> {
1802    let output = StdCommand::new("git")
1803        .args(args)
1804        .current_dir(cwd)
1805        .output()
1806        .with_context(|| format!("failed to run git {}", args.join(" ")))?;
1807    if output.status.success() {
1808        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
1809    } else {
1810        bail!(
1811            "git {} failed with {}{}",
1812            args.join(" "),
1813            status_text(output.status.code()),
1814            command_stderr(&output.stderr)
1815        )
1816    }
1817}
1818
1819fn git_status(cwd: &Path, args: &[&str]) -> anyhow::Result<()> {
1820    let output = StdCommand::new("git")
1821        .args(args)
1822        .current_dir(cwd)
1823        .output()
1824        .with_context(|| format!("failed to run git {}", args.join(" ")))?;
1825    if output.status.success() {
1826        Ok(())
1827    } else {
1828        bail!(
1829            "git {} failed with {}{}",
1830            args.join(" "),
1831            status_text(output.status.code()),
1832            command_stderr(&output.stderr)
1833        )
1834    }
1835}
1836
1837fn status_text(code: Option<i32>) -> String {
1838    code.map(|code| format!("code {code}"))
1839        .unwrap_or_else(|| "signal".to_string())
1840}
1841
1842fn command_stderr(stderr: &[u8]) -> String {
1843    let stderr = String::from_utf8_lossy(stderr);
1844    let stderr = stderr.trim();
1845    if stderr.is_empty() {
1846        String::new()
1847    } else {
1848        format!(": {stderr}")
1849    }
1850}
1851
1852async fn run_agent_with_schema_validation(
1853    provider: Arc<dyn AgentProvider>,
1854    input: AgentProviderRunInput,
1855) -> anyhow::Result<AgentProviderResult> {
1856    let Some(schema) = input
1857        .options
1858        .as_ref()
1859        .and_then(|options| options.get("schema"))
1860        .cloned()
1861    else {
1862        return provider.run(input).await;
1863    };
1864
1865    let max_attempts = 2;
1866    let original_prompt = input.prompt.clone();
1867    let mut attempt_input = input;
1868    let mut last_errors = Vec::new();
1869
1870    for attempt in 1..=max_attempts {
1871        let result = provider.run(attempt_input.clone()).await?;
1872        match validate_structured_output(&schema, &result.output) {
1873            Ok(()) => return Ok(result),
1874            Err(errors) => {
1875                last_errors = errors;
1876                if attempt < max_attempts {
1877                    attempt_input.prompt =
1878                        with_structured_output_retry_prompt(&original_prompt, &last_errors);
1879                }
1880            }
1881        }
1882    }
1883
1884    bail!(
1885        "{}",
1886        format_structured_output_validation_error(&last_errors)
1887    )
1888}
1889
1890fn validate_structured_output(schema: &Value, output: &Value) -> Result<(), Vec<String>> {
1891    let validator = jsonschema::validator_for(schema)
1892        .map_err(|error| vec![format!("/ schema is invalid: {}", error)])?;
1893    let errors = validator
1894        .iter_errors(output)
1895        .map(|error| {
1896            let path = error.instance_path().to_string();
1897            let path = if path.is_empty() {
1898                "/".to_string()
1899            } else {
1900                path
1901            };
1902            format!("{path} {error}")
1903        })
1904        .collect::<Vec<_>>();
1905
1906    if errors.is_empty() {
1907        Ok(())
1908    } else {
1909        Err(errors)
1910    }
1911}
1912
1913fn format_structured_output_validation_error(errors: &[String]) -> String {
1914    format!(
1915        "Structured output did not match JSON Schema: {}",
1916        errors.join("; ")
1917    )
1918}
1919
1920fn with_structured_output_retry_prompt(prompt: &str, errors: &[String]) -> String {
1921    let mut lines = vec![
1922        prompt.to_string(),
1923        String::new(),
1924        "Previous structured output failed JSON Schema validation.".to_string(),
1925        "Return a corrected structured output that satisfies the original JSON Schema.".to_string(),
1926        "Validation errors:".to_string(),
1927    ];
1928    lines.extend(errors.iter().map(|error| format!("- {error}")));
1929    lines.join("\n")
1930}
1931
1932#[derive(Debug, Clone, PartialEq, Eq)]
1933struct ResolvedModelSelector {
1934    requested: String,
1935    selector: String,
1936    model_id: String,
1937    model_provider: Option<String>,
1938    thinking: Option<String>,
1939}
1940
1941impl ResolvedModelSelector {
1942    fn provider_model(&self) -> String {
1943        match &self.model_provider {
1944            Some(provider) => format!("{provider}/{}", self.model_id),
1945            None => self.model_id.clone(),
1946        }
1947    }
1948}
1949
1950fn resolve_model_options(
1951    options: Option<Value>,
1952    agent_provider: &str,
1953    model_map: &BTreeMap<String, String>,
1954) -> anyhow::Result<Option<Value>> {
1955    let Some(model) = options
1956        .as_ref()
1957        .and_then(Value::as_object)
1958        .and_then(|object| object.get("model"))
1959        .and_then(Value::as_str)
1960        .map(ToString::to_string)
1961    else {
1962        return Ok(options);
1963    };
1964
1965    let mapped_selector = model_map.get(&model).cloned();
1966    let alias_matched = mapped_selector.is_some();
1967    let selector = mapped_selector.unwrap_or_else(|| model.clone());
1968    let resolved = parse_model_selector(&model, &selector)?;
1969    validate_model_selector_for_provider(&resolved, agent_provider)?;
1970
1971    let mut object = options
1972        .and_then(|value| value.as_object().cloned())
1973        .unwrap_or_default();
1974    object.insert(
1975        "model".to_string(),
1976        Value::String(resolved.provider_model()),
1977    );
1978
1979    let selector_has_extra_parts = alias_matched
1980        || resolved.selector.contains('?')
1981        || resolved.model_provider.is_some()
1982        || resolved.thinking.is_some();
1983    if selector_has_extra_parts {
1984        object.insert(
1985            "requestedModel".to_string(),
1986            Value::String(resolved.requested.clone()),
1987        );
1988        object.insert(
1989            "modelSelector".to_string(),
1990            Value::String(resolved.selector.clone()),
1991        );
1992    } else {
1993        object.remove("requestedModel");
1994        object.remove("modelSelector");
1995    }
1996
1997    if let Some(provider) = resolved.model_provider {
1998        object.insert("modelProvider".to_string(), Value::String(provider));
1999    } else {
2000        object.remove("modelProvider");
2001    }
2002    if let Some(thinking) = resolved.thinking {
2003        object.insert("thinking".to_string(), Value::String(thinking));
2004    } else {
2005        object.remove("thinking");
2006    }
2007    Ok(Some(Value::Object(object)))
2008}
2009
2010fn parse_model_selector(requested: &str, selector: &str) -> anyhow::Result<ResolvedModelSelector> {
2011    let (model_part, query) = selector.split_once('?').unwrap_or((selector, ""));
2012    if model_part.trim().is_empty() {
2013        bail!("model selector must include a model id: {selector}");
2014    }
2015
2016    let (slash_provider, model_id) = match model_part.split_once('/') {
2017        Some((provider, model_id)) if !provider.is_empty() && !model_id.is_empty() => {
2018            (Some(provider.to_string()), model_id.to_string())
2019        }
2020        Some(_) => bail!("model selector provider/model form is invalid: {selector}"),
2021        None => (None, model_part.to_string()),
2022    };
2023
2024    let mut query_provider = None::<String>;
2025    let mut thinking = None::<String>;
2026    if !query.is_empty() {
2027        for pair in query.split('&') {
2028            if pair.is_empty() {
2029                continue;
2030            }
2031            let (key, value) = pair.split_once('=').ok_or_else(|| {
2032                anyhow!("model selector query parameter must use key=value: {pair}")
2033            })?;
2034            let key = percent_decode(key)?;
2035            let value = percent_decode(value)?;
2036            if value.is_empty() {
2037                bail!("model selector query parameter `{key}` must not be empty");
2038            }
2039            match key.as_str() {
2040                "provider" => set_unique_query_value(&mut query_provider, key, value)?,
2041                "thinking" => set_unique_query_value(&mut thinking, key, value)?,
2042                _ => bail!("unknown model selector query parameter `{key}`"),
2043            }
2044        }
2045    }
2046
2047    let model_provider = match (slash_provider, query_provider) {
2048        (Some(slash), Some(query)) if slash != query => bail!(
2049            "conflicting model provider qualifiers in selector `{selector}`: `{slash}` and `{query}`"
2050        ),
2051        (Some(provider), Some(_)) | (Some(provider), None) | (None, Some(provider)) => {
2052            Some(provider)
2053        }
2054        (None, None) => None,
2055    };
2056
2057    Ok(ResolvedModelSelector {
2058        requested: requested.to_string(),
2059        selector: selector.to_string(),
2060        model_id,
2061        model_provider,
2062        thinking,
2063    })
2064}
2065
2066fn set_unique_query_value(
2067    target: &mut Option<String>,
2068    key: String,
2069    value: String,
2070) -> anyhow::Result<()> {
2071    if target.replace(value).is_some() {
2072        bail!("duplicate model selector query parameter `{key}`");
2073    }
2074    Ok(())
2075}
2076
2077fn percent_decode(value: &str) -> anyhow::Result<String> {
2078    let bytes = value.as_bytes();
2079    let mut output = Vec::with_capacity(bytes.len());
2080    let mut index = 0;
2081    while index < bytes.len() {
2082        match bytes[index] {
2083            b'%' => {
2084                if index + 2 >= bytes.len() {
2085                    bail!("invalid percent escape in model selector query: {value}");
2086                }
2087                let high = hex_value(bytes[index + 1]).ok_or_else(|| {
2088                    anyhow!("invalid percent escape in model selector query: {value}")
2089                })?;
2090                let low = hex_value(bytes[index + 2]).ok_or_else(|| {
2091                    anyhow!("invalid percent escape in model selector query: {value}")
2092                })?;
2093                output.push((high << 4) | low);
2094                index += 3;
2095            }
2096            b'+' => {
2097                output.push(b' ');
2098                index += 1;
2099            }
2100            byte => {
2101                output.push(byte);
2102                index += 1;
2103            }
2104        }
2105    }
2106    String::from_utf8(output).context("model selector query is not valid UTF-8")
2107}
2108
2109fn hex_value(byte: u8) -> Option<u8> {
2110    match byte {
2111        b'0'..=b'9' => Some(byte - b'0'),
2112        b'a'..=b'f' => Some(byte - b'a' + 10),
2113        b'A'..=b'F' => Some(byte - b'A' + 10),
2114        _ => None,
2115    }
2116}
2117
2118fn validate_model_selector_for_provider(
2119    resolved: &ResolvedModelSelector,
2120    agent_provider: &str,
2121) -> anyhow::Result<()> {
2122    match agent_provider {
2123        "codex" => {
2124            if resolved.model_provider.is_some() {
2125                bail!("Codex model selectors do not support ?provider=... or provider/model form");
2126            }
2127            if resolved.thinking.is_some() {
2128                bail!("Codex model selectors do not support thinking=...");
2129            }
2130        }
2131        "claude-code" if resolved.model_provider.is_some() => {
2132            bail!(
2133                "Claude Code model selectors do not support ?provider=... or provider/model form"
2134            );
2135        }
2136        "opencode" if resolved.model_provider.is_none() => {
2137            bail!("OpenCode model selectors must use provider/model or ?provider=...");
2138        }
2139        "debug" | "pi" => {}
2140        _ => {}
2141    }
2142    Ok(())
2143}
2144
2145fn apply_phase_defaults(options: Option<Value>, metadata: &WorkflowMetadata) -> Option<Value> {
2146    let phase_name = options
2147        .as_ref()
2148        .and_then(|options| options.get("phase"))
2149        .and_then(Value::as_str)
2150        .map(ToString::to_string);
2151    let phase_metadata = phase_name.as_ref().and_then(|phase_name| {
2152        metadata
2153            .phases
2154            .iter()
2155            .find(|phase| phase.title == *phase_name)
2156    });
2157
2158    if phase_name.is_none() && phase_metadata.is_none() {
2159        return options;
2160    }
2161
2162    let mut object = options
2163        .and_then(|value| value.as_object().cloned())
2164        .unwrap_or_default();
2165
2166    if let Some(phase_name) = phase_name {
2167        object
2168            .entry("phase".to_string())
2169            .or_insert(Value::String(phase_name));
2170    }
2171    if let Some(model) = phase_metadata.and_then(|phase| phase.model.clone()) {
2172        object
2173            .entry("model".to_string())
2174            .or_insert(Value::String(model));
2175    }
2176    if let Some(provider) = phase_metadata.and_then(|phase| phase.provider.clone()) {
2177        object
2178            .entry("provider".to_string())
2179            .or_insert(Value::String(provider));
2180    }
2181
2182    Some(Value::Object(object))
2183}
2184
2185fn resolve_relative_script(current_script_path: &Path, script_path: &str) -> PathBuf {
2186    let script_path = PathBuf::from(script_path);
2187    if script_path.is_absolute() {
2188        script_path
2189    } else {
2190        current_script_path
2191            .parent()
2192            .unwrap_or_else(|| Path::new("."))
2193            .join(script_path)
2194    }
2195}
2196
2197fn resolve_named_workflow(name: &str) -> anyhow::Result<PathBuf> {
2198    let workflows_dir = PathBuf::from(".claude/workflows");
2199    for entry in fs::read_dir(&workflows_dir).unwrap_or_else(|_| fs::read_dir(".").unwrap()) {
2200        let entry = entry?;
2201        let path = entry.path();
2202        if path.extension().and_then(|extension| extension.to_str()) != Some("js") {
2203            continue;
2204        }
2205        if read_workflow_metadata(&path)?.is_some_and(|metadata| metadata.name == name) {
2206            return Ok(path);
2207        }
2208    }
2209    bail!("Unknown workflow: {name}")
2210}