1use crate::agent_providers::{
2 create_agent_provider, AgentProvider, AgentProviderContext, AgentProviderResult,
3 AgentProviderRunInput, AgentRunIsolation, AgentUsage, AgentUsageCost,
4};
5use crate::environment::{
6 AgentExecutionEnvironment, EnvironmentPath, ExecRequest, LocalExecutionEnvironment,
7 NullExecEventSink, SandboxExecutionEnvironment,
8};
9use crate::js_runtime::rquickjs::RQuickJSWorkflowRuntime;
10use crate::js_runtime::{
11 WorkflowBudgetSnapshot, WorkflowJSRuntime, WorkflowModuleInput, WorkflowModuleOutput,
12 WorkflowRef, WorkflowRuntimeCall, WorkflowRuntimeExecution, WorkflowRuntimePoll,
13 WorkflowRuntimeRequest, WorkflowRuntimeRequestResolution, WorkflowSandboxExecOutput,
14 WorkflowSandboxExecRequest,
15};
16use crate::metadata::{read_workflow_metadata, WorkflowMetadata};
17use anyhow::{anyhow, bail, Context};
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20use smol_workflow_sandbox::{
21 Metadata as SandboxMetadata, OpenSandboxRequest, ProfileRef, WorkspaceSync,
22};
23use std::collections::{BTreeMap, BTreeSet, VecDeque};
24use std::fs;
25use std::path::{Path, PathBuf};
26use std::process::Command as StdCommand;
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29use tokio::sync::{mpsc, watch};
30use tokio::task::{JoinSet, LocalSet};
31
32pub use crate::events::{
33 WorkflowEvent, WorkflowEventMetadata, WorkflowEventSink, WorkflowEventType,
34};
35
36#[async_trait::async_trait]
37pub trait AgentSessionLogSink: Send + Sync {
38 async fn write_agent_result(
39 &self,
40 provider: &str,
41 result: &AgentProviderResult,
42 ) -> anyhow::Result<()>;
43}
44
45#[async_trait::async_trait]
46pub trait WorkflowAgentRunner: Send + Sync {
47 async fn run_agent(
48 &self,
49 default_provider: Arc<dyn AgentProvider>,
50 provider_override: Option<String>,
51 input: AgentProviderRunInput,
52 ) -> anyhow::Result<AgentProviderResult>;
53
54 fn retry_in_runtime(&self) -> bool {
68 true
69 }
70
71 async fn sleep(&self, duration_ms: u64) -> anyhow::Result<()> {
72 tokio::time::sleep(std::time::Duration::from_millis(duration_ms)).await;
73 Ok(())
74 }
75}
76
77#[derive(Debug, Default)]
78pub struct DirectWorkflowAgentRunner;
79
80#[async_trait::async_trait]
81impl WorkflowAgentRunner for DirectWorkflowAgentRunner {
82 async fn run_agent(
83 &self,
84 default_provider: Arc<dyn AgentProvider>,
85 provider_override: Option<String>,
86 input: AgentProviderRunInput,
87 ) -> anyhow::Result<AgentProviderResult> {
88 run_agent_provider(default_provider, provider_override, input).await
89 }
90}
91
92pub struct RunWorkflowOptions {
93 pub script_path: PathBuf,
94 pub workflow_cwd: Option<PathBuf>,
97 pub args: Value,
98 pub agent_provider: Arc<dyn AgentProvider>,
99 pub model_map: BTreeMap<String, String>,
100 pub budget_total: Option<u64>,
101 pub budget_spent: u64,
102 pub nesting_depth: usize,
103 pub max_parallel_agent_requests: Option<usize>,
104 pub agent_runner: Option<Arc<dyn WorkflowAgentRunner>>,
105 pub cancel_rx: Option<watch::Receiver<bool>>,
106 pub event_sink: Option<Arc<dyn WorkflowEventSink>>,
107 pub event_parent_step_id: Option<String>,
108 pub event_stream_start: Option<Instant>,
109 pub session_log_sink: Option<Arc<dyn AgentSessionLogSink>>,
110}
111
112#[derive(Debug)]
113pub struct RunWorkflowResult {
114 pub output: WorkflowModuleOutput,
115 pub logs: Vec<Vec<Value>>,
116 pub phases: Vec<WorkflowPhaseCall>,
117 pub agent_calls: Vec<WorkflowRuntimeRequest>,
118 pub workflow_calls: Vec<WorkflowRuntimeRequest>,
119 pub budget: WorkflowBudgetSnapshot,
120 pub token_usage: WorkflowTokenUsage,
121 pub token_usage_by_phase: std::collections::BTreeMap<String, WorkflowTokenUsage>,
122 pub agent_runs: Vec<WorkflowAgentRunSummary>,
123}
124
125#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
126#[serde(rename_all = "camelCase")]
127pub struct WorkflowTokenUsage {
128 pub input_tokens: u64,
129 pub output_tokens: u64,
130 pub cache_read_tokens: u64,
131 pub cache_write_tokens: u64,
132 pub total_tokens: u64,
133 #[serde(skip_serializing_if = "Option::is_none")]
134 pub cost: Option<AgentUsageCost>,
135}
136
137#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
138#[serde(rename_all = "camelCase")]
139pub struct WorkflowAgentRunSummary {
140 pub id: String,
141 #[serde(skip_serializing_if = "Option::is_none")]
142 pub phase: Option<String>,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 pub provider: Option<String>,
145 #[serde(skip_serializing_if = "Option::is_none")]
146 pub model: Option<String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
148 pub provider_session_id: Option<String>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 pub usage: Option<AgentUsage>,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 pub isolation: Option<AgentRunIsolation>,
153}
154
155#[derive(Debug, Clone, PartialEq)]
156pub struct WorkflowPhaseCall {
157 pub name: String,
158 pub options: Option<Value>,
159}
160
161pub async fn run_workflow(options: RunWorkflowOptions) -> anyhow::Result<RunWorkflowResult> {
162 LocalSet::new().run_until(run_workflow_inner(options)).await
163}
164
165async fn run_workflow_inner(options: RunWorkflowOptions) -> anyhow::Result<RunWorkflowResult> {
166 log::debug!(
167 "run_workflow start script={} provider={} nesting_depth={} budget_total={:?} budget_spent={}",
168 options.script_path.display(),
169 options.agent_provider.name(),
170 options.nesting_depth,
171 options.budget_total,
172 options.budget_spent
173 );
174 let script_path = fs::canonicalize(&options.script_path).with_context(|| {
175 format!(
176 "failed to resolve workflow script {}",
177 options.script_path.display()
178 )
179 })?;
180 let metadata = read_workflow_metadata(&script_path)?.ok_or_else(|| {
181 anyhow!("Workflow script must export valid literal metadata as `export const meta = {{ name, description, ... }}`")
182 })?;
183 log::debug!(
184 "workflow metadata loaded name={} phases={}",
185 metadata.name,
186 metadata.phases.len()
187 );
188 let workflow_cwd = match options.workflow_cwd {
189 Some(cwd) => Some(
190 fs::canonicalize(&cwd)
191 .with_context(|| format!("failed to resolve workflow cwd {}", cwd.display()))?,
192 ),
193 None => script_path.parent().map(Path::to_path_buf),
194 };
195 let source = fs::read_to_string(&script_path)
196 .with_context(|| format!("failed to read workflow script {}", script_path.display()))?;
197 let runtime = RQuickJSWorkflowRuntime::new();
198 let execution = runtime.start_module(WorkflowModuleInput {
199 source,
200 source_name: script_path.to_string_lossy().into_owned(),
201 args: options.args,
202 budget: WorkflowBudgetSnapshot {
203 total: options.budget_total,
204 spent: options.budget_spent,
205 },
206 sandbox: Default::default(),
207 })?;
208
209 let (js_commands, js_command_rx) = mpsc::channel::<JsCommand>(64);
210 let (js_event_tx, mut js_events) = mpsc::channel::<JsEvent>(64);
211 let js_task = tokio::task::spawn_local(js_runtime_actor(execution, js_command_rx, js_event_tx));
212
213 let emit_lifecycle_events = options.event_sink.is_some();
214 let event_start = options.event_stream_start.unwrap_or_else(Instant::now);
215
216 let mut state = RunState {
217 script_path,
218 workflow_cwd,
219 metadata,
220 event_start,
221 agent_provider: options.agent_provider,
222 model_map: options.model_map,
223 logs: Vec::new(),
224 phases: Vec::new(),
225 agent_calls: Vec::new(),
226 workflow_calls: Vec::new(),
227 budget: WorkflowBudgetSnapshot {
228 total: options.budget_total,
229 spent: options.budget_spent,
230 },
231 token_usage: WorkflowTokenUsage::default(),
232 token_usage_by_phase: Default::default(),
233 agent_runs: Vec::new(),
234 active_request_ids: BTreeSet::new(),
235 nesting_depth: options.nesting_depth,
236 max_parallel_agent_requests: options.max_parallel_agent_requests,
237 agent_runner: options
238 .agent_runner
239 .unwrap_or_else(|| Arc::new(DirectWorkflowAgentRunner)),
240 cancel_rx: options.cancel_rx,
241 event_sink: options.event_sink,
242 event_parent_step_id: options.event_parent_step_id,
243 session_log_sink: options.session_log_sink,
244 };
245
246 let mut pending_requests = VecDeque::<WorkflowRuntimeRequest>::new();
247 let mut agent_tasks = JoinSet::<AgentTaskCompletion>::new();
248 let mut sleep_tasks = JoinSet::<SleepTaskCompletion>::new();
249
250 if emit_lifecycle_events {
251 if let Err(error) = state
252 .emit_event(WorkflowEvent::started(rfc3339_now()?))
253 .await
254 {
255 let _ = send_js_command(&js_commands, JsCommand::Shutdown).await;
256 let _ = js_task.await;
257 return Err(error);
258 }
259 }
260
261 let workflow_result: anyhow::Result<RunWorkflowResult> = loop {
262 if let Err(error) = state
263 .start_pending_requests(
264 &mut pending_requests,
265 &mut agent_tasks,
266 &mut sleep_tasks,
267 &js_commands,
268 )
269 .await
270 {
271 break Err(error);
272 }
273
274 tokio::select! {
275 biased;
276 () = wait_for_cancellation(&mut state.cancel_rx) => {
277 break state.cancel_workflow(
278 &mut pending_requests,
279 &mut agent_tasks,
280 &mut sleep_tasks,
281 &js_commands,
282 &mut js_events,
283 ).await;
284 }
285 event = js_events.recv() => {
286 let event = match event {
287 Some(event) => event,
288 None => break Err(anyhow!("JavaScript runtime actor stopped unexpectedly")),
289 };
290 match state.handle_js_event(event, &mut pending_requests).await {
291 Ok(Some(result)) => break Ok(result),
292 Ok(None) => {}
293 Err(error) => break Err(error),
294 }
295 }
296 completion = agent_tasks.join_next(), if !agent_tasks.is_empty() => {
297 let completion = match completion {
298 Some(Ok(completion)) => completion,
299 Some(Err(error)) => break Err(anyhow!("agent provider task failed: {error}")),
300 None => break Err(anyhow!("agent task set ended unexpectedly")),
301 };
302 let AgentTaskCompletion { id, input, provider, result } = completion;
303 state.active_request_ids.remove(&id);
304 let resolution = match result {
305 Ok(result) => match state.apply_agent_result(&id, &input, provider, result).await {
306 Ok(value) => WorkflowRuntimeRequestResolution::OkWithBudget {
307 value,
308 budget: state.budget.clone(),
309 },
310 Err(error) => WorkflowRuntimeRequestResolution::Err {
311 message: error.to_string(),
312 },
313 },
314 Err(error) => {
315 let message = error.to_string();
316 if let Err(emit_error) = state.emit_agent_failed_event(&id, provider.as_deref(), &message).await {
317 log::debug!("failed to emit agent failure event: {emit_error:#}");
318 }
319 WorkflowRuntimeRequestResolution::Err { message }
320 },
321 };
322 if let Err(error) = send_js_command(&js_commands, JsCommand::ResolveRequest { id, resolution }).await {
323 break Err(error);
324 }
325 }
326 completion = sleep_tasks.join_next(), if !sleep_tasks.is_empty() => {
327 let completion = match completion {
328 Some(Ok(completion)) => completion,
329 Some(Err(error)) => break Err(anyhow!("sleep task failed: {error}")),
330 None => break Err(anyhow!("sleep task set ended unexpectedly")),
331 };
332 let SleepTaskCompletion { id, result } = completion;
333 state.active_request_ids.remove(&id);
334 let resolution = match result {
335 Ok(()) => WorkflowRuntimeRequestResolution::OkUndefined,
336 Err(error) => WorkflowRuntimeRequestResolution::Err {
337 message: error.to_string(),
338 },
339 };
340 if let Err(error) = send_js_command(&js_commands, JsCommand::ResolveRequest { id, resolution }).await {
341 break Err(error);
342 }
343 }
344 }
345 };
346
347 let _ = send_js_command(&js_commands, JsCommand::Shutdown).await;
348 let _ = js_task.await;
349
350 if emit_lifecycle_events {
351 match &workflow_result {
352 Ok(result) => {
353 state
354 .emit_event(WorkflowEvent::result(
355 result.token_usage.input_tokens,
356 result.token_usage.output_tokens,
357 result.token_usage.total_tokens,
358 result.output.result.clone(),
359 ))
360 .await?
361 }
362 Err(error) => {
363 state
364 .emit_event(WorkflowEvent::error(error.to_string(), None))
365 .await?;
366 }
367 }
368 }
369
370 workflow_result
371}
372
373enum JsCommand {
374 ResolveRequest {
375 id: String,
376 resolution: WorkflowRuntimeRequestResolution,
377 },
378 Shutdown,
379}
380
381enum JsEvent {
382 Call(WorkflowRuntimeCall),
383 Request(WorkflowRuntimeRequest),
384 Complete(WorkflowModuleOutput),
385 Error(String),
386}
387
388async fn js_runtime_actor(
389 mut execution: Box<dyn WorkflowRuntimeExecution>,
390 mut commands: mpsc::Receiver<JsCommand>,
391 events: mpsc::Sender<JsEvent>,
392) {
393 let mut outstanding_requests = 0usize;
394 loop {
395 match execution.poll() {
396 Ok(WorkflowRuntimePoll::Call(call)) => {
397 if events.send(JsEvent::Call(call)).await.is_err() {
398 return;
399 }
400 }
401 Ok(WorkflowRuntimePoll::Request(request)) => {
402 let requests = match execution.take_pending_requests() {
403 Ok(requests) if requests.is_empty() => vec![request],
404 Ok(requests) => requests,
405 Err(error) => {
406 let _ = events.send(JsEvent::Error(error.to_string())).await;
407 return;
408 }
409 };
410 outstanding_requests = outstanding_requests.saturating_add(requests.len());
411 for request in requests {
412 if events.send(JsEvent::Request(request)).await.is_err() {
413 return;
414 }
415 }
416 }
417 Ok(WorkflowRuntimePoll::Complete(output)) => {
418 let _ = events.send(JsEvent::Complete(output)).await;
419 return;
420 }
421 Ok(WorkflowRuntimePoll::Pending) => {
422 if outstanding_requests == 0 {
423 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
424 continue;
425 }
426 match commands.recv().await {
427 Some(JsCommand::ResolveRequest { id, resolution }) => {
428 outstanding_requests = outstanding_requests.saturating_sub(1);
429 if let Err(error) = execution.resolve_request(&id, resolution) {
430 let _ = events.send(JsEvent::Error(error.to_string())).await;
431 return;
432 }
433 }
434 Some(JsCommand::Shutdown) | None => return,
435 }
436 }
437 Err(error) => {
438 let _ = events.send(JsEvent::Error(error.to_string())).await;
439 return;
440 }
441 }
442 }
443}
444
445async fn send_js_command(
446 commands: &mpsc::Sender<JsCommand>,
447 command: JsCommand,
448) -> anyhow::Result<()> {
449 commands
450 .send(command)
451 .await
452 .map_err(|_| anyhow!("JavaScript runtime actor stopped unexpectedly"))
453}
454
455struct RunState {
456 script_path: PathBuf,
457 workflow_cwd: Option<PathBuf>,
458 metadata: WorkflowMetadata,
459 event_start: Instant,
460 agent_provider: Arc<dyn AgentProvider>,
461 model_map: BTreeMap<String, String>,
462 logs: Vec<Vec<Value>>,
463 phases: Vec<WorkflowPhaseCall>,
464 agent_calls: Vec<WorkflowRuntimeRequest>,
465 workflow_calls: Vec<WorkflowRuntimeRequest>,
466 budget: WorkflowBudgetSnapshot,
467 token_usage: WorkflowTokenUsage,
468 token_usage_by_phase: std::collections::BTreeMap<String, WorkflowTokenUsage>,
469 agent_runs: Vec<WorkflowAgentRunSummary>,
470 active_request_ids: BTreeSet<String>,
471 nesting_depth: usize,
472 max_parallel_agent_requests: Option<usize>,
473 agent_runner: Arc<dyn WorkflowAgentRunner>,
474 cancel_rx: Option<watch::Receiver<bool>>,
475 event_sink: Option<Arc<dyn WorkflowEventSink>>,
476 event_parent_step_id: Option<String>,
477 session_log_sink: Option<Arc<dyn AgentSessionLogSink>>,
478}
479
480struct PreparedAgentRun {
481 provider_override: Option<String>,
482 input: AgentProviderRunInput,
483}
484
485struct AgentTaskCompletion {
486 id: String,
487 input: AgentProviderRunInput,
488 provider: Option<String>,
489 result: anyhow::Result<AgentProviderResult>,
490}
491
492struct SleepTaskCompletion {
493 id: String,
494 result: anyhow::Result<()>,
495}
496
497fn add_usage(total: &mut WorkflowTokenUsage, usage: Option<&AgentUsage>) {
498 let Some(usage) = usage else {
499 return;
500 };
501
502 total.input_tokens = total
503 .input_tokens
504 .saturating_add(usage.input_tokens.unwrap_or_default());
505 total.output_tokens = total
506 .output_tokens
507 .saturating_add(usage.output_tokens.unwrap_or_default());
508 total.cache_read_tokens = total
509 .cache_read_tokens
510 .saturating_add(usage.cache_read_tokens.unwrap_or_default());
511 total.cache_write_tokens = total
512 .cache_write_tokens
513 .saturating_add(usage.cache_write_tokens.unwrap_or_default());
514 total.total_tokens = total
515 .total_tokens
516 .saturating_add(usage.total_tokens.unwrap_or_default());
517
518 if let Some(cost) = usage.cost.as_ref() {
519 total.cost = Some(merge_cost(total.cost.as_ref(), cost));
520 }
521}
522
523fn merge_token_usage(total: &mut WorkflowTokenUsage, usage: &WorkflowTokenUsage) {
524 total.input_tokens = total.input_tokens.saturating_add(usage.input_tokens);
525 total.output_tokens = total.output_tokens.saturating_add(usage.output_tokens);
526 total.cache_read_tokens = total
527 .cache_read_tokens
528 .saturating_add(usage.cache_read_tokens);
529 total.cache_write_tokens = total
530 .cache_write_tokens
531 .saturating_add(usage.cache_write_tokens);
532 total.total_tokens = total.total_tokens.saturating_add(usage.total_tokens);
533 if let Some(cost) = usage.cost.as_ref() {
534 total.cost = Some(merge_cost(total.cost.as_ref(), cost));
535 }
536}
537
538fn merge_cost(left: Option<&AgentUsageCost>, right: &AgentUsageCost) -> AgentUsageCost {
539 AgentUsageCost {
540 input: sum_f64(left.and_then(|cost| cost.input), right.input),
541 output: sum_f64(left.and_then(|cost| cost.output), right.output),
542 cache_read: sum_f64(left.and_then(|cost| cost.cache_read), right.cache_read),
543 cache_write: sum_f64(left.and_then(|cost| cost.cache_write), right.cache_write),
544 total: sum_f64(left.and_then(|cost| cost.total), right.total),
545 currency: right
546 .currency
547 .clone()
548 .or_else(|| left.and_then(|cost| cost.currency.clone())),
549 }
550}
551
552fn elapsed_nanos(start: Instant) -> u64 {
553 u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX)
554}
555
556fn rfc3339_now() -> anyhow::Result<String> {
557 Ok(time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339)?)
558}
559
560fn raw_agent_event_payloads(raw: &Value) -> Vec<Value> {
561 if let Some(events) = raw.get("events").and_then(Value::as_array) {
562 events.clone()
563 } else if let Some(items) = raw.as_array() {
564 items.clone()
565 } else {
566 vec![raw.clone()]
567 }
568}
569
570fn agent_session_event_payload(provider_event: Value, metadata: &WorkflowEventMetadata) -> Value {
571 let mut payload = serde_json::Map::new();
572 if let Some(provider) = metadata.provider.as_ref() {
573 payload.insert("provider".to_string(), Value::String(provider.clone()));
574 }
575 if let Some(session_id) = metadata.session_id.as_ref() {
576 payload.insert("sessionId".to_string(), Value::String(session_id.clone()));
577 }
578 if let Some(run_id) = metadata.run_id.as_ref() {
579 payload.insert("runId".to_string(), Value::String(run_id.clone()));
580 }
581 if let Some(step_id) = metadata.step_id.as_ref() {
582 payload.insert("stepId".to_string(), Value::String(step_id.clone()));
583 }
584 payload.insert("providerEvent".to_string(), provider_event);
585 Value::Object(payload)
586}
587
588fn truncate_for_event(value: &str, max_chars: usize) -> String {
589 let mut chars = value.chars();
590 let truncated = chars.by_ref().take(max_chars).collect::<String>();
591 if chars.next().is_some() {
592 format!("{truncated}…")
593 } else {
594 truncated
595 }
596}
597
598fn format_log_message(values: &[Value]) -> String {
599 values
600 .iter()
601 .map(|value| match value {
602 Value::String(value) => value.clone(),
603 value => serde_json::to_string(value).unwrap_or_else(|_| String::from("<unprintable>")),
604 })
605 .collect::<Vec<_>>()
606 .join(" ")
607}
608
609fn sum_f64(left: Option<f64>, right: Option<f64>) -> Option<f64> {
610 match (left, right) {
611 (None, None) => None,
612 (left, right) => Some(left.unwrap_or_default() + right.unwrap_or_default()),
613 }
614}
615
616async fn wait_for_cancellation(cancel_rx: &mut Option<watch::Receiver<bool>>) {
617 let Some(cancel_rx) = cancel_rx else {
618 std::future::pending::<()>().await;
619 return;
620 };
621 while !*cancel_rx.borrow() {
622 if cancel_rx.changed().await.is_err() {
623 return;
624 }
625 }
626}
627
628impl RunState {
629 async fn handle_js_event(
630 &mut self,
631 event: JsEvent,
632 pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
633 ) -> anyhow::Result<Option<RunWorkflowResult>> {
634 match event {
635 JsEvent::Call(call) => self.handle_call(call).await?,
636 JsEvent::Request(request) => {
637 log::debug!(
638 "workflow runtime request id={} kind={}",
639 request.id(),
640 request.kind()
641 );
642 pending_requests.push_back(request);
643 }
644 JsEvent::Complete(output) => {
645 log::debug!(
646 "run_workflow complete script={} budget_spent={}",
647 self.script_path.display(),
648 self.budget.spent
649 );
650 return Ok(Some(RunWorkflowResult {
651 output,
652 logs: std::mem::take(&mut self.logs),
653 phases: std::mem::take(&mut self.phases),
654 agent_calls: std::mem::take(&mut self.agent_calls),
655 workflow_calls: std::mem::take(&mut self.workflow_calls),
656 budget: self.budget.clone(),
657 token_usage: std::mem::take(&mut self.token_usage),
658 token_usage_by_phase: std::mem::take(&mut self.token_usage_by_phase),
659 agent_runs: std::mem::take(&mut self.agent_runs),
660 }));
661 }
662 JsEvent::Error(message) => bail!(message),
663 }
664 Ok(None)
665 }
666
667 async fn start_pending_requests(
668 &mut self,
669 pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
670 agent_tasks: &mut JoinSet<AgentTaskCompletion>,
671 sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
672 js_commands: &mpsc::Sender<JsCommand>,
673 ) -> anyhow::Result<()> {
674 loop {
675 let Some(request) = pending_requests.front() else {
676 return Ok(());
677 };
678 if matches!(request, WorkflowRuntimeRequest::Agent { .. })
679 && !self.agent_capacity_available(agent_tasks.len())
680 {
681 return Ok(());
682 }
683
684 let request = pending_requests
685 .pop_front()
686 .expect("pending request should exist");
687 match request {
688 WorkflowRuntimeRequest::Agent { .. } => match self.prepare_agent_request(request) {
689 Ok((id, prepared)) => {
690 self.emit_agent_started_event(&id, &prepared).await?;
691 self.spawn_agent_task(agent_tasks, id, prepared);
692 }
693 Err((id, error)) => {
694 send_js_command(
695 js_commands,
696 JsCommand::ResolveRequest {
697 id,
698 resolution: WorkflowRuntimeRequestResolution::Err {
699 message: error.to_string(),
700 },
701 },
702 )
703 .await?;
704 }
705 },
706 WorkflowRuntimeRequest::Sleep { id, duration_ms } => {
707 self.spawn_sleep_task(sleep_tasks, id, duration_ms);
708 }
709 WorkflowRuntimeRequest::Workflow {
710 id,
711 workflow_ref,
712 args,
713 } => {
714 self.workflow_calls.push(WorkflowRuntimeRequest::Workflow {
715 id: id.clone(),
716 workflow_ref: workflow_ref.clone(),
717 args: args.clone(),
718 });
719 let parent_event_step_id = self.event_step_id(&id);
720 let resolution = match self
721 .handle_workflow(parent_event_step_id, workflow_ref, args)
722 .await
723 {
724 Ok(value) => WorkflowRuntimeRequestResolution::OkWithBudget {
725 value,
726 budget: self.budget.clone(),
727 },
728 Err(error) => WorkflowRuntimeRequestResolution::Err {
729 message: error.to_string(),
730 },
731 };
732 send_js_command(js_commands, JsCommand::ResolveRequest { id, resolution })
733 .await?;
734 }
735 WorkflowRuntimeRequest::SandboxExec {
736 id,
737 profile,
738 request,
739 } => {
740 let resolution = match self.handle_sandbox_exec(&profile, request).await {
741 Ok(value) => WorkflowRuntimeRequestResolution::Ok(value),
742 Err(error) => WorkflowRuntimeRequestResolution::Err {
743 message: error.to_string(),
744 },
745 };
746 send_js_command(js_commands, JsCommand::ResolveRequest { id, resolution })
747 .await?;
748 }
749 }
750 }
751 }
752
753 async fn cancel_workflow(
754 &mut self,
755 pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
756 agent_tasks: &mut JoinSet<AgentTaskCompletion>,
757 sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
758 js_commands: &mpsc::Sender<JsCommand>,
759 js_events: &mut mpsc::Receiver<JsEvent>,
760 ) -> anyhow::Result<RunWorkflowResult> {
761 log::debug!(
762 "workflow cancellation requested script={}",
763 self.script_path.display()
764 );
765
766 if pending_requests.is_empty()
767 && self.active_request_ids.is_empty()
768 && agent_tasks.is_empty()
769 && sleep_tasks.is_empty()
770 && self
771 .reject_next_runtime_request_for_cancellation(js_commands, js_events)
772 .await
773 {
774 bail!("workflow cancelled");
775 }
776
777 self.reject_pending_requests_for_cancellation(pending_requests, js_commands)
778 .await;
779 sleep_tasks.abort_all();
780 self.reject_active_sleep_requests_for_cancellation(sleep_tasks, js_commands)
781 .await;
782
783 if self.session_log_sink.is_some() {
784 while let Some(completion) = agent_tasks.join_next().await {
785 match completion {
786 Ok(AgentTaskCompletion {
787 id,
788 input,
789 provider,
790 result: Ok(result),
791 }) => {
792 self.active_request_ids.remove(&id);
793 if let Err(error) = self
794 .emit_agent_result_events(&id, provider.as_deref(), &result)
795 .await
796 {
797 log::debug!("failed to emit drained agent events during cancellation: {error:#}");
798 }
799 if let Err(error) = self
800 .emit_agent_completed_event(&id, provider.as_deref(), &result)
801 .await
802 {
803 log::debug!("failed to emit drained agent completion during cancellation: {error:#}");
804 }
805 self.record_agent_run(&id, &input, provider, &result);
806 self.reject_request_for_cancellation(id, js_commands).await;
807 }
808 Ok(AgentTaskCompletion {
809 id,
810 provider,
811 result: Err(error),
812 ..
813 }) => {
814 self.active_request_ids.remove(&id);
815 let message = error.to_string();
816 if let Err(error) = self
817 .emit_agent_failed_event(&id, provider.as_deref(), &message)
818 .await
819 {
820 log::debug!("failed to emit drained agent failure during cancellation: {error:#}");
821 }
822 log::debug!("agent task failed while draining cancellation: {message}");
823 self.reject_request_for_cancellation(id, js_commands).await;
824 }
825 Err(error) => {
826 log::debug!("agent task join failed while draining cancellation: {error}");
827 }
828 }
829 }
830 } else {
831 let ids: Vec<String> = self.active_request_ids.iter().cloned().collect();
832 agent_tasks.abort_all();
833 for id in ids {
834 self.active_request_ids.remove(&id);
835 self.reject_request_for_cancellation(id, js_commands).await;
836 }
837 }
838
839 self.reject_remaining_active_requests_for_cancellation(js_commands)
840 .await;
841 self.drain_runtime_after_cancellation(js_events).await;
842 let _ = send_js_command(js_commands, JsCommand::Shutdown).await;
843 bail!("workflow cancelled")
844 }
845
846 async fn reject_next_runtime_request_for_cancellation(
847 &mut self,
848 js_commands: &mpsc::Sender<JsCommand>,
849 js_events: &mut mpsc::Receiver<JsEvent>,
850 ) -> bool {
851 loop {
852 match js_events.recv().await {
853 Some(JsEvent::Call(call)) => {
854 let _ = self.handle_call(call).await;
855 }
856 Some(JsEvent::Request(request)) => {
857 self.reject_request_for_cancellation(request.id().to_string(), js_commands)
858 .await;
859 return false;
860 }
861 Some(JsEvent::Complete(_)) | Some(JsEvent::Error(_)) | None => return true,
862 }
863 }
864 }
865
866 async fn reject_pending_requests_for_cancellation(
867 &mut self,
868 pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
869 js_commands: &mpsc::Sender<JsCommand>,
870 ) {
871 while let Some(request) = pending_requests.pop_front() {
872 self.reject_request_for_cancellation(request.id().to_string(), js_commands)
873 .await;
874 }
875 }
876
877 async fn reject_active_sleep_requests_for_cancellation(
878 &mut self,
879 sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
880 js_commands: &mpsc::Sender<JsCommand>,
881 ) {
882 while let Some(completion) = sleep_tasks.join_next().await {
883 if let Ok(SleepTaskCompletion { id, .. }) = completion {
884 self.active_request_ids.remove(&id);
885 self.reject_request_for_cancellation(id, js_commands).await;
886 }
887 }
888 }
889
890 async fn reject_remaining_active_requests_for_cancellation(
891 &mut self,
892 js_commands: &mpsc::Sender<JsCommand>,
893 ) {
894 let ids: Vec<String> = self.active_request_ids.iter().cloned().collect();
895 for id in ids {
896 self.active_request_ids.remove(&id);
897 self.reject_request_for_cancellation(id, js_commands).await;
898 }
899 }
900
901 async fn reject_request_for_cancellation(
902 &self,
903 id: String,
904 js_commands: &mpsc::Sender<JsCommand>,
905 ) {
906 let _ = send_js_command(
907 js_commands,
908 JsCommand::ResolveRequest {
909 id,
910 resolution: WorkflowRuntimeRequestResolution::Err {
911 message: "workflow cancelled".to_string(),
912 },
913 },
914 )
915 .await;
916 }
917
918 async fn drain_runtime_after_cancellation(&mut self, js_events: &mut mpsc::Receiver<JsEvent>) {
919 while let Some(event) = js_events.recv().await {
920 match event {
921 JsEvent::Call(call) => {
922 let _ = self.handle_call(call).await;
923 }
924 JsEvent::Request(request) => {
925 log::debug!(
926 "ignoring request after cancellation id={} kind={}",
927 request.id(),
928 request.kind()
929 );
930 }
931 JsEvent::Complete(_) | JsEvent::Error(_) => break,
932 }
933 }
934 }
935
936 fn event_step_id(&self, runtime_request_id: &str) -> String {
937 let parent = self.event_parent_step_id.as_deref().unwrap_or("");
938 let hash = blake3::hash(
939 format!("{parent}:{}:{runtime_request_id}", self.nesting_depth).as_bytes(),
940 );
941 format!("step_{}", &hash.to_hex()[..16])
942 }
943
944 async fn emit_event(&self, mut event: WorkflowEvent) -> anyhow::Result<()> {
945 if (event.event_type.as_str() != "workflow.started" || self.nesting_depth > 0)
946 && event.elapsed_nanos.is_none()
947 {
948 event.elapsed_nanos = Some(elapsed_nanos(self.event_start));
949 }
950 let metadata = event
951 .metadata
952 .get_or_insert_with(WorkflowEventMetadata::default);
953 if metadata.workflow_depth.is_none() {
954 metadata.workflow_depth = Some(u32::try_from(self.nesting_depth).unwrap_or(u32::MAX));
955 }
956 if metadata.parent_step_id.is_none() {
957 metadata.parent_step_id = self.event_parent_step_id.clone();
958 }
959 if let Some(event_sink) = self.event_sink.as_ref() {
960 event_sink.emit(event).await?;
961 }
962 Ok(())
963 }
964
965 async fn handle_call(&mut self, call: WorkflowRuntimeCall) -> anyhow::Result<()> {
966 match call {
967 WorkflowRuntimeCall::Log { values } => {
968 self.emit_event(WorkflowEvent::log(format_log_message(&values)))
969 .await?;
970 self.logs.push(values);
971 }
972 WorkflowRuntimeCall::Phase { name, options } => {
973 let phase = WorkflowPhaseCall { name, options };
974 self.emit_event(WorkflowEvent::phase(
975 phase.name.clone(),
976 phase.options.clone(),
977 ))
978 .await?;
979 self.phases.push(phase);
980 }
981 }
982 Ok(())
983 }
984
985 fn agent_capacity_available(&self, in_flight: usize) -> bool {
986 let max_parallel = self
987 .max_parallel_agent_requests
988 .filter(|value| *value > 0)
989 .unwrap_or(usize::MAX);
990 in_flight < max_parallel
991 }
992
993 fn prepare_agent_request(
994 &mut self,
995 request: WorkflowRuntimeRequest,
996 ) -> Result<(String, PreparedAgentRun), (String, anyhow::Error)> {
997 match request {
998 WorkflowRuntimeRequest::Agent {
999 id,
1000 prompt,
1001 options,
1002 } => {
1003 self.agent_calls.push(WorkflowRuntimeRequest::Agent {
1004 id: id.clone(),
1005 prompt: prompt.clone(),
1006 options: options.clone(),
1007 });
1008 match self.prepare_agent_run(prompt, options) {
1009 Ok(prepared) => Ok((id, prepared)),
1010 Err(error) => Err((id, error)),
1011 }
1012 }
1013 WorkflowRuntimeRequest::Workflow { .. }
1014 | WorkflowRuntimeRequest::Sleep { .. }
1015 | WorkflowRuntimeRequest::SandboxExec { .. } => {
1016 unreachable!("prepare_agent_request only accepts agent requests")
1017 }
1018 }
1019 }
1020
1021 fn spawn_agent_task(
1022 &mut self,
1023 agent_tasks: &mut JoinSet<AgentTaskCompletion>,
1024 id: String,
1025 prepared: PreparedAgentRun,
1026 ) {
1027 let default_provider_name = self.agent_provider.name().to_string();
1028 let default_provider = Arc::clone(&self.agent_provider);
1029 let agent_runner = Arc::clone(&self.agent_runner);
1030 let retry_in_runtime = agent_runner.retry_in_runtime();
1031 let cancel_rx = self.cancel_rx.clone();
1032 let completion_input = prepared.input.clone();
1033 let completion_provider = prepared
1034 .provider_override
1035 .clone()
1036 .or(Some(default_provider_name));
1037 let session_log_sink = self.session_log_sink.clone();
1038 let max_parallel = self
1039 .max_parallel_agent_requests
1040 .filter(|value| *value > 0)
1041 .unwrap_or(usize::MAX);
1042 log::debug!(
1043 "starting agent request id={} in_flight_after_start={} max_parallel={}",
1044 id,
1045 agent_tasks.len() + 1,
1046 max_parallel
1047 );
1048 self.active_request_ids.insert(id.clone());
1049 agent_tasks.spawn(async move {
1050 let result = if retry_in_runtime {
1051 run_agent_runner_with_retry(
1052 Arc::clone(&agent_runner),
1053 default_provider,
1054 prepared.provider_override,
1055 prepared.input,
1056 cancel_rx,
1057 )
1058 .await
1059 } else {
1060 agent_runner
1061 .run_agent(default_provider, prepared.provider_override, prepared.input)
1062 .await
1063 };
1064 let result = match result {
1065 Ok(result) => {
1066 if let Some(session_log_sink) = session_log_sink.as_ref() {
1067 let provider_name = completion_provider
1068 .as_deref()
1069 .expect("completion provider should always be set");
1070 match session_log_sink
1071 .write_agent_result(provider_name, &result)
1072 .await
1073 {
1074 Ok(()) => Ok(result),
1075 Err(error) => Err(error),
1076 }
1077 } else {
1078 Ok(result)
1079 }
1080 }
1081 Err(error) => Err(error),
1082 };
1083 AgentTaskCompletion {
1084 id,
1085 input: completion_input,
1086 provider: completion_provider,
1087 result,
1088 }
1089 });
1090 }
1091
1092 fn spawn_sleep_task(
1093 &mut self,
1094 sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
1095 id: String,
1096 duration_ms: u64,
1097 ) {
1098 let agent_runner = Arc::clone(&self.agent_runner);
1099 log::debug!(
1100 "starting sleep request id={} duration_ms={}",
1101 id,
1102 duration_ms
1103 );
1104 self.active_request_ids.insert(id.clone());
1105 sleep_tasks.spawn(async move {
1106 SleepTaskCompletion {
1107 id,
1108 result: agent_runner.sleep(duration_ms).await,
1109 }
1110 });
1111 }
1112
1113 fn prepare_agent_run(
1114 &self,
1115 prompt: String,
1116 options: Option<Value>,
1117 ) -> anyhow::Result<PreparedAgentRun> {
1118 let options = apply_phase_defaults(options, &self.metadata);
1119 let context = AgentProviderContext {
1120 phase: options
1121 .as_ref()
1122 .and_then(|options| options.get("phase"))
1123 .and_then(Value::as_str)
1124 .map(ToString::to_string),
1125 cwd: self.workflow_cwd.clone(),
1126 };
1127 let provider_override = options
1128 .as_ref()
1129 .and_then(|options| options.get("provider"))
1130 .and_then(Value::as_str)
1131 .map(ToString::to_string);
1132 let provider_name = provider_override
1133 .as_deref()
1134 .unwrap_or_else(|| self.agent_provider.name());
1135 let options = resolve_model_options(options, provider_name, &self.model_map)?;
1136 agent_retry_policy(&options)?;
1137 log::debug!(
1138 "agent call provider={} phase={:?} model={:?} prompt_len={}",
1139 provider_name,
1140 context.phase.as_deref(),
1141 options
1142 .as_ref()
1143 .and_then(|options| options.get("model"))
1144 .and_then(Value::as_str),
1145 prompt.len()
1146 );
1147 Ok(PreparedAgentRun {
1148 provider_override,
1149 input: AgentProviderRunInput {
1150 prompt,
1151 options,
1152 environment: Arc::new(LocalExecutionEnvironment::new(context.cwd.clone())?),
1153 context,
1154 },
1155 })
1156 }
1157
1158 async fn emit_agent_started_event(
1159 &self,
1160 id: &str,
1161 prepared: &PreparedAgentRun,
1162 ) -> anyhow::Result<()> {
1163 let provider = prepared
1164 .provider_override
1165 .as_deref()
1166 .unwrap_or_else(|| self.agent_provider.name());
1167 let metadata = self.agent_event_metadata(id, Some(provider), None);
1168 self.emit_event(WorkflowEvent::agent_started(
1169 serde_json::json!({
1170 "phase": prepared.input.context.phase,
1171 "promptPreview": truncate_for_event(&prepared.input.prompt, 200),
1172 }),
1173 metadata,
1174 ))
1175 .await
1176 }
1177
1178 async fn apply_agent_result(
1179 &mut self,
1180 id: &str,
1181 input: &AgentProviderRunInput,
1182 provider: Option<String>,
1183 result: AgentProviderResult,
1184 ) -> anyhow::Result<Value> {
1185 if let Some(output_tokens) = result.usage.as_ref().and_then(|usage| usage.output_tokens) {
1186 self.budget.spent = self.budget.spent.saturating_add(output_tokens);
1187 }
1188 self.emit_agent_result_events(id, provider.as_deref(), &result)
1189 .await?;
1190 self.emit_agent_completed_event(id, provider.as_deref(), &result)
1191 .await?;
1192 self.record_agent_run(id, input, provider, &result);
1193 log::debug!(
1194 "agent call complete session_id={:?} output_tokens={:?} budget_spent={}",
1195 result.session_id,
1196 result.usage.as_ref().and_then(|usage| usage.output_tokens),
1197 self.budget.spent
1198 );
1199 Ok(result.output)
1200 }
1201
1202 async fn emit_agent_result_events(
1203 &self,
1204 id: &str,
1205 provider: Option<&str>,
1206 result: &AgentProviderResult,
1207 ) -> anyhow::Result<()> {
1208 let Some(raw) = result.raw.as_ref() else {
1209 return Ok(());
1210 };
1211 let metadata = self.agent_event_metadata(id, provider, result.session_id.clone());
1212 for provider_event in raw_agent_event_payloads(raw) {
1213 let event_data = agent_session_event_payload(provider_event, &metadata);
1214 self.emit_event(WorkflowEvent::agent_event(event_data, metadata.clone()))
1215 .await?;
1216 }
1217 Ok(())
1218 }
1219
1220 async fn emit_agent_completed_event(
1221 &self,
1222 id: &str,
1223 provider: Option<&str>,
1224 result: &AgentProviderResult,
1225 ) -> anyhow::Result<()> {
1226 let metadata = self.agent_event_metadata(id, provider, result.session_id.clone());
1227 self.emit_event(WorkflowEvent::agent_completed(
1228 serde_json::json!({
1229 "sessionId": result.session_id,
1230 "model": result.model,
1231 "usage": result.usage,
1232 }),
1233 metadata,
1234 ))
1235 .await
1236 }
1237
1238 async fn emit_agent_failed_event(
1239 &self,
1240 id: &str,
1241 provider: Option<&str>,
1242 message: &str,
1243 ) -> anyhow::Result<()> {
1244 let metadata = self.agent_event_metadata(id, provider, None);
1245 self.emit_event(WorkflowEvent::agent_failed(
1246 serde_json::json!({ "message": message }),
1247 metadata,
1248 ))
1249 .await
1250 }
1251
1252 fn agent_event_metadata(
1253 &self,
1254 id: &str,
1255 provider: Option<&str>,
1256 session_id: Option<String>,
1257 ) -> WorkflowEventMetadata {
1258 WorkflowEventMetadata {
1259 run_id: None,
1260 step_id: Some(self.event_step_id(id)),
1261 provider: Some(
1262 provider
1263 .unwrap_or_else(|| self.agent_provider.name())
1264 .to_string(),
1265 ),
1266 session_id,
1267 workflow_depth: None,
1268 parent_step_id: None,
1269 }
1270 }
1271
1272 fn record_agent_run(
1273 &mut self,
1274 id: &str,
1275 input: &AgentProviderRunInput,
1276 provider: Option<String>,
1277 result: &AgentProviderResult,
1278 ) {
1279 add_usage(&mut self.token_usage, result.usage.as_ref());
1280 if let Some(phase) = input.context.phase.as_ref() {
1281 let phase_usage = self.token_usage_by_phase.entry(phase.clone()).or_default();
1282 add_usage(phase_usage, result.usage.as_ref());
1283 }
1284 let model = result.model.clone().or_else(|| {
1285 input
1286 .options
1287 .as_ref()
1288 .and_then(|options| options.get("model"))
1289 .and_then(Value::as_str)
1290 .map(ToString::to_string)
1291 });
1292 self.agent_runs.push(WorkflowAgentRunSummary {
1293 id: id.to_string(),
1294 phase: input.context.phase.clone(),
1295 provider,
1296 model,
1297 provider_session_id: result.session_id.clone(),
1298 usage: result.usage.clone(),
1299 isolation: result.isolation.clone(),
1300 });
1301 }
1302
1303 async fn handle_sandbox_exec(
1304 &self,
1305 profile: &str,
1306 request: WorkflowSandboxExecRequest,
1307 ) -> anyhow::Result<Value> {
1308 let (provider, profile_name) = parse_sandbox_profile_ref(profile)?;
1309 let program = format!("smol-sandbox-{provider}");
1310 let sandbox_group_id = format!("sbxgrp_{}", ulid::Ulid::new());
1311 let workspace_path = self
1312 .workflow_cwd
1313 .clone()
1314 .ok_or_else(|| anyhow!("sandbox.exec requires the workflow cwd to be available"))?;
1315 let sandbox_env = SandboxExecutionEnvironment::open(
1316 program,
1317 OpenSandboxRequest {
1318 metadata: SandboxMetadata::new(
1319 format!("sandbox-exec-{}", ulid::Ulid::new()),
1320 sandbox_group_id,
1321 ),
1322 profile: ProfileRef {
1323 provider,
1324 name: profile_name,
1325 },
1326 workspace_sync: WorkspaceSync {
1327 host_path: workspace_path,
1328 },
1329 cwd: request.cwd.clone(),
1330 },
1331 )
1332 .await
1333 .with_context(|| format!("failed to open sandbox profile `{profile}`"))?;
1334
1335 let mut argv = Vec::with_capacity(1 + request.args.len());
1336 argv.push(request.command);
1337 argv.extend(request.args);
1338 let exec_request = ExecRequest {
1339 argv,
1340 cwd: request.cwd.map(EnvironmentPath),
1341 env: request.env,
1342 stdin: request.stdin.map(String::into_bytes),
1343 };
1344 let mut sink = NullExecEventSink;
1345 let exec_result = sandbox_env.exec(exec_request, &mut sink).await;
1346 let close_result = sandbox_env.close().await;
1347 let output = exec_result?;
1348 close_result.context("failed to close sandbox after sandbox.exec")?;
1349
1350 let value = serde_json::to_value(WorkflowSandboxExecOutput {
1351 exit_code: output.exit_code,
1352 stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
1353 stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
1354 })?;
1355 Ok(value)
1356 }
1357
1358 async fn handle_workflow(
1359 &mut self,
1360 parent_step_id: String,
1361 workflow_ref: WorkflowRef,
1362 args: Option<Value>,
1363 ) -> anyhow::Result<Value> {
1364 if self.nesting_depth >= 1 {
1365 bail!("Nested workflow() calls are limited to one level");
1366 }
1367 let script_path = match workflow_ref {
1368 WorkflowRef::ScriptPath { script_path } => {
1369 resolve_relative_script(&self.script_path, &script_path)
1370 }
1371 WorkflowRef::Name(name) => resolve_named_workflow(&name)?,
1372 };
1373 log::debug!("child workflow call script={}", script_path.display());
1374 let child = Box::pin(run_workflow_inner(RunWorkflowOptions {
1375 script_path,
1376 workflow_cwd: self.workflow_cwd.clone(),
1377 args: args.unwrap_or(Value::Null),
1378 agent_provider: Arc::clone(&self.agent_provider),
1379 model_map: self.model_map.clone(),
1380 budget_total: self.budget.total,
1381 budget_spent: self.budget.spent,
1382 nesting_depth: self.nesting_depth + 1,
1383 max_parallel_agent_requests: self.max_parallel_agent_requests,
1384 agent_runner: Some(Arc::clone(&self.agent_runner)),
1385 cancel_rx: self.cancel_rx.clone(),
1386 event_sink: self.event_sink.clone(),
1387 event_parent_step_id: Some(parent_step_id),
1388 event_stream_start: Some(self.event_start),
1389 session_log_sink: self.session_log_sink.clone(),
1390 }))
1391 .await?;
1392 self.budget = child.budget;
1393 self.logs.extend(child.logs);
1394 self.phases.extend(child.phases);
1395 self.agent_calls.extend(child.agent_calls);
1396 self.workflow_calls.extend(child.workflow_calls);
1397 merge_token_usage(&mut self.token_usage, &child.token_usage);
1398 for (phase, usage) in child.token_usage_by_phase {
1399 merge_token_usage(self.token_usage_by_phase.entry(phase).or_default(), &usage);
1400 }
1401 self.agent_runs.extend(child.agent_runs);
1402 Ok(child.output.result)
1403 }
1404}
1405
1406async fn run_agent_runner_with_retry(
1407 agent_runner: Arc<dyn WorkflowAgentRunner>,
1408 default_provider: Arc<dyn AgentProvider>,
1409 provider_override: Option<String>,
1410 input: AgentProviderRunInput,
1411 mut cancel_rx: Option<watch::Receiver<bool>>,
1412) -> anyhow::Result<AgentProviderResult> {
1413 let retry = agent_retry_policy(&input.options)?;
1414 let mut final_result = None;
1415 for attempt in 1..=retry.max_attempts {
1416 let attempt_result = agent_runner
1417 .run_agent(
1418 Arc::clone(&default_provider),
1419 provider_override.clone(),
1420 input.clone(),
1421 )
1422 .await;
1423 match attempt_result {
1424 Ok(result) => {
1425 final_result = Some(Ok(result));
1426 break;
1427 }
1428 Err(error) if attempt < retry.max_attempts => {
1429 log::debug!(
1430 "agent call failed on attempt {attempt}/{}; retrying after {}ms: {error:#}",
1431 retry.max_attempts,
1432 retry.backoff_ms
1433 );
1434 sleep_retry_backoff(retry.backoff_ms, &mut cancel_rx).await?;
1435 }
1436 Err(error) => {
1437 final_result = Some(Err(error));
1438 break;
1439 }
1440 }
1441 }
1442 final_result.unwrap_or_else(|| Err(anyhow!("agent retry loop finished without a result")))
1443}
1444
1445async fn sleep_retry_backoff(
1446 backoff_ms: u64,
1447 cancel_rx: &mut Option<watch::Receiver<bool>>,
1448) -> anyhow::Result<()> {
1449 if backoff_ms == 0 {
1450 return Ok(());
1451 }
1452 let Some(cancel_rx) = cancel_rx.as_mut() else {
1453 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1454 return Ok(());
1455 };
1456 if *cancel_rx.borrow() {
1457 bail!("workflow cancelled");
1458 }
1459 let sleep = tokio::time::sleep(Duration::from_millis(backoff_ms));
1460 tokio::pin!(sleep);
1461 loop {
1462 tokio::select! {
1463 _ = &mut sleep => return Ok(()),
1464 changed = cancel_rx.changed() => {
1465 match changed {
1466 Ok(()) if *cancel_rx.borrow() => bail!("workflow cancelled"),
1467 Ok(()) => continue,
1468 Err(_) => {
1469 sleep.await;
1470 return Ok(());
1471 }
1472 }
1473 }
1474 }
1475 }
1476}
1477
1478pub(crate) async fn run_agent_provider_with_retry(
1479 default_provider: Arc<dyn AgentProvider>,
1480 provider_override: Option<String>,
1481 input: AgentProviderRunInput,
1482 mut cancel_rx: Option<watch::Receiver<bool>>,
1483) -> anyhow::Result<AgentProviderResult> {
1484 let retry = agent_retry_policy(&input.options)?;
1485 let provider = resolve_agent_provider(default_provider, provider_override)?;
1486 let mut final_result = None;
1487 for attempt in 1..=retry.max_attempts {
1488 let attempt_result =
1489 run_agent_with_optional_isolation(Arc::clone(&provider), input.clone()).await;
1490 match attempt_result {
1491 Ok(result) => {
1492 final_result = Some(Ok(result));
1493 break;
1494 }
1495 Err(error) if attempt < retry.max_attempts => {
1496 log::debug!(
1497 "agent provider failed on attempt {attempt}/{}; retrying after {}ms: {error:#}",
1498 retry.max_attempts,
1499 retry.backoff_ms
1500 );
1501 sleep_retry_backoff(retry.backoff_ms, &mut cancel_rx).await?;
1502 }
1503 Err(error) => {
1504 final_result = Some(Err(error));
1505 break;
1506 }
1507 }
1508 }
1509 final_result.unwrap_or_else(|| Err(anyhow!("agent retry loop finished without a result")))
1510}
1511
1512pub(crate) async fn run_agent_provider(
1513 default_provider: Arc<dyn AgentProvider>,
1514 provider_override: Option<String>,
1515 input: AgentProviderRunInput,
1516) -> anyhow::Result<AgentProviderResult> {
1517 let provider = resolve_agent_provider(default_provider, provider_override)?;
1518 run_agent_with_optional_isolation(provider, input).await
1519}
1520
1521fn resolve_agent_provider(
1522 default_provider: Arc<dyn AgentProvider>,
1523 provider_override: Option<String>,
1524) -> anyhow::Result<Arc<dyn AgentProvider>> {
1525 if let Some(provider_override) = provider_override {
1526 Ok(Arc::from(create_agent_provider(&provider_override)?))
1527 } else {
1528 Ok(default_provider)
1529 }
1530}
1531
1532#[derive(Debug, Clone, Copy)]
1533pub(crate) struct AgentRetryPolicy {
1534 pub max_attempts: u32,
1535 pub backoff_ms: u64,
1536}
1537
1538pub(crate) fn agent_retry_policy(options: &Option<Value>) -> anyhow::Result<AgentRetryPolicy> {
1539 let default = AgentRetryPolicy {
1540 max_attempts: 1,
1541 backoff_ms: 0,
1542 };
1543 let Some(retry) = options.as_ref().and_then(|options| options.get("retry")) else {
1544 return Ok(default);
1545 };
1546 if retry.is_null() {
1547 return Ok(default);
1548 }
1549 let object = retry
1550 .as_object()
1551 .ok_or_else(|| anyhow!("agent retry option must be an object"))?;
1552 let max_attempts = match object.get("maxAttempts") {
1553 Some(value) => {
1554 let value = value
1555 .as_u64()
1556 .ok_or_else(|| anyhow!("agent retry.maxAttempts must be a positive integer"))?;
1557 if value == 0 || value > u32::MAX as u64 {
1558 bail!("agent retry.maxAttempts must be between 1 and {}", u32::MAX);
1559 }
1560 value as u32
1561 }
1562 None => default.max_attempts,
1563 };
1564 let backoff_ms = match object.get("backoffMs") {
1565 Some(value) => value
1566 .as_u64()
1567 .ok_or_else(|| anyhow!("agent retry.backoffMs must be a non-negative integer"))?,
1568 None => default.backoff_ms,
1569 };
1570 Ok(AgentRetryPolicy {
1571 max_attempts,
1572 backoff_ms,
1573 })
1574}
1575
1576async fn run_agent_with_optional_isolation(
1577 provider: Arc<dyn AgentProvider>,
1578 input: AgentProviderRunInput,
1579) -> anyhow::Result<AgentProviderResult> {
1580 if let Some(request) = sandbox_isolation_request(&input.options)? {
1581 return run_agent_with_sandbox_isolation(provider, input, request).await;
1582 }
1583
1584 if requests_worktree_isolation(&input.options) {
1585 return run_agent_with_worktree_isolation(provider, input).await;
1586 }
1587
1588 run_agent_with_schema_validation(provider, input).await
1589}
1590
1591async fn run_agent_with_worktree_isolation(
1592 provider: Arc<dyn AgentProvider>,
1593 input: AgentProviderRunInput,
1594) -> anyhow::Result<AgentProviderResult> {
1595 let isolation = WorktreeIsolation::create(input.context.cwd.as_deref())?;
1596 let isolation_info = isolation.info();
1597 let mut isolated_input = input;
1598 isolated_input.context.cwd = Some(isolation.cwd.clone());
1599 isolated_input.environment =
1600 Arc::new(LocalExecutionEnvironment::new(Some(isolation.cwd.clone()))?);
1601 let mut result = run_agent_with_schema_validation(provider, isolated_input).await;
1602 if let Ok(result) = &mut result {
1603 result.isolation = Some(isolation_info);
1604 }
1605 if let Err(error) = isolation.cleanup() {
1606 log::warn!("failed to cleanup isolated agent worktree: {error:#}");
1607 }
1608 result
1609}
1610
1611async fn run_agent_with_sandbox_isolation(
1612 provider: Arc<dyn AgentProvider>,
1613 input: AgentProviderRunInput,
1614 request: SandboxIsolationRequest,
1615) -> anyhow::Result<AgentProviderResult> {
1616 let program = sandbox_provider_program(&request);
1617 let sandbox_group_id = format!("sbxgrp_{}", ulid::Ulid::new());
1618 let cwd = input.context.cwd.clone().ok_or_else(|| {
1619 anyhow!("agent sandbox isolation requires the workflow cwd to be available")
1620 })?;
1621 let sandbox_env = SandboxExecutionEnvironment::open(
1622 program,
1623 OpenSandboxRequest {
1624 metadata: SandboxMetadata::new(
1625 format!("req_{}", ulid::Ulid::new()),
1626 sandbox_group_id.clone(),
1627 ),
1628 profile: ProfileRef {
1629 provider: request.provider.clone(),
1630 name: request.profile.clone(),
1631 },
1632 workspace_sync: WorkspaceSync { host_path: cwd },
1633 cwd: request.cwd.clone(),
1634 },
1635 )
1636 .await
1637 .with_context(|| {
1638 format!(
1639 "failed to open sandbox profile `{}` with provider `{}`",
1640 request.profile, request.provider
1641 )
1642 })?;
1643
1644 let session = sandbox_env.session().clone();
1645 let mut isolated_input = input;
1646 isolated_input.context.cwd = sandbox_env.cwd().map(|path| PathBuf::from(path.as_str()));
1647 isolated_input.environment = Arc::new(sandbox_env.clone());
1648 let isolation_info = AgentRunIsolation {
1649 kind: "sandbox".to_string(),
1650 branch: None,
1651 worktree_path: None,
1652 cwd: session.cwd.clone(),
1653 profile: Some(request.profile.clone()),
1654 provider: Some(request.provider.clone()),
1655 session_id: Some(session.id.clone()),
1656 };
1657
1658 let mut result = run_agent_with_schema_validation(provider, isolated_input).await;
1659 if let Ok(result) = &mut result {
1660 result.isolation = Some(isolation_info);
1661 }
1662
1663 if let Err(error) = sandbox_env.close().await {
1664 log::warn!("failed to close sandbox-isolated agent session: {error:#}");
1665 }
1666
1667 result
1668}
1669
1670fn requests_worktree_isolation(options: &Option<Value>) -> bool {
1671 options
1672 .as_ref()
1673 .and_then(|options| options.get("isolation"))
1674 .and_then(Value::as_str)
1675 == Some("worktree")
1676}
1677
1678#[derive(Debug, Clone)]
1679struct SandboxIsolationRequest {
1680 provider: String,
1681 profile: String,
1682 cwd: Option<String>,
1683}
1684
1685fn sandbox_isolation_request(
1686 options: &Option<Value>,
1687) -> anyhow::Result<Option<SandboxIsolationRequest>> {
1688 let Some(isolation) = options
1689 .as_ref()
1690 .and_then(|options| options.get("isolation"))
1691 else {
1692 return Ok(None);
1693 };
1694 let Some(object) = isolation.as_object() else {
1695 return Ok(None);
1696 };
1697 if object.get("type").and_then(Value::as_str) != Some("sandbox") {
1698 return Ok(None);
1699 }
1700 let profile = object
1701 .get("profile")
1702 .and_then(Value::as_str)
1703 .ok_or_else(|| anyhow!("agent sandbox isolation requires isolation.profile"))?
1704 .to_string();
1705 let cwd = object
1706 .get("cwd")
1707 .and_then(Value::as_str)
1708 .map(ToString::to_string);
1709 let (provider, profile) = parse_sandbox_profile_ref(&profile)?;
1710 Ok(Some(SandboxIsolationRequest {
1711 provider,
1712 profile,
1713 cwd,
1714 }))
1715}
1716
1717fn parse_sandbox_profile_ref(value: &str) -> anyhow::Result<(String, String)> {
1718 let (provider, profile) = value.split_once('/').ok_or_else(|| {
1719 anyhow!("agent sandbox isolation.profile must use <provider>/<profile>, got `{value}`")
1720 })?;
1721 validate_sandbox_provider_name(provider)?;
1722 if profile.is_empty() {
1723 bail!(
1724 "agent sandbox isolation.profile must include a non-empty provider-local profile name"
1725 );
1726 }
1727 Ok((provider.to_string(), profile.to_string()))
1728}
1729
1730fn validate_sandbox_provider_name(provider: &str) -> anyhow::Result<()> {
1731 let valid = !provider.is_empty()
1732 && provider
1733 .bytes()
1734 .all(|byte| byte.is_ascii_lowercase() || byte.is_ascii_digit() || byte == b'-')
1735 && provider
1736 .as_bytes()
1737 .first()
1738 .is_some_and(u8::is_ascii_alphanumeric)
1739 && provider
1740 .as_bytes()
1741 .last()
1742 .is_some_and(u8::is_ascii_alphanumeric);
1743 if !valid {
1744 bail!("invalid sandbox provider name `{provider}`; expected lowercase letters, digits, and hyphens, starting and ending with an alphanumeric character");
1745 }
1746 Ok(())
1747}
1748
1749fn sandbox_provider_program(request: &SandboxIsolationRequest) -> String {
1750 format!("smol-sandbox-{}", request.provider)
1751}
1752
1753struct WorktreeIsolation {
1754 repo_root: PathBuf,
1755 worktree_root: PathBuf,
1756 cwd: PathBuf,
1757 branch_name: String,
1758 cleaned: bool,
1759 _temp_dir: tempfile::TempDir,
1760}
1761
1762impl WorktreeIsolation {
1763 fn create(cwd: Option<&Path>) -> anyhow::Result<Self> {
1764 let cwd = cwd
1765 .map(Path::to_path_buf)
1766 .unwrap_or(std::env::current_dir()?)
1767 .canonicalize()
1768 .context("failed to canonicalize workflow cwd for worktree isolation")?;
1769 let repo_root = git_output(&cwd, &["rev-parse", "--show-toplevel"]).context(
1770 "agent isolation='worktree' requires the workflow cwd to be inside a git repository",
1771 )?;
1772 let repo_root = PathBuf::from(repo_root.trim())
1773 .canonicalize()
1774 .context("failed to canonicalize git repository root for worktree isolation")?;
1775 let relative_cwd = cwd.strip_prefix(&repo_root).with_context(|| {
1776 format!(
1777 "workflow cwd {} is not under git repository root {}",
1778 cwd.display(),
1779 repo_root.display()
1780 )
1781 })?;
1782
1783 let temp_dir = tempfile::Builder::new()
1784 .prefix("smol-wf-agent-worktree-")
1785 .tempdir()
1786 .context("failed to create temp directory for agent worktree isolation")?;
1787 let worktree_root = temp_dir.path().join("worktree");
1788 let worktree_arg = path_arg(&worktree_root);
1789 let branch_name = format!(
1790 "smol-wf/agent-run/{}",
1791 ulid::Ulid::new().to_string().to_ascii_lowercase()
1792 );
1793 git_status(
1794 &repo_root,
1795 &[
1796 "worktree",
1797 "add",
1798 "--quiet",
1799 "-b",
1800 &branch_name,
1801 &worktree_arg,
1802 "HEAD",
1803 ],
1804 )
1805 .context("failed to create isolated git worktree for agent run")?;
1806 let isolated_cwd = if relative_cwd.as_os_str().is_empty() {
1807 worktree_root.clone()
1808 } else {
1809 worktree_root.join(relative_cwd)
1810 };
1811 Ok(Self {
1812 repo_root,
1813 worktree_root,
1814 cwd: isolated_cwd,
1815 branch_name,
1816 cleaned: false,
1817 _temp_dir: temp_dir,
1818 })
1819 }
1820
1821 fn info(&self) -> AgentRunIsolation {
1822 AgentRunIsolation {
1823 kind: "worktree".to_string(),
1824 branch: Some(self.branch_name.clone()),
1825 worktree_path: Some(path_arg(&self.worktree_root)),
1826 cwd: Some(path_arg(&self.cwd)),
1827 profile: None,
1828 provider: None,
1829 session_id: None,
1830 }
1831 }
1832
1833 fn cleanup(mut self) -> anyhow::Result<()> {
1834 self.remove_worktree()?;
1835 self.delete_branch()?;
1836 self.cleaned = true;
1837 Ok(())
1838 }
1839
1840 fn remove_worktree(&self) -> anyhow::Result<()> {
1841 let worktree_arg = path_arg(&self.worktree_root);
1842 git_status(
1843 &self.repo_root,
1844 &["worktree", "remove", "--force", &worktree_arg],
1845 )
1846 .context("failed to remove isolated git worktree")
1847 }
1848
1849 fn delete_branch(&self) -> anyhow::Result<()> {
1850 git_status(&self.repo_root, &["branch", "-D", &self.branch_name])
1851 .context("failed to delete isolated agent worktree branch")
1852 }
1853}
1854
1855impl Drop for WorktreeIsolation {
1856 fn drop(&mut self) {
1857 if !self.cleaned {
1858 if let Err(error) = self.remove_worktree() {
1859 log::warn!("failed to cleanup isolated agent worktree during drop: {error:#}");
1860 }
1861 if let Err(error) = self.delete_branch() {
1862 log::warn!(
1863 "failed to delete isolated agent worktree branch during drop: {error:#}"
1864 );
1865 }
1866 }
1867 }
1868}
1869
1870fn path_arg(path: &Path) -> String {
1871 path.to_string_lossy().into_owned()
1872}
1873
1874fn git_output(cwd: &Path, args: &[&str]) -> anyhow::Result<String> {
1875 let output = StdCommand::new("git")
1876 .args(args)
1877 .current_dir(cwd)
1878 .output()
1879 .with_context(|| format!("failed to run git {}", args.join(" ")))?;
1880 if output.status.success() {
1881 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
1882 } else {
1883 bail!(
1884 "git {} failed with {}{}",
1885 args.join(" "),
1886 status_text(output.status.code()),
1887 command_stderr(&output.stderr)
1888 )
1889 }
1890}
1891
1892fn git_status(cwd: &Path, args: &[&str]) -> anyhow::Result<()> {
1893 let output = StdCommand::new("git")
1894 .args(args)
1895 .current_dir(cwd)
1896 .output()
1897 .with_context(|| format!("failed to run git {}", args.join(" ")))?;
1898 if output.status.success() {
1899 Ok(())
1900 } else {
1901 bail!(
1902 "git {} failed with {}{}",
1903 args.join(" "),
1904 status_text(output.status.code()),
1905 command_stderr(&output.stderr)
1906 )
1907 }
1908}
1909
1910fn status_text(code: Option<i32>) -> String {
1911 code.map(|code| format!("code {code}"))
1912 .unwrap_or_else(|| "signal".to_string())
1913}
1914
1915fn command_stderr(stderr: &[u8]) -> String {
1916 let stderr = String::from_utf8_lossy(stderr);
1917 let stderr = stderr.trim();
1918 if stderr.is_empty() {
1919 String::new()
1920 } else {
1921 format!(": {stderr}")
1922 }
1923}
1924
1925async fn run_agent_with_schema_validation(
1926 provider: Arc<dyn AgentProvider>,
1927 input: AgentProviderRunInput,
1928) -> anyhow::Result<AgentProviderResult> {
1929 let Some(schema) = input
1930 .options
1931 .as_ref()
1932 .and_then(|options| options.get("schema"))
1933 .cloned()
1934 else {
1935 return provider.run(input).await;
1936 };
1937
1938 let max_attempts = 2;
1939 let original_prompt = input.prompt.clone();
1940 let mut attempt_input = input;
1941 let mut last_errors = Vec::new();
1942
1943 for attempt in 1..=max_attempts {
1944 let result = provider.run(attempt_input.clone()).await?;
1945 match validate_structured_output(&schema, &result.output) {
1946 Ok(()) => return Ok(result),
1947 Err(errors) => {
1948 last_errors = errors;
1949 if attempt < max_attempts {
1950 attempt_input.prompt =
1951 with_structured_output_retry_prompt(&original_prompt, &last_errors);
1952 }
1953 }
1954 }
1955 }
1956
1957 bail!(
1958 "{}",
1959 format_structured_output_validation_error(&last_errors)
1960 )
1961}
1962
1963fn validate_structured_output(schema: &Value, output: &Value) -> Result<(), Vec<String>> {
1964 let validator = jsonschema::validator_for(schema)
1965 .map_err(|error| vec![format!("/ schema is invalid: {}", error)])?;
1966 let errors = validator
1967 .iter_errors(output)
1968 .map(|error| {
1969 let path = error.instance_path().to_string();
1970 let path = if path.is_empty() {
1971 "/".to_string()
1972 } else {
1973 path
1974 };
1975 format!("{path} {error}")
1976 })
1977 .collect::<Vec<_>>();
1978
1979 if errors.is_empty() {
1980 Ok(())
1981 } else {
1982 Err(errors)
1983 }
1984}
1985
1986fn format_structured_output_validation_error(errors: &[String]) -> String {
1987 format!(
1988 "Structured output did not match JSON Schema: {}",
1989 errors.join("; ")
1990 )
1991}
1992
1993fn with_structured_output_retry_prompt(prompt: &str, errors: &[String]) -> String {
1994 let mut lines = vec![
1995 prompt.to_string(),
1996 String::new(),
1997 "Previous structured output failed JSON Schema validation.".to_string(),
1998 "Return a corrected structured output that satisfies the original JSON Schema.".to_string(),
1999 "Validation errors:".to_string(),
2000 ];
2001 lines.extend(errors.iter().map(|error| format!("- {error}")));
2002 lines.join("\n")
2003}
2004
2005#[derive(Debug, Clone, PartialEq, Eq)]
2006struct ResolvedModelSelector {
2007 requested: String,
2008 selector: String,
2009 model_id: String,
2010 model_provider: Option<String>,
2011 thinking: Option<String>,
2012}
2013
2014impl ResolvedModelSelector {
2015 fn provider_model(&self) -> String {
2016 match &self.model_provider {
2017 Some(provider) => format!("{provider}/{}", self.model_id),
2018 None => self.model_id.clone(),
2019 }
2020 }
2021}
2022
2023fn resolve_model_options(
2024 options: Option<Value>,
2025 agent_provider: &str,
2026 model_map: &BTreeMap<String, String>,
2027) -> anyhow::Result<Option<Value>> {
2028 let Some(model) = options
2029 .as_ref()
2030 .and_then(Value::as_object)
2031 .and_then(|object| object.get("model"))
2032 .and_then(Value::as_str)
2033 .map(ToString::to_string)
2034 else {
2035 return Ok(options);
2036 };
2037
2038 let mapped_selector = model_map.get(&model).cloned();
2039 let alias_matched = mapped_selector.is_some();
2040 let selector = mapped_selector.unwrap_or_else(|| model.clone());
2041 let resolved = parse_model_selector(&model, &selector)?;
2042 validate_model_selector_for_provider(&resolved, agent_provider)?;
2043
2044 let mut object = options
2045 .and_then(|value| value.as_object().cloned())
2046 .unwrap_or_default();
2047 object.insert(
2048 "model".to_string(),
2049 Value::String(resolved.provider_model()),
2050 );
2051
2052 let selector_has_extra_parts = alias_matched
2053 || resolved.selector.contains('?')
2054 || resolved.model_provider.is_some()
2055 || resolved.thinking.is_some();
2056 if selector_has_extra_parts {
2057 object.insert(
2058 "requestedModel".to_string(),
2059 Value::String(resolved.requested.clone()),
2060 );
2061 object.insert(
2062 "modelSelector".to_string(),
2063 Value::String(resolved.selector.clone()),
2064 );
2065 } else {
2066 object.remove("requestedModel");
2067 object.remove("modelSelector");
2068 }
2069
2070 if let Some(provider) = resolved.model_provider {
2071 object.insert("modelProvider".to_string(), Value::String(provider));
2072 } else {
2073 object.remove("modelProvider");
2074 }
2075 if let Some(thinking) = resolved.thinking {
2076 object.insert("thinking".to_string(), Value::String(thinking));
2077 } else {
2078 object.remove("thinking");
2079 }
2080 Ok(Some(Value::Object(object)))
2081}
2082
2083fn parse_model_selector(requested: &str, selector: &str) -> anyhow::Result<ResolvedModelSelector> {
2084 let (model_part, query) = selector.split_once('?').unwrap_or((selector, ""));
2085 if model_part.trim().is_empty() {
2086 bail!("model selector must include a model id: {selector}");
2087 }
2088
2089 let (slash_provider, model_id) = match model_part.split_once('/') {
2090 Some((provider, model_id)) if !provider.is_empty() && !model_id.is_empty() => {
2091 (Some(provider.to_string()), model_id.to_string())
2092 }
2093 Some(_) => bail!("model selector provider/model form is invalid: {selector}"),
2094 None => (None, model_part.to_string()),
2095 };
2096
2097 let mut query_provider = None::<String>;
2098 let mut thinking = None::<String>;
2099 if !query.is_empty() {
2100 for pair in query.split('&') {
2101 if pair.is_empty() {
2102 continue;
2103 }
2104 let (key, value) = pair.split_once('=').ok_or_else(|| {
2105 anyhow!("model selector query parameter must use key=value: {pair}")
2106 })?;
2107 let key = percent_decode(key)?;
2108 let value = percent_decode(value)?;
2109 if value.is_empty() {
2110 bail!("model selector query parameter `{key}` must not be empty");
2111 }
2112 match key.as_str() {
2113 "provider" => set_unique_query_value(&mut query_provider, key, value)?,
2114 "thinking" => set_unique_query_value(&mut thinking, key, value)?,
2115 _ => bail!("unknown model selector query parameter `{key}`"),
2116 }
2117 }
2118 }
2119
2120 let model_provider = match (slash_provider, query_provider) {
2121 (Some(slash), Some(query)) if slash != query => bail!(
2122 "conflicting model provider qualifiers in selector `{selector}`: `{slash}` and `{query}`"
2123 ),
2124 (Some(provider), Some(_)) | (Some(provider), None) | (None, Some(provider)) => {
2125 Some(provider)
2126 }
2127 (None, None) => None,
2128 };
2129
2130 Ok(ResolvedModelSelector {
2131 requested: requested.to_string(),
2132 selector: selector.to_string(),
2133 model_id,
2134 model_provider,
2135 thinking,
2136 })
2137}
2138
2139fn set_unique_query_value(
2140 target: &mut Option<String>,
2141 key: String,
2142 value: String,
2143) -> anyhow::Result<()> {
2144 if target.replace(value).is_some() {
2145 bail!("duplicate model selector query parameter `{key}`");
2146 }
2147 Ok(())
2148}
2149
2150fn percent_decode(value: &str) -> anyhow::Result<String> {
2151 let bytes = value.as_bytes();
2152 let mut output = Vec::with_capacity(bytes.len());
2153 let mut index = 0;
2154 while index < bytes.len() {
2155 match bytes[index] {
2156 b'%' => {
2157 if index + 2 >= bytes.len() {
2158 bail!("invalid percent escape in model selector query: {value}");
2159 }
2160 let high = hex_value(bytes[index + 1]).ok_or_else(|| {
2161 anyhow!("invalid percent escape in model selector query: {value}")
2162 })?;
2163 let low = hex_value(bytes[index + 2]).ok_or_else(|| {
2164 anyhow!("invalid percent escape in model selector query: {value}")
2165 })?;
2166 output.push((high << 4) | low);
2167 index += 3;
2168 }
2169 b'+' => {
2170 output.push(b' ');
2171 index += 1;
2172 }
2173 byte => {
2174 output.push(byte);
2175 index += 1;
2176 }
2177 }
2178 }
2179 String::from_utf8(output).context("model selector query is not valid UTF-8")
2180}
2181
2182fn hex_value(byte: u8) -> Option<u8> {
2183 match byte {
2184 b'0'..=b'9' => Some(byte - b'0'),
2185 b'a'..=b'f' => Some(byte - b'a' + 10),
2186 b'A'..=b'F' => Some(byte - b'A' + 10),
2187 _ => None,
2188 }
2189}
2190
2191fn validate_model_selector_for_provider(
2192 resolved: &ResolvedModelSelector,
2193 agent_provider: &str,
2194) -> anyhow::Result<()> {
2195 match agent_provider {
2196 "codex" => {
2197 if resolved.model_provider.is_some() {
2198 bail!("Codex model selectors do not support ?provider=... or provider/model form");
2199 }
2200 if resolved.thinking.is_some() {
2201 bail!("Codex model selectors do not support thinking=...");
2202 }
2203 }
2204 "claude-code" if resolved.model_provider.is_some() => {
2205 bail!(
2206 "Claude Code model selectors do not support ?provider=... or provider/model form"
2207 );
2208 }
2209 "opencode" if resolved.model_provider.is_none() => {
2210 bail!("OpenCode model selectors must use provider/model or ?provider=...");
2211 }
2212 "debug" | "pi" => {}
2213 _ => {}
2214 }
2215 Ok(())
2216}
2217
2218fn apply_phase_defaults(options: Option<Value>, metadata: &WorkflowMetadata) -> Option<Value> {
2219 let phase_name = options
2220 .as_ref()
2221 .and_then(|options| options.get("phase"))
2222 .and_then(Value::as_str)
2223 .map(ToString::to_string);
2224 let phase_metadata = phase_name.as_ref().and_then(|phase_name| {
2225 metadata
2226 .phases
2227 .iter()
2228 .find(|phase| phase.title == *phase_name)
2229 });
2230
2231 if phase_name.is_none() && phase_metadata.is_none() {
2232 return options;
2233 }
2234
2235 let mut object = options
2236 .and_then(|value| value.as_object().cloned())
2237 .unwrap_or_default();
2238
2239 if let Some(phase_name) = phase_name {
2240 object
2241 .entry("phase".to_string())
2242 .or_insert(Value::String(phase_name));
2243 }
2244 if let Some(model) = phase_metadata.and_then(|phase| phase.model.clone()) {
2245 object
2246 .entry("model".to_string())
2247 .or_insert(Value::String(model));
2248 }
2249 if let Some(provider) = phase_metadata.and_then(|phase| phase.provider.clone()) {
2250 object
2251 .entry("provider".to_string())
2252 .or_insert(Value::String(provider));
2253 }
2254
2255 Some(Value::Object(object))
2256}
2257
2258fn resolve_relative_script(current_script_path: &Path, script_path: &str) -> PathBuf {
2259 let script_path = PathBuf::from(script_path);
2260 if script_path.is_absolute() {
2261 script_path
2262 } else {
2263 current_script_path
2264 .parent()
2265 .unwrap_or_else(|| Path::new("."))
2266 .join(script_path)
2267 }
2268}
2269
2270fn resolve_named_workflow(name: &str) -> anyhow::Result<PathBuf> {
2271 let workflows_dir = PathBuf::from(".claude/workflows");
2272 for entry in fs::read_dir(&workflows_dir).unwrap_or_else(|_| fs::read_dir(".").unwrap()) {
2273 let entry = entry?;
2274 let path = entry.path();
2275 if path.extension().and_then(|extension| extension.to_str()) != Some("js") {
2276 continue;
2277 }
2278 if read_workflow_metadata(&path)?.is_some_and(|metadata| metadata.name == name) {
2279 return Ok(path);
2280 }
2281 }
2282 bail!("Unknown workflow: {name}")
2283}