1use crate::agent_providers::{
2 create_agent_provider, AgentProvider, AgentProviderContext, AgentProviderResult,
3 AgentProviderRunInput, AgentRunIsolation, AgentUsage, AgentUsageCost,
4};
5use crate::environment::{
6 AgentExecutionEnvironment, LocalExecutionEnvironment, SandboxExecutionEnvironment,
7};
8use crate::js_runtime::rquickjs::RQuickJSWorkflowRuntime;
9use crate::js_runtime::{
10 WorkflowBudgetSnapshot, WorkflowJSRuntime, WorkflowModuleInput, WorkflowModuleOutput,
11 WorkflowRef, WorkflowRuntimeCall, WorkflowRuntimeExecution, WorkflowRuntimePoll,
12 WorkflowRuntimeRequest, WorkflowRuntimeRequestResolution,
13};
14use crate::metadata::{read_workflow_metadata, WorkflowMetadata};
15use anyhow::{anyhow, bail, Context};
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use smol_workflow_sandbox::{
19 Metadata as SandboxMetadata, OpenSandboxRequest, ProfileRef, WorkspaceSync,
20};
21use std::collections::{BTreeMap, BTreeSet, VecDeque};
22use std::fs;
23use std::path::{Path, PathBuf};
24use std::process::Command as StdCommand;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{mpsc, watch};
28use tokio::task::{JoinSet, LocalSet};
29
30pub use crate::events::{
31 WorkflowEvent, WorkflowEventMetadata, WorkflowEventSink, WorkflowEventType,
32};
33
34#[async_trait::async_trait]
35pub trait AgentSessionLogSink: Send + Sync {
36 async fn write_agent_result(
37 &self,
38 provider: &str,
39 result: &AgentProviderResult,
40 ) -> anyhow::Result<()>;
41}
42
43#[async_trait::async_trait]
44pub trait WorkflowAgentRunner: Send + Sync {
45 async fn run_agent(
46 &self,
47 default_provider: Arc<dyn AgentProvider>,
48 provider_override: Option<String>,
49 input: AgentProviderRunInput,
50 ) -> anyhow::Result<AgentProviderResult>;
51
52 fn retry_in_runtime(&self) -> bool {
66 true
67 }
68
69 async fn sleep(&self, duration_ms: u64) -> anyhow::Result<()> {
70 tokio::time::sleep(std::time::Duration::from_millis(duration_ms)).await;
71 Ok(())
72 }
73}
74
75#[derive(Debug, Default)]
76pub struct DirectWorkflowAgentRunner;
77
78#[async_trait::async_trait]
79impl WorkflowAgentRunner for DirectWorkflowAgentRunner {
80 async fn run_agent(
81 &self,
82 default_provider: Arc<dyn AgentProvider>,
83 provider_override: Option<String>,
84 input: AgentProviderRunInput,
85 ) -> anyhow::Result<AgentProviderResult> {
86 run_agent_provider(default_provider, provider_override, input).await
87 }
88}
89
90pub struct RunWorkflowOptions {
91 pub script_path: PathBuf,
92 pub workflow_cwd: Option<PathBuf>,
95 pub args: Value,
96 pub agent_provider: Arc<dyn AgentProvider>,
97 pub model_map: BTreeMap<String, String>,
98 pub budget_total: Option<u64>,
99 pub budget_spent: u64,
100 pub nesting_depth: usize,
101 pub max_parallel_agent_requests: Option<usize>,
102 pub agent_runner: Option<Arc<dyn WorkflowAgentRunner>>,
103 pub cancel_rx: Option<watch::Receiver<bool>>,
104 pub event_sink: Option<Arc<dyn WorkflowEventSink>>,
105 pub event_parent_step_id: Option<String>,
106 pub event_stream_start: Option<Instant>,
107 pub session_log_sink: Option<Arc<dyn AgentSessionLogSink>>,
108}
109
110#[derive(Debug)]
111pub struct RunWorkflowResult {
112 pub output: WorkflowModuleOutput,
113 pub logs: Vec<Vec<Value>>,
114 pub phases: Vec<WorkflowPhaseCall>,
115 pub agent_calls: Vec<WorkflowRuntimeRequest>,
116 pub workflow_calls: Vec<WorkflowRuntimeRequest>,
117 pub budget: WorkflowBudgetSnapshot,
118 pub token_usage: WorkflowTokenUsage,
119 pub token_usage_by_phase: std::collections::BTreeMap<String, WorkflowTokenUsage>,
120 pub agent_runs: Vec<WorkflowAgentRunSummary>,
121}
122
123#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
124#[serde(rename_all = "camelCase")]
125pub struct WorkflowTokenUsage {
126 pub input_tokens: u64,
127 pub output_tokens: u64,
128 pub cache_read_tokens: u64,
129 pub cache_write_tokens: u64,
130 pub total_tokens: u64,
131 #[serde(skip_serializing_if = "Option::is_none")]
132 pub cost: Option<AgentUsageCost>,
133}
134
135#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
136#[serde(rename_all = "camelCase")]
137pub struct WorkflowAgentRunSummary {
138 pub id: String,
139 #[serde(skip_serializing_if = "Option::is_none")]
140 pub phase: Option<String>,
141 #[serde(skip_serializing_if = "Option::is_none")]
142 pub provider: Option<String>,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 pub model: Option<String>,
145 #[serde(skip_serializing_if = "Option::is_none")]
146 pub provider_session_id: Option<String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
148 pub usage: Option<AgentUsage>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 pub isolation: Option<AgentRunIsolation>,
151}
152
153#[derive(Debug, Clone, PartialEq)]
154pub struct WorkflowPhaseCall {
155 pub name: String,
156 pub options: Option<Value>,
157}
158
159pub async fn run_workflow(options: RunWorkflowOptions) -> anyhow::Result<RunWorkflowResult> {
160 LocalSet::new().run_until(run_workflow_inner(options)).await
161}
162
163async fn run_workflow_inner(options: RunWorkflowOptions) -> anyhow::Result<RunWorkflowResult> {
164 log::debug!(
165 "run_workflow start script={} provider={} nesting_depth={} budget_total={:?} budget_spent={}",
166 options.script_path.display(),
167 options.agent_provider.name(),
168 options.nesting_depth,
169 options.budget_total,
170 options.budget_spent
171 );
172 let script_path = fs::canonicalize(&options.script_path).with_context(|| {
173 format!(
174 "failed to resolve workflow script {}",
175 options.script_path.display()
176 )
177 })?;
178 let metadata = read_workflow_metadata(&script_path)?.ok_or_else(|| {
179 anyhow!("Workflow script must export valid literal metadata as `export const meta = {{ name, description, ... }}`")
180 })?;
181 log::debug!(
182 "workflow metadata loaded name={} phases={}",
183 metadata.name,
184 metadata.phases.len()
185 );
186 let workflow_cwd = match options.workflow_cwd {
187 Some(cwd) => Some(
188 fs::canonicalize(&cwd)
189 .with_context(|| format!("failed to resolve workflow cwd {}", cwd.display()))?,
190 ),
191 None => script_path.parent().map(Path::to_path_buf),
192 };
193 let source = fs::read_to_string(&script_path)
194 .with_context(|| format!("failed to read workflow script {}", script_path.display()))?;
195 let runtime = RQuickJSWorkflowRuntime::new();
196 let execution = runtime.start_module(WorkflowModuleInput {
197 source,
198 source_name: script_path.to_string_lossy().into_owned(),
199 args: options.args,
200 budget: WorkflowBudgetSnapshot {
201 total: options.budget_total,
202 spent: options.budget_spent,
203 },
204 sandbox: Default::default(),
205 })?;
206
207 let (js_commands, js_command_rx) = mpsc::channel::<JsCommand>(64);
208 let (js_event_tx, mut js_events) = mpsc::channel::<JsEvent>(64);
209 let js_task = tokio::task::spawn_local(js_runtime_actor(execution, js_command_rx, js_event_tx));
210
211 let emit_lifecycle_events = options.event_sink.is_some();
212 let event_start = options.event_stream_start.unwrap_or_else(Instant::now);
213
214 let mut state = RunState {
215 script_path,
216 workflow_cwd,
217 metadata,
218 event_start,
219 agent_provider: options.agent_provider,
220 model_map: options.model_map,
221 logs: Vec::new(),
222 phases: Vec::new(),
223 agent_calls: Vec::new(),
224 workflow_calls: Vec::new(),
225 budget: WorkflowBudgetSnapshot {
226 total: options.budget_total,
227 spent: options.budget_spent,
228 },
229 token_usage: WorkflowTokenUsage::default(),
230 token_usage_by_phase: Default::default(),
231 agent_runs: Vec::new(),
232 active_request_ids: BTreeSet::new(),
233 nesting_depth: options.nesting_depth,
234 max_parallel_agent_requests: options.max_parallel_agent_requests,
235 agent_runner: options
236 .agent_runner
237 .unwrap_or_else(|| Arc::new(DirectWorkflowAgentRunner)),
238 cancel_rx: options.cancel_rx,
239 event_sink: options.event_sink,
240 event_parent_step_id: options.event_parent_step_id,
241 session_log_sink: options.session_log_sink,
242 };
243
244 let mut pending_requests = VecDeque::<WorkflowRuntimeRequest>::new();
245 let mut agent_tasks = JoinSet::<AgentTaskCompletion>::new();
246 let mut sleep_tasks = JoinSet::<SleepTaskCompletion>::new();
247
248 if emit_lifecycle_events {
249 if let Err(error) = state
250 .emit_event(WorkflowEvent::started(rfc3339_now()?))
251 .await
252 {
253 let _ = send_js_command(&js_commands, JsCommand::Shutdown).await;
254 let _ = js_task.await;
255 return Err(error);
256 }
257 }
258
259 let workflow_result: anyhow::Result<RunWorkflowResult> = loop {
260 if let Err(error) = state
261 .start_pending_requests(
262 &mut pending_requests,
263 &mut agent_tasks,
264 &mut sleep_tasks,
265 &js_commands,
266 )
267 .await
268 {
269 break Err(error);
270 }
271
272 tokio::select! {
273 biased;
274 () = wait_for_cancellation(&mut state.cancel_rx) => {
275 break state.cancel_workflow(
276 &mut pending_requests,
277 &mut agent_tasks,
278 &mut sleep_tasks,
279 &js_commands,
280 &mut js_events,
281 ).await;
282 }
283 event = js_events.recv() => {
284 let event = match event {
285 Some(event) => event,
286 None => break Err(anyhow!("JavaScript runtime actor stopped unexpectedly")),
287 };
288 match state.handle_js_event(event, &mut pending_requests).await {
289 Ok(Some(result)) => break Ok(result),
290 Ok(None) => {}
291 Err(error) => break Err(error),
292 }
293 }
294 completion = agent_tasks.join_next(), if !agent_tasks.is_empty() => {
295 let completion = match completion {
296 Some(Ok(completion)) => completion,
297 Some(Err(error)) => break Err(anyhow!("agent provider task failed: {error}")),
298 None => break Err(anyhow!("agent task set ended unexpectedly")),
299 };
300 let AgentTaskCompletion { id, input, provider, result } = completion;
301 state.active_request_ids.remove(&id);
302 let resolution = match result {
303 Ok(result) => match state.apply_agent_result(&id, &input, provider, result).await {
304 Ok(value) => WorkflowRuntimeRequestResolution::OkWithBudget {
305 value,
306 budget: state.budget.clone(),
307 },
308 Err(error) => WorkflowRuntimeRequestResolution::Err {
309 message: error.to_string(),
310 },
311 },
312 Err(error) => {
313 let message = error.to_string();
314 if let Err(emit_error) = state.emit_agent_failed_event(&id, provider.as_deref(), &message).await {
315 log::debug!("failed to emit agent failure event: {emit_error:#}");
316 }
317 WorkflowRuntimeRequestResolution::Err { message }
318 },
319 };
320 if let Err(error) = send_js_command(&js_commands, JsCommand::ResolveRequest { id, resolution }).await {
321 break Err(error);
322 }
323 }
324 completion = sleep_tasks.join_next(), if !sleep_tasks.is_empty() => {
325 let completion = match completion {
326 Some(Ok(completion)) => completion,
327 Some(Err(error)) => break Err(anyhow!("sleep task failed: {error}")),
328 None => break Err(anyhow!("sleep task set ended unexpectedly")),
329 };
330 let SleepTaskCompletion { id, result } = completion;
331 state.active_request_ids.remove(&id);
332 let resolution = match result {
333 Ok(()) => WorkflowRuntimeRequestResolution::OkUndefined,
334 Err(error) => WorkflowRuntimeRequestResolution::Err {
335 message: error.to_string(),
336 },
337 };
338 if let Err(error) = send_js_command(&js_commands, JsCommand::ResolveRequest { id, resolution }).await {
339 break Err(error);
340 }
341 }
342 }
343 };
344
345 let _ = send_js_command(&js_commands, JsCommand::Shutdown).await;
346 let _ = js_task.await;
347
348 if emit_lifecycle_events {
349 match &workflow_result {
350 Ok(result) => {
351 state
352 .emit_event(WorkflowEvent::result(
353 result.token_usage.input_tokens,
354 result.token_usage.output_tokens,
355 result.token_usage.total_tokens,
356 result.output.result.clone(),
357 ))
358 .await?
359 }
360 Err(error) => {
361 state
362 .emit_event(WorkflowEvent::error(error.to_string(), None))
363 .await?;
364 }
365 }
366 }
367
368 workflow_result
369}
370
371enum JsCommand {
372 ResolveRequest {
373 id: String,
374 resolution: WorkflowRuntimeRequestResolution,
375 },
376 Shutdown,
377}
378
379enum JsEvent {
380 Call(WorkflowRuntimeCall),
381 Request(WorkflowRuntimeRequest),
382 Complete(WorkflowModuleOutput),
383 Error(String),
384}
385
386async fn js_runtime_actor(
387 mut execution: Box<dyn WorkflowRuntimeExecution>,
388 mut commands: mpsc::Receiver<JsCommand>,
389 events: mpsc::Sender<JsEvent>,
390) {
391 let mut outstanding_requests = 0usize;
392 loop {
393 match execution.poll() {
394 Ok(WorkflowRuntimePoll::Call(call)) => {
395 if events.send(JsEvent::Call(call)).await.is_err() {
396 return;
397 }
398 }
399 Ok(WorkflowRuntimePoll::Request(request)) => {
400 let requests = match execution.take_pending_requests() {
401 Ok(requests) if requests.is_empty() => vec![request],
402 Ok(requests) => requests,
403 Err(error) => {
404 let _ = events.send(JsEvent::Error(error.to_string())).await;
405 return;
406 }
407 };
408 outstanding_requests = outstanding_requests.saturating_add(requests.len());
409 for request in requests {
410 if events.send(JsEvent::Request(request)).await.is_err() {
411 return;
412 }
413 }
414 }
415 Ok(WorkflowRuntimePoll::Complete(output)) => {
416 let _ = events.send(JsEvent::Complete(output)).await;
417 return;
418 }
419 Ok(WorkflowRuntimePoll::Pending) => {
420 if outstanding_requests == 0 {
421 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
422 continue;
423 }
424 match commands.recv().await {
425 Some(JsCommand::ResolveRequest { id, resolution }) => {
426 outstanding_requests = outstanding_requests.saturating_sub(1);
427 if let Err(error) = execution.resolve_request(&id, resolution) {
428 let _ = events.send(JsEvent::Error(error.to_string())).await;
429 return;
430 }
431 }
432 Some(JsCommand::Shutdown) | None => return,
433 }
434 }
435 Err(error) => {
436 let _ = events.send(JsEvent::Error(error.to_string())).await;
437 return;
438 }
439 }
440 }
441}
442
443async fn send_js_command(
444 commands: &mpsc::Sender<JsCommand>,
445 command: JsCommand,
446) -> anyhow::Result<()> {
447 commands
448 .send(command)
449 .await
450 .map_err(|_| anyhow!("JavaScript runtime actor stopped unexpectedly"))
451}
452
453struct RunState {
454 script_path: PathBuf,
455 workflow_cwd: Option<PathBuf>,
456 metadata: WorkflowMetadata,
457 event_start: Instant,
458 agent_provider: Arc<dyn AgentProvider>,
459 model_map: BTreeMap<String, String>,
460 logs: Vec<Vec<Value>>,
461 phases: Vec<WorkflowPhaseCall>,
462 agent_calls: Vec<WorkflowRuntimeRequest>,
463 workflow_calls: Vec<WorkflowRuntimeRequest>,
464 budget: WorkflowBudgetSnapshot,
465 token_usage: WorkflowTokenUsage,
466 token_usage_by_phase: std::collections::BTreeMap<String, WorkflowTokenUsage>,
467 agent_runs: Vec<WorkflowAgentRunSummary>,
468 active_request_ids: BTreeSet<String>,
469 nesting_depth: usize,
470 max_parallel_agent_requests: Option<usize>,
471 agent_runner: Arc<dyn WorkflowAgentRunner>,
472 cancel_rx: Option<watch::Receiver<bool>>,
473 event_sink: Option<Arc<dyn WorkflowEventSink>>,
474 event_parent_step_id: Option<String>,
475 session_log_sink: Option<Arc<dyn AgentSessionLogSink>>,
476}
477
478struct PreparedAgentRun {
479 provider_override: Option<String>,
480 input: AgentProviderRunInput,
481}
482
483struct AgentTaskCompletion {
484 id: String,
485 input: AgentProviderRunInput,
486 provider: Option<String>,
487 result: anyhow::Result<AgentProviderResult>,
488}
489
490struct SleepTaskCompletion {
491 id: String,
492 result: anyhow::Result<()>,
493}
494
495fn add_usage(total: &mut WorkflowTokenUsage, usage: Option<&AgentUsage>) {
496 let Some(usage) = usage else {
497 return;
498 };
499
500 total.input_tokens = total
501 .input_tokens
502 .saturating_add(usage.input_tokens.unwrap_or_default());
503 total.output_tokens = total
504 .output_tokens
505 .saturating_add(usage.output_tokens.unwrap_or_default());
506 total.cache_read_tokens = total
507 .cache_read_tokens
508 .saturating_add(usage.cache_read_tokens.unwrap_or_default());
509 total.cache_write_tokens = total
510 .cache_write_tokens
511 .saturating_add(usage.cache_write_tokens.unwrap_or_default());
512 total.total_tokens = total
513 .total_tokens
514 .saturating_add(usage.total_tokens.unwrap_or_default());
515
516 if let Some(cost) = usage.cost.as_ref() {
517 total.cost = Some(merge_cost(total.cost.as_ref(), cost));
518 }
519}
520
521fn merge_token_usage(total: &mut WorkflowTokenUsage, usage: &WorkflowTokenUsage) {
522 total.input_tokens = total.input_tokens.saturating_add(usage.input_tokens);
523 total.output_tokens = total.output_tokens.saturating_add(usage.output_tokens);
524 total.cache_read_tokens = total
525 .cache_read_tokens
526 .saturating_add(usage.cache_read_tokens);
527 total.cache_write_tokens = total
528 .cache_write_tokens
529 .saturating_add(usage.cache_write_tokens);
530 total.total_tokens = total.total_tokens.saturating_add(usage.total_tokens);
531 if let Some(cost) = usage.cost.as_ref() {
532 total.cost = Some(merge_cost(total.cost.as_ref(), cost));
533 }
534}
535
536fn merge_cost(left: Option<&AgentUsageCost>, right: &AgentUsageCost) -> AgentUsageCost {
537 AgentUsageCost {
538 input: sum_f64(left.and_then(|cost| cost.input), right.input),
539 output: sum_f64(left.and_then(|cost| cost.output), right.output),
540 cache_read: sum_f64(left.and_then(|cost| cost.cache_read), right.cache_read),
541 cache_write: sum_f64(left.and_then(|cost| cost.cache_write), right.cache_write),
542 total: sum_f64(left.and_then(|cost| cost.total), right.total),
543 currency: right
544 .currency
545 .clone()
546 .or_else(|| left.and_then(|cost| cost.currency.clone())),
547 }
548}
549
550fn elapsed_nanos(start: Instant) -> u64 {
551 u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX)
552}
553
554fn rfc3339_now() -> anyhow::Result<String> {
555 Ok(time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339)?)
556}
557
558fn raw_agent_event_payloads(raw: &Value) -> Vec<Value> {
559 if let Some(events) = raw.get("events").and_then(Value::as_array) {
560 events.clone()
561 } else if let Some(items) = raw.as_array() {
562 items.clone()
563 } else {
564 vec![raw.clone()]
565 }
566}
567
568fn agent_session_event_payload(provider_event: Value, metadata: &WorkflowEventMetadata) -> Value {
569 let mut payload = serde_json::Map::new();
570 if let Some(provider) = metadata.provider.as_ref() {
571 payload.insert("provider".to_string(), Value::String(provider.clone()));
572 }
573 if let Some(session_id) = metadata.session_id.as_ref() {
574 payload.insert("sessionId".to_string(), Value::String(session_id.clone()));
575 }
576 if let Some(run_id) = metadata.run_id.as_ref() {
577 payload.insert("runId".to_string(), Value::String(run_id.clone()));
578 }
579 if let Some(step_id) = metadata.step_id.as_ref() {
580 payload.insert("stepId".to_string(), Value::String(step_id.clone()));
581 }
582 payload.insert("providerEvent".to_string(), provider_event);
583 Value::Object(payload)
584}
585
586fn truncate_for_event(value: &str, max_chars: usize) -> String {
587 let mut chars = value.chars();
588 let truncated = chars.by_ref().take(max_chars).collect::<String>();
589 if chars.next().is_some() {
590 format!("{truncated}…")
591 } else {
592 truncated
593 }
594}
595
596fn format_log_message(values: &[Value]) -> String {
597 values
598 .iter()
599 .map(|value| match value {
600 Value::String(value) => value.clone(),
601 value => serde_json::to_string(value).unwrap_or_else(|_| String::from("<unprintable>")),
602 })
603 .collect::<Vec<_>>()
604 .join(" ")
605}
606
607fn sum_f64(left: Option<f64>, right: Option<f64>) -> Option<f64> {
608 match (left, right) {
609 (None, None) => None,
610 (left, right) => Some(left.unwrap_or_default() + right.unwrap_or_default()),
611 }
612}
613
614async fn wait_for_cancellation(cancel_rx: &mut Option<watch::Receiver<bool>>) {
615 let Some(cancel_rx) = cancel_rx else {
616 std::future::pending::<()>().await;
617 return;
618 };
619 while !*cancel_rx.borrow() {
620 if cancel_rx.changed().await.is_err() {
621 return;
622 }
623 }
624}
625
626impl RunState {
627 async fn handle_js_event(
628 &mut self,
629 event: JsEvent,
630 pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
631 ) -> anyhow::Result<Option<RunWorkflowResult>> {
632 match event {
633 JsEvent::Call(call) => self.handle_call(call).await?,
634 JsEvent::Request(request) => {
635 log::debug!(
636 "workflow runtime request id={} kind={}",
637 request.id(),
638 request.kind()
639 );
640 pending_requests.push_back(request);
641 }
642 JsEvent::Complete(output) => {
643 log::debug!(
644 "run_workflow complete script={} budget_spent={}",
645 self.script_path.display(),
646 self.budget.spent
647 );
648 return Ok(Some(RunWorkflowResult {
649 output,
650 logs: std::mem::take(&mut self.logs),
651 phases: std::mem::take(&mut self.phases),
652 agent_calls: std::mem::take(&mut self.agent_calls),
653 workflow_calls: std::mem::take(&mut self.workflow_calls),
654 budget: self.budget.clone(),
655 token_usage: std::mem::take(&mut self.token_usage),
656 token_usage_by_phase: std::mem::take(&mut self.token_usage_by_phase),
657 agent_runs: std::mem::take(&mut self.agent_runs),
658 }));
659 }
660 JsEvent::Error(message) => bail!(message),
661 }
662 Ok(None)
663 }
664
665 async fn start_pending_requests(
666 &mut self,
667 pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
668 agent_tasks: &mut JoinSet<AgentTaskCompletion>,
669 sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
670 js_commands: &mpsc::Sender<JsCommand>,
671 ) -> anyhow::Result<()> {
672 loop {
673 let Some(request) = pending_requests.front() else {
674 return Ok(());
675 };
676 if matches!(request, WorkflowRuntimeRequest::Agent { .. })
677 && !self.agent_capacity_available(agent_tasks.len())
678 {
679 return Ok(());
680 }
681
682 let request = pending_requests
683 .pop_front()
684 .expect("pending request should exist");
685 match request {
686 WorkflowRuntimeRequest::Agent { .. } => match self.prepare_agent_request(request) {
687 Ok((id, prepared)) => {
688 self.emit_agent_started_event(&id, &prepared).await?;
689 self.spawn_agent_task(agent_tasks, id, prepared);
690 }
691 Err((id, error)) => {
692 send_js_command(
693 js_commands,
694 JsCommand::ResolveRequest {
695 id,
696 resolution: WorkflowRuntimeRequestResolution::Err {
697 message: error.to_string(),
698 },
699 },
700 )
701 .await?;
702 }
703 },
704 WorkflowRuntimeRequest::Sleep { id, duration_ms } => {
705 self.spawn_sleep_task(sleep_tasks, id, duration_ms);
706 }
707 WorkflowRuntimeRequest::Workflow {
708 id,
709 workflow_ref,
710 args,
711 } => {
712 self.workflow_calls.push(WorkflowRuntimeRequest::Workflow {
713 id: id.clone(),
714 workflow_ref: workflow_ref.clone(),
715 args: args.clone(),
716 });
717 let parent_event_step_id = self.event_step_id(&id);
718 let resolution = match self
719 .handle_workflow(parent_event_step_id, workflow_ref, args)
720 .await
721 {
722 Ok(value) => WorkflowRuntimeRequestResolution::OkWithBudget {
723 value,
724 budget: self.budget.clone(),
725 },
726 Err(error) => WorkflowRuntimeRequestResolution::Err {
727 message: error.to_string(),
728 },
729 };
730 send_js_command(js_commands, JsCommand::ResolveRequest { id, resolution })
731 .await?;
732 }
733 }
734 }
735 }
736
737 async fn cancel_workflow(
738 &mut self,
739 pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
740 agent_tasks: &mut JoinSet<AgentTaskCompletion>,
741 sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
742 js_commands: &mpsc::Sender<JsCommand>,
743 js_events: &mut mpsc::Receiver<JsEvent>,
744 ) -> anyhow::Result<RunWorkflowResult> {
745 log::debug!(
746 "workflow cancellation requested script={}",
747 self.script_path.display()
748 );
749
750 if pending_requests.is_empty()
751 && self.active_request_ids.is_empty()
752 && agent_tasks.is_empty()
753 && sleep_tasks.is_empty()
754 && self
755 .reject_next_runtime_request_for_cancellation(js_commands, js_events)
756 .await
757 {
758 bail!("workflow cancelled");
759 }
760
761 self.reject_pending_requests_for_cancellation(pending_requests, js_commands)
762 .await;
763 sleep_tasks.abort_all();
764 self.reject_active_sleep_requests_for_cancellation(sleep_tasks, js_commands)
765 .await;
766
767 if self.session_log_sink.is_some() {
768 while let Some(completion) = agent_tasks.join_next().await {
769 match completion {
770 Ok(AgentTaskCompletion {
771 id,
772 input,
773 provider,
774 result: Ok(result),
775 }) => {
776 self.active_request_ids.remove(&id);
777 if let Err(error) = self
778 .emit_agent_result_events(&id, provider.as_deref(), &result)
779 .await
780 {
781 log::debug!("failed to emit drained agent events during cancellation: {error:#}");
782 }
783 if let Err(error) = self
784 .emit_agent_completed_event(&id, provider.as_deref(), &result)
785 .await
786 {
787 log::debug!("failed to emit drained agent completion during cancellation: {error:#}");
788 }
789 self.record_agent_run(&id, &input, provider, &result);
790 self.reject_request_for_cancellation(id, js_commands).await;
791 }
792 Ok(AgentTaskCompletion {
793 id,
794 provider,
795 result: Err(error),
796 ..
797 }) => {
798 self.active_request_ids.remove(&id);
799 let message = error.to_string();
800 if let Err(error) = self
801 .emit_agent_failed_event(&id, provider.as_deref(), &message)
802 .await
803 {
804 log::debug!("failed to emit drained agent failure during cancellation: {error:#}");
805 }
806 log::debug!("agent task failed while draining cancellation: {message}");
807 self.reject_request_for_cancellation(id, js_commands).await;
808 }
809 Err(error) => {
810 log::debug!("agent task join failed while draining cancellation: {error}");
811 }
812 }
813 }
814 } else {
815 let ids: Vec<String> = self.active_request_ids.iter().cloned().collect();
816 agent_tasks.abort_all();
817 for id in ids {
818 self.active_request_ids.remove(&id);
819 self.reject_request_for_cancellation(id, js_commands).await;
820 }
821 }
822
823 self.reject_remaining_active_requests_for_cancellation(js_commands)
824 .await;
825 self.drain_runtime_after_cancellation(js_events).await;
826 let _ = send_js_command(js_commands, JsCommand::Shutdown).await;
827 bail!("workflow cancelled")
828 }
829
830 async fn reject_next_runtime_request_for_cancellation(
831 &mut self,
832 js_commands: &mpsc::Sender<JsCommand>,
833 js_events: &mut mpsc::Receiver<JsEvent>,
834 ) -> bool {
835 loop {
836 match js_events.recv().await {
837 Some(JsEvent::Call(call)) => {
838 let _ = self.handle_call(call).await;
839 }
840 Some(JsEvent::Request(request)) => {
841 self.reject_request_for_cancellation(request.id().to_string(), js_commands)
842 .await;
843 return false;
844 }
845 Some(JsEvent::Complete(_)) | Some(JsEvent::Error(_)) | None => return true,
846 }
847 }
848 }
849
850 async fn reject_pending_requests_for_cancellation(
851 &mut self,
852 pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
853 js_commands: &mpsc::Sender<JsCommand>,
854 ) {
855 while let Some(request) = pending_requests.pop_front() {
856 self.reject_request_for_cancellation(request.id().to_string(), js_commands)
857 .await;
858 }
859 }
860
861 async fn reject_active_sleep_requests_for_cancellation(
862 &mut self,
863 sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
864 js_commands: &mpsc::Sender<JsCommand>,
865 ) {
866 while let Some(completion) = sleep_tasks.join_next().await {
867 if let Ok(SleepTaskCompletion { id, .. }) = completion {
868 self.active_request_ids.remove(&id);
869 self.reject_request_for_cancellation(id, js_commands).await;
870 }
871 }
872 }
873
874 async fn reject_remaining_active_requests_for_cancellation(
875 &mut self,
876 js_commands: &mpsc::Sender<JsCommand>,
877 ) {
878 let ids: Vec<String> = self.active_request_ids.iter().cloned().collect();
879 for id in ids {
880 self.active_request_ids.remove(&id);
881 self.reject_request_for_cancellation(id, js_commands).await;
882 }
883 }
884
885 async fn reject_request_for_cancellation(
886 &self,
887 id: String,
888 js_commands: &mpsc::Sender<JsCommand>,
889 ) {
890 let _ = send_js_command(
891 js_commands,
892 JsCommand::ResolveRequest {
893 id,
894 resolution: WorkflowRuntimeRequestResolution::Err {
895 message: "workflow cancelled".to_string(),
896 },
897 },
898 )
899 .await;
900 }
901
902 async fn drain_runtime_after_cancellation(&mut self, js_events: &mut mpsc::Receiver<JsEvent>) {
903 while let Some(event) = js_events.recv().await {
904 match event {
905 JsEvent::Call(call) => {
906 let _ = self.handle_call(call).await;
907 }
908 JsEvent::Request(request) => {
909 log::debug!(
910 "ignoring request after cancellation id={} kind={}",
911 request.id(),
912 request.kind()
913 );
914 }
915 JsEvent::Complete(_) | JsEvent::Error(_) => break,
916 }
917 }
918 }
919
920 fn event_step_id(&self, runtime_request_id: &str) -> String {
921 let parent = self.event_parent_step_id.as_deref().unwrap_or("");
922 let hash = blake3::hash(
923 format!("{parent}:{}:{runtime_request_id}", self.nesting_depth).as_bytes(),
924 );
925 format!("step_{}", &hash.to_hex()[..16])
926 }
927
928 async fn emit_event(&self, mut event: WorkflowEvent) -> anyhow::Result<()> {
929 if (event.event_type.as_str() != "workflow.started" || self.nesting_depth > 0)
930 && event.elapsed_nanos.is_none()
931 {
932 event.elapsed_nanos = Some(elapsed_nanos(self.event_start));
933 }
934 let metadata = event
935 .metadata
936 .get_or_insert_with(WorkflowEventMetadata::default);
937 if metadata.workflow_depth.is_none() {
938 metadata.workflow_depth = Some(u32::try_from(self.nesting_depth).unwrap_or(u32::MAX));
939 }
940 if metadata.parent_step_id.is_none() {
941 metadata.parent_step_id = self.event_parent_step_id.clone();
942 }
943 if let Some(event_sink) = self.event_sink.as_ref() {
944 event_sink.emit(event).await?;
945 }
946 Ok(())
947 }
948
949 async fn handle_call(&mut self, call: WorkflowRuntimeCall) -> anyhow::Result<()> {
950 match call {
951 WorkflowRuntimeCall::Log { values } => {
952 self.emit_event(WorkflowEvent::log(format_log_message(&values)))
953 .await?;
954 self.logs.push(values);
955 }
956 WorkflowRuntimeCall::Phase { name, options } => {
957 let phase = WorkflowPhaseCall { name, options };
958 self.emit_event(WorkflowEvent::phase(
959 phase.name.clone(),
960 phase.options.clone(),
961 ))
962 .await?;
963 self.phases.push(phase);
964 }
965 }
966 Ok(())
967 }
968
969 fn agent_capacity_available(&self, in_flight: usize) -> bool {
970 let max_parallel = self
971 .max_parallel_agent_requests
972 .filter(|value| *value > 0)
973 .unwrap_or(usize::MAX);
974 in_flight < max_parallel
975 }
976
977 fn prepare_agent_request(
978 &mut self,
979 request: WorkflowRuntimeRequest,
980 ) -> Result<(String, PreparedAgentRun), (String, anyhow::Error)> {
981 match request {
982 WorkflowRuntimeRequest::Agent {
983 id,
984 prompt,
985 options,
986 } => {
987 self.agent_calls.push(WorkflowRuntimeRequest::Agent {
988 id: id.clone(),
989 prompt: prompt.clone(),
990 options: options.clone(),
991 });
992 match self.prepare_agent_run(prompt, options) {
993 Ok(prepared) => Ok((id, prepared)),
994 Err(error) => Err((id, error)),
995 }
996 }
997 WorkflowRuntimeRequest::Workflow { .. } | WorkflowRuntimeRequest::Sleep { .. } => {
998 unreachable!("prepare_agent_request only accepts agent requests")
999 }
1000 }
1001 }
1002
1003 fn spawn_agent_task(
1004 &mut self,
1005 agent_tasks: &mut JoinSet<AgentTaskCompletion>,
1006 id: String,
1007 prepared: PreparedAgentRun,
1008 ) {
1009 let default_provider_name = self.agent_provider.name().to_string();
1010 let default_provider = Arc::clone(&self.agent_provider);
1011 let agent_runner = Arc::clone(&self.agent_runner);
1012 let retry_in_runtime = agent_runner.retry_in_runtime();
1013 let cancel_rx = self.cancel_rx.clone();
1014 let completion_input = prepared.input.clone();
1015 let completion_provider = prepared
1016 .provider_override
1017 .clone()
1018 .or(Some(default_provider_name));
1019 let session_log_sink = self.session_log_sink.clone();
1020 let max_parallel = self
1021 .max_parallel_agent_requests
1022 .filter(|value| *value > 0)
1023 .unwrap_or(usize::MAX);
1024 log::debug!(
1025 "starting agent request id={} in_flight_after_start={} max_parallel={}",
1026 id,
1027 agent_tasks.len() + 1,
1028 max_parallel
1029 );
1030 self.active_request_ids.insert(id.clone());
1031 agent_tasks.spawn(async move {
1032 let result = if retry_in_runtime {
1033 run_agent_runner_with_retry(
1034 Arc::clone(&agent_runner),
1035 default_provider,
1036 prepared.provider_override,
1037 prepared.input,
1038 cancel_rx,
1039 )
1040 .await
1041 } else {
1042 agent_runner
1043 .run_agent(default_provider, prepared.provider_override, prepared.input)
1044 .await
1045 };
1046 let result = match result {
1047 Ok(result) => {
1048 if let Some(session_log_sink) = session_log_sink.as_ref() {
1049 let provider_name = completion_provider
1050 .as_deref()
1051 .expect("completion provider should always be set");
1052 match session_log_sink
1053 .write_agent_result(provider_name, &result)
1054 .await
1055 {
1056 Ok(()) => Ok(result),
1057 Err(error) => Err(error),
1058 }
1059 } else {
1060 Ok(result)
1061 }
1062 }
1063 Err(error) => Err(error),
1064 };
1065 AgentTaskCompletion {
1066 id,
1067 input: completion_input,
1068 provider: completion_provider,
1069 result,
1070 }
1071 });
1072 }
1073
1074 fn spawn_sleep_task(
1075 &mut self,
1076 sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
1077 id: String,
1078 duration_ms: u64,
1079 ) {
1080 let agent_runner = Arc::clone(&self.agent_runner);
1081 log::debug!(
1082 "starting sleep request id={} duration_ms={}",
1083 id,
1084 duration_ms
1085 );
1086 self.active_request_ids.insert(id.clone());
1087 sleep_tasks.spawn(async move {
1088 SleepTaskCompletion {
1089 id,
1090 result: agent_runner.sleep(duration_ms).await,
1091 }
1092 });
1093 }
1094
1095 fn prepare_agent_run(
1096 &self,
1097 prompt: String,
1098 options: Option<Value>,
1099 ) -> anyhow::Result<PreparedAgentRun> {
1100 let options = apply_phase_defaults(options, &self.metadata);
1101 let context = AgentProviderContext {
1102 phase: options
1103 .as_ref()
1104 .and_then(|options| options.get("phase"))
1105 .and_then(Value::as_str)
1106 .map(ToString::to_string),
1107 cwd: self.workflow_cwd.clone(),
1108 };
1109 let provider_override = options
1110 .as_ref()
1111 .and_then(|options| options.get("provider"))
1112 .and_then(Value::as_str)
1113 .map(ToString::to_string);
1114 let provider_name = provider_override
1115 .as_deref()
1116 .unwrap_or_else(|| self.agent_provider.name());
1117 let options = resolve_model_options(options, provider_name, &self.model_map)?;
1118 agent_retry_policy(&options)?;
1119 log::debug!(
1120 "agent call provider={} phase={:?} model={:?} prompt_len={}",
1121 provider_name,
1122 context.phase.as_deref(),
1123 options
1124 .as_ref()
1125 .and_then(|options| options.get("model"))
1126 .and_then(Value::as_str),
1127 prompt.len()
1128 );
1129 Ok(PreparedAgentRun {
1130 provider_override,
1131 input: AgentProviderRunInput {
1132 prompt,
1133 options,
1134 environment: Arc::new(LocalExecutionEnvironment::new(context.cwd.clone())?),
1135 context,
1136 },
1137 })
1138 }
1139
1140 async fn emit_agent_started_event(
1141 &self,
1142 id: &str,
1143 prepared: &PreparedAgentRun,
1144 ) -> anyhow::Result<()> {
1145 let provider = prepared
1146 .provider_override
1147 .as_deref()
1148 .unwrap_or_else(|| self.agent_provider.name());
1149 let metadata = self.agent_event_metadata(id, Some(provider), None);
1150 self.emit_event(WorkflowEvent::agent_started(
1151 serde_json::json!({
1152 "phase": prepared.input.context.phase,
1153 "promptPreview": truncate_for_event(&prepared.input.prompt, 200),
1154 }),
1155 metadata,
1156 ))
1157 .await
1158 }
1159
1160 async fn apply_agent_result(
1161 &mut self,
1162 id: &str,
1163 input: &AgentProviderRunInput,
1164 provider: Option<String>,
1165 result: AgentProviderResult,
1166 ) -> anyhow::Result<Value> {
1167 if let Some(output_tokens) = result.usage.as_ref().and_then(|usage| usage.output_tokens) {
1168 self.budget.spent = self.budget.spent.saturating_add(output_tokens);
1169 }
1170 self.emit_agent_result_events(id, provider.as_deref(), &result)
1171 .await?;
1172 self.emit_agent_completed_event(id, provider.as_deref(), &result)
1173 .await?;
1174 self.record_agent_run(id, input, provider, &result);
1175 log::debug!(
1176 "agent call complete session_id={:?} output_tokens={:?} budget_spent={}",
1177 result.session_id,
1178 result.usage.as_ref().and_then(|usage| usage.output_tokens),
1179 self.budget.spent
1180 );
1181 Ok(result.output)
1182 }
1183
1184 async fn emit_agent_result_events(
1185 &self,
1186 id: &str,
1187 provider: Option<&str>,
1188 result: &AgentProviderResult,
1189 ) -> anyhow::Result<()> {
1190 let Some(raw) = result.raw.as_ref() else {
1191 return Ok(());
1192 };
1193 let metadata = self.agent_event_metadata(id, provider, result.session_id.clone());
1194 for provider_event in raw_agent_event_payloads(raw) {
1195 let event_data = agent_session_event_payload(provider_event, &metadata);
1196 self.emit_event(WorkflowEvent::agent_event(event_data, metadata.clone()))
1197 .await?;
1198 }
1199 Ok(())
1200 }
1201
1202 async fn emit_agent_completed_event(
1203 &self,
1204 id: &str,
1205 provider: Option<&str>,
1206 result: &AgentProviderResult,
1207 ) -> anyhow::Result<()> {
1208 let metadata = self.agent_event_metadata(id, provider, result.session_id.clone());
1209 self.emit_event(WorkflowEvent::agent_completed(
1210 serde_json::json!({
1211 "sessionId": result.session_id,
1212 "model": result.model,
1213 "usage": result.usage,
1214 }),
1215 metadata,
1216 ))
1217 .await
1218 }
1219
1220 async fn emit_agent_failed_event(
1221 &self,
1222 id: &str,
1223 provider: Option<&str>,
1224 message: &str,
1225 ) -> anyhow::Result<()> {
1226 let metadata = self.agent_event_metadata(id, provider, None);
1227 self.emit_event(WorkflowEvent::agent_failed(
1228 serde_json::json!({ "message": message }),
1229 metadata,
1230 ))
1231 .await
1232 }
1233
1234 fn agent_event_metadata(
1235 &self,
1236 id: &str,
1237 provider: Option<&str>,
1238 session_id: Option<String>,
1239 ) -> WorkflowEventMetadata {
1240 WorkflowEventMetadata {
1241 run_id: None,
1242 step_id: Some(self.event_step_id(id)),
1243 provider: Some(
1244 provider
1245 .unwrap_or_else(|| self.agent_provider.name())
1246 .to_string(),
1247 ),
1248 session_id,
1249 workflow_depth: None,
1250 parent_step_id: None,
1251 }
1252 }
1253
1254 fn record_agent_run(
1255 &mut self,
1256 id: &str,
1257 input: &AgentProviderRunInput,
1258 provider: Option<String>,
1259 result: &AgentProviderResult,
1260 ) {
1261 add_usage(&mut self.token_usage, result.usage.as_ref());
1262 if let Some(phase) = input.context.phase.as_ref() {
1263 let phase_usage = self.token_usage_by_phase.entry(phase.clone()).or_default();
1264 add_usage(phase_usage, result.usage.as_ref());
1265 }
1266 let model = result.model.clone().or_else(|| {
1267 input
1268 .options
1269 .as_ref()
1270 .and_then(|options| options.get("model"))
1271 .and_then(Value::as_str)
1272 .map(ToString::to_string)
1273 });
1274 self.agent_runs.push(WorkflowAgentRunSummary {
1275 id: id.to_string(),
1276 phase: input.context.phase.clone(),
1277 provider,
1278 model,
1279 provider_session_id: result.session_id.clone(),
1280 usage: result.usage.clone(),
1281 isolation: result.isolation.clone(),
1282 });
1283 }
1284
1285 async fn handle_workflow(
1286 &mut self,
1287 parent_step_id: String,
1288 workflow_ref: WorkflowRef,
1289 args: Option<Value>,
1290 ) -> anyhow::Result<Value> {
1291 if self.nesting_depth >= 1 {
1292 bail!("Nested workflow() calls are limited to one level");
1293 }
1294 let script_path = match workflow_ref {
1295 WorkflowRef::ScriptPath { script_path } => {
1296 resolve_relative_script(&self.script_path, &script_path)
1297 }
1298 WorkflowRef::Name(name) => resolve_named_workflow(&name)?,
1299 };
1300 log::debug!("child workflow call script={}", script_path.display());
1301 let child = Box::pin(run_workflow_inner(RunWorkflowOptions {
1302 script_path,
1303 workflow_cwd: self.workflow_cwd.clone(),
1304 args: args.unwrap_or(Value::Null),
1305 agent_provider: Arc::clone(&self.agent_provider),
1306 model_map: self.model_map.clone(),
1307 budget_total: self.budget.total,
1308 budget_spent: self.budget.spent,
1309 nesting_depth: self.nesting_depth + 1,
1310 max_parallel_agent_requests: self.max_parallel_agent_requests,
1311 agent_runner: Some(Arc::clone(&self.agent_runner)),
1312 cancel_rx: self.cancel_rx.clone(),
1313 event_sink: self.event_sink.clone(),
1314 event_parent_step_id: Some(parent_step_id),
1315 event_stream_start: Some(self.event_start),
1316 session_log_sink: self.session_log_sink.clone(),
1317 }))
1318 .await?;
1319 self.budget = child.budget;
1320 self.logs.extend(child.logs);
1321 self.phases.extend(child.phases);
1322 self.agent_calls.extend(child.agent_calls);
1323 self.workflow_calls.extend(child.workflow_calls);
1324 merge_token_usage(&mut self.token_usage, &child.token_usage);
1325 for (phase, usage) in child.token_usage_by_phase {
1326 merge_token_usage(self.token_usage_by_phase.entry(phase).or_default(), &usage);
1327 }
1328 self.agent_runs.extend(child.agent_runs);
1329 Ok(child.output.result)
1330 }
1331}
1332
1333async fn run_agent_runner_with_retry(
1334 agent_runner: Arc<dyn WorkflowAgentRunner>,
1335 default_provider: Arc<dyn AgentProvider>,
1336 provider_override: Option<String>,
1337 input: AgentProviderRunInput,
1338 mut cancel_rx: Option<watch::Receiver<bool>>,
1339) -> anyhow::Result<AgentProviderResult> {
1340 let retry = agent_retry_policy(&input.options)?;
1341 let mut final_result = None;
1342 for attempt in 1..=retry.max_attempts {
1343 let attempt_result = agent_runner
1344 .run_agent(
1345 Arc::clone(&default_provider),
1346 provider_override.clone(),
1347 input.clone(),
1348 )
1349 .await;
1350 match attempt_result {
1351 Ok(result) => {
1352 final_result = Some(Ok(result));
1353 break;
1354 }
1355 Err(error) if attempt < retry.max_attempts => {
1356 log::debug!(
1357 "agent call failed on attempt {attempt}/{}; retrying after {}ms: {error:#}",
1358 retry.max_attempts,
1359 retry.backoff_ms
1360 );
1361 sleep_retry_backoff(retry.backoff_ms, &mut cancel_rx).await?;
1362 }
1363 Err(error) => {
1364 final_result = Some(Err(error));
1365 break;
1366 }
1367 }
1368 }
1369 final_result.unwrap_or_else(|| Err(anyhow!("agent retry loop finished without a result")))
1370}
1371
1372async fn sleep_retry_backoff(
1373 backoff_ms: u64,
1374 cancel_rx: &mut Option<watch::Receiver<bool>>,
1375) -> anyhow::Result<()> {
1376 if backoff_ms == 0 {
1377 return Ok(());
1378 }
1379 let Some(cancel_rx) = cancel_rx.as_mut() else {
1380 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1381 return Ok(());
1382 };
1383 if *cancel_rx.borrow() {
1384 bail!("workflow cancelled");
1385 }
1386 let sleep = tokio::time::sleep(Duration::from_millis(backoff_ms));
1387 tokio::pin!(sleep);
1388 loop {
1389 tokio::select! {
1390 _ = &mut sleep => return Ok(()),
1391 changed = cancel_rx.changed() => {
1392 match changed {
1393 Ok(()) if *cancel_rx.borrow() => bail!("workflow cancelled"),
1394 Ok(()) => continue,
1395 Err(_) => {
1396 sleep.await;
1397 return Ok(());
1398 }
1399 }
1400 }
1401 }
1402 }
1403}
1404
1405pub(crate) async fn run_agent_provider_with_retry(
1406 default_provider: Arc<dyn AgentProvider>,
1407 provider_override: Option<String>,
1408 input: AgentProviderRunInput,
1409 mut cancel_rx: Option<watch::Receiver<bool>>,
1410) -> anyhow::Result<AgentProviderResult> {
1411 let retry = agent_retry_policy(&input.options)?;
1412 let provider = resolve_agent_provider(default_provider, provider_override)?;
1413 let mut final_result = None;
1414 for attempt in 1..=retry.max_attempts {
1415 let attempt_result =
1416 run_agent_with_optional_isolation(Arc::clone(&provider), input.clone()).await;
1417 match attempt_result {
1418 Ok(result) => {
1419 final_result = Some(Ok(result));
1420 break;
1421 }
1422 Err(error) if attempt < retry.max_attempts => {
1423 log::debug!(
1424 "agent provider failed on attempt {attempt}/{}; retrying after {}ms: {error:#}",
1425 retry.max_attempts,
1426 retry.backoff_ms
1427 );
1428 sleep_retry_backoff(retry.backoff_ms, &mut cancel_rx).await?;
1429 }
1430 Err(error) => {
1431 final_result = Some(Err(error));
1432 break;
1433 }
1434 }
1435 }
1436 final_result.unwrap_or_else(|| Err(anyhow!("agent retry loop finished without a result")))
1437}
1438
1439pub(crate) async fn run_agent_provider(
1440 default_provider: Arc<dyn AgentProvider>,
1441 provider_override: Option<String>,
1442 input: AgentProviderRunInput,
1443) -> anyhow::Result<AgentProviderResult> {
1444 let provider = resolve_agent_provider(default_provider, provider_override)?;
1445 run_agent_with_optional_isolation(provider, input).await
1446}
1447
1448fn resolve_agent_provider(
1449 default_provider: Arc<dyn AgentProvider>,
1450 provider_override: Option<String>,
1451) -> anyhow::Result<Arc<dyn AgentProvider>> {
1452 if let Some(provider_override) = provider_override {
1453 Ok(Arc::from(create_agent_provider(&provider_override)?))
1454 } else {
1455 Ok(default_provider)
1456 }
1457}
1458
1459#[derive(Debug, Clone, Copy)]
1460pub(crate) struct AgentRetryPolicy {
1461 pub max_attempts: u32,
1462 pub backoff_ms: u64,
1463}
1464
1465pub(crate) fn agent_retry_policy(options: &Option<Value>) -> anyhow::Result<AgentRetryPolicy> {
1466 let default = AgentRetryPolicy {
1467 max_attempts: 1,
1468 backoff_ms: 0,
1469 };
1470 let Some(retry) = options.as_ref().and_then(|options| options.get("retry")) else {
1471 return Ok(default);
1472 };
1473 if retry.is_null() {
1474 return Ok(default);
1475 }
1476 let object = retry
1477 .as_object()
1478 .ok_or_else(|| anyhow!("agent retry option must be an object"))?;
1479 let max_attempts = match object.get("maxAttempts") {
1480 Some(value) => {
1481 let value = value
1482 .as_u64()
1483 .ok_or_else(|| anyhow!("agent retry.maxAttempts must be a positive integer"))?;
1484 if value == 0 || value > u32::MAX as u64 {
1485 bail!("agent retry.maxAttempts must be between 1 and {}", u32::MAX);
1486 }
1487 value as u32
1488 }
1489 None => default.max_attempts,
1490 };
1491 let backoff_ms = match object.get("backoffMs") {
1492 Some(value) => value
1493 .as_u64()
1494 .ok_or_else(|| anyhow!("agent retry.backoffMs must be a non-negative integer"))?,
1495 None => default.backoff_ms,
1496 };
1497 Ok(AgentRetryPolicy {
1498 max_attempts,
1499 backoff_ms,
1500 })
1501}
1502
1503async fn run_agent_with_optional_isolation(
1504 provider: Arc<dyn AgentProvider>,
1505 input: AgentProviderRunInput,
1506) -> anyhow::Result<AgentProviderResult> {
1507 if let Some(request) = sandbox_isolation_request(&input.options)? {
1508 return run_agent_with_sandbox_isolation(provider, input, request).await;
1509 }
1510
1511 if requests_worktree_isolation(&input.options) {
1512 return run_agent_with_worktree_isolation(provider, input).await;
1513 }
1514
1515 run_agent_with_schema_validation(provider, input).await
1516}
1517
1518async fn run_agent_with_worktree_isolation(
1519 provider: Arc<dyn AgentProvider>,
1520 input: AgentProviderRunInput,
1521) -> anyhow::Result<AgentProviderResult> {
1522 let isolation = WorktreeIsolation::create(input.context.cwd.as_deref())?;
1523 let isolation_info = isolation.info();
1524 let mut isolated_input = input;
1525 isolated_input.context.cwd = Some(isolation.cwd.clone());
1526 isolated_input.environment =
1527 Arc::new(LocalExecutionEnvironment::new(Some(isolation.cwd.clone()))?);
1528 let mut result = run_agent_with_schema_validation(provider, isolated_input).await;
1529 if let Ok(result) = &mut result {
1530 result.isolation = Some(isolation_info);
1531 }
1532 if let Err(error) = isolation.cleanup() {
1533 log::warn!("failed to cleanup isolated agent worktree: {error:#}");
1534 }
1535 result
1536}
1537
1538async fn run_agent_with_sandbox_isolation(
1539 provider: Arc<dyn AgentProvider>,
1540 input: AgentProviderRunInput,
1541 request: SandboxIsolationRequest,
1542) -> anyhow::Result<AgentProviderResult> {
1543 let program = sandbox_provider_program(&request);
1544 let sandbox_group_id = format!("sbxgrp_{}", ulid::Ulid::new());
1545 let cwd = input.context.cwd.clone().ok_or_else(|| {
1546 anyhow!("agent sandbox isolation requires the workflow cwd to be available")
1547 })?;
1548 let sandbox_env = SandboxExecutionEnvironment::open(
1549 program,
1550 OpenSandboxRequest {
1551 metadata: SandboxMetadata::new(
1552 format!("req_{}", ulid::Ulid::new()),
1553 sandbox_group_id.clone(),
1554 ),
1555 profile: ProfileRef {
1556 provider: request.provider.clone(),
1557 name: request.profile.clone(),
1558 },
1559 workspace_sync: WorkspaceSync { host_path: cwd },
1560 cwd: request.cwd.clone(),
1561 },
1562 )
1563 .await
1564 .with_context(|| {
1565 format!(
1566 "failed to open sandbox profile `{}` with provider `{}`",
1567 request.profile, request.provider
1568 )
1569 })?;
1570
1571 let session = sandbox_env.session().clone();
1572 let mut isolated_input = input;
1573 isolated_input.context.cwd = sandbox_env.cwd().map(|path| PathBuf::from(path.as_str()));
1574 isolated_input.environment = Arc::new(sandbox_env.clone());
1575 let isolation_info = AgentRunIsolation {
1576 kind: "sandbox".to_string(),
1577 branch: None,
1578 worktree_path: None,
1579 cwd: session.cwd.clone(),
1580 profile: Some(request.profile.clone()),
1581 provider: Some(request.provider.clone()),
1582 session_id: Some(session.id.clone()),
1583 };
1584
1585 let mut result = run_agent_with_schema_validation(provider, isolated_input).await;
1586 if let Ok(result) = &mut result {
1587 result.isolation = Some(isolation_info);
1588 }
1589
1590 if let Err(error) = sandbox_env.close().await {
1591 log::warn!("failed to close sandbox-isolated agent session: {error:#}");
1592 }
1593
1594 result
1595}
1596
1597fn requests_worktree_isolation(options: &Option<Value>) -> bool {
1598 options
1599 .as_ref()
1600 .and_then(|options| options.get("isolation"))
1601 .and_then(Value::as_str)
1602 == Some("worktree")
1603}
1604
1605#[derive(Debug, Clone)]
1606struct SandboxIsolationRequest {
1607 provider: String,
1608 profile: String,
1609 cwd: Option<String>,
1610}
1611
1612fn sandbox_isolation_request(
1613 options: &Option<Value>,
1614) -> anyhow::Result<Option<SandboxIsolationRequest>> {
1615 let Some(isolation) = options
1616 .as_ref()
1617 .and_then(|options| options.get("isolation"))
1618 else {
1619 return Ok(None);
1620 };
1621 let Some(object) = isolation.as_object() else {
1622 return Ok(None);
1623 };
1624 if object.get("type").and_then(Value::as_str) != Some("sandbox") {
1625 return Ok(None);
1626 }
1627 let profile = object
1628 .get("profile")
1629 .and_then(Value::as_str)
1630 .ok_or_else(|| anyhow!("agent sandbox isolation requires isolation.profile"))?
1631 .to_string();
1632 let cwd = object
1633 .get("cwd")
1634 .and_then(Value::as_str)
1635 .map(ToString::to_string);
1636 let (provider, profile) = parse_sandbox_profile_ref(&profile)?;
1637 Ok(Some(SandboxIsolationRequest {
1638 provider,
1639 profile,
1640 cwd,
1641 }))
1642}
1643
1644fn parse_sandbox_profile_ref(value: &str) -> anyhow::Result<(String, String)> {
1645 let (provider, profile) = value.split_once('/').ok_or_else(|| {
1646 anyhow!("agent sandbox isolation.profile must use <provider>/<profile>, got `{value}`")
1647 })?;
1648 validate_sandbox_provider_name(provider)?;
1649 if profile.is_empty() {
1650 bail!(
1651 "agent sandbox isolation.profile must include a non-empty provider-local profile name"
1652 );
1653 }
1654 Ok((provider.to_string(), profile.to_string()))
1655}
1656
1657fn validate_sandbox_provider_name(provider: &str) -> anyhow::Result<()> {
1658 let valid = !provider.is_empty()
1659 && provider
1660 .bytes()
1661 .all(|byte| byte.is_ascii_lowercase() || byte.is_ascii_digit() || byte == b'-')
1662 && provider
1663 .as_bytes()
1664 .first()
1665 .is_some_and(u8::is_ascii_alphanumeric)
1666 && provider
1667 .as_bytes()
1668 .last()
1669 .is_some_and(u8::is_ascii_alphanumeric);
1670 if !valid {
1671 bail!("invalid sandbox provider name `{provider}`; expected lowercase letters, digits, and hyphens, starting and ending with an alphanumeric character");
1672 }
1673 Ok(())
1674}
1675
1676fn sandbox_provider_program(request: &SandboxIsolationRequest) -> String {
1677 format!("smol-sandbox-{}", request.provider)
1678}
1679
1680struct WorktreeIsolation {
1681 repo_root: PathBuf,
1682 worktree_root: PathBuf,
1683 cwd: PathBuf,
1684 branch_name: String,
1685 cleaned: bool,
1686 _temp_dir: tempfile::TempDir,
1687}
1688
1689impl WorktreeIsolation {
1690 fn create(cwd: Option<&Path>) -> anyhow::Result<Self> {
1691 let cwd = cwd
1692 .map(Path::to_path_buf)
1693 .unwrap_or(std::env::current_dir()?)
1694 .canonicalize()
1695 .context("failed to canonicalize workflow cwd for worktree isolation")?;
1696 let repo_root = git_output(&cwd, &["rev-parse", "--show-toplevel"]).context(
1697 "agent isolation='worktree' requires the workflow cwd to be inside a git repository",
1698 )?;
1699 let repo_root = PathBuf::from(repo_root.trim())
1700 .canonicalize()
1701 .context("failed to canonicalize git repository root for worktree isolation")?;
1702 let relative_cwd = cwd.strip_prefix(&repo_root).with_context(|| {
1703 format!(
1704 "workflow cwd {} is not under git repository root {}",
1705 cwd.display(),
1706 repo_root.display()
1707 )
1708 })?;
1709
1710 let temp_dir = tempfile::Builder::new()
1711 .prefix("smol-wf-agent-worktree-")
1712 .tempdir()
1713 .context("failed to create temp directory for agent worktree isolation")?;
1714 let worktree_root = temp_dir.path().join("worktree");
1715 let worktree_arg = path_arg(&worktree_root);
1716 let branch_name = format!(
1717 "smol-wf/agent-run/{}",
1718 ulid::Ulid::new().to_string().to_ascii_lowercase()
1719 );
1720 git_status(
1721 &repo_root,
1722 &[
1723 "worktree",
1724 "add",
1725 "--quiet",
1726 "-b",
1727 &branch_name,
1728 &worktree_arg,
1729 "HEAD",
1730 ],
1731 )
1732 .context("failed to create isolated git worktree for agent run")?;
1733 let isolated_cwd = if relative_cwd.as_os_str().is_empty() {
1734 worktree_root.clone()
1735 } else {
1736 worktree_root.join(relative_cwd)
1737 };
1738 Ok(Self {
1739 repo_root,
1740 worktree_root,
1741 cwd: isolated_cwd,
1742 branch_name,
1743 cleaned: false,
1744 _temp_dir: temp_dir,
1745 })
1746 }
1747
1748 fn info(&self) -> AgentRunIsolation {
1749 AgentRunIsolation {
1750 kind: "worktree".to_string(),
1751 branch: Some(self.branch_name.clone()),
1752 worktree_path: Some(path_arg(&self.worktree_root)),
1753 cwd: Some(path_arg(&self.cwd)),
1754 profile: None,
1755 provider: None,
1756 session_id: None,
1757 }
1758 }
1759
1760 fn cleanup(mut self) -> anyhow::Result<()> {
1761 self.remove_worktree()?;
1762 self.delete_branch()?;
1763 self.cleaned = true;
1764 Ok(())
1765 }
1766
1767 fn remove_worktree(&self) -> anyhow::Result<()> {
1768 let worktree_arg = path_arg(&self.worktree_root);
1769 git_status(
1770 &self.repo_root,
1771 &["worktree", "remove", "--force", &worktree_arg],
1772 )
1773 .context("failed to remove isolated git worktree")
1774 }
1775
1776 fn delete_branch(&self) -> anyhow::Result<()> {
1777 git_status(&self.repo_root, &["branch", "-D", &self.branch_name])
1778 .context("failed to delete isolated agent worktree branch")
1779 }
1780}
1781
1782impl Drop for WorktreeIsolation {
1783 fn drop(&mut self) {
1784 if !self.cleaned {
1785 if let Err(error) = self.remove_worktree() {
1786 log::warn!("failed to cleanup isolated agent worktree during drop: {error:#}");
1787 }
1788 if let Err(error) = self.delete_branch() {
1789 log::warn!(
1790 "failed to delete isolated agent worktree branch during drop: {error:#}"
1791 );
1792 }
1793 }
1794 }
1795}
1796
1797fn path_arg(path: &Path) -> String {
1798 path.to_string_lossy().into_owned()
1799}
1800
1801fn git_output(cwd: &Path, args: &[&str]) -> anyhow::Result<String> {
1802 let output = StdCommand::new("git")
1803 .args(args)
1804 .current_dir(cwd)
1805 .output()
1806 .with_context(|| format!("failed to run git {}", args.join(" ")))?;
1807 if output.status.success() {
1808 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
1809 } else {
1810 bail!(
1811 "git {} failed with {}{}",
1812 args.join(" "),
1813 status_text(output.status.code()),
1814 command_stderr(&output.stderr)
1815 )
1816 }
1817}
1818
1819fn git_status(cwd: &Path, args: &[&str]) -> anyhow::Result<()> {
1820 let output = StdCommand::new("git")
1821 .args(args)
1822 .current_dir(cwd)
1823 .output()
1824 .with_context(|| format!("failed to run git {}", args.join(" ")))?;
1825 if output.status.success() {
1826 Ok(())
1827 } else {
1828 bail!(
1829 "git {} failed with {}{}",
1830 args.join(" "),
1831 status_text(output.status.code()),
1832 command_stderr(&output.stderr)
1833 )
1834 }
1835}
1836
1837fn status_text(code: Option<i32>) -> String {
1838 code.map(|code| format!("code {code}"))
1839 .unwrap_or_else(|| "signal".to_string())
1840}
1841
1842fn command_stderr(stderr: &[u8]) -> String {
1843 let stderr = String::from_utf8_lossy(stderr);
1844 let stderr = stderr.trim();
1845 if stderr.is_empty() {
1846 String::new()
1847 } else {
1848 format!(": {stderr}")
1849 }
1850}
1851
1852async fn run_agent_with_schema_validation(
1853 provider: Arc<dyn AgentProvider>,
1854 input: AgentProviderRunInput,
1855) -> anyhow::Result<AgentProviderResult> {
1856 let Some(schema) = input
1857 .options
1858 .as_ref()
1859 .and_then(|options| options.get("schema"))
1860 .cloned()
1861 else {
1862 return provider.run(input).await;
1863 };
1864
1865 let max_attempts = 2;
1866 let original_prompt = input.prompt.clone();
1867 let mut attempt_input = input;
1868 let mut last_errors = Vec::new();
1869
1870 for attempt in 1..=max_attempts {
1871 let result = provider.run(attempt_input.clone()).await?;
1872 match validate_structured_output(&schema, &result.output) {
1873 Ok(()) => return Ok(result),
1874 Err(errors) => {
1875 last_errors = errors;
1876 if attempt < max_attempts {
1877 attempt_input.prompt =
1878 with_structured_output_retry_prompt(&original_prompt, &last_errors);
1879 }
1880 }
1881 }
1882 }
1883
1884 bail!(
1885 "{}",
1886 format_structured_output_validation_error(&last_errors)
1887 )
1888}
1889
1890fn validate_structured_output(schema: &Value, output: &Value) -> Result<(), Vec<String>> {
1891 let validator = jsonschema::validator_for(schema)
1892 .map_err(|error| vec![format!("/ schema is invalid: {}", error)])?;
1893 let errors = validator
1894 .iter_errors(output)
1895 .map(|error| {
1896 let path = error.instance_path().to_string();
1897 let path = if path.is_empty() {
1898 "/".to_string()
1899 } else {
1900 path
1901 };
1902 format!("{path} {error}")
1903 })
1904 .collect::<Vec<_>>();
1905
1906 if errors.is_empty() {
1907 Ok(())
1908 } else {
1909 Err(errors)
1910 }
1911}
1912
1913fn format_structured_output_validation_error(errors: &[String]) -> String {
1914 format!(
1915 "Structured output did not match JSON Schema: {}",
1916 errors.join("; ")
1917 )
1918}
1919
1920fn with_structured_output_retry_prompt(prompt: &str, errors: &[String]) -> String {
1921 let mut lines = vec![
1922 prompt.to_string(),
1923 String::new(),
1924 "Previous structured output failed JSON Schema validation.".to_string(),
1925 "Return a corrected structured output that satisfies the original JSON Schema.".to_string(),
1926 "Validation errors:".to_string(),
1927 ];
1928 lines.extend(errors.iter().map(|error| format!("- {error}")));
1929 lines.join("\n")
1930}
1931
1932#[derive(Debug, Clone, PartialEq, Eq)]
1933struct ResolvedModelSelector {
1934 requested: String,
1935 selector: String,
1936 model_id: String,
1937 model_provider: Option<String>,
1938 thinking: Option<String>,
1939}
1940
1941impl ResolvedModelSelector {
1942 fn provider_model(&self) -> String {
1943 match &self.model_provider {
1944 Some(provider) => format!("{provider}/{}", self.model_id),
1945 None => self.model_id.clone(),
1946 }
1947 }
1948}
1949
1950fn resolve_model_options(
1951 options: Option<Value>,
1952 agent_provider: &str,
1953 model_map: &BTreeMap<String, String>,
1954) -> anyhow::Result<Option<Value>> {
1955 let Some(model) = options
1956 .as_ref()
1957 .and_then(Value::as_object)
1958 .and_then(|object| object.get("model"))
1959 .and_then(Value::as_str)
1960 .map(ToString::to_string)
1961 else {
1962 return Ok(options);
1963 };
1964
1965 let mapped_selector = model_map.get(&model).cloned();
1966 let alias_matched = mapped_selector.is_some();
1967 let selector = mapped_selector.unwrap_or_else(|| model.clone());
1968 let resolved = parse_model_selector(&model, &selector)?;
1969 validate_model_selector_for_provider(&resolved, agent_provider)?;
1970
1971 let mut object = options
1972 .and_then(|value| value.as_object().cloned())
1973 .unwrap_or_default();
1974 object.insert(
1975 "model".to_string(),
1976 Value::String(resolved.provider_model()),
1977 );
1978
1979 let selector_has_extra_parts = alias_matched
1980 || resolved.selector.contains('?')
1981 || resolved.model_provider.is_some()
1982 || resolved.thinking.is_some();
1983 if selector_has_extra_parts {
1984 object.insert(
1985 "requestedModel".to_string(),
1986 Value::String(resolved.requested.clone()),
1987 );
1988 object.insert(
1989 "modelSelector".to_string(),
1990 Value::String(resolved.selector.clone()),
1991 );
1992 } else {
1993 object.remove("requestedModel");
1994 object.remove("modelSelector");
1995 }
1996
1997 if let Some(provider) = resolved.model_provider {
1998 object.insert("modelProvider".to_string(), Value::String(provider));
1999 } else {
2000 object.remove("modelProvider");
2001 }
2002 if let Some(thinking) = resolved.thinking {
2003 object.insert("thinking".to_string(), Value::String(thinking));
2004 } else {
2005 object.remove("thinking");
2006 }
2007 Ok(Some(Value::Object(object)))
2008}
2009
2010fn parse_model_selector(requested: &str, selector: &str) -> anyhow::Result<ResolvedModelSelector> {
2011 let (model_part, query) = selector.split_once('?').unwrap_or((selector, ""));
2012 if model_part.trim().is_empty() {
2013 bail!("model selector must include a model id: {selector}");
2014 }
2015
2016 let (slash_provider, model_id) = match model_part.split_once('/') {
2017 Some((provider, model_id)) if !provider.is_empty() && !model_id.is_empty() => {
2018 (Some(provider.to_string()), model_id.to_string())
2019 }
2020 Some(_) => bail!("model selector provider/model form is invalid: {selector}"),
2021 None => (None, model_part.to_string()),
2022 };
2023
2024 let mut query_provider = None::<String>;
2025 let mut thinking = None::<String>;
2026 if !query.is_empty() {
2027 for pair in query.split('&') {
2028 if pair.is_empty() {
2029 continue;
2030 }
2031 let (key, value) = pair.split_once('=').ok_or_else(|| {
2032 anyhow!("model selector query parameter must use key=value: {pair}")
2033 })?;
2034 let key = percent_decode(key)?;
2035 let value = percent_decode(value)?;
2036 if value.is_empty() {
2037 bail!("model selector query parameter `{key}` must not be empty");
2038 }
2039 match key.as_str() {
2040 "provider" => set_unique_query_value(&mut query_provider, key, value)?,
2041 "thinking" => set_unique_query_value(&mut thinking, key, value)?,
2042 _ => bail!("unknown model selector query parameter `{key}`"),
2043 }
2044 }
2045 }
2046
2047 let model_provider = match (slash_provider, query_provider) {
2048 (Some(slash), Some(query)) if slash != query => bail!(
2049 "conflicting model provider qualifiers in selector `{selector}`: `{slash}` and `{query}`"
2050 ),
2051 (Some(provider), Some(_)) | (Some(provider), None) | (None, Some(provider)) => {
2052 Some(provider)
2053 }
2054 (None, None) => None,
2055 };
2056
2057 Ok(ResolvedModelSelector {
2058 requested: requested.to_string(),
2059 selector: selector.to_string(),
2060 model_id,
2061 model_provider,
2062 thinking,
2063 })
2064}
2065
2066fn set_unique_query_value(
2067 target: &mut Option<String>,
2068 key: String,
2069 value: String,
2070) -> anyhow::Result<()> {
2071 if target.replace(value).is_some() {
2072 bail!("duplicate model selector query parameter `{key}`");
2073 }
2074 Ok(())
2075}
2076
2077fn percent_decode(value: &str) -> anyhow::Result<String> {
2078 let bytes = value.as_bytes();
2079 let mut output = Vec::with_capacity(bytes.len());
2080 let mut index = 0;
2081 while index < bytes.len() {
2082 match bytes[index] {
2083 b'%' => {
2084 if index + 2 >= bytes.len() {
2085 bail!("invalid percent escape in model selector query: {value}");
2086 }
2087 let high = hex_value(bytes[index + 1]).ok_or_else(|| {
2088 anyhow!("invalid percent escape in model selector query: {value}")
2089 })?;
2090 let low = hex_value(bytes[index + 2]).ok_or_else(|| {
2091 anyhow!("invalid percent escape in model selector query: {value}")
2092 })?;
2093 output.push((high << 4) | low);
2094 index += 3;
2095 }
2096 b'+' => {
2097 output.push(b' ');
2098 index += 1;
2099 }
2100 byte => {
2101 output.push(byte);
2102 index += 1;
2103 }
2104 }
2105 }
2106 String::from_utf8(output).context("model selector query is not valid UTF-8")
2107}
2108
2109fn hex_value(byte: u8) -> Option<u8> {
2110 match byte {
2111 b'0'..=b'9' => Some(byte - b'0'),
2112 b'a'..=b'f' => Some(byte - b'a' + 10),
2113 b'A'..=b'F' => Some(byte - b'A' + 10),
2114 _ => None,
2115 }
2116}
2117
2118fn validate_model_selector_for_provider(
2119 resolved: &ResolvedModelSelector,
2120 agent_provider: &str,
2121) -> anyhow::Result<()> {
2122 match agent_provider {
2123 "codex" => {
2124 if resolved.model_provider.is_some() {
2125 bail!("Codex model selectors do not support ?provider=... or provider/model form");
2126 }
2127 if resolved.thinking.is_some() {
2128 bail!("Codex model selectors do not support thinking=...");
2129 }
2130 }
2131 "claude-code" if resolved.model_provider.is_some() => {
2132 bail!(
2133 "Claude Code model selectors do not support ?provider=... or provider/model form"
2134 );
2135 }
2136 "opencode" if resolved.model_provider.is_none() => {
2137 bail!("OpenCode model selectors must use provider/model or ?provider=...");
2138 }
2139 "debug" | "pi" => {}
2140 _ => {}
2141 }
2142 Ok(())
2143}
2144
2145fn apply_phase_defaults(options: Option<Value>, metadata: &WorkflowMetadata) -> Option<Value> {
2146 let phase_name = options
2147 .as_ref()
2148 .and_then(|options| options.get("phase"))
2149 .and_then(Value::as_str)
2150 .map(ToString::to_string);
2151 let phase_metadata = phase_name.as_ref().and_then(|phase_name| {
2152 metadata
2153 .phases
2154 .iter()
2155 .find(|phase| phase.title == *phase_name)
2156 });
2157
2158 if phase_name.is_none() && phase_metadata.is_none() {
2159 return options;
2160 }
2161
2162 let mut object = options
2163 .and_then(|value| value.as_object().cloned())
2164 .unwrap_or_default();
2165
2166 if let Some(phase_name) = phase_name {
2167 object
2168 .entry("phase".to_string())
2169 .or_insert(Value::String(phase_name));
2170 }
2171 if let Some(model) = phase_metadata.and_then(|phase| phase.model.clone()) {
2172 object
2173 .entry("model".to_string())
2174 .or_insert(Value::String(model));
2175 }
2176 if let Some(provider) = phase_metadata.and_then(|phase| phase.provider.clone()) {
2177 object
2178 .entry("provider".to_string())
2179 .or_insert(Value::String(provider));
2180 }
2181
2182 Some(Value::Object(object))
2183}
2184
2185fn resolve_relative_script(current_script_path: &Path, script_path: &str) -> PathBuf {
2186 let script_path = PathBuf::from(script_path);
2187 if script_path.is_absolute() {
2188 script_path
2189 } else {
2190 current_script_path
2191 .parent()
2192 .unwrap_or_else(|| Path::new("."))
2193 .join(script_path)
2194 }
2195}
2196
2197fn resolve_named_workflow(name: &str) -> anyhow::Result<PathBuf> {
2198 let workflows_dir = PathBuf::from(".claude/workflows");
2199 for entry in fs::read_dir(&workflows_dir).unwrap_or_else(|_| fs::read_dir(".").unwrap()) {
2200 let entry = entry?;
2201 let path = entry.path();
2202 if path.extension().and_then(|extension| extension.to_str()) != Some("js") {
2203 continue;
2204 }
2205 if read_workflow_metadata(&path)?.is_some_and(|metadata| metadata.name == name) {
2206 return Ok(path);
2207 }
2208 }
2209 bail!("Unknown workflow: {name}")
2210}