Skip to main content

smol_workflow_engine/
workflow.rs

1use crate::agent_providers::{
2    create_agent_provider, AgentProvider, AgentProviderContext, AgentProviderResult,
3    AgentProviderRunInput, AgentRunIsolation, AgentUsage, AgentUsageCost,
4};
5use crate::environment::{
6    AgentExecutionEnvironment, LocalExecutionEnvironment, SandboxExecutionEnvironment,
7};
8use crate::js_runtime::rquickjs::RQuickJSWorkflowRuntime;
9use crate::js_runtime::{
10    WorkflowBudgetSnapshot, WorkflowJSRuntime, WorkflowModuleInput, WorkflowModuleOutput,
11    WorkflowRef, WorkflowRuntimeCall, WorkflowRuntimeExecution, WorkflowRuntimePoll,
12    WorkflowRuntimeRequest, WorkflowRuntimeRequestResolution,
13};
14use crate::metadata::{read_workflow_metadata, WorkflowMetadata};
15use anyhow::{anyhow, bail, Context};
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use smol_workflow_sandbox::{
19    Metadata as SandboxMetadata, OpenSandboxRequest, ProfileRef, WorkspaceSync,
20};
21use std::collections::{BTreeMap, BTreeSet, VecDeque};
22use std::fs;
23use std::path::{Path, PathBuf};
24use std::process::Command as StdCommand;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{mpsc, watch};
28use tokio::task::{JoinSet, LocalSet};
29
30pub use crate::events::{
31    WorkflowEvent, WorkflowEventMetadata, WorkflowEventSink, WorkflowEventType,
32};
33
34#[async_trait::async_trait]
35pub trait AgentSessionLogSink: Send + Sync {
36    async fn write_agent_result(
37        &self,
38        provider: &str,
39        result: &AgentProviderResult,
40    ) -> anyhow::Result<()>;
41}
42
43#[async_trait::async_trait]
44pub trait WorkflowAgentRunner: Send + Sync {
45    async fn run_agent(
46        &self,
47        default_provider: Arc<dyn AgentProvider>,
48        provider_override: Option<String>,
49        input: AgentProviderRunInput,
50    ) -> anyhow::Result<AgentProviderResult>;
51
52    /// Whether the workflow scheduler should wrap this runner's `run_agent` call
53    /// in the per-agent retry loop.
54    ///
55    /// Most runners should use the default `true`: their `run_agent` method is a
56    /// single agent/provider boundary, so retrying it is safe and keeps retry
57    /// behavior centralized in the runtime scheduler.
58    ///
59    /// Runners that perform their own checkpointing or replay should return
60    /// `false` and apply retry internally around the nondeterministic provider
61    /// call. For example, the SQLite durable runner must not have the scheduler
62    /// retry its whole `run_agent` method, because each call advances durable
63    /// occurrence counters and may create a distinct checkpoint such as
64    /// `step:sig_x#2` instead of retrying the original durable step.
65    fn retry_in_runtime(&self) -> bool {
66        true
67    }
68
69    async fn sleep(&self, duration_ms: u64) -> anyhow::Result<()> {
70        tokio::time::sleep(std::time::Duration::from_millis(duration_ms)).await;
71        Ok(())
72    }
73}
74
75#[derive(Debug, Default)]
76pub struct DirectWorkflowAgentRunner;
77
78#[async_trait::async_trait]
79impl WorkflowAgentRunner for DirectWorkflowAgentRunner {
80    async fn run_agent(
81        &self,
82        default_provider: Arc<dyn AgentProvider>,
83        provider_override: Option<String>,
84        input: AgentProviderRunInput,
85    ) -> anyhow::Result<AgentProviderResult> {
86        run_agent_provider(default_provider, provider_override, input).await
87    }
88}
89
90pub struct RunWorkflowOptions {
91    pub script_path: PathBuf,
92    pub args: Value,
93    pub agent_provider: Arc<dyn AgentProvider>,
94    pub model_map: BTreeMap<String, String>,
95    pub budget_total: Option<u64>,
96    pub budget_spent: u64,
97    pub nesting_depth: usize,
98    pub max_parallel_agent_requests: Option<usize>,
99    pub agent_runner: Option<Arc<dyn WorkflowAgentRunner>>,
100    pub cancel_rx: Option<watch::Receiver<bool>>,
101    pub event_sink: Option<Arc<dyn WorkflowEventSink>>,
102    pub event_parent_step_id: Option<String>,
103    pub event_stream_start: Option<Instant>,
104    pub session_log_sink: Option<Arc<dyn AgentSessionLogSink>>,
105}
106
107#[derive(Debug)]
108pub struct RunWorkflowResult {
109    pub output: WorkflowModuleOutput,
110    pub logs: Vec<Vec<Value>>,
111    pub phases: Vec<WorkflowPhaseCall>,
112    pub agent_calls: Vec<WorkflowRuntimeRequest>,
113    pub workflow_calls: Vec<WorkflowRuntimeRequest>,
114    pub budget: WorkflowBudgetSnapshot,
115    pub token_usage: WorkflowTokenUsage,
116    pub token_usage_by_phase: std::collections::BTreeMap<String, WorkflowTokenUsage>,
117    pub agent_runs: Vec<WorkflowAgentRunSummary>,
118}
119
120#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
121#[serde(rename_all = "camelCase")]
122pub struct WorkflowTokenUsage {
123    pub input_tokens: u64,
124    pub output_tokens: u64,
125    pub cache_read_tokens: u64,
126    pub cache_write_tokens: u64,
127    pub total_tokens: u64,
128    #[serde(skip_serializing_if = "Option::is_none")]
129    pub cost: Option<AgentUsageCost>,
130}
131
132#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
133#[serde(rename_all = "camelCase")]
134pub struct WorkflowAgentRunSummary {
135    pub id: String,
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub phase: Option<String>,
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub provider: Option<String>,
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub model: Option<String>,
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub provider_session_id: Option<String>,
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub usage: Option<AgentUsage>,
146    #[serde(skip_serializing_if = "Option::is_none")]
147    pub isolation: Option<AgentRunIsolation>,
148}
149
150#[derive(Debug, Clone, PartialEq)]
151pub struct WorkflowPhaseCall {
152    pub name: String,
153    pub options: Option<Value>,
154}
155
156pub async fn run_workflow(options: RunWorkflowOptions) -> anyhow::Result<RunWorkflowResult> {
157    LocalSet::new().run_until(run_workflow_inner(options)).await
158}
159
160async fn run_workflow_inner(options: RunWorkflowOptions) -> anyhow::Result<RunWorkflowResult> {
161    log::debug!(
162        "run_workflow start script={} provider={} nesting_depth={} budget_total={:?} budget_spent={}",
163        options.script_path.display(),
164        options.agent_provider.name(),
165        options.nesting_depth,
166        options.budget_total,
167        options.budget_spent
168    );
169    let script_path = fs::canonicalize(&options.script_path).with_context(|| {
170        format!(
171            "failed to resolve workflow script {}",
172            options.script_path.display()
173        )
174    })?;
175    let metadata = read_workflow_metadata(&script_path)?.ok_or_else(|| {
176        anyhow!("Workflow script must export valid literal metadata as `export const meta = {{ name, description, ... }}`")
177    })?;
178    log::debug!(
179        "workflow metadata loaded name={} phases={}",
180        metadata.name,
181        metadata.phases.len()
182    );
183    let source = fs::read_to_string(&script_path)
184        .with_context(|| format!("failed to read workflow script {}", script_path.display()))?;
185    let runtime = RQuickJSWorkflowRuntime::new();
186    let execution = runtime.start_module(WorkflowModuleInput {
187        source,
188        source_name: script_path.to_string_lossy().into_owned(),
189        args: options.args,
190        budget: WorkflowBudgetSnapshot {
191            total: options.budget_total,
192            spent: options.budget_spent,
193        },
194        sandbox: Default::default(),
195    })?;
196
197    let (js_commands, js_command_rx) = mpsc::channel::<JsCommand>(64);
198    let (js_event_tx, mut js_events) = mpsc::channel::<JsEvent>(64);
199    let js_task = tokio::task::spawn_local(js_runtime_actor(execution, js_command_rx, js_event_tx));
200
201    let emit_lifecycle_events = options.event_sink.is_some();
202    let event_start = options.event_stream_start.unwrap_or_else(Instant::now);
203
204    let mut state = RunState {
205        script_path,
206        metadata,
207        event_start,
208        agent_provider: options.agent_provider,
209        model_map: options.model_map,
210        logs: Vec::new(),
211        phases: Vec::new(),
212        agent_calls: Vec::new(),
213        workflow_calls: Vec::new(),
214        budget: WorkflowBudgetSnapshot {
215            total: options.budget_total,
216            spent: options.budget_spent,
217        },
218        token_usage: WorkflowTokenUsage::default(),
219        token_usage_by_phase: Default::default(),
220        agent_runs: Vec::new(),
221        active_request_ids: BTreeSet::new(),
222        nesting_depth: options.nesting_depth,
223        max_parallel_agent_requests: options.max_parallel_agent_requests,
224        agent_runner: options
225            .agent_runner
226            .unwrap_or_else(|| Arc::new(DirectWorkflowAgentRunner)),
227        cancel_rx: options.cancel_rx,
228        event_sink: options.event_sink,
229        event_parent_step_id: options.event_parent_step_id,
230        session_log_sink: options.session_log_sink,
231    };
232
233    let mut pending_requests = VecDeque::<WorkflowRuntimeRequest>::new();
234    let mut agent_tasks = JoinSet::<AgentTaskCompletion>::new();
235    let mut sleep_tasks = JoinSet::<SleepTaskCompletion>::new();
236
237    if emit_lifecycle_events {
238        if let Err(error) = state
239            .emit_event(WorkflowEvent::started(rfc3339_now()?))
240            .await
241        {
242            let _ = send_js_command(&js_commands, JsCommand::Shutdown).await;
243            let _ = js_task.await;
244            return Err(error);
245        }
246    }
247
248    let workflow_result: anyhow::Result<RunWorkflowResult> = loop {
249        if let Err(error) = state
250            .start_pending_requests(
251                &mut pending_requests,
252                &mut agent_tasks,
253                &mut sleep_tasks,
254                &js_commands,
255            )
256            .await
257        {
258            break Err(error);
259        }
260
261        tokio::select! {
262            biased;
263            () = wait_for_cancellation(&mut state.cancel_rx) => {
264                break state.cancel_workflow(
265                    &mut pending_requests,
266                    &mut agent_tasks,
267                    &mut sleep_tasks,
268                    &js_commands,
269                    &mut js_events,
270                ).await;
271            }
272            event = js_events.recv() => {
273                let event = match event {
274                    Some(event) => event,
275                    None => break Err(anyhow!("JavaScript runtime actor stopped unexpectedly")),
276                };
277                match state.handle_js_event(event, &mut pending_requests).await {
278                    Ok(Some(result)) => break Ok(result),
279                    Ok(None) => {}
280                    Err(error) => break Err(error),
281                }
282            }
283            completion = agent_tasks.join_next(), if !agent_tasks.is_empty() => {
284                let completion = match completion {
285                    Some(Ok(completion)) => completion,
286                    Some(Err(error)) => break Err(anyhow!("agent provider task failed: {error}")),
287                    None => break Err(anyhow!("agent task set ended unexpectedly")),
288                };
289                let AgentTaskCompletion { id, input, provider, result } = completion;
290                state.active_request_ids.remove(&id);
291                let resolution = match result {
292                    Ok(result) => match state.apply_agent_result(&id, &input, provider, result).await {
293                        Ok(value) => WorkflowRuntimeRequestResolution::OkWithBudget {
294                            value,
295                            budget: state.budget.clone(),
296                        },
297                        Err(error) => WorkflowRuntimeRequestResolution::Err {
298                            message: error.to_string(),
299                        },
300                    },
301                    Err(error) => {
302                        let message = error.to_string();
303                        if let Err(emit_error) = state.emit_agent_failed_event(&id, provider.as_deref(), &message).await {
304                            log::debug!("failed to emit agent failure event: {emit_error:#}");
305                        }
306                        WorkflowRuntimeRequestResolution::Err { message }
307                    },
308                };
309                if let Err(error) = send_js_command(&js_commands, JsCommand::ResolveRequest { id, resolution }).await {
310                    break Err(error);
311                }
312            }
313            completion = sleep_tasks.join_next(), if !sleep_tasks.is_empty() => {
314                let completion = match completion {
315                    Some(Ok(completion)) => completion,
316                    Some(Err(error)) => break Err(anyhow!("sleep task failed: {error}")),
317                    None => break Err(anyhow!("sleep task set ended unexpectedly")),
318                };
319                let SleepTaskCompletion { id, result } = completion;
320                state.active_request_ids.remove(&id);
321                let resolution = match result {
322                    Ok(()) => WorkflowRuntimeRequestResolution::OkUndefined,
323                    Err(error) => WorkflowRuntimeRequestResolution::Err {
324                        message: error.to_string(),
325                    },
326                };
327                if let Err(error) = send_js_command(&js_commands, JsCommand::ResolveRequest { id, resolution }).await {
328                    break Err(error);
329                }
330            }
331        }
332    };
333
334    let _ = send_js_command(&js_commands, JsCommand::Shutdown).await;
335    let _ = js_task.await;
336
337    if emit_lifecycle_events {
338        match &workflow_result {
339            Ok(result) => {
340                state
341                    .emit_event(WorkflowEvent::result(
342                        result.token_usage.input_tokens,
343                        result.token_usage.output_tokens,
344                        result.token_usage.total_tokens,
345                        result.output.result.clone(),
346                    ))
347                    .await?
348            }
349            Err(error) => {
350                state
351                    .emit_event(WorkflowEvent::error(error.to_string(), None))
352                    .await?;
353            }
354        }
355    }
356
357    workflow_result
358}
359
360enum JsCommand {
361    ResolveRequest {
362        id: String,
363        resolution: WorkflowRuntimeRequestResolution,
364    },
365    Shutdown,
366}
367
368enum JsEvent {
369    Call(WorkflowRuntimeCall),
370    Request(WorkflowRuntimeRequest),
371    Complete(WorkflowModuleOutput),
372    Error(String),
373}
374
375async fn js_runtime_actor(
376    mut execution: Box<dyn WorkflowRuntimeExecution>,
377    mut commands: mpsc::Receiver<JsCommand>,
378    events: mpsc::Sender<JsEvent>,
379) {
380    let mut outstanding_requests = 0usize;
381    loop {
382        match execution.poll() {
383            Ok(WorkflowRuntimePoll::Call(call)) => {
384                if events.send(JsEvent::Call(call)).await.is_err() {
385                    return;
386                }
387            }
388            Ok(WorkflowRuntimePoll::Request(request)) => {
389                let requests = match execution.take_pending_requests() {
390                    Ok(requests) if requests.is_empty() => vec![request],
391                    Ok(requests) => requests,
392                    Err(error) => {
393                        let _ = events.send(JsEvent::Error(error.to_string())).await;
394                        return;
395                    }
396                };
397                outstanding_requests = outstanding_requests.saturating_add(requests.len());
398                for request in requests {
399                    if events.send(JsEvent::Request(request)).await.is_err() {
400                        return;
401                    }
402                }
403            }
404            Ok(WorkflowRuntimePoll::Complete(output)) => {
405                let _ = events.send(JsEvent::Complete(output)).await;
406                return;
407            }
408            Ok(WorkflowRuntimePoll::Pending) => {
409                if outstanding_requests == 0 {
410                    tokio::time::sleep(std::time::Duration::from_millis(1)).await;
411                    continue;
412                }
413                match commands.recv().await {
414                    Some(JsCommand::ResolveRequest { id, resolution }) => {
415                        outstanding_requests = outstanding_requests.saturating_sub(1);
416                        if let Err(error) = execution.resolve_request(&id, resolution) {
417                            let _ = events.send(JsEvent::Error(error.to_string())).await;
418                            return;
419                        }
420                    }
421                    Some(JsCommand::Shutdown) | None => return,
422                }
423            }
424            Err(error) => {
425                let _ = events.send(JsEvent::Error(error.to_string())).await;
426                return;
427            }
428        }
429    }
430}
431
432async fn send_js_command(
433    commands: &mpsc::Sender<JsCommand>,
434    command: JsCommand,
435) -> anyhow::Result<()> {
436    commands
437        .send(command)
438        .await
439        .map_err(|_| anyhow!("JavaScript runtime actor stopped unexpectedly"))
440}
441
442struct RunState {
443    script_path: PathBuf,
444    metadata: WorkflowMetadata,
445    event_start: Instant,
446    agent_provider: Arc<dyn AgentProvider>,
447    model_map: BTreeMap<String, String>,
448    logs: Vec<Vec<Value>>,
449    phases: Vec<WorkflowPhaseCall>,
450    agent_calls: Vec<WorkflowRuntimeRequest>,
451    workflow_calls: Vec<WorkflowRuntimeRequest>,
452    budget: WorkflowBudgetSnapshot,
453    token_usage: WorkflowTokenUsage,
454    token_usage_by_phase: std::collections::BTreeMap<String, WorkflowTokenUsage>,
455    agent_runs: Vec<WorkflowAgentRunSummary>,
456    active_request_ids: BTreeSet<String>,
457    nesting_depth: usize,
458    max_parallel_agent_requests: Option<usize>,
459    agent_runner: Arc<dyn WorkflowAgentRunner>,
460    cancel_rx: Option<watch::Receiver<bool>>,
461    event_sink: Option<Arc<dyn WorkflowEventSink>>,
462    event_parent_step_id: Option<String>,
463    session_log_sink: Option<Arc<dyn AgentSessionLogSink>>,
464}
465
466struct PreparedAgentRun {
467    provider_override: Option<String>,
468    input: AgentProviderRunInput,
469}
470
471struct AgentTaskCompletion {
472    id: String,
473    input: AgentProviderRunInput,
474    provider: Option<String>,
475    result: anyhow::Result<AgentProviderResult>,
476}
477
478struct SleepTaskCompletion {
479    id: String,
480    result: anyhow::Result<()>,
481}
482
483fn add_usage(total: &mut WorkflowTokenUsage, usage: Option<&AgentUsage>) {
484    let Some(usage) = usage else {
485        return;
486    };
487
488    total.input_tokens = total
489        .input_tokens
490        .saturating_add(usage.input_tokens.unwrap_or_default());
491    total.output_tokens = total
492        .output_tokens
493        .saturating_add(usage.output_tokens.unwrap_or_default());
494    total.cache_read_tokens = total
495        .cache_read_tokens
496        .saturating_add(usage.cache_read_tokens.unwrap_or_default());
497    total.cache_write_tokens = total
498        .cache_write_tokens
499        .saturating_add(usage.cache_write_tokens.unwrap_or_default());
500    total.total_tokens = total
501        .total_tokens
502        .saturating_add(usage.total_tokens.unwrap_or_default());
503
504    if let Some(cost) = usage.cost.as_ref() {
505        total.cost = Some(merge_cost(total.cost.as_ref(), cost));
506    }
507}
508
509fn merge_token_usage(total: &mut WorkflowTokenUsage, usage: &WorkflowTokenUsage) {
510    total.input_tokens = total.input_tokens.saturating_add(usage.input_tokens);
511    total.output_tokens = total.output_tokens.saturating_add(usage.output_tokens);
512    total.cache_read_tokens = total
513        .cache_read_tokens
514        .saturating_add(usage.cache_read_tokens);
515    total.cache_write_tokens = total
516        .cache_write_tokens
517        .saturating_add(usage.cache_write_tokens);
518    total.total_tokens = total.total_tokens.saturating_add(usage.total_tokens);
519    if let Some(cost) = usage.cost.as_ref() {
520        total.cost = Some(merge_cost(total.cost.as_ref(), cost));
521    }
522}
523
524fn merge_cost(left: Option<&AgentUsageCost>, right: &AgentUsageCost) -> AgentUsageCost {
525    AgentUsageCost {
526        input: sum_f64(left.and_then(|cost| cost.input), right.input),
527        output: sum_f64(left.and_then(|cost| cost.output), right.output),
528        cache_read: sum_f64(left.and_then(|cost| cost.cache_read), right.cache_read),
529        cache_write: sum_f64(left.and_then(|cost| cost.cache_write), right.cache_write),
530        total: sum_f64(left.and_then(|cost| cost.total), right.total),
531        currency: right
532            .currency
533            .clone()
534            .or_else(|| left.and_then(|cost| cost.currency.clone())),
535    }
536}
537
538fn elapsed_nanos(start: Instant) -> u64 {
539    u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX)
540}
541
542fn rfc3339_now() -> anyhow::Result<String> {
543    Ok(time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339)?)
544}
545
546fn raw_agent_event_payloads(raw: &Value) -> Vec<Value> {
547    if let Some(events) = raw.get("events").and_then(Value::as_array) {
548        events.clone()
549    } else if let Some(items) = raw.as_array() {
550        items.clone()
551    } else {
552        vec![raw.clone()]
553    }
554}
555
556fn agent_session_event_payload(provider_event: Value, metadata: &WorkflowEventMetadata) -> Value {
557    let mut payload = serde_json::Map::new();
558    if let Some(provider) = metadata.provider.as_ref() {
559        payload.insert("provider".to_string(), Value::String(provider.clone()));
560    }
561    if let Some(session_id) = metadata.session_id.as_ref() {
562        payload.insert("sessionId".to_string(), Value::String(session_id.clone()));
563    }
564    if let Some(run_id) = metadata.run_id.as_ref() {
565        payload.insert("runId".to_string(), Value::String(run_id.clone()));
566    }
567    if let Some(step_id) = metadata.step_id.as_ref() {
568        payload.insert("stepId".to_string(), Value::String(step_id.clone()));
569    }
570    payload.insert("providerEvent".to_string(), provider_event);
571    Value::Object(payload)
572}
573
574fn truncate_for_event(value: &str, max_chars: usize) -> String {
575    let mut chars = value.chars();
576    let truncated = chars.by_ref().take(max_chars).collect::<String>();
577    if chars.next().is_some() {
578        format!("{truncated}…")
579    } else {
580        truncated
581    }
582}
583
584fn format_log_message(values: &[Value]) -> String {
585    values
586        .iter()
587        .map(|value| match value {
588            Value::String(value) => value.clone(),
589            value => serde_json::to_string(value).unwrap_or_else(|_| String::from("<unprintable>")),
590        })
591        .collect::<Vec<_>>()
592        .join(" ")
593}
594
595fn sum_f64(left: Option<f64>, right: Option<f64>) -> Option<f64> {
596    match (left, right) {
597        (None, None) => None,
598        (left, right) => Some(left.unwrap_or_default() + right.unwrap_or_default()),
599    }
600}
601
602async fn wait_for_cancellation(cancel_rx: &mut Option<watch::Receiver<bool>>) {
603    let Some(cancel_rx) = cancel_rx else {
604        std::future::pending::<()>().await;
605        return;
606    };
607    while !*cancel_rx.borrow() {
608        if cancel_rx.changed().await.is_err() {
609            return;
610        }
611    }
612}
613
614impl RunState {
615    async fn handle_js_event(
616        &mut self,
617        event: JsEvent,
618        pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
619    ) -> anyhow::Result<Option<RunWorkflowResult>> {
620        match event {
621            JsEvent::Call(call) => self.handle_call(call).await?,
622            JsEvent::Request(request) => {
623                log::debug!(
624                    "workflow runtime request id={} kind={}",
625                    request.id(),
626                    request.kind()
627                );
628                pending_requests.push_back(request);
629            }
630            JsEvent::Complete(output) => {
631                log::debug!(
632                    "run_workflow complete script={} budget_spent={}",
633                    self.script_path.display(),
634                    self.budget.spent
635                );
636                return Ok(Some(RunWorkflowResult {
637                    output,
638                    logs: std::mem::take(&mut self.logs),
639                    phases: std::mem::take(&mut self.phases),
640                    agent_calls: std::mem::take(&mut self.agent_calls),
641                    workflow_calls: std::mem::take(&mut self.workflow_calls),
642                    budget: self.budget.clone(),
643                    token_usage: std::mem::take(&mut self.token_usage),
644                    token_usage_by_phase: std::mem::take(&mut self.token_usage_by_phase),
645                    agent_runs: std::mem::take(&mut self.agent_runs),
646                }));
647            }
648            JsEvent::Error(message) => bail!(message),
649        }
650        Ok(None)
651    }
652
653    async fn start_pending_requests(
654        &mut self,
655        pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
656        agent_tasks: &mut JoinSet<AgentTaskCompletion>,
657        sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
658        js_commands: &mpsc::Sender<JsCommand>,
659    ) -> anyhow::Result<()> {
660        loop {
661            let Some(request) = pending_requests.front() else {
662                return Ok(());
663            };
664            if matches!(request, WorkflowRuntimeRequest::Agent { .. })
665                && !self.agent_capacity_available(agent_tasks.len())
666            {
667                return Ok(());
668            }
669
670            let request = pending_requests
671                .pop_front()
672                .expect("pending request should exist");
673            match request {
674                WorkflowRuntimeRequest::Agent { .. } => match self.prepare_agent_request(request) {
675                    Ok((id, prepared)) => {
676                        self.emit_agent_started_event(&id, &prepared).await?;
677                        self.spawn_agent_task(agent_tasks, id, prepared);
678                    }
679                    Err((id, error)) => {
680                        send_js_command(
681                            js_commands,
682                            JsCommand::ResolveRequest {
683                                id,
684                                resolution: WorkflowRuntimeRequestResolution::Err {
685                                    message: error.to_string(),
686                                },
687                            },
688                        )
689                        .await?;
690                    }
691                },
692                WorkflowRuntimeRequest::Sleep { id, duration_ms } => {
693                    self.spawn_sleep_task(sleep_tasks, id, duration_ms);
694                }
695                WorkflowRuntimeRequest::Workflow {
696                    id,
697                    workflow_ref,
698                    args,
699                } => {
700                    self.workflow_calls.push(WorkflowRuntimeRequest::Workflow {
701                        id: id.clone(),
702                        workflow_ref: workflow_ref.clone(),
703                        args: args.clone(),
704                    });
705                    let parent_event_step_id = self.event_step_id(&id);
706                    let resolution = match self
707                        .handle_workflow(parent_event_step_id, workflow_ref, args)
708                        .await
709                    {
710                        Ok(value) => WorkflowRuntimeRequestResolution::OkWithBudget {
711                            value,
712                            budget: self.budget.clone(),
713                        },
714                        Err(error) => WorkflowRuntimeRequestResolution::Err {
715                            message: error.to_string(),
716                        },
717                    };
718                    send_js_command(js_commands, JsCommand::ResolveRequest { id, resolution })
719                        .await?;
720                }
721            }
722        }
723    }
724
725    async fn cancel_workflow(
726        &mut self,
727        pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
728        agent_tasks: &mut JoinSet<AgentTaskCompletion>,
729        sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
730        js_commands: &mpsc::Sender<JsCommand>,
731        js_events: &mut mpsc::Receiver<JsEvent>,
732    ) -> anyhow::Result<RunWorkflowResult> {
733        log::debug!(
734            "workflow cancellation requested script={}",
735            self.script_path.display()
736        );
737
738        if pending_requests.is_empty()
739            && self.active_request_ids.is_empty()
740            && agent_tasks.is_empty()
741            && sleep_tasks.is_empty()
742            && self
743                .reject_next_runtime_request_for_cancellation(js_commands, js_events)
744                .await
745        {
746            bail!("workflow cancelled");
747        }
748
749        self.reject_pending_requests_for_cancellation(pending_requests, js_commands)
750            .await;
751        sleep_tasks.abort_all();
752        self.reject_active_sleep_requests_for_cancellation(sleep_tasks, js_commands)
753            .await;
754
755        if self.session_log_sink.is_some() {
756            while let Some(completion) = agent_tasks.join_next().await {
757                match completion {
758                    Ok(AgentTaskCompletion {
759                        id,
760                        input,
761                        provider,
762                        result: Ok(result),
763                    }) => {
764                        self.active_request_ids.remove(&id);
765                        if let Err(error) = self
766                            .emit_agent_result_events(&id, provider.as_deref(), &result)
767                            .await
768                        {
769                            log::debug!("failed to emit drained agent events during cancellation: {error:#}");
770                        }
771                        if let Err(error) = self
772                            .emit_agent_completed_event(&id, provider.as_deref(), &result)
773                            .await
774                        {
775                            log::debug!("failed to emit drained agent completion during cancellation: {error:#}");
776                        }
777                        self.record_agent_run(&id, &input, provider, &result);
778                        self.reject_request_for_cancellation(id, js_commands).await;
779                    }
780                    Ok(AgentTaskCompletion {
781                        id,
782                        provider,
783                        result: Err(error),
784                        ..
785                    }) => {
786                        self.active_request_ids.remove(&id);
787                        let message = error.to_string();
788                        if let Err(error) = self
789                            .emit_agent_failed_event(&id, provider.as_deref(), &message)
790                            .await
791                        {
792                            log::debug!("failed to emit drained agent failure during cancellation: {error:#}");
793                        }
794                        log::debug!("agent task failed while draining cancellation: {message}");
795                        self.reject_request_for_cancellation(id, js_commands).await;
796                    }
797                    Err(error) => {
798                        log::debug!("agent task join failed while draining cancellation: {error}");
799                    }
800                }
801            }
802        } else {
803            let ids: Vec<String> = self.active_request_ids.iter().cloned().collect();
804            agent_tasks.abort_all();
805            for id in ids {
806                self.active_request_ids.remove(&id);
807                self.reject_request_for_cancellation(id, js_commands).await;
808            }
809        }
810
811        self.reject_remaining_active_requests_for_cancellation(js_commands)
812            .await;
813        self.drain_runtime_after_cancellation(js_events).await;
814        let _ = send_js_command(js_commands, JsCommand::Shutdown).await;
815        bail!("workflow cancelled")
816    }
817
818    async fn reject_next_runtime_request_for_cancellation(
819        &mut self,
820        js_commands: &mpsc::Sender<JsCommand>,
821        js_events: &mut mpsc::Receiver<JsEvent>,
822    ) -> bool {
823        loop {
824            match js_events.recv().await {
825                Some(JsEvent::Call(call)) => {
826                    let _ = self.handle_call(call).await;
827                }
828                Some(JsEvent::Request(request)) => {
829                    self.reject_request_for_cancellation(request.id().to_string(), js_commands)
830                        .await;
831                    return false;
832                }
833                Some(JsEvent::Complete(_)) | Some(JsEvent::Error(_)) | None => return true,
834            }
835        }
836    }
837
838    async fn reject_pending_requests_for_cancellation(
839        &mut self,
840        pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
841        js_commands: &mpsc::Sender<JsCommand>,
842    ) {
843        while let Some(request) = pending_requests.pop_front() {
844            self.reject_request_for_cancellation(request.id().to_string(), js_commands)
845                .await;
846        }
847    }
848
849    async fn reject_active_sleep_requests_for_cancellation(
850        &mut self,
851        sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
852        js_commands: &mpsc::Sender<JsCommand>,
853    ) {
854        while let Some(completion) = sleep_tasks.join_next().await {
855            if let Ok(SleepTaskCompletion { id, .. }) = completion {
856                self.active_request_ids.remove(&id);
857                self.reject_request_for_cancellation(id, js_commands).await;
858            }
859        }
860    }
861
862    async fn reject_remaining_active_requests_for_cancellation(
863        &mut self,
864        js_commands: &mpsc::Sender<JsCommand>,
865    ) {
866        let ids: Vec<String> = self.active_request_ids.iter().cloned().collect();
867        for id in ids {
868            self.active_request_ids.remove(&id);
869            self.reject_request_for_cancellation(id, js_commands).await;
870        }
871    }
872
873    async fn reject_request_for_cancellation(
874        &self,
875        id: String,
876        js_commands: &mpsc::Sender<JsCommand>,
877    ) {
878        let _ = send_js_command(
879            js_commands,
880            JsCommand::ResolveRequest {
881                id,
882                resolution: WorkflowRuntimeRequestResolution::Err {
883                    message: "workflow cancelled".to_string(),
884                },
885            },
886        )
887        .await;
888    }
889
890    async fn drain_runtime_after_cancellation(&mut self, js_events: &mut mpsc::Receiver<JsEvent>) {
891        while let Some(event) = js_events.recv().await {
892            match event {
893                JsEvent::Call(call) => {
894                    let _ = self.handle_call(call).await;
895                }
896                JsEvent::Request(request) => {
897                    log::debug!(
898                        "ignoring request after cancellation id={} kind={}",
899                        request.id(),
900                        request.kind()
901                    );
902                }
903                JsEvent::Complete(_) | JsEvent::Error(_) => break,
904            }
905        }
906    }
907
908    fn event_step_id(&self, runtime_request_id: &str) -> String {
909        let parent = self.event_parent_step_id.as_deref().unwrap_or("");
910        let hash = blake3::hash(
911            format!("{parent}:{}:{runtime_request_id}", self.nesting_depth).as_bytes(),
912        );
913        format!("step_{}", &hash.to_hex()[..16])
914    }
915
916    async fn emit_event(&self, mut event: WorkflowEvent) -> anyhow::Result<()> {
917        if (event.event_type.as_str() != "workflow.started" || self.nesting_depth > 0)
918            && event.elapsed_nanos.is_none()
919        {
920            event.elapsed_nanos = Some(elapsed_nanos(self.event_start));
921        }
922        let metadata = event
923            .metadata
924            .get_or_insert_with(WorkflowEventMetadata::default);
925        if metadata.workflow_depth.is_none() {
926            metadata.workflow_depth = Some(u32::try_from(self.nesting_depth).unwrap_or(u32::MAX));
927        }
928        if metadata.parent_step_id.is_none() {
929            metadata.parent_step_id = self.event_parent_step_id.clone();
930        }
931        if let Some(event_sink) = self.event_sink.as_ref() {
932            event_sink.emit(event).await?;
933        }
934        Ok(())
935    }
936
937    async fn handle_call(&mut self, call: WorkflowRuntimeCall) -> anyhow::Result<()> {
938        match call {
939            WorkflowRuntimeCall::Log { values } => {
940                self.emit_event(WorkflowEvent::log(format_log_message(&values)))
941                    .await?;
942                self.logs.push(values);
943            }
944            WorkflowRuntimeCall::Phase { name, options } => {
945                let phase = WorkflowPhaseCall { name, options };
946                self.emit_event(WorkflowEvent::phase(
947                    phase.name.clone(),
948                    phase.options.clone(),
949                ))
950                .await?;
951                self.phases.push(phase);
952            }
953        }
954        Ok(())
955    }
956
957    fn agent_capacity_available(&self, in_flight: usize) -> bool {
958        let max_parallel = self
959            .max_parallel_agent_requests
960            .filter(|value| *value > 0)
961            .unwrap_or(usize::MAX);
962        in_flight < max_parallel
963    }
964
965    fn prepare_agent_request(
966        &mut self,
967        request: WorkflowRuntimeRequest,
968    ) -> Result<(String, PreparedAgentRun), (String, anyhow::Error)> {
969        match request {
970            WorkflowRuntimeRequest::Agent {
971                id,
972                prompt,
973                options,
974            } => {
975                self.agent_calls.push(WorkflowRuntimeRequest::Agent {
976                    id: id.clone(),
977                    prompt: prompt.clone(),
978                    options: options.clone(),
979                });
980                match self.prepare_agent_run(prompt, options) {
981                    Ok(prepared) => Ok((id, prepared)),
982                    Err(error) => Err((id, error)),
983                }
984            }
985            WorkflowRuntimeRequest::Workflow { .. } | WorkflowRuntimeRequest::Sleep { .. } => {
986                unreachable!("prepare_agent_request only accepts agent requests")
987            }
988        }
989    }
990
991    fn spawn_agent_task(
992        &mut self,
993        agent_tasks: &mut JoinSet<AgentTaskCompletion>,
994        id: String,
995        prepared: PreparedAgentRun,
996    ) {
997        let default_provider_name = self.agent_provider.name().to_string();
998        let default_provider = Arc::clone(&self.agent_provider);
999        let agent_runner = Arc::clone(&self.agent_runner);
1000        let retry_in_runtime = agent_runner.retry_in_runtime();
1001        let cancel_rx = self.cancel_rx.clone();
1002        let completion_input = prepared.input.clone();
1003        let completion_provider = prepared
1004            .provider_override
1005            .clone()
1006            .or(Some(default_provider_name));
1007        let session_log_sink = self.session_log_sink.clone();
1008        let max_parallel = self
1009            .max_parallel_agent_requests
1010            .filter(|value| *value > 0)
1011            .unwrap_or(usize::MAX);
1012        log::debug!(
1013            "starting agent request id={} in_flight_after_start={} max_parallel={}",
1014            id,
1015            agent_tasks.len() + 1,
1016            max_parallel
1017        );
1018        self.active_request_ids.insert(id.clone());
1019        agent_tasks.spawn(async move {
1020            let result = if retry_in_runtime {
1021                run_agent_runner_with_retry(
1022                    Arc::clone(&agent_runner),
1023                    default_provider,
1024                    prepared.provider_override,
1025                    prepared.input,
1026                    cancel_rx,
1027                )
1028                .await
1029            } else {
1030                agent_runner
1031                    .run_agent(default_provider, prepared.provider_override, prepared.input)
1032                    .await
1033            };
1034            let result = match result {
1035                Ok(result) => {
1036                    if let Some(session_log_sink) = session_log_sink.as_ref() {
1037                        let provider_name = completion_provider
1038                            .as_deref()
1039                            .expect("completion provider should always be set");
1040                        match session_log_sink
1041                            .write_agent_result(provider_name, &result)
1042                            .await
1043                        {
1044                            Ok(()) => Ok(result),
1045                            Err(error) => Err(error),
1046                        }
1047                    } else {
1048                        Ok(result)
1049                    }
1050                }
1051                Err(error) => Err(error),
1052            };
1053            AgentTaskCompletion {
1054                id,
1055                input: completion_input,
1056                provider: completion_provider,
1057                result,
1058            }
1059        });
1060    }
1061
1062    fn spawn_sleep_task(
1063        &mut self,
1064        sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
1065        id: String,
1066        duration_ms: u64,
1067    ) {
1068        let agent_runner = Arc::clone(&self.agent_runner);
1069        log::debug!(
1070            "starting sleep request id={} duration_ms={}",
1071            id,
1072            duration_ms
1073        );
1074        self.active_request_ids.insert(id.clone());
1075        sleep_tasks.spawn(async move {
1076            SleepTaskCompletion {
1077                id,
1078                result: agent_runner.sleep(duration_ms).await,
1079            }
1080        });
1081    }
1082
1083    fn prepare_agent_run(
1084        &self,
1085        prompt: String,
1086        options: Option<Value>,
1087    ) -> anyhow::Result<PreparedAgentRun> {
1088        let options = apply_phase_defaults(options, &self.metadata);
1089        let context = AgentProviderContext {
1090            phase: options
1091                .as_ref()
1092                .and_then(|options| options.get("phase"))
1093                .and_then(Value::as_str)
1094                .map(ToString::to_string),
1095            cwd: self.script_path.parent().map(Path::to_path_buf),
1096        };
1097        let provider_override = options
1098            .as_ref()
1099            .and_then(|options| options.get("provider"))
1100            .and_then(Value::as_str)
1101            .map(ToString::to_string);
1102        let provider_name = provider_override
1103            .as_deref()
1104            .unwrap_or_else(|| self.agent_provider.name());
1105        let options = resolve_model_options(options, provider_name, &self.model_map)?;
1106        agent_retry_policy(&options)?;
1107        log::debug!(
1108            "agent call provider={} phase={:?} model={:?} prompt_len={}",
1109            provider_name,
1110            context.phase.as_deref(),
1111            options
1112                .as_ref()
1113                .and_then(|options| options.get("model"))
1114                .and_then(Value::as_str),
1115            prompt.len()
1116        );
1117        Ok(PreparedAgentRun {
1118            provider_override,
1119            input: AgentProviderRunInput {
1120                prompt,
1121                options,
1122                environment: Arc::new(LocalExecutionEnvironment::new(context.cwd.clone())?),
1123                context,
1124            },
1125        })
1126    }
1127
1128    async fn emit_agent_started_event(
1129        &self,
1130        id: &str,
1131        prepared: &PreparedAgentRun,
1132    ) -> anyhow::Result<()> {
1133        let provider = prepared
1134            .provider_override
1135            .as_deref()
1136            .unwrap_or_else(|| self.agent_provider.name());
1137        let metadata = self.agent_event_metadata(id, Some(provider), None);
1138        self.emit_event(WorkflowEvent::agent_started(
1139            serde_json::json!({
1140                "phase": prepared.input.context.phase,
1141                "promptPreview": truncate_for_event(&prepared.input.prompt, 200),
1142            }),
1143            metadata,
1144        ))
1145        .await
1146    }
1147
1148    async fn apply_agent_result(
1149        &mut self,
1150        id: &str,
1151        input: &AgentProviderRunInput,
1152        provider: Option<String>,
1153        result: AgentProviderResult,
1154    ) -> anyhow::Result<Value> {
1155        if let Some(output_tokens) = result.usage.as_ref().and_then(|usage| usage.output_tokens) {
1156            self.budget.spent = self.budget.spent.saturating_add(output_tokens);
1157        }
1158        self.emit_agent_result_events(id, provider.as_deref(), &result)
1159            .await?;
1160        self.emit_agent_completed_event(id, provider.as_deref(), &result)
1161            .await?;
1162        self.record_agent_run(id, input, provider, &result);
1163        log::debug!(
1164            "agent call complete session_id={:?} output_tokens={:?} budget_spent={}",
1165            result.session_id,
1166            result.usage.as_ref().and_then(|usage| usage.output_tokens),
1167            self.budget.spent
1168        );
1169        Ok(result.output)
1170    }
1171
1172    async fn emit_agent_result_events(
1173        &self,
1174        id: &str,
1175        provider: Option<&str>,
1176        result: &AgentProviderResult,
1177    ) -> anyhow::Result<()> {
1178        let Some(raw) = result.raw.as_ref() else {
1179            return Ok(());
1180        };
1181        let metadata = self.agent_event_metadata(id, provider, result.session_id.clone());
1182        for provider_event in raw_agent_event_payloads(raw) {
1183            let event_data = agent_session_event_payload(provider_event, &metadata);
1184            self.emit_event(WorkflowEvent::agent_event(event_data, metadata.clone()))
1185                .await?;
1186        }
1187        Ok(())
1188    }
1189
1190    async fn emit_agent_completed_event(
1191        &self,
1192        id: &str,
1193        provider: Option<&str>,
1194        result: &AgentProviderResult,
1195    ) -> anyhow::Result<()> {
1196        let metadata = self.agent_event_metadata(id, provider, result.session_id.clone());
1197        self.emit_event(WorkflowEvent::agent_completed(
1198            serde_json::json!({
1199                "sessionId": result.session_id,
1200                "model": result.model,
1201                "usage": result.usage,
1202            }),
1203            metadata,
1204        ))
1205        .await
1206    }
1207
1208    async fn emit_agent_failed_event(
1209        &self,
1210        id: &str,
1211        provider: Option<&str>,
1212        message: &str,
1213    ) -> anyhow::Result<()> {
1214        let metadata = self.agent_event_metadata(id, provider, None);
1215        self.emit_event(WorkflowEvent::agent_failed(
1216            serde_json::json!({ "message": message }),
1217            metadata,
1218        ))
1219        .await
1220    }
1221
1222    fn agent_event_metadata(
1223        &self,
1224        id: &str,
1225        provider: Option<&str>,
1226        session_id: Option<String>,
1227    ) -> WorkflowEventMetadata {
1228        WorkflowEventMetadata {
1229            run_id: None,
1230            step_id: Some(self.event_step_id(id)),
1231            provider: Some(
1232                provider
1233                    .unwrap_or_else(|| self.agent_provider.name())
1234                    .to_string(),
1235            ),
1236            session_id,
1237            workflow_depth: None,
1238            parent_step_id: None,
1239        }
1240    }
1241
1242    fn record_agent_run(
1243        &mut self,
1244        id: &str,
1245        input: &AgentProviderRunInput,
1246        provider: Option<String>,
1247        result: &AgentProviderResult,
1248    ) {
1249        add_usage(&mut self.token_usage, result.usage.as_ref());
1250        if let Some(phase) = input.context.phase.as_ref() {
1251            let phase_usage = self.token_usage_by_phase.entry(phase.clone()).or_default();
1252            add_usage(phase_usage, result.usage.as_ref());
1253        }
1254        let model = result.model.clone().or_else(|| {
1255            input
1256                .options
1257                .as_ref()
1258                .and_then(|options| options.get("model"))
1259                .and_then(Value::as_str)
1260                .map(ToString::to_string)
1261        });
1262        self.agent_runs.push(WorkflowAgentRunSummary {
1263            id: id.to_string(),
1264            phase: input.context.phase.clone(),
1265            provider,
1266            model,
1267            provider_session_id: result.session_id.clone(),
1268            usage: result.usage.clone(),
1269            isolation: result.isolation.clone(),
1270        });
1271    }
1272
1273    async fn handle_workflow(
1274        &mut self,
1275        parent_step_id: String,
1276        workflow_ref: WorkflowRef,
1277        args: Option<Value>,
1278    ) -> anyhow::Result<Value> {
1279        if self.nesting_depth >= 1 {
1280            bail!("Nested workflow() calls are limited to one level");
1281        }
1282        let script_path = match workflow_ref {
1283            WorkflowRef::ScriptPath { script_path } => {
1284                resolve_relative_script(&self.script_path, &script_path)
1285            }
1286            WorkflowRef::Name(name) => resolve_named_workflow(&name)?,
1287        };
1288        log::debug!("child workflow call script={}", script_path.display());
1289        let child = Box::pin(run_workflow_inner(RunWorkflowOptions {
1290            script_path,
1291            args: args.unwrap_or(Value::Null),
1292            agent_provider: Arc::clone(&self.agent_provider),
1293            model_map: self.model_map.clone(),
1294            budget_total: self.budget.total,
1295            budget_spent: self.budget.spent,
1296            nesting_depth: self.nesting_depth + 1,
1297            max_parallel_agent_requests: self.max_parallel_agent_requests,
1298            agent_runner: Some(Arc::clone(&self.agent_runner)),
1299            cancel_rx: self.cancel_rx.clone(),
1300            event_sink: self.event_sink.clone(),
1301            event_parent_step_id: Some(parent_step_id),
1302            event_stream_start: Some(self.event_start),
1303            session_log_sink: self.session_log_sink.clone(),
1304        }))
1305        .await?;
1306        self.budget = child.budget;
1307        self.logs.extend(child.logs);
1308        self.phases.extend(child.phases);
1309        self.agent_calls.extend(child.agent_calls);
1310        self.workflow_calls.extend(child.workflow_calls);
1311        merge_token_usage(&mut self.token_usage, &child.token_usage);
1312        for (phase, usage) in child.token_usage_by_phase {
1313            merge_token_usage(self.token_usage_by_phase.entry(phase).or_default(), &usage);
1314        }
1315        self.agent_runs.extend(child.agent_runs);
1316        Ok(child.output.result)
1317    }
1318}
1319
1320async fn run_agent_runner_with_retry(
1321    agent_runner: Arc<dyn WorkflowAgentRunner>,
1322    default_provider: Arc<dyn AgentProvider>,
1323    provider_override: Option<String>,
1324    input: AgentProviderRunInput,
1325    mut cancel_rx: Option<watch::Receiver<bool>>,
1326) -> anyhow::Result<AgentProviderResult> {
1327    let retry = agent_retry_policy(&input.options)?;
1328    let mut final_result = None;
1329    for attempt in 1..=retry.max_attempts {
1330        let attempt_result = agent_runner
1331            .run_agent(
1332                Arc::clone(&default_provider),
1333                provider_override.clone(),
1334                input.clone(),
1335            )
1336            .await;
1337        match attempt_result {
1338            Ok(result) => {
1339                final_result = Some(Ok(result));
1340                break;
1341            }
1342            Err(error) if attempt < retry.max_attempts => {
1343                log::debug!(
1344                    "agent call failed on attempt {attempt}/{}; retrying after {}ms: {error:#}",
1345                    retry.max_attempts,
1346                    retry.backoff_ms
1347                );
1348                sleep_retry_backoff(retry.backoff_ms, &mut cancel_rx).await?;
1349            }
1350            Err(error) => {
1351                final_result = Some(Err(error));
1352                break;
1353            }
1354        }
1355    }
1356    final_result.unwrap_or_else(|| Err(anyhow!("agent retry loop finished without a result")))
1357}
1358
1359async fn sleep_retry_backoff(
1360    backoff_ms: u64,
1361    cancel_rx: &mut Option<watch::Receiver<bool>>,
1362) -> anyhow::Result<()> {
1363    if backoff_ms == 0 {
1364        return Ok(());
1365    }
1366    let Some(cancel_rx) = cancel_rx.as_mut() else {
1367        tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1368        return Ok(());
1369    };
1370    if *cancel_rx.borrow() {
1371        bail!("workflow cancelled");
1372    }
1373    let sleep = tokio::time::sleep(Duration::from_millis(backoff_ms));
1374    tokio::pin!(sleep);
1375    loop {
1376        tokio::select! {
1377            _ = &mut sleep => return Ok(()),
1378            changed = cancel_rx.changed() => {
1379                match changed {
1380                    Ok(()) if *cancel_rx.borrow() => bail!("workflow cancelled"),
1381                    Ok(()) => continue,
1382                    Err(_) => {
1383                        sleep.await;
1384                        return Ok(());
1385                    }
1386                }
1387            }
1388        }
1389    }
1390}
1391
1392pub(crate) async fn run_agent_provider_with_retry(
1393    default_provider: Arc<dyn AgentProvider>,
1394    provider_override: Option<String>,
1395    input: AgentProviderRunInput,
1396    mut cancel_rx: Option<watch::Receiver<bool>>,
1397) -> anyhow::Result<AgentProviderResult> {
1398    let retry = agent_retry_policy(&input.options)?;
1399    let provider = resolve_agent_provider(default_provider, provider_override)?;
1400    let mut final_result = None;
1401    for attempt in 1..=retry.max_attempts {
1402        let attempt_result =
1403            run_agent_with_optional_isolation(Arc::clone(&provider), input.clone()).await;
1404        match attempt_result {
1405            Ok(result) => {
1406                final_result = Some(Ok(result));
1407                break;
1408            }
1409            Err(error) if attempt < retry.max_attempts => {
1410                log::debug!(
1411                    "agent provider failed on attempt {attempt}/{}; retrying after {}ms: {error:#}",
1412                    retry.max_attempts,
1413                    retry.backoff_ms
1414                );
1415                sleep_retry_backoff(retry.backoff_ms, &mut cancel_rx).await?;
1416            }
1417            Err(error) => {
1418                final_result = Some(Err(error));
1419                break;
1420            }
1421        }
1422    }
1423    final_result.unwrap_or_else(|| Err(anyhow!("agent retry loop finished without a result")))
1424}
1425
1426pub(crate) async fn run_agent_provider(
1427    default_provider: Arc<dyn AgentProvider>,
1428    provider_override: Option<String>,
1429    input: AgentProviderRunInput,
1430) -> anyhow::Result<AgentProviderResult> {
1431    let provider = resolve_agent_provider(default_provider, provider_override)?;
1432    run_agent_with_optional_isolation(provider, input).await
1433}
1434
1435fn resolve_agent_provider(
1436    default_provider: Arc<dyn AgentProvider>,
1437    provider_override: Option<String>,
1438) -> anyhow::Result<Arc<dyn AgentProvider>> {
1439    if let Some(provider_override) = provider_override {
1440        Ok(Arc::from(create_agent_provider(&provider_override)?))
1441    } else {
1442        Ok(default_provider)
1443    }
1444}
1445
1446#[derive(Debug, Clone, Copy)]
1447pub(crate) struct AgentRetryPolicy {
1448    pub max_attempts: u32,
1449    pub backoff_ms: u64,
1450}
1451
1452pub(crate) fn agent_retry_policy(options: &Option<Value>) -> anyhow::Result<AgentRetryPolicy> {
1453    let default = AgentRetryPolicy {
1454        max_attempts: 1,
1455        backoff_ms: 0,
1456    };
1457    let Some(retry) = options.as_ref().and_then(|options| options.get("retry")) else {
1458        return Ok(default);
1459    };
1460    if retry.is_null() {
1461        return Ok(default);
1462    }
1463    let object = retry
1464        .as_object()
1465        .ok_or_else(|| anyhow!("agent retry option must be an object"))?;
1466    let max_attempts = match object.get("maxAttempts") {
1467        Some(value) => {
1468            let value = value
1469                .as_u64()
1470                .ok_or_else(|| anyhow!("agent retry.maxAttempts must be a positive integer"))?;
1471            if value == 0 || value > u32::MAX as u64 {
1472                bail!("agent retry.maxAttempts must be between 1 and {}", u32::MAX);
1473            }
1474            value as u32
1475        }
1476        None => default.max_attempts,
1477    };
1478    let backoff_ms = match object.get("backoffMs") {
1479        Some(value) => value
1480            .as_u64()
1481            .ok_or_else(|| anyhow!("agent retry.backoffMs must be a non-negative integer"))?,
1482        None => default.backoff_ms,
1483    };
1484    Ok(AgentRetryPolicy {
1485        max_attempts,
1486        backoff_ms,
1487    })
1488}
1489
1490async fn run_agent_with_optional_isolation(
1491    provider: Arc<dyn AgentProvider>,
1492    input: AgentProviderRunInput,
1493) -> anyhow::Result<AgentProviderResult> {
1494    if let Some(request) = sandbox_isolation_request(&input.options)? {
1495        return run_agent_with_sandbox_isolation(provider, input, request).await;
1496    }
1497
1498    if requests_worktree_isolation(&input.options) {
1499        return run_agent_with_worktree_isolation(provider, input).await;
1500    }
1501
1502    run_agent_with_schema_validation(provider, input).await
1503}
1504
1505async fn run_agent_with_worktree_isolation(
1506    provider: Arc<dyn AgentProvider>,
1507    input: AgentProviderRunInput,
1508) -> anyhow::Result<AgentProviderResult> {
1509    let isolation = WorktreeIsolation::create(input.context.cwd.as_deref())?;
1510    let isolation_info = isolation.info();
1511    let mut isolated_input = input;
1512    isolated_input.context.cwd = Some(isolation.cwd.clone());
1513    isolated_input.environment =
1514        Arc::new(LocalExecutionEnvironment::new(Some(isolation.cwd.clone()))?);
1515    let mut result = run_agent_with_schema_validation(provider, isolated_input).await;
1516    if let Ok(result) = &mut result {
1517        result.isolation = Some(isolation_info);
1518    }
1519    if let Err(error) = isolation.cleanup() {
1520        log::warn!("failed to cleanup isolated agent worktree: {error:#}");
1521    }
1522    result
1523}
1524
1525async fn run_agent_with_sandbox_isolation(
1526    provider: Arc<dyn AgentProvider>,
1527    input: AgentProviderRunInput,
1528    request: SandboxIsolationRequest,
1529) -> anyhow::Result<AgentProviderResult> {
1530    let program = sandbox_provider_program(&request);
1531    let sandbox_group_id = format!("sbxgrp_{}", ulid::Ulid::new());
1532    let cwd = input.context.cwd.clone().ok_or_else(|| {
1533        anyhow!("agent sandbox isolation requires the workflow cwd to be available")
1534    })?;
1535    let sandbox_env = SandboxExecutionEnvironment::open(
1536        program,
1537        OpenSandboxRequest {
1538            metadata: SandboxMetadata::new(
1539                format!("req_{}", ulid::Ulid::new()),
1540                sandbox_group_id.clone(),
1541            ),
1542            profile: ProfileRef {
1543                provider: request.provider.clone(),
1544                name: request.profile.clone(),
1545            },
1546            workspace_sync: WorkspaceSync { host_path: cwd },
1547            cwd: request.cwd.clone(),
1548        },
1549    )
1550    .await
1551    .with_context(|| {
1552        format!(
1553            "failed to open sandbox profile `{}` with provider `{}`",
1554            request.profile, request.provider
1555        )
1556    })?;
1557
1558    let session = sandbox_env.session().clone();
1559    let mut isolated_input = input;
1560    isolated_input.context.cwd = sandbox_env.cwd().map(|path| PathBuf::from(path.as_str()));
1561    isolated_input.environment = Arc::new(sandbox_env.clone());
1562    let isolation_info = AgentRunIsolation {
1563        kind: "sandbox".to_string(),
1564        branch: None,
1565        worktree_path: None,
1566        cwd: session.cwd.clone(),
1567        profile: Some(request.profile.clone()),
1568        provider: Some(request.provider.clone()),
1569        session_id: Some(session.id.clone()),
1570    };
1571
1572    let mut result = run_agent_with_schema_validation(provider, isolated_input).await;
1573    if let Ok(result) = &mut result {
1574        result.isolation = Some(isolation_info);
1575    }
1576
1577    if let Err(error) = sandbox_env.close().await {
1578        log::warn!("failed to close sandbox-isolated agent session: {error:#}");
1579    }
1580
1581    result
1582}
1583
1584fn requests_worktree_isolation(options: &Option<Value>) -> bool {
1585    options
1586        .as_ref()
1587        .and_then(|options| options.get("isolation"))
1588        .and_then(Value::as_str)
1589        == Some("worktree")
1590}
1591
1592#[derive(Debug, Clone)]
1593struct SandboxIsolationRequest {
1594    provider: String,
1595    profile: String,
1596    cwd: Option<String>,
1597}
1598
1599fn sandbox_isolation_request(
1600    options: &Option<Value>,
1601) -> anyhow::Result<Option<SandboxIsolationRequest>> {
1602    let Some(isolation) = options
1603        .as_ref()
1604        .and_then(|options| options.get("isolation"))
1605    else {
1606        return Ok(None);
1607    };
1608    let Some(object) = isolation.as_object() else {
1609        return Ok(None);
1610    };
1611    if object.get("type").and_then(Value::as_str) != Some("sandbox") {
1612        return Ok(None);
1613    }
1614    let profile = object
1615        .get("profile")
1616        .and_then(Value::as_str)
1617        .ok_or_else(|| anyhow!("agent sandbox isolation requires isolation.profile"))?
1618        .to_string();
1619    let cwd = object
1620        .get("cwd")
1621        .and_then(Value::as_str)
1622        .map(ToString::to_string);
1623    let (provider, profile) = parse_sandbox_profile_ref(&profile)?;
1624    Ok(Some(SandboxIsolationRequest {
1625        provider,
1626        profile,
1627        cwd,
1628    }))
1629}
1630
1631fn parse_sandbox_profile_ref(value: &str) -> anyhow::Result<(String, String)> {
1632    let (provider, profile) = value.split_once('/').ok_or_else(|| {
1633        anyhow!("agent sandbox isolation.profile must use <provider>/<profile>, got `{value}`")
1634    })?;
1635    validate_sandbox_provider_name(provider)?;
1636    if profile.is_empty() {
1637        bail!(
1638            "agent sandbox isolation.profile must include a non-empty provider-local profile name"
1639        );
1640    }
1641    Ok((provider.to_string(), profile.to_string()))
1642}
1643
1644fn validate_sandbox_provider_name(provider: &str) -> anyhow::Result<()> {
1645    let valid = !provider.is_empty()
1646        && provider
1647            .bytes()
1648            .all(|byte| byte.is_ascii_lowercase() || byte.is_ascii_digit() || byte == b'-')
1649        && provider
1650            .as_bytes()
1651            .first()
1652            .is_some_and(u8::is_ascii_alphanumeric)
1653        && provider
1654            .as_bytes()
1655            .last()
1656            .is_some_and(u8::is_ascii_alphanumeric);
1657    if !valid {
1658        bail!("invalid sandbox provider name `{provider}`; expected lowercase letters, digits, and hyphens, starting and ending with an alphanumeric character");
1659    }
1660    Ok(())
1661}
1662
1663fn sandbox_provider_program(request: &SandboxIsolationRequest) -> String {
1664    format!("smol-sandbox-{}", request.provider)
1665}
1666
1667struct WorktreeIsolation {
1668    repo_root: PathBuf,
1669    worktree_root: PathBuf,
1670    cwd: PathBuf,
1671    branch_name: String,
1672    cleaned: bool,
1673    _temp_dir: tempfile::TempDir,
1674}
1675
1676impl WorktreeIsolation {
1677    fn create(cwd: Option<&Path>) -> anyhow::Result<Self> {
1678        let cwd = cwd
1679            .map(Path::to_path_buf)
1680            .unwrap_or(std::env::current_dir()?)
1681            .canonicalize()
1682            .context("failed to canonicalize workflow cwd for worktree isolation")?;
1683        let repo_root = git_output(&cwd, &["rev-parse", "--show-toplevel"]).context(
1684            "agent isolation='worktree' requires the workflow cwd to be inside a git repository",
1685        )?;
1686        let repo_root = PathBuf::from(repo_root.trim())
1687            .canonicalize()
1688            .context("failed to canonicalize git repository root for worktree isolation")?;
1689        let relative_cwd = cwd.strip_prefix(&repo_root).with_context(|| {
1690            format!(
1691                "workflow cwd {} is not under git repository root {}",
1692                cwd.display(),
1693                repo_root.display()
1694            )
1695        })?;
1696
1697        let temp_dir = tempfile::Builder::new()
1698            .prefix("smol-wf-agent-worktree-")
1699            .tempdir()
1700            .context("failed to create temp directory for agent worktree isolation")?;
1701        let worktree_root = temp_dir.path().join("worktree");
1702        let worktree_arg = path_arg(&worktree_root);
1703        let branch_name = format!(
1704            "smol-wf/agent-run/{}",
1705            ulid::Ulid::new().to_string().to_ascii_lowercase()
1706        );
1707        git_status(
1708            &repo_root,
1709            &[
1710                "worktree",
1711                "add",
1712                "--quiet",
1713                "-b",
1714                &branch_name,
1715                &worktree_arg,
1716                "HEAD",
1717            ],
1718        )
1719        .context("failed to create isolated git worktree for agent run")?;
1720        let isolated_cwd = if relative_cwd.as_os_str().is_empty() {
1721            worktree_root.clone()
1722        } else {
1723            worktree_root.join(relative_cwd)
1724        };
1725        Ok(Self {
1726            repo_root,
1727            worktree_root,
1728            cwd: isolated_cwd,
1729            branch_name,
1730            cleaned: false,
1731            _temp_dir: temp_dir,
1732        })
1733    }
1734
1735    fn info(&self) -> AgentRunIsolation {
1736        AgentRunIsolation {
1737            kind: "worktree".to_string(),
1738            branch: Some(self.branch_name.clone()),
1739            worktree_path: Some(path_arg(&self.worktree_root)),
1740            cwd: Some(path_arg(&self.cwd)),
1741            profile: None,
1742            provider: None,
1743            session_id: None,
1744        }
1745    }
1746
1747    fn cleanup(mut self) -> anyhow::Result<()> {
1748        self.remove_worktree()?;
1749        self.delete_branch()?;
1750        self.cleaned = true;
1751        Ok(())
1752    }
1753
1754    fn remove_worktree(&self) -> anyhow::Result<()> {
1755        let worktree_arg = path_arg(&self.worktree_root);
1756        git_status(
1757            &self.repo_root,
1758            &["worktree", "remove", "--force", &worktree_arg],
1759        )
1760        .context("failed to remove isolated git worktree")
1761    }
1762
1763    fn delete_branch(&self) -> anyhow::Result<()> {
1764        git_status(&self.repo_root, &["branch", "-D", &self.branch_name])
1765            .context("failed to delete isolated agent worktree branch")
1766    }
1767}
1768
1769impl Drop for WorktreeIsolation {
1770    fn drop(&mut self) {
1771        if !self.cleaned {
1772            if let Err(error) = self.remove_worktree() {
1773                log::warn!("failed to cleanup isolated agent worktree during drop: {error:#}");
1774            }
1775            if let Err(error) = self.delete_branch() {
1776                log::warn!(
1777                    "failed to delete isolated agent worktree branch during drop: {error:#}"
1778                );
1779            }
1780        }
1781    }
1782}
1783
1784fn path_arg(path: &Path) -> String {
1785    path.to_string_lossy().into_owned()
1786}
1787
1788fn git_output(cwd: &Path, args: &[&str]) -> anyhow::Result<String> {
1789    let output = StdCommand::new("git")
1790        .args(args)
1791        .current_dir(cwd)
1792        .output()
1793        .with_context(|| format!("failed to run git {}", args.join(" ")))?;
1794    if output.status.success() {
1795        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
1796    } else {
1797        bail!(
1798            "git {} failed with {}{}",
1799            args.join(" "),
1800            status_text(output.status.code()),
1801            command_stderr(&output.stderr)
1802        )
1803    }
1804}
1805
1806fn git_status(cwd: &Path, args: &[&str]) -> anyhow::Result<()> {
1807    let output = StdCommand::new("git")
1808        .args(args)
1809        .current_dir(cwd)
1810        .output()
1811        .with_context(|| format!("failed to run git {}", args.join(" ")))?;
1812    if output.status.success() {
1813        Ok(())
1814    } else {
1815        bail!(
1816            "git {} failed with {}{}",
1817            args.join(" "),
1818            status_text(output.status.code()),
1819            command_stderr(&output.stderr)
1820        )
1821    }
1822}
1823
1824fn status_text(code: Option<i32>) -> String {
1825    code.map(|code| format!("code {code}"))
1826        .unwrap_or_else(|| "signal".to_string())
1827}
1828
1829fn command_stderr(stderr: &[u8]) -> String {
1830    let stderr = String::from_utf8_lossy(stderr);
1831    let stderr = stderr.trim();
1832    if stderr.is_empty() {
1833        String::new()
1834    } else {
1835        format!(": {stderr}")
1836    }
1837}
1838
1839async fn run_agent_with_schema_validation(
1840    provider: Arc<dyn AgentProvider>,
1841    input: AgentProviderRunInput,
1842) -> anyhow::Result<AgentProviderResult> {
1843    let Some(schema) = input
1844        .options
1845        .as_ref()
1846        .and_then(|options| options.get("schema"))
1847        .cloned()
1848    else {
1849        return provider.run(input).await;
1850    };
1851
1852    let max_attempts = 2;
1853    let original_prompt = input.prompt.clone();
1854    let mut attempt_input = input;
1855    let mut last_errors = Vec::new();
1856
1857    for attempt in 1..=max_attempts {
1858        let result = provider.run(attempt_input.clone()).await?;
1859        match validate_structured_output(&schema, &result.output) {
1860            Ok(()) => return Ok(result),
1861            Err(errors) => {
1862                last_errors = errors;
1863                if attempt < max_attempts {
1864                    attempt_input.prompt =
1865                        with_structured_output_retry_prompt(&original_prompt, &last_errors);
1866                }
1867            }
1868        }
1869    }
1870
1871    bail!(
1872        "{}",
1873        format_structured_output_validation_error(&last_errors)
1874    )
1875}
1876
1877fn validate_structured_output(schema: &Value, output: &Value) -> Result<(), Vec<String>> {
1878    let validator = jsonschema::validator_for(schema)
1879        .map_err(|error| vec![format!("/ schema is invalid: {}", error)])?;
1880    let errors = validator
1881        .iter_errors(output)
1882        .map(|error| {
1883            let path = error.instance_path().to_string();
1884            let path = if path.is_empty() {
1885                "/".to_string()
1886            } else {
1887                path
1888            };
1889            format!("{path} {error}")
1890        })
1891        .collect::<Vec<_>>();
1892
1893    if errors.is_empty() {
1894        Ok(())
1895    } else {
1896        Err(errors)
1897    }
1898}
1899
1900fn format_structured_output_validation_error(errors: &[String]) -> String {
1901    format!(
1902        "Structured output did not match JSON Schema: {}",
1903        errors.join("; ")
1904    )
1905}
1906
1907fn with_structured_output_retry_prompt(prompt: &str, errors: &[String]) -> String {
1908    let mut lines = vec![
1909        prompt.to_string(),
1910        String::new(),
1911        "Previous structured output failed JSON Schema validation.".to_string(),
1912        "Return a corrected structured output that satisfies the original JSON Schema.".to_string(),
1913        "Validation errors:".to_string(),
1914    ];
1915    lines.extend(errors.iter().map(|error| format!("- {error}")));
1916    lines.join("\n")
1917}
1918
1919#[derive(Debug, Clone, PartialEq, Eq)]
1920struct ResolvedModelSelector {
1921    requested: String,
1922    selector: String,
1923    model_id: String,
1924    model_provider: Option<String>,
1925    thinking: Option<String>,
1926}
1927
1928impl ResolvedModelSelector {
1929    fn provider_model(&self) -> String {
1930        match &self.model_provider {
1931            Some(provider) => format!("{provider}/{}", self.model_id),
1932            None => self.model_id.clone(),
1933        }
1934    }
1935}
1936
1937fn resolve_model_options(
1938    options: Option<Value>,
1939    agent_provider: &str,
1940    model_map: &BTreeMap<String, String>,
1941) -> anyhow::Result<Option<Value>> {
1942    let Some(model) = options
1943        .as_ref()
1944        .and_then(Value::as_object)
1945        .and_then(|object| object.get("model"))
1946        .and_then(Value::as_str)
1947        .map(ToString::to_string)
1948    else {
1949        return Ok(options);
1950    };
1951
1952    let mapped_selector = model_map.get(&model).cloned();
1953    let alias_matched = mapped_selector.is_some();
1954    let selector = mapped_selector.unwrap_or_else(|| model.clone());
1955    let resolved = parse_model_selector(&model, &selector)?;
1956    validate_model_selector_for_provider(&resolved, agent_provider)?;
1957
1958    let mut object = options
1959        .and_then(|value| value.as_object().cloned())
1960        .unwrap_or_default();
1961    object.insert(
1962        "model".to_string(),
1963        Value::String(resolved.provider_model()),
1964    );
1965
1966    let selector_has_extra_parts = alias_matched
1967        || resolved.selector.contains('?')
1968        || resolved.model_provider.is_some()
1969        || resolved.thinking.is_some();
1970    if selector_has_extra_parts {
1971        object.insert(
1972            "requestedModel".to_string(),
1973            Value::String(resolved.requested.clone()),
1974        );
1975        object.insert(
1976            "modelSelector".to_string(),
1977            Value::String(resolved.selector.clone()),
1978        );
1979    } else {
1980        object.remove("requestedModel");
1981        object.remove("modelSelector");
1982    }
1983
1984    if let Some(provider) = resolved.model_provider {
1985        object.insert("modelProvider".to_string(), Value::String(provider));
1986    } else {
1987        object.remove("modelProvider");
1988    }
1989    if let Some(thinking) = resolved.thinking {
1990        object.insert("thinking".to_string(), Value::String(thinking));
1991    } else {
1992        object.remove("thinking");
1993    }
1994    Ok(Some(Value::Object(object)))
1995}
1996
1997fn parse_model_selector(requested: &str, selector: &str) -> anyhow::Result<ResolvedModelSelector> {
1998    let (model_part, query) = selector.split_once('?').unwrap_or((selector, ""));
1999    if model_part.trim().is_empty() {
2000        bail!("model selector must include a model id: {selector}");
2001    }
2002
2003    let (slash_provider, model_id) = match model_part.split_once('/') {
2004        Some((provider, model_id)) if !provider.is_empty() && !model_id.is_empty() => {
2005            (Some(provider.to_string()), model_id.to_string())
2006        }
2007        Some(_) => bail!("model selector provider/model form is invalid: {selector}"),
2008        None => (None, model_part.to_string()),
2009    };
2010
2011    let mut query_provider = None::<String>;
2012    let mut thinking = None::<String>;
2013    if !query.is_empty() {
2014        for pair in query.split('&') {
2015            if pair.is_empty() {
2016                continue;
2017            }
2018            let (key, value) = pair.split_once('=').ok_or_else(|| {
2019                anyhow!("model selector query parameter must use key=value: {pair}")
2020            })?;
2021            let key = percent_decode(key)?;
2022            let value = percent_decode(value)?;
2023            if value.is_empty() {
2024                bail!("model selector query parameter `{key}` must not be empty");
2025            }
2026            match key.as_str() {
2027                "provider" => set_unique_query_value(&mut query_provider, key, value)?,
2028                "thinking" => set_unique_query_value(&mut thinking, key, value)?,
2029                _ => bail!("unknown model selector query parameter `{key}`"),
2030            }
2031        }
2032    }
2033
2034    let model_provider = match (slash_provider, query_provider) {
2035        (Some(slash), Some(query)) if slash != query => bail!(
2036            "conflicting model provider qualifiers in selector `{selector}`: `{slash}` and `{query}`"
2037        ),
2038        (Some(provider), Some(_)) | (Some(provider), None) | (None, Some(provider)) => {
2039            Some(provider)
2040        }
2041        (None, None) => None,
2042    };
2043
2044    Ok(ResolvedModelSelector {
2045        requested: requested.to_string(),
2046        selector: selector.to_string(),
2047        model_id,
2048        model_provider,
2049        thinking,
2050    })
2051}
2052
2053fn set_unique_query_value(
2054    target: &mut Option<String>,
2055    key: String,
2056    value: String,
2057) -> anyhow::Result<()> {
2058    if target.replace(value).is_some() {
2059        bail!("duplicate model selector query parameter `{key}`");
2060    }
2061    Ok(())
2062}
2063
2064fn percent_decode(value: &str) -> anyhow::Result<String> {
2065    let bytes = value.as_bytes();
2066    let mut output = Vec::with_capacity(bytes.len());
2067    let mut index = 0;
2068    while index < bytes.len() {
2069        match bytes[index] {
2070            b'%' => {
2071                if index + 2 >= bytes.len() {
2072                    bail!("invalid percent escape in model selector query: {value}");
2073                }
2074                let high = hex_value(bytes[index + 1]).ok_or_else(|| {
2075                    anyhow!("invalid percent escape in model selector query: {value}")
2076                })?;
2077                let low = hex_value(bytes[index + 2]).ok_or_else(|| {
2078                    anyhow!("invalid percent escape in model selector query: {value}")
2079                })?;
2080                output.push((high << 4) | low);
2081                index += 3;
2082            }
2083            b'+' => {
2084                output.push(b' ');
2085                index += 1;
2086            }
2087            byte => {
2088                output.push(byte);
2089                index += 1;
2090            }
2091        }
2092    }
2093    String::from_utf8(output).context("model selector query is not valid UTF-8")
2094}
2095
2096fn hex_value(byte: u8) -> Option<u8> {
2097    match byte {
2098        b'0'..=b'9' => Some(byte - b'0'),
2099        b'a'..=b'f' => Some(byte - b'a' + 10),
2100        b'A'..=b'F' => Some(byte - b'A' + 10),
2101        _ => None,
2102    }
2103}
2104
2105fn validate_model_selector_for_provider(
2106    resolved: &ResolvedModelSelector,
2107    agent_provider: &str,
2108) -> anyhow::Result<()> {
2109    match agent_provider {
2110        "codex" => {
2111            if resolved.model_provider.is_some() {
2112                bail!("Codex model selectors do not support ?provider=... or provider/model form");
2113            }
2114            if resolved.thinking.is_some() {
2115                bail!("Codex model selectors do not support thinking=...");
2116            }
2117        }
2118        "claude-code" if resolved.model_provider.is_some() => {
2119            bail!(
2120                "Claude Code model selectors do not support ?provider=... or provider/model form"
2121            );
2122        }
2123        "opencode" if resolved.model_provider.is_none() => {
2124            bail!("OpenCode model selectors must use provider/model or ?provider=...");
2125        }
2126        "debug" | "pi" => {}
2127        _ => {}
2128    }
2129    Ok(())
2130}
2131
2132fn apply_phase_defaults(options: Option<Value>, metadata: &WorkflowMetadata) -> Option<Value> {
2133    let phase_name = options
2134        .as_ref()
2135        .and_then(|options| options.get("phase"))
2136        .and_then(Value::as_str)
2137        .map(ToString::to_string);
2138    let phase_metadata = phase_name.as_ref().and_then(|phase_name| {
2139        metadata
2140            .phases
2141            .iter()
2142            .find(|phase| phase.title == *phase_name)
2143    });
2144
2145    if phase_name.is_none() && phase_metadata.is_none() {
2146        return options;
2147    }
2148
2149    let mut object = options
2150        .and_then(|value| value.as_object().cloned())
2151        .unwrap_or_default();
2152
2153    if let Some(phase_name) = phase_name {
2154        object
2155            .entry("phase".to_string())
2156            .or_insert(Value::String(phase_name));
2157    }
2158    if let Some(model) = phase_metadata.and_then(|phase| phase.model.clone()) {
2159        object
2160            .entry("model".to_string())
2161            .or_insert(Value::String(model));
2162    }
2163    if let Some(provider) = phase_metadata.and_then(|phase| phase.provider.clone()) {
2164        object
2165            .entry("provider".to_string())
2166            .or_insert(Value::String(provider));
2167    }
2168
2169    Some(Value::Object(object))
2170}
2171
2172fn resolve_relative_script(current_script_path: &Path, script_path: &str) -> PathBuf {
2173    let script_path = PathBuf::from(script_path);
2174    if script_path.is_absolute() {
2175        script_path
2176    } else {
2177        current_script_path
2178            .parent()
2179            .unwrap_or_else(|| Path::new("."))
2180            .join(script_path)
2181    }
2182}
2183
2184fn resolve_named_workflow(name: &str) -> anyhow::Result<PathBuf> {
2185    let workflows_dir = PathBuf::from(".claude/workflows");
2186    for entry in fs::read_dir(&workflows_dir).unwrap_or_else(|_| fs::read_dir(".").unwrap()) {
2187        let entry = entry?;
2188        let path = entry.path();
2189        if path.extension().and_then(|extension| extension.to_str()) != Some("js") {
2190            continue;
2191        }
2192        if read_workflow_metadata(&path)?.is_some_and(|metadata| metadata.name == name) {
2193            return Ok(path);
2194        }
2195    }
2196    bail!("Unknown workflow: {name}")
2197}