Skip to main content

smol_workflow_engine/
workflow.rs

1use crate::agent_providers::{
2    create_agent_provider, AgentProvider, AgentProviderContext, AgentProviderResult,
3    AgentProviderRunInput, AgentRunIsolation, AgentUsage, AgentUsageCost,
4};
5use crate::environment::{
6    AgentExecutionEnvironment, EnvironmentPath, ExecRequest, LocalExecutionEnvironment,
7    NullExecEventSink, SandboxExecutionEnvironment,
8};
9use crate::js_runtime::rquickjs::RQuickJSWorkflowRuntime;
10use crate::js_runtime::{
11    WorkflowBudgetSnapshot, WorkflowJSRuntime, WorkflowModuleInput, WorkflowModuleOutput,
12    WorkflowRef, WorkflowRuntimeCall, WorkflowRuntimeExecution, WorkflowRuntimePoll,
13    WorkflowRuntimeRequest, WorkflowRuntimeRequestResolution, WorkflowSandboxExecOutput,
14    WorkflowSandboxExecRequest,
15};
16use crate::metadata::{read_workflow_metadata, WorkflowMetadata};
17use anyhow::{anyhow, bail, Context};
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20use smol_workflow_sandbox::{
21    Metadata as SandboxMetadata, OpenSandboxRequest, ProfileRef, WorkspaceSync,
22};
23use std::collections::{BTreeMap, BTreeSet, VecDeque};
24use std::fs;
25use std::path::{Path, PathBuf};
26use std::process::Command as StdCommand;
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29use tokio::sync::{mpsc, watch};
30use tokio::task::{JoinSet, LocalSet};
31
32pub use crate::events::{
33    WorkflowEvent, WorkflowEventMetadata, WorkflowEventSink, WorkflowEventType,
34};
35
36#[async_trait::async_trait]
37pub trait AgentSessionLogSink: Send + Sync {
38    async fn write_agent_result(
39        &self,
40        provider: &str,
41        result: &AgentProviderResult,
42    ) -> anyhow::Result<()>;
43}
44
45#[async_trait::async_trait]
46pub trait WorkflowAgentRunner: Send + Sync {
47    async fn run_agent(
48        &self,
49        default_provider: Arc<dyn AgentProvider>,
50        provider_override: Option<String>,
51        input: AgentProviderRunInput,
52    ) -> anyhow::Result<AgentProviderResult>;
53
54    /// Whether the workflow scheduler should wrap this runner's `run_agent` call
55    /// in the per-agent retry loop.
56    ///
57    /// Most runners should use the default `true`: their `run_agent` method is a
58    /// single agent/provider boundary, so retrying it is safe and keeps retry
59    /// behavior centralized in the runtime scheduler.
60    ///
61    /// Runners that perform their own checkpointing or replay should return
62    /// `false` and apply retry internally around the nondeterministic provider
63    /// call. For example, the SQLite durable runner must not have the scheduler
64    /// retry its whole `run_agent` method, because each call advances durable
65    /// occurrence counters and may create a distinct checkpoint such as
66    /// `step:sig_x#2` instead of retrying the original durable step.
67    fn retry_in_runtime(&self) -> bool {
68        true
69    }
70
71    async fn sleep(&self, duration_ms: u64) -> anyhow::Result<()> {
72        tokio::time::sleep(std::time::Duration::from_millis(duration_ms)).await;
73        Ok(())
74    }
75}
76
77#[derive(Debug, Default)]
78pub struct DirectWorkflowAgentRunner;
79
80#[async_trait::async_trait]
81impl WorkflowAgentRunner for DirectWorkflowAgentRunner {
82    async fn run_agent(
83        &self,
84        default_provider: Arc<dyn AgentProvider>,
85        provider_override: Option<String>,
86        input: AgentProviderRunInput,
87    ) -> anyhow::Result<AgentProviderResult> {
88        run_agent_provider(default_provider, provider_override, input).await
89    }
90}
91
92pub struct RunWorkflowOptions {
93    pub script_path: PathBuf,
94    /// Host working directory used for agent provider cwd, worktree isolation,
95    /// and sandbox workspace sync. Defaults to the workflow script directory.
96    pub workflow_cwd: Option<PathBuf>,
97    pub args: Value,
98    pub agent_provider: Arc<dyn AgentProvider>,
99    pub model_map: BTreeMap<String, String>,
100    pub budget_total: Option<u64>,
101    pub budget_spent: u64,
102    pub nesting_depth: usize,
103    pub max_parallel_agent_requests: Option<usize>,
104    pub agent_runner: Option<Arc<dyn WorkflowAgentRunner>>,
105    pub cancel_rx: Option<watch::Receiver<bool>>,
106    pub event_sink: Option<Arc<dyn WorkflowEventSink>>,
107    pub event_parent_step_id: Option<String>,
108    pub event_stream_start: Option<Instant>,
109    pub session_log_sink: Option<Arc<dyn AgentSessionLogSink>>,
110}
111
112#[derive(Debug)]
113pub struct RunWorkflowResult {
114    pub output: WorkflowModuleOutput,
115    pub logs: Vec<Vec<Value>>,
116    pub phases: Vec<WorkflowPhaseCall>,
117    pub agent_calls: Vec<WorkflowRuntimeRequest>,
118    pub workflow_calls: Vec<WorkflowRuntimeRequest>,
119    pub budget: WorkflowBudgetSnapshot,
120    pub token_usage: WorkflowTokenUsage,
121    pub token_usage_by_phase: std::collections::BTreeMap<String, WorkflowTokenUsage>,
122    pub agent_runs: Vec<WorkflowAgentRunSummary>,
123}
124
125#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
126#[serde(rename_all = "camelCase")]
127pub struct WorkflowTokenUsage {
128    pub input_tokens: u64,
129    pub output_tokens: u64,
130    pub cache_read_tokens: u64,
131    pub cache_write_tokens: u64,
132    pub total_tokens: u64,
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub cost: Option<AgentUsageCost>,
135}
136
137#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
138#[serde(rename_all = "camelCase")]
139pub struct WorkflowAgentRunSummary {
140    pub id: String,
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub phase: Option<String>,
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub provider: Option<String>,
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub model: Option<String>,
147    #[serde(skip_serializing_if = "Option::is_none")]
148    pub provider_session_id: Option<String>,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub usage: Option<AgentUsage>,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub isolation: Option<AgentRunIsolation>,
153}
154
155#[derive(Debug, Clone, PartialEq)]
156pub struct WorkflowPhaseCall {
157    pub name: String,
158    pub options: Option<Value>,
159}
160
161pub async fn run_workflow(options: RunWorkflowOptions) -> anyhow::Result<RunWorkflowResult> {
162    LocalSet::new().run_until(run_workflow_inner(options)).await
163}
164
165async fn run_workflow_inner(options: RunWorkflowOptions) -> anyhow::Result<RunWorkflowResult> {
166    log::debug!(
167        "run_workflow start script={} provider={} nesting_depth={} budget_total={:?} budget_spent={}",
168        options.script_path.display(),
169        options.agent_provider.name(),
170        options.nesting_depth,
171        options.budget_total,
172        options.budget_spent
173    );
174    let script_path = fs::canonicalize(&options.script_path).with_context(|| {
175        format!(
176            "failed to resolve workflow script {}",
177            options.script_path.display()
178        )
179    })?;
180    let metadata = read_workflow_metadata(&script_path)?.ok_or_else(|| {
181        anyhow!("Workflow script must export valid literal metadata as `export const meta = {{ name, description, ... }}`")
182    })?;
183    log::debug!(
184        "workflow metadata loaded name={} phases={}",
185        metadata.name,
186        metadata.phases.len()
187    );
188    let workflow_cwd = match options.workflow_cwd {
189        Some(cwd) => Some(
190            fs::canonicalize(&cwd)
191                .with_context(|| format!("failed to resolve workflow cwd {}", cwd.display()))?,
192        ),
193        None => script_path.parent().map(Path::to_path_buf),
194    };
195    let source = fs::read_to_string(&script_path)
196        .with_context(|| format!("failed to read workflow script {}", script_path.display()))?;
197    let runtime = RQuickJSWorkflowRuntime::new();
198    let execution = runtime.start_module(WorkflowModuleInput {
199        source,
200        source_name: script_path.to_string_lossy().into_owned(),
201        args: options.args,
202        budget: WorkflowBudgetSnapshot {
203            total: options.budget_total,
204            spent: options.budget_spent,
205        },
206        sandbox: Default::default(),
207    })?;
208
209    let (js_commands, js_command_rx) = mpsc::channel::<JsCommand>(64);
210    let (js_event_tx, mut js_events) = mpsc::channel::<JsEvent>(64);
211    let js_task = tokio::task::spawn_local(js_runtime_actor(execution, js_command_rx, js_event_tx));
212
213    let emit_lifecycle_events = options.event_sink.is_some();
214    let event_start = options.event_stream_start.unwrap_or_else(Instant::now);
215
216    let mut state = RunState {
217        script_path,
218        workflow_cwd,
219        metadata,
220        event_start,
221        agent_provider: options.agent_provider,
222        model_map: options.model_map,
223        logs: Vec::new(),
224        phases: Vec::new(),
225        agent_calls: Vec::new(),
226        workflow_calls: Vec::new(),
227        budget: WorkflowBudgetSnapshot {
228            total: options.budget_total,
229            spent: options.budget_spent,
230        },
231        token_usage: WorkflowTokenUsage::default(),
232        token_usage_by_phase: Default::default(),
233        agent_runs: Vec::new(),
234        active_request_ids: BTreeSet::new(),
235        nesting_depth: options.nesting_depth,
236        max_parallel_agent_requests: options.max_parallel_agent_requests,
237        agent_runner: options
238            .agent_runner
239            .unwrap_or_else(|| Arc::new(DirectWorkflowAgentRunner)),
240        cancel_rx: options.cancel_rx,
241        event_sink: options.event_sink,
242        event_parent_step_id: options.event_parent_step_id,
243        session_log_sink: options.session_log_sink,
244    };
245
246    let mut pending_requests = VecDeque::<WorkflowRuntimeRequest>::new();
247    let mut agent_tasks = JoinSet::<AgentTaskCompletion>::new();
248    let mut sleep_tasks = JoinSet::<SleepTaskCompletion>::new();
249
250    if emit_lifecycle_events {
251        if let Err(error) = state
252            .emit_event(WorkflowEvent::started(rfc3339_now()?))
253            .await
254        {
255            let _ = send_js_command(&js_commands, JsCommand::Shutdown).await;
256            let _ = js_task.await;
257            return Err(error);
258        }
259    }
260
261    let workflow_result: anyhow::Result<RunWorkflowResult> = loop {
262        if let Err(error) = state
263            .start_pending_requests(
264                &mut pending_requests,
265                &mut agent_tasks,
266                &mut sleep_tasks,
267                &js_commands,
268            )
269            .await
270        {
271            break Err(error);
272        }
273
274        tokio::select! {
275            biased;
276            () = wait_for_cancellation(&mut state.cancel_rx) => {
277                break state.cancel_workflow(
278                    &mut pending_requests,
279                    &mut agent_tasks,
280                    &mut sleep_tasks,
281                    &js_commands,
282                    &mut js_events,
283                ).await;
284            }
285            event = js_events.recv() => {
286                let event = match event {
287                    Some(event) => event,
288                    None => break Err(anyhow!("JavaScript runtime actor stopped unexpectedly")),
289                };
290                match state.handle_js_event(event, &mut pending_requests).await {
291                    Ok(Some(result)) => break Ok(result),
292                    Ok(None) => {}
293                    Err(error) => break Err(error),
294                }
295            }
296            completion = agent_tasks.join_next(), if !agent_tasks.is_empty() => {
297                let completion = match completion {
298                    Some(Ok(completion)) => completion,
299                    Some(Err(error)) => break Err(anyhow!("agent provider task failed: {error}")),
300                    None => break Err(anyhow!("agent task set ended unexpectedly")),
301                };
302                let AgentTaskCompletion { id, input, provider, result } = completion;
303                state.active_request_ids.remove(&id);
304                let resolution = match result {
305                    Ok(result) => match state.apply_agent_result(&id, &input, provider, result).await {
306                        Ok(value) => WorkflowRuntimeRequestResolution::OkWithBudget {
307                            value,
308                            budget: state.budget.clone(),
309                        },
310                        Err(error) => WorkflowRuntimeRequestResolution::Err {
311                            message: error.to_string(),
312                        },
313                    },
314                    Err(error) => {
315                        let message = error.to_string();
316                        if let Err(emit_error) = state.emit_agent_failed_event(&id, provider.as_deref(), &message).await {
317                            log::debug!("failed to emit agent failure event: {emit_error:#}");
318                        }
319                        WorkflowRuntimeRequestResolution::Err { message }
320                    },
321                };
322                if let Err(error) = send_js_command(&js_commands, JsCommand::ResolveRequest { id, resolution }).await {
323                    break Err(error);
324                }
325            }
326            completion = sleep_tasks.join_next(), if !sleep_tasks.is_empty() => {
327                let completion = match completion {
328                    Some(Ok(completion)) => completion,
329                    Some(Err(error)) => break Err(anyhow!("sleep task failed: {error}")),
330                    None => break Err(anyhow!("sleep task set ended unexpectedly")),
331                };
332                let SleepTaskCompletion { id, result } = completion;
333                state.active_request_ids.remove(&id);
334                let resolution = match result {
335                    Ok(()) => WorkflowRuntimeRequestResolution::OkUndefined,
336                    Err(error) => WorkflowRuntimeRequestResolution::Err {
337                        message: error.to_string(),
338                    },
339                };
340                if let Err(error) = send_js_command(&js_commands, JsCommand::ResolveRequest { id, resolution }).await {
341                    break Err(error);
342                }
343            }
344        }
345    };
346
347    let _ = send_js_command(&js_commands, JsCommand::Shutdown).await;
348    let _ = js_task.await;
349
350    if emit_lifecycle_events {
351        match &workflow_result {
352            Ok(result) => {
353                state
354                    .emit_event(WorkflowEvent::result(
355                        result.token_usage.input_tokens,
356                        result.token_usage.output_tokens,
357                        result.token_usage.total_tokens,
358                        result.output.result.clone(),
359                    ))
360                    .await?
361            }
362            Err(error) => {
363                state
364                    .emit_event(WorkflowEvent::error(error.to_string(), None))
365                    .await?;
366            }
367        }
368    }
369
370    workflow_result
371}
372
373enum JsCommand {
374    ResolveRequest {
375        id: String,
376        resolution: WorkflowRuntimeRequestResolution,
377    },
378    Shutdown,
379}
380
381enum JsEvent {
382    Call(WorkflowRuntimeCall),
383    Request(WorkflowRuntimeRequest),
384    Complete(WorkflowModuleOutput),
385    Error(String),
386}
387
388async fn js_runtime_actor(
389    mut execution: Box<dyn WorkflowRuntimeExecution>,
390    mut commands: mpsc::Receiver<JsCommand>,
391    events: mpsc::Sender<JsEvent>,
392) {
393    let mut outstanding_requests = 0usize;
394    loop {
395        match execution.poll() {
396            Ok(WorkflowRuntimePoll::Call(call)) => {
397                if events.send(JsEvent::Call(call)).await.is_err() {
398                    return;
399                }
400            }
401            Ok(WorkflowRuntimePoll::Request(request)) => {
402                let requests = match execution.take_pending_requests() {
403                    Ok(requests) if requests.is_empty() => vec![request],
404                    Ok(requests) => requests,
405                    Err(error) => {
406                        let _ = events.send(JsEvent::Error(error.to_string())).await;
407                        return;
408                    }
409                };
410                outstanding_requests = outstanding_requests.saturating_add(requests.len());
411                for request in requests {
412                    if events.send(JsEvent::Request(request)).await.is_err() {
413                        return;
414                    }
415                }
416            }
417            Ok(WorkflowRuntimePoll::Complete(output)) => {
418                let _ = events.send(JsEvent::Complete(output)).await;
419                return;
420            }
421            Ok(WorkflowRuntimePoll::Pending) => {
422                if outstanding_requests == 0 {
423                    tokio::time::sleep(std::time::Duration::from_millis(1)).await;
424                    continue;
425                }
426                match commands.recv().await {
427                    Some(JsCommand::ResolveRequest { id, resolution }) => {
428                        outstanding_requests = outstanding_requests.saturating_sub(1);
429                        if let Err(error) = execution.resolve_request(&id, resolution) {
430                            let _ = events.send(JsEvent::Error(error.to_string())).await;
431                            return;
432                        }
433                    }
434                    Some(JsCommand::Shutdown) | None => return,
435                }
436            }
437            Err(error) => {
438                let _ = events.send(JsEvent::Error(error.to_string())).await;
439                return;
440            }
441        }
442    }
443}
444
445async fn send_js_command(
446    commands: &mpsc::Sender<JsCommand>,
447    command: JsCommand,
448) -> anyhow::Result<()> {
449    commands
450        .send(command)
451        .await
452        .map_err(|_| anyhow!("JavaScript runtime actor stopped unexpectedly"))
453}
454
455struct RunState {
456    script_path: PathBuf,
457    workflow_cwd: Option<PathBuf>,
458    metadata: WorkflowMetadata,
459    event_start: Instant,
460    agent_provider: Arc<dyn AgentProvider>,
461    model_map: BTreeMap<String, String>,
462    logs: Vec<Vec<Value>>,
463    phases: Vec<WorkflowPhaseCall>,
464    agent_calls: Vec<WorkflowRuntimeRequest>,
465    workflow_calls: Vec<WorkflowRuntimeRequest>,
466    budget: WorkflowBudgetSnapshot,
467    token_usage: WorkflowTokenUsage,
468    token_usage_by_phase: std::collections::BTreeMap<String, WorkflowTokenUsage>,
469    agent_runs: Vec<WorkflowAgentRunSummary>,
470    active_request_ids: BTreeSet<String>,
471    nesting_depth: usize,
472    max_parallel_agent_requests: Option<usize>,
473    agent_runner: Arc<dyn WorkflowAgentRunner>,
474    cancel_rx: Option<watch::Receiver<bool>>,
475    event_sink: Option<Arc<dyn WorkflowEventSink>>,
476    event_parent_step_id: Option<String>,
477    session_log_sink: Option<Arc<dyn AgentSessionLogSink>>,
478}
479
480struct PreparedAgentRun {
481    provider_override: Option<String>,
482    input: AgentProviderRunInput,
483}
484
485struct AgentTaskCompletion {
486    id: String,
487    input: AgentProviderRunInput,
488    provider: Option<String>,
489    result: anyhow::Result<AgentProviderResult>,
490}
491
492struct SleepTaskCompletion {
493    id: String,
494    result: anyhow::Result<()>,
495}
496
497fn add_usage(total: &mut WorkflowTokenUsage, usage: Option<&AgentUsage>) {
498    let Some(usage) = usage else {
499        return;
500    };
501
502    total.input_tokens = total
503        .input_tokens
504        .saturating_add(usage.input_tokens.unwrap_or_default());
505    total.output_tokens = total
506        .output_tokens
507        .saturating_add(usage.output_tokens.unwrap_or_default());
508    total.cache_read_tokens = total
509        .cache_read_tokens
510        .saturating_add(usage.cache_read_tokens.unwrap_or_default());
511    total.cache_write_tokens = total
512        .cache_write_tokens
513        .saturating_add(usage.cache_write_tokens.unwrap_or_default());
514    total.total_tokens = total
515        .total_tokens
516        .saturating_add(usage.total_tokens.unwrap_or_default());
517
518    if let Some(cost) = usage.cost.as_ref() {
519        total.cost = Some(merge_cost(total.cost.as_ref(), cost));
520    }
521}
522
523fn merge_token_usage(total: &mut WorkflowTokenUsage, usage: &WorkflowTokenUsage) {
524    total.input_tokens = total.input_tokens.saturating_add(usage.input_tokens);
525    total.output_tokens = total.output_tokens.saturating_add(usage.output_tokens);
526    total.cache_read_tokens = total
527        .cache_read_tokens
528        .saturating_add(usage.cache_read_tokens);
529    total.cache_write_tokens = total
530        .cache_write_tokens
531        .saturating_add(usage.cache_write_tokens);
532    total.total_tokens = total.total_tokens.saturating_add(usage.total_tokens);
533    if let Some(cost) = usage.cost.as_ref() {
534        total.cost = Some(merge_cost(total.cost.as_ref(), cost));
535    }
536}
537
538fn merge_cost(left: Option<&AgentUsageCost>, right: &AgentUsageCost) -> AgentUsageCost {
539    AgentUsageCost {
540        input: sum_f64(left.and_then(|cost| cost.input), right.input),
541        output: sum_f64(left.and_then(|cost| cost.output), right.output),
542        cache_read: sum_f64(left.and_then(|cost| cost.cache_read), right.cache_read),
543        cache_write: sum_f64(left.and_then(|cost| cost.cache_write), right.cache_write),
544        total: sum_f64(left.and_then(|cost| cost.total), right.total),
545        currency: right
546            .currency
547            .clone()
548            .or_else(|| left.and_then(|cost| cost.currency.clone())),
549    }
550}
551
552fn elapsed_nanos(start: Instant) -> u64 {
553    u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX)
554}
555
556fn rfc3339_now() -> anyhow::Result<String> {
557    Ok(time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339)?)
558}
559
560fn raw_agent_event_payloads(raw: &Value) -> Vec<Value> {
561    if let Some(events) = raw.get("events").and_then(Value::as_array) {
562        events.clone()
563    } else if let Some(items) = raw.as_array() {
564        items.clone()
565    } else {
566        vec![raw.clone()]
567    }
568}
569
570fn agent_session_event_payload(provider_event: Value, metadata: &WorkflowEventMetadata) -> Value {
571    let mut payload = serde_json::Map::new();
572    if let Some(provider) = metadata.provider.as_ref() {
573        payload.insert("provider".to_string(), Value::String(provider.clone()));
574    }
575    if let Some(session_id) = metadata.session_id.as_ref() {
576        payload.insert("sessionId".to_string(), Value::String(session_id.clone()));
577    }
578    if let Some(run_id) = metadata.run_id.as_ref() {
579        payload.insert("runId".to_string(), Value::String(run_id.clone()));
580    }
581    if let Some(step_id) = metadata.step_id.as_ref() {
582        payload.insert("stepId".to_string(), Value::String(step_id.clone()));
583    }
584    payload.insert("providerEvent".to_string(), provider_event);
585    Value::Object(payload)
586}
587
588fn truncate_for_event(value: &str, max_chars: usize) -> String {
589    let mut chars = value.chars();
590    let truncated = chars.by_ref().take(max_chars).collect::<String>();
591    if chars.next().is_some() {
592        format!("{truncated}…")
593    } else {
594        truncated
595    }
596}
597
598fn format_log_message(values: &[Value]) -> String {
599    values
600        .iter()
601        .map(|value| match value {
602            Value::String(value) => value.clone(),
603            value => serde_json::to_string(value).unwrap_or_else(|_| String::from("<unprintable>")),
604        })
605        .collect::<Vec<_>>()
606        .join(" ")
607}
608
609fn sum_f64(left: Option<f64>, right: Option<f64>) -> Option<f64> {
610    match (left, right) {
611        (None, None) => None,
612        (left, right) => Some(left.unwrap_or_default() + right.unwrap_or_default()),
613    }
614}
615
616async fn wait_for_cancellation(cancel_rx: &mut Option<watch::Receiver<bool>>) {
617    let Some(cancel_rx) = cancel_rx else {
618        std::future::pending::<()>().await;
619        return;
620    };
621    while !*cancel_rx.borrow() {
622        if cancel_rx.changed().await.is_err() {
623            return;
624        }
625    }
626}
627
628impl RunState {
629    async fn handle_js_event(
630        &mut self,
631        event: JsEvent,
632        pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
633    ) -> anyhow::Result<Option<RunWorkflowResult>> {
634        match event {
635            JsEvent::Call(call) => self.handle_call(call).await?,
636            JsEvent::Request(request) => {
637                log::debug!(
638                    "workflow runtime request id={} kind={}",
639                    request.id(),
640                    request.kind()
641                );
642                pending_requests.push_back(request);
643            }
644            JsEvent::Complete(output) => {
645                log::debug!(
646                    "run_workflow complete script={} budget_spent={}",
647                    self.script_path.display(),
648                    self.budget.spent
649                );
650                return Ok(Some(RunWorkflowResult {
651                    output,
652                    logs: std::mem::take(&mut self.logs),
653                    phases: std::mem::take(&mut self.phases),
654                    agent_calls: std::mem::take(&mut self.agent_calls),
655                    workflow_calls: std::mem::take(&mut self.workflow_calls),
656                    budget: self.budget.clone(),
657                    token_usage: std::mem::take(&mut self.token_usage),
658                    token_usage_by_phase: std::mem::take(&mut self.token_usage_by_phase),
659                    agent_runs: std::mem::take(&mut self.agent_runs),
660                }));
661            }
662            JsEvent::Error(message) => bail!(message),
663        }
664        Ok(None)
665    }
666
667    async fn start_pending_requests(
668        &mut self,
669        pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
670        agent_tasks: &mut JoinSet<AgentTaskCompletion>,
671        sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
672        js_commands: &mpsc::Sender<JsCommand>,
673    ) -> anyhow::Result<()> {
674        loop {
675            let Some(request) = pending_requests.front() else {
676                return Ok(());
677            };
678            if matches!(request, WorkflowRuntimeRequest::Agent { .. })
679                && !self.agent_capacity_available(agent_tasks.len())
680            {
681                return Ok(());
682            }
683
684            let request = pending_requests
685                .pop_front()
686                .expect("pending request should exist");
687            match request {
688                WorkflowRuntimeRequest::Agent { .. } => match self.prepare_agent_request(request) {
689                    Ok((id, prepared)) => {
690                        self.emit_agent_started_event(&id, &prepared).await?;
691                        self.spawn_agent_task(agent_tasks, id, prepared);
692                    }
693                    Err((id, error)) => {
694                        send_js_command(
695                            js_commands,
696                            JsCommand::ResolveRequest {
697                                id,
698                                resolution: WorkflowRuntimeRequestResolution::Err {
699                                    message: error.to_string(),
700                                },
701                            },
702                        )
703                        .await?;
704                    }
705                },
706                WorkflowRuntimeRequest::Sleep { id, duration_ms } => {
707                    self.spawn_sleep_task(sleep_tasks, id, duration_ms);
708                }
709                WorkflowRuntimeRequest::Workflow {
710                    id,
711                    workflow_ref,
712                    args,
713                } => {
714                    self.workflow_calls.push(WorkflowRuntimeRequest::Workflow {
715                        id: id.clone(),
716                        workflow_ref: workflow_ref.clone(),
717                        args: args.clone(),
718                    });
719                    let parent_event_step_id = self.event_step_id(&id);
720                    let resolution = match self
721                        .handle_workflow(parent_event_step_id, workflow_ref, args)
722                        .await
723                    {
724                        Ok(value) => WorkflowRuntimeRequestResolution::OkWithBudget {
725                            value,
726                            budget: self.budget.clone(),
727                        },
728                        Err(error) => WorkflowRuntimeRequestResolution::Err {
729                            message: error.to_string(),
730                        },
731                    };
732                    send_js_command(js_commands, JsCommand::ResolveRequest { id, resolution })
733                        .await?;
734                }
735                WorkflowRuntimeRequest::SandboxExec {
736                    id,
737                    profile,
738                    request,
739                } => {
740                    let resolution = match self.handle_sandbox_exec(&profile, request).await {
741                        Ok(value) => WorkflowRuntimeRequestResolution::Ok(value),
742                        Err(error) => WorkflowRuntimeRequestResolution::Err {
743                            message: error.to_string(),
744                        },
745                    };
746                    send_js_command(js_commands, JsCommand::ResolveRequest { id, resolution })
747                        .await?;
748                }
749            }
750        }
751    }
752
753    async fn cancel_workflow(
754        &mut self,
755        pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
756        agent_tasks: &mut JoinSet<AgentTaskCompletion>,
757        sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
758        js_commands: &mpsc::Sender<JsCommand>,
759        js_events: &mut mpsc::Receiver<JsEvent>,
760    ) -> anyhow::Result<RunWorkflowResult> {
761        log::debug!(
762            "workflow cancellation requested script={}",
763            self.script_path.display()
764        );
765
766        if pending_requests.is_empty()
767            && self.active_request_ids.is_empty()
768            && agent_tasks.is_empty()
769            && sleep_tasks.is_empty()
770            && self
771                .reject_next_runtime_request_for_cancellation(js_commands, js_events)
772                .await
773        {
774            bail!("workflow cancelled");
775        }
776
777        self.reject_pending_requests_for_cancellation(pending_requests, js_commands)
778            .await;
779        sleep_tasks.abort_all();
780        self.reject_active_sleep_requests_for_cancellation(sleep_tasks, js_commands)
781            .await;
782
783        if self.session_log_sink.is_some() {
784            while let Some(completion) = agent_tasks.join_next().await {
785                match completion {
786                    Ok(AgentTaskCompletion {
787                        id,
788                        input,
789                        provider,
790                        result: Ok(result),
791                    }) => {
792                        self.active_request_ids.remove(&id);
793                        if let Err(error) = self
794                            .emit_agent_result_events(&id, provider.as_deref(), &result)
795                            .await
796                        {
797                            log::debug!("failed to emit drained agent events during cancellation: {error:#}");
798                        }
799                        if let Err(error) = self
800                            .emit_agent_completed_event(&id, provider.as_deref(), &result)
801                            .await
802                        {
803                            log::debug!("failed to emit drained agent completion during cancellation: {error:#}");
804                        }
805                        self.record_agent_run(&id, &input, provider, &result);
806                        self.reject_request_for_cancellation(id, js_commands).await;
807                    }
808                    Ok(AgentTaskCompletion {
809                        id,
810                        provider,
811                        result: Err(error),
812                        ..
813                    }) => {
814                        self.active_request_ids.remove(&id);
815                        let message = error.to_string();
816                        if let Err(error) = self
817                            .emit_agent_failed_event(&id, provider.as_deref(), &message)
818                            .await
819                        {
820                            log::debug!("failed to emit drained agent failure during cancellation: {error:#}");
821                        }
822                        log::debug!("agent task failed while draining cancellation: {message}");
823                        self.reject_request_for_cancellation(id, js_commands).await;
824                    }
825                    Err(error) => {
826                        log::debug!("agent task join failed while draining cancellation: {error}");
827                    }
828                }
829            }
830        } else {
831            let ids: Vec<String> = self.active_request_ids.iter().cloned().collect();
832            agent_tasks.abort_all();
833            for id in ids {
834                self.active_request_ids.remove(&id);
835                self.reject_request_for_cancellation(id, js_commands).await;
836            }
837        }
838
839        self.reject_remaining_active_requests_for_cancellation(js_commands)
840            .await;
841        self.drain_runtime_after_cancellation(js_events).await;
842        let _ = send_js_command(js_commands, JsCommand::Shutdown).await;
843        bail!("workflow cancelled")
844    }
845
846    async fn reject_next_runtime_request_for_cancellation(
847        &mut self,
848        js_commands: &mpsc::Sender<JsCommand>,
849        js_events: &mut mpsc::Receiver<JsEvent>,
850    ) -> bool {
851        loop {
852            match js_events.recv().await {
853                Some(JsEvent::Call(call)) => {
854                    let _ = self.handle_call(call).await;
855                }
856                Some(JsEvent::Request(request)) => {
857                    self.reject_request_for_cancellation(request.id().to_string(), js_commands)
858                        .await;
859                    return false;
860                }
861                Some(JsEvent::Complete(_)) | Some(JsEvent::Error(_)) | None => return true,
862            }
863        }
864    }
865
866    async fn reject_pending_requests_for_cancellation(
867        &mut self,
868        pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
869        js_commands: &mpsc::Sender<JsCommand>,
870    ) {
871        while let Some(request) = pending_requests.pop_front() {
872            self.reject_request_for_cancellation(request.id().to_string(), js_commands)
873                .await;
874        }
875    }
876
877    async fn reject_active_sleep_requests_for_cancellation(
878        &mut self,
879        sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
880        js_commands: &mpsc::Sender<JsCommand>,
881    ) {
882        while let Some(completion) = sleep_tasks.join_next().await {
883            if let Ok(SleepTaskCompletion { id, .. }) = completion {
884                self.active_request_ids.remove(&id);
885                self.reject_request_for_cancellation(id, js_commands).await;
886            }
887        }
888    }
889
890    async fn reject_remaining_active_requests_for_cancellation(
891        &mut self,
892        js_commands: &mpsc::Sender<JsCommand>,
893    ) {
894        let ids: Vec<String> = self.active_request_ids.iter().cloned().collect();
895        for id in ids {
896            self.active_request_ids.remove(&id);
897            self.reject_request_for_cancellation(id, js_commands).await;
898        }
899    }
900
901    async fn reject_request_for_cancellation(
902        &self,
903        id: String,
904        js_commands: &mpsc::Sender<JsCommand>,
905    ) {
906        let _ = send_js_command(
907            js_commands,
908            JsCommand::ResolveRequest {
909                id,
910                resolution: WorkflowRuntimeRequestResolution::Err {
911                    message: "workflow cancelled".to_string(),
912                },
913            },
914        )
915        .await;
916    }
917
918    async fn drain_runtime_after_cancellation(&mut self, js_events: &mut mpsc::Receiver<JsEvent>) {
919        while let Some(event) = js_events.recv().await {
920            match event {
921                JsEvent::Call(call) => {
922                    let _ = self.handle_call(call).await;
923                }
924                JsEvent::Request(request) => {
925                    log::debug!(
926                        "ignoring request after cancellation id={} kind={}",
927                        request.id(),
928                        request.kind()
929                    );
930                }
931                JsEvent::Complete(_) | JsEvent::Error(_) => break,
932            }
933        }
934    }
935
936    fn event_step_id(&self, runtime_request_id: &str) -> String {
937        let parent = self.event_parent_step_id.as_deref().unwrap_or("");
938        let hash = blake3::hash(
939            format!("{parent}:{}:{runtime_request_id}", self.nesting_depth).as_bytes(),
940        );
941        format!("step_{}", &hash.to_hex()[..16])
942    }
943
944    async fn emit_event(&self, mut event: WorkflowEvent) -> anyhow::Result<()> {
945        if (event.event_type.as_str() != "workflow.started" || self.nesting_depth > 0)
946            && event.elapsed_nanos.is_none()
947        {
948            event.elapsed_nanos = Some(elapsed_nanos(self.event_start));
949        }
950        let metadata = event
951            .metadata
952            .get_or_insert_with(WorkflowEventMetadata::default);
953        if metadata.workflow_depth.is_none() {
954            metadata.workflow_depth = Some(u32::try_from(self.nesting_depth).unwrap_or(u32::MAX));
955        }
956        if metadata.parent_step_id.is_none() {
957            metadata.parent_step_id = self.event_parent_step_id.clone();
958        }
959        if let Some(event_sink) = self.event_sink.as_ref() {
960            event_sink.emit(event).await?;
961        }
962        Ok(())
963    }
964
965    async fn handle_call(&mut self, call: WorkflowRuntimeCall) -> anyhow::Result<()> {
966        match call {
967            WorkflowRuntimeCall::Log { values } => {
968                self.emit_event(WorkflowEvent::log(format_log_message(&values)))
969                    .await?;
970                self.logs.push(values);
971            }
972            WorkflowRuntimeCall::Phase { name, options } => {
973                let phase = WorkflowPhaseCall { name, options };
974                self.emit_event(WorkflowEvent::phase(
975                    phase.name.clone(),
976                    phase.options.clone(),
977                ))
978                .await?;
979                self.phases.push(phase);
980            }
981        }
982        Ok(())
983    }
984
985    fn agent_capacity_available(&self, in_flight: usize) -> bool {
986        let max_parallel = self
987            .max_parallel_agent_requests
988            .filter(|value| *value > 0)
989            .unwrap_or(usize::MAX);
990        in_flight < max_parallel
991    }
992
993    fn prepare_agent_request(
994        &mut self,
995        request: WorkflowRuntimeRequest,
996    ) -> Result<(String, PreparedAgentRun), (String, anyhow::Error)> {
997        match request {
998            WorkflowRuntimeRequest::Agent {
999                id,
1000                prompt,
1001                options,
1002            } => {
1003                self.agent_calls.push(WorkflowRuntimeRequest::Agent {
1004                    id: id.clone(),
1005                    prompt: prompt.clone(),
1006                    options: options.clone(),
1007                });
1008                match self.prepare_agent_run(prompt, options) {
1009                    Ok(prepared) => Ok((id, prepared)),
1010                    Err(error) => Err((id, error)),
1011                }
1012            }
1013            WorkflowRuntimeRequest::Workflow { .. }
1014            | WorkflowRuntimeRequest::Sleep { .. }
1015            | WorkflowRuntimeRequest::SandboxExec { .. } => {
1016                unreachable!("prepare_agent_request only accepts agent requests")
1017            }
1018        }
1019    }
1020
1021    fn spawn_agent_task(
1022        &mut self,
1023        agent_tasks: &mut JoinSet<AgentTaskCompletion>,
1024        id: String,
1025        prepared: PreparedAgentRun,
1026    ) {
1027        let default_provider_name = self.agent_provider.name().to_string();
1028        let default_provider = Arc::clone(&self.agent_provider);
1029        let agent_runner = Arc::clone(&self.agent_runner);
1030        let retry_in_runtime = agent_runner.retry_in_runtime();
1031        let cancel_rx = self.cancel_rx.clone();
1032        let completion_input = prepared.input.clone();
1033        let completion_provider = prepared
1034            .provider_override
1035            .clone()
1036            .or(Some(default_provider_name));
1037        let session_log_sink = self.session_log_sink.clone();
1038        let max_parallel = self
1039            .max_parallel_agent_requests
1040            .filter(|value| *value > 0)
1041            .unwrap_or(usize::MAX);
1042        log::debug!(
1043            "starting agent request id={} in_flight_after_start={} max_parallel={}",
1044            id,
1045            agent_tasks.len() + 1,
1046            max_parallel
1047        );
1048        self.active_request_ids.insert(id.clone());
1049        agent_tasks.spawn(async move {
1050            let result = if retry_in_runtime {
1051                run_agent_runner_with_retry(
1052                    Arc::clone(&agent_runner),
1053                    default_provider,
1054                    prepared.provider_override,
1055                    prepared.input,
1056                    cancel_rx,
1057                )
1058                .await
1059            } else {
1060                agent_runner
1061                    .run_agent(default_provider, prepared.provider_override, prepared.input)
1062                    .await
1063            };
1064            let result = match result {
1065                Ok(result) => {
1066                    if let Some(session_log_sink) = session_log_sink.as_ref() {
1067                        let provider_name = completion_provider
1068                            .as_deref()
1069                            .expect("completion provider should always be set");
1070                        match session_log_sink
1071                            .write_agent_result(provider_name, &result)
1072                            .await
1073                        {
1074                            Ok(()) => Ok(result),
1075                            Err(error) => Err(error),
1076                        }
1077                    } else {
1078                        Ok(result)
1079                    }
1080                }
1081                Err(error) => Err(error),
1082            };
1083            AgentTaskCompletion {
1084                id,
1085                input: completion_input,
1086                provider: completion_provider,
1087                result,
1088            }
1089        });
1090    }
1091
1092    fn spawn_sleep_task(
1093        &mut self,
1094        sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
1095        id: String,
1096        duration_ms: u64,
1097    ) {
1098        let agent_runner = Arc::clone(&self.agent_runner);
1099        log::debug!(
1100            "starting sleep request id={} duration_ms={}",
1101            id,
1102            duration_ms
1103        );
1104        self.active_request_ids.insert(id.clone());
1105        sleep_tasks.spawn(async move {
1106            SleepTaskCompletion {
1107                id,
1108                result: agent_runner.sleep(duration_ms).await,
1109            }
1110        });
1111    }
1112
1113    fn prepare_agent_run(
1114        &self,
1115        prompt: String,
1116        options: Option<Value>,
1117    ) -> anyhow::Result<PreparedAgentRun> {
1118        let options = apply_phase_defaults(options, &self.metadata);
1119        let context = AgentProviderContext {
1120            phase: options
1121                .as_ref()
1122                .and_then(|options| options.get("phase"))
1123                .and_then(Value::as_str)
1124                .map(ToString::to_string),
1125            cwd: self.workflow_cwd.clone(),
1126        };
1127        let provider_override = options
1128            .as_ref()
1129            .and_then(|options| options.get("provider"))
1130            .and_then(Value::as_str)
1131            .map(ToString::to_string);
1132        let provider_name = provider_override
1133            .as_deref()
1134            .unwrap_or_else(|| self.agent_provider.name());
1135        let options = resolve_model_options(options, provider_name, &self.model_map)?;
1136        agent_retry_policy(&options)?;
1137        log::debug!(
1138            "agent call provider={} phase={:?} model={:?} prompt_len={}",
1139            provider_name,
1140            context.phase.as_deref(),
1141            options
1142                .as_ref()
1143                .and_then(|options| options.get("model"))
1144                .and_then(Value::as_str),
1145            prompt.len()
1146        );
1147        Ok(PreparedAgentRun {
1148            provider_override,
1149            input: AgentProviderRunInput {
1150                prompt,
1151                options,
1152                environment: Arc::new(LocalExecutionEnvironment::new(context.cwd.clone())?),
1153                context,
1154            },
1155        })
1156    }
1157
1158    async fn emit_agent_started_event(
1159        &self,
1160        id: &str,
1161        prepared: &PreparedAgentRun,
1162    ) -> anyhow::Result<()> {
1163        let provider = prepared
1164            .provider_override
1165            .as_deref()
1166            .unwrap_or_else(|| self.agent_provider.name());
1167        let metadata = self.agent_event_metadata(id, Some(provider), None);
1168        self.emit_event(WorkflowEvent::agent_started(
1169            serde_json::json!({
1170                "phase": prepared.input.context.phase,
1171                "promptPreview": truncate_for_event(&prepared.input.prompt, 200),
1172            }),
1173            metadata,
1174        ))
1175        .await
1176    }
1177
1178    async fn apply_agent_result(
1179        &mut self,
1180        id: &str,
1181        input: &AgentProviderRunInput,
1182        provider: Option<String>,
1183        result: AgentProviderResult,
1184    ) -> anyhow::Result<Value> {
1185        if let Some(output_tokens) = result.usage.as_ref().and_then(|usage| usage.output_tokens) {
1186            self.budget.spent = self.budget.spent.saturating_add(output_tokens);
1187        }
1188        self.emit_agent_result_events(id, provider.as_deref(), &result)
1189            .await?;
1190        self.emit_agent_completed_event(id, provider.as_deref(), &result)
1191            .await?;
1192        self.record_agent_run(id, input, provider, &result);
1193        log::debug!(
1194            "agent call complete session_id={:?} output_tokens={:?} budget_spent={}",
1195            result.session_id,
1196            result.usage.as_ref().and_then(|usage| usage.output_tokens),
1197            self.budget.spent
1198        );
1199        Ok(result.output)
1200    }
1201
1202    async fn emit_agent_result_events(
1203        &self,
1204        id: &str,
1205        provider: Option<&str>,
1206        result: &AgentProviderResult,
1207    ) -> anyhow::Result<()> {
1208        let Some(raw) = result.raw.as_ref() else {
1209            return Ok(());
1210        };
1211        let metadata = self.agent_event_metadata(id, provider, result.session_id.clone());
1212        for provider_event in raw_agent_event_payloads(raw) {
1213            let event_data = agent_session_event_payload(provider_event, &metadata);
1214            self.emit_event(WorkflowEvent::agent_event(event_data, metadata.clone()))
1215                .await?;
1216        }
1217        Ok(())
1218    }
1219
1220    async fn emit_agent_completed_event(
1221        &self,
1222        id: &str,
1223        provider: Option<&str>,
1224        result: &AgentProviderResult,
1225    ) -> anyhow::Result<()> {
1226        let metadata = self.agent_event_metadata(id, provider, result.session_id.clone());
1227        self.emit_event(WorkflowEvent::agent_completed(
1228            serde_json::json!({
1229                "sessionId": result.session_id,
1230                "model": result.model,
1231                "usage": result.usage,
1232            }),
1233            metadata,
1234        ))
1235        .await
1236    }
1237
1238    async fn emit_agent_failed_event(
1239        &self,
1240        id: &str,
1241        provider: Option<&str>,
1242        message: &str,
1243    ) -> anyhow::Result<()> {
1244        let metadata = self.agent_event_metadata(id, provider, None);
1245        self.emit_event(WorkflowEvent::agent_failed(
1246            serde_json::json!({ "message": message }),
1247            metadata,
1248        ))
1249        .await
1250    }
1251
1252    fn agent_event_metadata(
1253        &self,
1254        id: &str,
1255        provider: Option<&str>,
1256        session_id: Option<String>,
1257    ) -> WorkflowEventMetadata {
1258        WorkflowEventMetadata {
1259            run_id: None,
1260            step_id: Some(self.event_step_id(id)),
1261            provider: Some(
1262                provider
1263                    .unwrap_or_else(|| self.agent_provider.name())
1264                    .to_string(),
1265            ),
1266            session_id,
1267            workflow_depth: None,
1268            parent_step_id: None,
1269        }
1270    }
1271
1272    fn record_agent_run(
1273        &mut self,
1274        id: &str,
1275        input: &AgentProviderRunInput,
1276        provider: Option<String>,
1277        result: &AgentProviderResult,
1278    ) {
1279        add_usage(&mut self.token_usage, result.usage.as_ref());
1280        if let Some(phase) = input.context.phase.as_ref() {
1281            let phase_usage = self.token_usage_by_phase.entry(phase.clone()).or_default();
1282            add_usage(phase_usage, result.usage.as_ref());
1283        }
1284        let model = result.model.clone().or_else(|| {
1285            input
1286                .options
1287                .as_ref()
1288                .and_then(|options| options.get("model"))
1289                .and_then(Value::as_str)
1290                .map(ToString::to_string)
1291        });
1292        self.agent_runs.push(WorkflowAgentRunSummary {
1293            id: id.to_string(),
1294            phase: input.context.phase.clone(),
1295            provider,
1296            model,
1297            provider_session_id: result.session_id.clone(),
1298            usage: result.usage.clone(),
1299            isolation: result.isolation.clone(),
1300        });
1301    }
1302
1303    async fn handle_sandbox_exec(
1304        &self,
1305        profile: &str,
1306        request: WorkflowSandboxExecRequest,
1307    ) -> anyhow::Result<Value> {
1308        let (provider, profile_name) = parse_sandbox_profile_ref(profile)?;
1309        let program = format!("smol-sandbox-{provider}");
1310        let sandbox_group_id = format!("sbxgrp_{}", ulid::Ulid::new());
1311        let workspace_path = self
1312            .workflow_cwd
1313            .clone()
1314            .ok_or_else(|| anyhow!("sandbox.exec requires the workflow cwd to be available"))?;
1315        let sandbox_env = SandboxExecutionEnvironment::open(
1316            program,
1317            OpenSandboxRequest {
1318                metadata: SandboxMetadata::new(
1319                    format!("sandbox-exec-{}", ulid::Ulid::new()),
1320                    sandbox_group_id,
1321                ),
1322                profile: ProfileRef {
1323                    provider,
1324                    name: profile_name,
1325                },
1326                workspace_sync: WorkspaceSync {
1327                    host_path: workspace_path,
1328                },
1329                cwd: request.cwd.clone(),
1330            },
1331        )
1332        .await
1333        .with_context(|| format!("failed to open sandbox profile `{profile}`"))?;
1334
1335        let mut argv = Vec::with_capacity(1 + request.args.len());
1336        argv.push(request.command);
1337        argv.extend(request.args);
1338        let exec_request = ExecRequest {
1339            argv,
1340            cwd: request.cwd.map(EnvironmentPath),
1341            env: request.env,
1342            stdin: request.stdin.map(String::into_bytes),
1343        };
1344        let mut sink = NullExecEventSink;
1345        let exec_result = sandbox_env.exec(exec_request, &mut sink).await;
1346        let close_result = sandbox_env.close().await;
1347        let output = exec_result?;
1348        close_result.context("failed to close sandbox after sandbox.exec")?;
1349
1350        let value = serde_json::to_value(WorkflowSandboxExecOutput {
1351            exit_code: output.exit_code,
1352            stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
1353            stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
1354        })?;
1355        Ok(value)
1356    }
1357
1358    async fn handle_workflow(
1359        &mut self,
1360        parent_step_id: String,
1361        workflow_ref: WorkflowRef,
1362        args: Option<Value>,
1363    ) -> anyhow::Result<Value> {
1364        if self.nesting_depth >= 1 {
1365            bail!("Nested workflow() calls are limited to one level");
1366        }
1367        let script_path = match workflow_ref {
1368            WorkflowRef::ScriptPath { script_path } => {
1369                resolve_relative_script(&self.script_path, &script_path)
1370            }
1371            WorkflowRef::Name(name) => resolve_named_workflow(&name)?,
1372        };
1373        log::debug!("child workflow call script={}", script_path.display());
1374        let child = Box::pin(run_workflow_inner(RunWorkflowOptions {
1375            script_path,
1376            workflow_cwd: self.workflow_cwd.clone(),
1377            args: args.unwrap_or(Value::Null),
1378            agent_provider: Arc::clone(&self.agent_provider),
1379            model_map: self.model_map.clone(),
1380            budget_total: self.budget.total,
1381            budget_spent: self.budget.spent,
1382            nesting_depth: self.nesting_depth + 1,
1383            max_parallel_agent_requests: self.max_parallel_agent_requests,
1384            agent_runner: Some(Arc::clone(&self.agent_runner)),
1385            cancel_rx: self.cancel_rx.clone(),
1386            event_sink: self.event_sink.clone(),
1387            event_parent_step_id: Some(parent_step_id),
1388            event_stream_start: Some(self.event_start),
1389            session_log_sink: self.session_log_sink.clone(),
1390        }))
1391        .await?;
1392        self.budget = child.budget;
1393        self.logs.extend(child.logs);
1394        self.phases.extend(child.phases);
1395        self.agent_calls.extend(child.agent_calls);
1396        self.workflow_calls.extend(child.workflow_calls);
1397        merge_token_usage(&mut self.token_usage, &child.token_usage);
1398        for (phase, usage) in child.token_usage_by_phase {
1399            merge_token_usage(self.token_usage_by_phase.entry(phase).or_default(), &usage);
1400        }
1401        self.agent_runs.extend(child.agent_runs);
1402        Ok(child.output.result)
1403    }
1404}
1405
1406async fn run_agent_runner_with_retry(
1407    agent_runner: Arc<dyn WorkflowAgentRunner>,
1408    default_provider: Arc<dyn AgentProvider>,
1409    provider_override: Option<String>,
1410    input: AgentProviderRunInput,
1411    mut cancel_rx: Option<watch::Receiver<bool>>,
1412) -> anyhow::Result<AgentProviderResult> {
1413    let retry = agent_retry_policy(&input.options)?;
1414    let mut final_result = None;
1415    for attempt in 1..=retry.max_attempts {
1416        let attempt_result = agent_runner
1417            .run_agent(
1418                Arc::clone(&default_provider),
1419                provider_override.clone(),
1420                input.clone(),
1421            )
1422            .await;
1423        match attempt_result {
1424            Ok(result) => {
1425                final_result = Some(Ok(result));
1426                break;
1427            }
1428            Err(error) if attempt < retry.max_attempts => {
1429                log::debug!(
1430                    "agent call failed on attempt {attempt}/{}; retrying after {}ms: {error:#}",
1431                    retry.max_attempts,
1432                    retry.backoff_ms
1433                );
1434                sleep_retry_backoff(retry.backoff_ms, &mut cancel_rx).await?;
1435            }
1436            Err(error) => {
1437                final_result = Some(Err(error));
1438                break;
1439            }
1440        }
1441    }
1442    final_result.unwrap_or_else(|| Err(anyhow!("agent retry loop finished without a result")))
1443}
1444
1445async fn sleep_retry_backoff(
1446    backoff_ms: u64,
1447    cancel_rx: &mut Option<watch::Receiver<bool>>,
1448) -> anyhow::Result<()> {
1449    if backoff_ms == 0 {
1450        return Ok(());
1451    }
1452    let Some(cancel_rx) = cancel_rx.as_mut() else {
1453        tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1454        return Ok(());
1455    };
1456    if *cancel_rx.borrow() {
1457        bail!("workflow cancelled");
1458    }
1459    let sleep = tokio::time::sleep(Duration::from_millis(backoff_ms));
1460    tokio::pin!(sleep);
1461    loop {
1462        tokio::select! {
1463            _ = &mut sleep => return Ok(()),
1464            changed = cancel_rx.changed() => {
1465                match changed {
1466                    Ok(()) if *cancel_rx.borrow() => bail!("workflow cancelled"),
1467                    Ok(()) => continue,
1468                    Err(_) => {
1469                        sleep.await;
1470                        return Ok(());
1471                    }
1472                }
1473            }
1474        }
1475    }
1476}
1477
1478pub(crate) async fn run_agent_provider_with_retry(
1479    default_provider: Arc<dyn AgentProvider>,
1480    provider_override: Option<String>,
1481    input: AgentProviderRunInput,
1482    mut cancel_rx: Option<watch::Receiver<bool>>,
1483) -> anyhow::Result<AgentProviderResult> {
1484    let retry = agent_retry_policy(&input.options)?;
1485    let provider = resolve_agent_provider(default_provider, provider_override)?;
1486    let mut final_result = None;
1487    for attempt in 1..=retry.max_attempts {
1488        let attempt_result =
1489            run_agent_with_optional_isolation(Arc::clone(&provider), input.clone()).await;
1490        match attempt_result {
1491            Ok(result) => {
1492                final_result = Some(Ok(result));
1493                break;
1494            }
1495            Err(error) if attempt < retry.max_attempts => {
1496                log::debug!(
1497                    "agent provider failed on attempt {attempt}/{}; retrying after {}ms: {error:#}",
1498                    retry.max_attempts,
1499                    retry.backoff_ms
1500                );
1501                sleep_retry_backoff(retry.backoff_ms, &mut cancel_rx).await?;
1502            }
1503            Err(error) => {
1504                final_result = Some(Err(error));
1505                break;
1506            }
1507        }
1508    }
1509    final_result.unwrap_or_else(|| Err(anyhow!("agent retry loop finished without a result")))
1510}
1511
1512pub(crate) async fn run_agent_provider(
1513    default_provider: Arc<dyn AgentProvider>,
1514    provider_override: Option<String>,
1515    input: AgentProviderRunInput,
1516) -> anyhow::Result<AgentProviderResult> {
1517    let provider = resolve_agent_provider(default_provider, provider_override)?;
1518    run_agent_with_optional_isolation(provider, input).await
1519}
1520
1521fn resolve_agent_provider(
1522    default_provider: Arc<dyn AgentProvider>,
1523    provider_override: Option<String>,
1524) -> anyhow::Result<Arc<dyn AgentProvider>> {
1525    if let Some(provider_override) = provider_override {
1526        Ok(Arc::from(create_agent_provider(&provider_override)?))
1527    } else {
1528        Ok(default_provider)
1529    }
1530}
1531
1532#[derive(Debug, Clone, Copy)]
1533pub(crate) struct AgentRetryPolicy {
1534    pub max_attempts: u32,
1535    pub backoff_ms: u64,
1536}
1537
1538pub(crate) fn agent_retry_policy(options: &Option<Value>) -> anyhow::Result<AgentRetryPolicy> {
1539    let default = AgentRetryPolicy {
1540        max_attempts: 1,
1541        backoff_ms: 0,
1542    };
1543    let Some(retry) = options.as_ref().and_then(|options| options.get("retry")) else {
1544        return Ok(default);
1545    };
1546    if retry.is_null() {
1547        return Ok(default);
1548    }
1549    let object = retry
1550        .as_object()
1551        .ok_or_else(|| anyhow!("agent retry option must be an object"))?;
1552    let max_attempts = match object.get("maxAttempts") {
1553        Some(value) => {
1554            let value = value
1555                .as_u64()
1556                .ok_or_else(|| anyhow!("agent retry.maxAttempts must be a positive integer"))?;
1557            if value == 0 || value > u32::MAX as u64 {
1558                bail!("agent retry.maxAttempts must be between 1 and {}", u32::MAX);
1559            }
1560            value as u32
1561        }
1562        None => default.max_attempts,
1563    };
1564    let backoff_ms = match object.get("backoffMs") {
1565        Some(value) => value
1566            .as_u64()
1567            .ok_or_else(|| anyhow!("agent retry.backoffMs must be a non-negative integer"))?,
1568        None => default.backoff_ms,
1569    };
1570    Ok(AgentRetryPolicy {
1571        max_attempts,
1572        backoff_ms,
1573    })
1574}
1575
1576async fn run_agent_with_optional_isolation(
1577    provider: Arc<dyn AgentProvider>,
1578    input: AgentProviderRunInput,
1579) -> anyhow::Result<AgentProviderResult> {
1580    if let Some(request) = sandbox_isolation_request(&input.options)? {
1581        return run_agent_with_sandbox_isolation(provider, input, request).await;
1582    }
1583
1584    if requests_worktree_isolation(&input.options) {
1585        return run_agent_with_worktree_isolation(provider, input).await;
1586    }
1587
1588    run_agent_with_schema_validation(provider, input).await
1589}
1590
1591async fn run_agent_with_worktree_isolation(
1592    provider: Arc<dyn AgentProvider>,
1593    input: AgentProviderRunInput,
1594) -> anyhow::Result<AgentProviderResult> {
1595    let isolation = WorktreeIsolation::create(input.context.cwd.as_deref())?;
1596    let isolation_info = isolation.info();
1597    let mut isolated_input = input;
1598    isolated_input.context.cwd = Some(isolation.cwd.clone());
1599    isolated_input.environment =
1600        Arc::new(LocalExecutionEnvironment::new(Some(isolation.cwd.clone()))?);
1601    let mut result = run_agent_with_schema_validation(provider, isolated_input).await;
1602    if let Ok(result) = &mut result {
1603        result.isolation = Some(isolation_info);
1604    }
1605    if let Err(error) = isolation.cleanup() {
1606        log::warn!("failed to cleanup isolated agent worktree: {error:#}");
1607    }
1608    result
1609}
1610
1611async fn run_agent_with_sandbox_isolation(
1612    provider: Arc<dyn AgentProvider>,
1613    input: AgentProviderRunInput,
1614    request: SandboxIsolationRequest,
1615) -> anyhow::Result<AgentProviderResult> {
1616    let program = sandbox_provider_program(&request);
1617    let sandbox_group_id = format!("sbxgrp_{}", ulid::Ulid::new());
1618    let cwd = input.context.cwd.clone().ok_or_else(|| {
1619        anyhow!("agent sandbox isolation requires the workflow cwd to be available")
1620    })?;
1621    let sandbox_env = SandboxExecutionEnvironment::open(
1622        program,
1623        OpenSandboxRequest {
1624            metadata: SandboxMetadata::new(
1625                format!("req_{}", ulid::Ulid::new()),
1626                sandbox_group_id.clone(),
1627            ),
1628            profile: ProfileRef {
1629                provider: request.provider.clone(),
1630                name: request.profile.clone(),
1631            },
1632            workspace_sync: WorkspaceSync { host_path: cwd },
1633            cwd: request.cwd.clone(),
1634        },
1635    )
1636    .await
1637    .with_context(|| {
1638        format!(
1639            "failed to open sandbox profile `{}` with provider `{}`",
1640            request.profile, request.provider
1641        )
1642    })?;
1643
1644    let session = sandbox_env.session().clone();
1645    let mut isolated_input = input;
1646    isolated_input.context.cwd = sandbox_env.cwd().map(|path| PathBuf::from(path.as_str()));
1647    isolated_input.environment = Arc::new(sandbox_env.clone());
1648    let isolation_info = AgentRunIsolation {
1649        kind: "sandbox".to_string(),
1650        branch: None,
1651        worktree_path: None,
1652        cwd: session.cwd.clone(),
1653        profile: Some(request.profile.clone()),
1654        provider: Some(request.provider.clone()),
1655        session_id: Some(session.id.clone()),
1656    };
1657
1658    let mut result = run_agent_with_schema_validation(provider, isolated_input).await;
1659    if let Ok(result) = &mut result {
1660        result.isolation = Some(isolation_info);
1661    }
1662
1663    if let Err(error) = sandbox_env.close().await {
1664        log::warn!("failed to close sandbox-isolated agent session: {error:#}");
1665    }
1666
1667    result
1668}
1669
1670fn requests_worktree_isolation(options: &Option<Value>) -> bool {
1671    options
1672        .as_ref()
1673        .and_then(|options| options.get("isolation"))
1674        .and_then(Value::as_str)
1675        == Some("worktree")
1676}
1677
1678#[derive(Debug, Clone)]
1679struct SandboxIsolationRequest {
1680    provider: String,
1681    profile: String,
1682    cwd: Option<String>,
1683}
1684
1685fn sandbox_isolation_request(
1686    options: &Option<Value>,
1687) -> anyhow::Result<Option<SandboxIsolationRequest>> {
1688    let Some(isolation) = options
1689        .as_ref()
1690        .and_then(|options| options.get("isolation"))
1691    else {
1692        return Ok(None);
1693    };
1694    let Some(object) = isolation.as_object() else {
1695        return Ok(None);
1696    };
1697    if object.get("type").and_then(Value::as_str) != Some("sandbox") {
1698        return Ok(None);
1699    }
1700    let profile = object
1701        .get("profile")
1702        .and_then(Value::as_str)
1703        .ok_or_else(|| anyhow!("agent sandbox isolation requires isolation.profile"))?
1704        .to_string();
1705    let cwd = object
1706        .get("cwd")
1707        .and_then(Value::as_str)
1708        .map(ToString::to_string);
1709    let (provider, profile) = parse_sandbox_profile_ref(&profile)?;
1710    Ok(Some(SandboxIsolationRequest {
1711        provider,
1712        profile,
1713        cwd,
1714    }))
1715}
1716
1717fn parse_sandbox_profile_ref(value: &str) -> anyhow::Result<(String, String)> {
1718    let (provider, profile) = value.split_once('/').ok_or_else(|| {
1719        anyhow!("agent sandbox isolation.profile must use <provider>/<profile>, got `{value}`")
1720    })?;
1721    validate_sandbox_provider_name(provider)?;
1722    if profile.is_empty() {
1723        bail!(
1724            "agent sandbox isolation.profile must include a non-empty provider-local profile name"
1725        );
1726    }
1727    Ok((provider.to_string(), profile.to_string()))
1728}
1729
1730fn validate_sandbox_provider_name(provider: &str) -> anyhow::Result<()> {
1731    let valid = !provider.is_empty()
1732        && provider
1733            .bytes()
1734            .all(|byte| byte.is_ascii_lowercase() || byte.is_ascii_digit() || byte == b'-')
1735        && provider
1736            .as_bytes()
1737            .first()
1738            .is_some_and(u8::is_ascii_alphanumeric)
1739        && provider
1740            .as_bytes()
1741            .last()
1742            .is_some_and(u8::is_ascii_alphanumeric);
1743    if !valid {
1744        bail!("invalid sandbox provider name `{provider}`; expected lowercase letters, digits, and hyphens, starting and ending with an alphanumeric character");
1745    }
1746    Ok(())
1747}
1748
1749fn sandbox_provider_program(request: &SandboxIsolationRequest) -> String {
1750    format!("smol-sandbox-{}", request.provider)
1751}
1752
1753struct WorktreeIsolation {
1754    repo_root: PathBuf,
1755    worktree_root: PathBuf,
1756    cwd: PathBuf,
1757    branch_name: String,
1758    cleaned: bool,
1759    _temp_dir: tempfile::TempDir,
1760}
1761
1762impl WorktreeIsolation {
1763    fn create(cwd: Option<&Path>) -> anyhow::Result<Self> {
1764        let cwd = cwd
1765            .map(Path::to_path_buf)
1766            .unwrap_or(std::env::current_dir()?)
1767            .canonicalize()
1768            .context("failed to canonicalize workflow cwd for worktree isolation")?;
1769        let repo_root = git_output(&cwd, &["rev-parse", "--show-toplevel"]).context(
1770            "agent isolation='worktree' requires the workflow cwd to be inside a git repository",
1771        )?;
1772        let repo_root = PathBuf::from(repo_root.trim())
1773            .canonicalize()
1774            .context("failed to canonicalize git repository root for worktree isolation")?;
1775        let relative_cwd = cwd.strip_prefix(&repo_root).with_context(|| {
1776            format!(
1777                "workflow cwd {} is not under git repository root {}",
1778                cwd.display(),
1779                repo_root.display()
1780            )
1781        })?;
1782
1783        let temp_dir = tempfile::Builder::new()
1784            .prefix("smol-wf-agent-worktree-")
1785            .tempdir()
1786            .context("failed to create temp directory for agent worktree isolation")?;
1787        let worktree_root = temp_dir.path().join("worktree");
1788        let worktree_arg = path_arg(&worktree_root);
1789        let branch_name = format!(
1790            "smol-wf/agent-run/{}",
1791            ulid::Ulid::new().to_string().to_ascii_lowercase()
1792        );
1793        git_status(
1794            &repo_root,
1795            &[
1796                "worktree",
1797                "add",
1798                "--quiet",
1799                "-b",
1800                &branch_name,
1801                &worktree_arg,
1802                "HEAD",
1803            ],
1804        )
1805        .context("failed to create isolated git worktree for agent run")?;
1806        let isolated_cwd = if relative_cwd.as_os_str().is_empty() {
1807            worktree_root.clone()
1808        } else {
1809            worktree_root.join(relative_cwd)
1810        };
1811        Ok(Self {
1812            repo_root,
1813            worktree_root,
1814            cwd: isolated_cwd,
1815            branch_name,
1816            cleaned: false,
1817            _temp_dir: temp_dir,
1818        })
1819    }
1820
1821    fn info(&self) -> AgentRunIsolation {
1822        AgentRunIsolation {
1823            kind: "worktree".to_string(),
1824            branch: Some(self.branch_name.clone()),
1825            worktree_path: Some(path_arg(&self.worktree_root)),
1826            cwd: Some(path_arg(&self.cwd)),
1827            profile: None,
1828            provider: None,
1829            session_id: None,
1830        }
1831    }
1832
1833    fn cleanup(mut self) -> anyhow::Result<()> {
1834        self.remove_worktree()?;
1835        self.delete_branch()?;
1836        self.cleaned = true;
1837        Ok(())
1838    }
1839
1840    fn remove_worktree(&self) -> anyhow::Result<()> {
1841        let worktree_arg = path_arg(&self.worktree_root);
1842        git_status(
1843            &self.repo_root,
1844            &["worktree", "remove", "--force", &worktree_arg],
1845        )
1846        .context("failed to remove isolated git worktree")
1847    }
1848
1849    fn delete_branch(&self) -> anyhow::Result<()> {
1850        git_status(&self.repo_root, &["branch", "-D", &self.branch_name])
1851            .context("failed to delete isolated agent worktree branch")
1852    }
1853}
1854
1855impl Drop for WorktreeIsolation {
1856    fn drop(&mut self) {
1857        if !self.cleaned {
1858            if let Err(error) = self.remove_worktree() {
1859                log::warn!("failed to cleanup isolated agent worktree during drop: {error:#}");
1860            }
1861            if let Err(error) = self.delete_branch() {
1862                log::warn!(
1863                    "failed to delete isolated agent worktree branch during drop: {error:#}"
1864                );
1865            }
1866        }
1867    }
1868}
1869
1870fn path_arg(path: &Path) -> String {
1871    path.to_string_lossy().into_owned()
1872}
1873
1874fn git_output(cwd: &Path, args: &[&str]) -> anyhow::Result<String> {
1875    let output = StdCommand::new("git")
1876        .args(args)
1877        .current_dir(cwd)
1878        .output()
1879        .with_context(|| format!("failed to run git {}", args.join(" ")))?;
1880    if output.status.success() {
1881        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
1882    } else {
1883        bail!(
1884            "git {} failed with {}{}",
1885            args.join(" "),
1886            status_text(output.status.code()),
1887            command_stderr(&output.stderr)
1888        )
1889    }
1890}
1891
1892fn git_status(cwd: &Path, args: &[&str]) -> anyhow::Result<()> {
1893    let output = StdCommand::new("git")
1894        .args(args)
1895        .current_dir(cwd)
1896        .output()
1897        .with_context(|| format!("failed to run git {}", args.join(" ")))?;
1898    if output.status.success() {
1899        Ok(())
1900    } else {
1901        bail!(
1902            "git {} failed with {}{}",
1903            args.join(" "),
1904            status_text(output.status.code()),
1905            command_stderr(&output.stderr)
1906        )
1907    }
1908}
1909
1910fn status_text(code: Option<i32>) -> String {
1911    code.map(|code| format!("code {code}"))
1912        .unwrap_or_else(|| "signal".to_string())
1913}
1914
1915fn command_stderr(stderr: &[u8]) -> String {
1916    let stderr = String::from_utf8_lossy(stderr);
1917    let stderr = stderr.trim();
1918    if stderr.is_empty() {
1919        String::new()
1920    } else {
1921        format!(": {stderr}")
1922    }
1923}
1924
1925async fn run_agent_with_schema_validation(
1926    provider: Arc<dyn AgentProvider>,
1927    input: AgentProviderRunInput,
1928) -> anyhow::Result<AgentProviderResult> {
1929    let Some(schema) = input
1930        .options
1931        .as_ref()
1932        .and_then(|options| options.get("schema"))
1933        .cloned()
1934    else {
1935        return provider.run(input).await;
1936    };
1937
1938    let max_attempts = 2;
1939    let original_prompt = input.prompt.clone();
1940    let mut attempt_input = input;
1941    let mut last_errors = Vec::new();
1942
1943    for attempt in 1..=max_attempts {
1944        let result = provider.run(attempt_input.clone()).await?;
1945        match validate_structured_output(&schema, &result.output) {
1946            Ok(()) => return Ok(result),
1947            Err(errors) => {
1948                last_errors = errors;
1949                if attempt < max_attempts {
1950                    attempt_input.prompt =
1951                        with_structured_output_retry_prompt(&original_prompt, &last_errors);
1952                }
1953            }
1954        }
1955    }
1956
1957    bail!(
1958        "{}",
1959        format_structured_output_validation_error(&last_errors)
1960    )
1961}
1962
1963fn validate_structured_output(schema: &Value, output: &Value) -> Result<(), Vec<String>> {
1964    let validator = jsonschema::validator_for(schema)
1965        .map_err(|error| vec![format!("/ schema is invalid: {}", error)])?;
1966    let errors = validator
1967        .iter_errors(output)
1968        .map(|error| {
1969            let path = error.instance_path().to_string();
1970            let path = if path.is_empty() {
1971                "/".to_string()
1972            } else {
1973                path
1974            };
1975            format!("{path} {error}")
1976        })
1977        .collect::<Vec<_>>();
1978
1979    if errors.is_empty() {
1980        Ok(())
1981    } else {
1982        Err(errors)
1983    }
1984}
1985
1986fn format_structured_output_validation_error(errors: &[String]) -> String {
1987    format!(
1988        "Structured output did not match JSON Schema: {}",
1989        errors.join("; ")
1990    )
1991}
1992
1993fn with_structured_output_retry_prompt(prompt: &str, errors: &[String]) -> String {
1994    let mut lines = vec![
1995        prompt.to_string(),
1996        String::new(),
1997        "Previous structured output failed JSON Schema validation.".to_string(),
1998        "Return a corrected structured output that satisfies the original JSON Schema.".to_string(),
1999        "Validation errors:".to_string(),
2000    ];
2001    lines.extend(errors.iter().map(|error| format!("- {error}")));
2002    lines.join("\n")
2003}
2004
2005#[derive(Debug, Clone, PartialEq, Eq)]
2006struct ResolvedModelSelector {
2007    requested: String,
2008    selector: String,
2009    model_id: String,
2010    model_provider: Option<String>,
2011    thinking: Option<String>,
2012}
2013
2014impl ResolvedModelSelector {
2015    fn provider_model(&self) -> String {
2016        match &self.model_provider {
2017            Some(provider) => format!("{provider}/{}", self.model_id),
2018            None => self.model_id.clone(),
2019        }
2020    }
2021}
2022
2023fn resolve_model_options(
2024    options: Option<Value>,
2025    agent_provider: &str,
2026    model_map: &BTreeMap<String, String>,
2027) -> anyhow::Result<Option<Value>> {
2028    let Some(model) = options
2029        .as_ref()
2030        .and_then(Value::as_object)
2031        .and_then(|object| object.get("model"))
2032        .and_then(Value::as_str)
2033        .map(ToString::to_string)
2034    else {
2035        return Ok(options);
2036    };
2037
2038    let mapped_selector = model_map.get(&model).cloned();
2039    let alias_matched = mapped_selector.is_some();
2040    let selector = mapped_selector.unwrap_or_else(|| model.clone());
2041    let resolved = parse_model_selector(&model, &selector)?;
2042    validate_model_selector_for_provider(&resolved, agent_provider)?;
2043
2044    let mut object = options
2045        .and_then(|value| value.as_object().cloned())
2046        .unwrap_or_default();
2047    object.insert(
2048        "model".to_string(),
2049        Value::String(resolved.provider_model()),
2050    );
2051
2052    let selector_has_extra_parts = alias_matched
2053        || resolved.selector.contains('?')
2054        || resolved.model_provider.is_some()
2055        || resolved.thinking.is_some();
2056    if selector_has_extra_parts {
2057        object.insert(
2058            "requestedModel".to_string(),
2059            Value::String(resolved.requested.clone()),
2060        );
2061        object.insert(
2062            "modelSelector".to_string(),
2063            Value::String(resolved.selector.clone()),
2064        );
2065    } else {
2066        object.remove("requestedModel");
2067        object.remove("modelSelector");
2068    }
2069
2070    if let Some(provider) = resolved.model_provider {
2071        object.insert("modelProvider".to_string(), Value::String(provider));
2072    } else {
2073        object.remove("modelProvider");
2074    }
2075    if let Some(thinking) = resolved.thinking {
2076        object.insert("thinking".to_string(), Value::String(thinking));
2077    } else {
2078        object.remove("thinking");
2079    }
2080    Ok(Some(Value::Object(object)))
2081}
2082
2083fn parse_model_selector(requested: &str, selector: &str) -> anyhow::Result<ResolvedModelSelector> {
2084    let (model_part, query) = selector.split_once('?').unwrap_or((selector, ""));
2085    if model_part.trim().is_empty() {
2086        bail!("model selector must include a model id: {selector}");
2087    }
2088
2089    let (slash_provider, model_id) = match model_part.split_once('/') {
2090        Some((provider, model_id)) if !provider.is_empty() && !model_id.is_empty() => {
2091            (Some(provider.to_string()), model_id.to_string())
2092        }
2093        Some(_) => bail!("model selector provider/model form is invalid: {selector}"),
2094        None => (None, model_part.to_string()),
2095    };
2096
2097    let mut query_provider = None::<String>;
2098    let mut thinking = None::<String>;
2099    if !query.is_empty() {
2100        for pair in query.split('&') {
2101            if pair.is_empty() {
2102                continue;
2103            }
2104            let (key, value) = pair.split_once('=').ok_or_else(|| {
2105                anyhow!("model selector query parameter must use key=value: {pair}")
2106            })?;
2107            let key = percent_decode(key)?;
2108            let value = percent_decode(value)?;
2109            if value.is_empty() {
2110                bail!("model selector query parameter `{key}` must not be empty");
2111            }
2112            match key.as_str() {
2113                "provider" => set_unique_query_value(&mut query_provider, key, value)?,
2114                "thinking" => set_unique_query_value(&mut thinking, key, value)?,
2115                _ => bail!("unknown model selector query parameter `{key}`"),
2116            }
2117        }
2118    }
2119
2120    let model_provider = match (slash_provider, query_provider) {
2121        (Some(slash), Some(query)) if slash != query => bail!(
2122            "conflicting model provider qualifiers in selector `{selector}`: `{slash}` and `{query}`"
2123        ),
2124        (Some(provider), Some(_)) | (Some(provider), None) | (None, Some(provider)) => {
2125            Some(provider)
2126        }
2127        (None, None) => None,
2128    };
2129
2130    Ok(ResolvedModelSelector {
2131        requested: requested.to_string(),
2132        selector: selector.to_string(),
2133        model_id,
2134        model_provider,
2135        thinking,
2136    })
2137}
2138
2139fn set_unique_query_value(
2140    target: &mut Option<String>,
2141    key: String,
2142    value: String,
2143) -> anyhow::Result<()> {
2144    if target.replace(value).is_some() {
2145        bail!("duplicate model selector query parameter `{key}`");
2146    }
2147    Ok(())
2148}
2149
2150fn percent_decode(value: &str) -> anyhow::Result<String> {
2151    let bytes = value.as_bytes();
2152    let mut output = Vec::with_capacity(bytes.len());
2153    let mut index = 0;
2154    while index < bytes.len() {
2155        match bytes[index] {
2156            b'%' => {
2157                if index + 2 >= bytes.len() {
2158                    bail!("invalid percent escape in model selector query: {value}");
2159                }
2160                let high = hex_value(bytes[index + 1]).ok_or_else(|| {
2161                    anyhow!("invalid percent escape in model selector query: {value}")
2162                })?;
2163                let low = hex_value(bytes[index + 2]).ok_or_else(|| {
2164                    anyhow!("invalid percent escape in model selector query: {value}")
2165                })?;
2166                output.push((high << 4) | low);
2167                index += 3;
2168            }
2169            b'+' => {
2170                output.push(b' ');
2171                index += 1;
2172            }
2173            byte => {
2174                output.push(byte);
2175                index += 1;
2176            }
2177        }
2178    }
2179    String::from_utf8(output).context("model selector query is not valid UTF-8")
2180}
2181
2182fn hex_value(byte: u8) -> Option<u8> {
2183    match byte {
2184        b'0'..=b'9' => Some(byte - b'0'),
2185        b'a'..=b'f' => Some(byte - b'a' + 10),
2186        b'A'..=b'F' => Some(byte - b'A' + 10),
2187        _ => None,
2188    }
2189}
2190
2191fn validate_model_selector_for_provider(
2192    resolved: &ResolvedModelSelector,
2193    agent_provider: &str,
2194) -> anyhow::Result<()> {
2195    match agent_provider {
2196        "codex" => {
2197            if resolved.model_provider.is_some() {
2198                bail!("Codex model selectors do not support ?provider=... or provider/model form");
2199            }
2200            if resolved.thinking.is_some() {
2201                bail!("Codex model selectors do not support thinking=...");
2202            }
2203        }
2204        "claude-code" if resolved.model_provider.is_some() => {
2205            bail!(
2206                "Claude Code model selectors do not support ?provider=... or provider/model form"
2207            );
2208        }
2209        "opencode" if resolved.model_provider.is_none() => {
2210            bail!("OpenCode model selectors must use provider/model or ?provider=...");
2211        }
2212        "debug" | "pi" => {}
2213        _ => {}
2214    }
2215    Ok(())
2216}
2217
2218fn apply_phase_defaults(options: Option<Value>, metadata: &WorkflowMetadata) -> Option<Value> {
2219    let phase_name = options
2220        .as_ref()
2221        .and_then(|options| options.get("phase"))
2222        .and_then(Value::as_str)
2223        .map(ToString::to_string);
2224    let phase_metadata = phase_name.as_ref().and_then(|phase_name| {
2225        metadata
2226            .phases
2227            .iter()
2228            .find(|phase| phase.title == *phase_name)
2229    });
2230
2231    if phase_name.is_none() && phase_metadata.is_none() {
2232        return options;
2233    }
2234
2235    let mut object = options
2236        .and_then(|value| value.as_object().cloned())
2237        .unwrap_or_default();
2238
2239    if let Some(phase_name) = phase_name {
2240        object
2241            .entry("phase".to_string())
2242            .or_insert(Value::String(phase_name));
2243    }
2244    if let Some(model) = phase_metadata.and_then(|phase| phase.model.clone()) {
2245        object
2246            .entry("model".to_string())
2247            .or_insert(Value::String(model));
2248    }
2249    if let Some(provider) = phase_metadata.and_then(|phase| phase.provider.clone()) {
2250        object
2251            .entry("provider".to_string())
2252            .or_insert(Value::String(provider));
2253    }
2254
2255    Some(Value::Object(object))
2256}
2257
2258fn resolve_relative_script(current_script_path: &Path, script_path: &str) -> PathBuf {
2259    let script_path = PathBuf::from(script_path);
2260    if script_path.is_absolute() {
2261        script_path
2262    } else {
2263        current_script_path
2264            .parent()
2265            .unwrap_or_else(|| Path::new("."))
2266            .join(script_path)
2267    }
2268}
2269
2270fn resolve_named_workflow(name: &str) -> anyhow::Result<PathBuf> {
2271    let workflows_dir = PathBuf::from(".claude/workflows");
2272    for entry in fs::read_dir(&workflows_dir).unwrap_or_else(|_| fs::read_dir(".").unwrap()) {
2273        let entry = entry?;
2274        let path = entry.path();
2275        if path.extension().and_then(|extension| extension.to_str()) != Some("js") {
2276            continue;
2277        }
2278        if read_workflow_metadata(&path)?.is_some_and(|metadata| metadata.name == name) {
2279            return Ok(path);
2280        }
2281    }
2282    bail!("Unknown workflow: {name}")
2283}