1use crate::agent_providers::{
2 create_agent_provider, AgentProvider, AgentProviderContext, AgentProviderResult,
3 AgentProviderRunInput, AgentRunIsolation, AgentUsage, AgentUsageCost,
4};
5use crate::environment::{
6 AgentExecutionEnvironment, LocalExecutionEnvironment, SandboxExecutionEnvironment,
7};
8use crate::js_runtime::rquickjs::RQuickJSWorkflowRuntime;
9use crate::js_runtime::{
10 WorkflowBudgetSnapshot, WorkflowJSRuntime, WorkflowModuleInput, WorkflowModuleOutput,
11 WorkflowRef, WorkflowRuntimeCall, WorkflowRuntimeExecution, WorkflowRuntimePoll,
12 WorkflowRuntimeRequest, WorkflowRuntimeRequestResolution,
13};
14use crate::metadata::{read_workflow_metadata, WorkflowMetadata};
15use anyhow::{anyhow, bail, Context};
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use smol_workflow_sandbox::{
19 Metadata as SandboxMetadata, OpenSandboxRequest, ProfileRef, WorkspaceSync,
20};
21use std::collections::{BTreeMap, BTreeSet, VecDeque};
22use std::fs;
23use std::path::{Path, PathBuf};
24use std::process::Command as StdCommand;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tokio::sync::{mpsc, watch};
28use tokio::task::{JoinSet, LocalSet};
29
30pub use crate::events::{
31 WorkflowEvent, WorkflowEventMetadata, WorkflowEventSink, WorkflowEventType,
32};
33
34#[async_trait::async_trait]
35pub trait AgentSessionLogSink: Send + Sync {
36 async fn write_agent_result(
37 &self,
38 provider: &str,
39 result: &AgentProviderResult,
40 ) -> anyhow::Result<()>;
41}
42
43#[async_trait::async_trait]
44pub trait WorkflowAgentRunner: Send + Sync {
45 async fn run_agent(
46 &self,
47 default_provider: Arc<dyn AgentProvider>,
48 provider_override: Option<String>,
49 input: AgentProviderRunInput,
50 ) -> anyhow::Result<AgentProviderResult>;
51
52 fn retry_in_runtime(&self) -> bool {
66 true
67 }
68
69 async fn sleep(&self, duration_ms: u64) -> anyhow::Result<()> {
70 tokio::time::sleep(std::time::Duration::from_millis(duration_ms)).await;
71 Ok(())
72 }
73}
74
75#[derive(Debug, Default)]
76pub struct DirectWorkflowAgentRunner;
77
78#[async_trait::async_trait]
79impl WorkflowAgentRunner for DirectWorkflowAgentRunner {
80 async fn run_agent(
81 &self,
82 default_provider: Arc<dyn AgentProvider>,
83 provider_override: Option<String>,
84 input: AgentProviderRunInput,
85 ) -> anyhow::Result<AgentProviderResult> {
86 run_agent_provider(default_provider, provider_override, input).await
87 }
88}
89
90pub struct RunWorkflowOptions {
91 pub script_path: PathBuf,
92 pub args: Value,
93 pub agent_provider: Arc<dyn AgentProvider>,
94 pub model_map: BTreeMap<String, String>,
95 pub budget_total: Option<u64>,
96 pub budget_spent: u64,
97 pub nesting_depth: usize,
98 pub max_parallel_agent_requests: Option<usize>,
99 pub agent_runner: Option<Arc<dyn WorkflowAgentRunner>>,
100 pub cancel_rx: Option<watch::Receiver<bool>>,
101 pub event_sink: Option<Arc<dyn WorkflowEventSink>>,
102 pub event_parent_step_id: Option<String>,
103 pub event_stream_start: Option<Instant>,
104 pub session_log_sink: Option<Arc<dyn AgentSessionLogSink>>,
105}
106
107#[derive(Debug)]
108pub struct RunWorkflowResult {
109 pub output: WorkflowModuleOutput,
110 pub logs: Vec<Vec<Value>>,
111 pub phases: Vec<WorkflowPhaseCall>,
112 pub agent_calls: Vec<WorkflowRuntimeRequest>,
113 pub workflow_calls: Vec<WorkflowRuntimeRequest>,
114 pub budget: WorkflowBudgetSnapshot,
115 pub token_usage: WorkflowTokenUsage,
116 pub token_usage_by_phase: std::collections::BTreeMap<String, WorkflowTokenUsage>,
117 pub agent_runs: Vec<WorkflowAgentRunSummary>,
118}
119
120#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
121#[serde(rename_all = "camelCase")]
122pub struct WorkflowTokenUsage {
123 pub input_tokens: u64,
124 pub output_tokens: u64,
125 pub cache_read_tokens: u64,
126 pub cache_write_tokens: u64,
127 pub total_tokens: u64,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 pub cost: Option<AgentUsageCost>,
130}
131
132#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
133#[serde(rename_all = "camelCase")]
134pub struct WorkflowAgentRunSummary {
135 pub id: String,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 pub phase: Option<String>,
138 #[serde(skip_serializing_if = "Option::is_none")]
139 pub provider: Option<String>,
140 #[serde(skip_serializing_if = "Option::is_none")]
141 pub model: Option<String>,
142 #[serde(skip_serializing_if = "Option::is_none")]
143 pub provider_session_id: Option<String>,
144 #[serde(skip_serializing_if = "Option::is_none")]
145 pub usage: Option<AgentUsage>,
146 #[serde(skip_serializing_if = "Option::is_none")]
147 pub isolation: Option<AgentRunIsolation>,
148}
149
150#[derive(Debug, Clone, PartialEq)]
151pub struct WorkflowPhaseCall {
152 pub name: String,
153 pub options: Option<Value>,
154}
155
156pub async fn run_workflow(options: RunWorkflowOptions) -> anyhow::Result<RunWorkflowResult> {
157 LocalSet::new().run_until(run_workflow_inner(options)).await
158}
159
160async fn run_workflow_inner(options: RunWorkflowOptions) -> anyhow::Result<RunWorkflowResult> {
161 log::debug!(
162 "run_workflow start script={} provider={} nesting_depth={} budget_total={:?} budget_spent={}",
163 options.script_path.display(),
164 options.agent_provider.name(),
165 options.nesting_depth,
166 options.budget_total,
167 options.budget_spent
168 );
169 let script_path = fs::canonicalize(&options.script_path).with_context(|| {
170 format!(
171 "failed to resolve workflow script {}",
172 options.script_path.display()
173 )
174 })?;
175 let metadata = read_workflow_metadata(&script_path)?.ok_or_else(|| {
176 anyhow!("Workflow script must export valid literal metadata as `export const meta = {{ name, description, ... }}`")
177 })?;
178 log::debug!(
179 "workflow metadata loaded name={} phases={}",
180 metadata.name,
181 metadata.phases.len()
182 );
183 let source = fs::read_to_string(&script_path)
184 .with_context(|| format!("failed to read workflow script {}", script_path.display()))?;
185 let runtime = RQuickJSWorkflowRuntime::new();
186 let execution = runtime.start_module(WorkflowModuleInput {
187 source,
188 source_name: script_path.to_string_lossy().into_owned(),
189 args: options.args,
190 budget: WorkflowBudgetSnapshot {
191 total: options.budget_total,
192 spent: options.budget_spent,
193 },
194 sandbox: Default::default(),
195 })?;
196
197 let (js_commands, js_command_rx) = mpsc::channel::<JsCommand>(64);
198 let (js_event_tx, mut js_events) = mpsc::channel::<JsEvent>(64);
199 let js_task = tokio::task::spawn_local(js_runtime_actor(execution, js_command_rx, js_event_tx));
200
201 let emit_lifecycle_events = options.event_sink.is_some();
202 let event_start = options.event_stream_start.unwrap_or_else(Instant::now);
203
204 let mut state = RunState {
205 script_path,
206 metadata,
207 event_start,
208 agent_provider: options.agent_provider,
209 model_map: options.model_map,
210 logs: Vec::new(),
211 phases: Vec::new(),
212 agent_calls: Vec::new(),
213 workflow_calls: Vec::new(),
214 budget: WorkflowBudgetSnapshot {
215 total: options.budget_total,
216 spent: options.budget_spent,
217 },
218 token_usage: WorkflowTokenUsage::default(),
219 token_usage_by_phase: Default::default(),
220 agent_runs: Vec::new(),
221 active_request_ids: BTreeSet::new(),
222 nesting_depth: options.nesting_depth,
223 max_parallel_agent_requests: options.max_parallel_agent_requests,
224 agent_runner: options
225 .agent_runner
226 .unwrap_or_else(|| Arc::new(DirectWorkflowAgentRunner)),
227 cancel_rx: options.cancel_rx,
228 event_sink: options.event_sink,
229 event_parent_step_id: options.event_parent_step_id,
230 session_log_sink: options.session_log_sink,
231 };
232
233 let mut pending_requests = VecDeque::<WorkflowRuntimeRequest>::new();
234 let mut agent_tasks = JoinSet::<AgentTaskCompletion>::new();
235 let mut sleep_tasks = JoinSet::<SleepTaskCompletion>::new();
236
237 if emit_lifecycle_events {
238 if let Err(error) = state
239 .emit_event(WorkflowEvent::started(rfc3339_now()?))
240 .await
241 {
242 let _ = send_js_command(&js_commands, JsCommand::Shutdown).await;
243 let _ = js_task.await;
244 return Err(error);
245 }
246 }
247
248 let workflow_result: anyhow::Result<RunWorkflowResult> = loop {
249 if let Err(error) = state
250 .start_pending_requests(
251 &mut pending_requests,
252 &mut agent_tasks,
253 &mut sleep_tasks,
254 &js_commands,
255 )
256 .await
257 {
258 break Err(error);
259 }
260
261 tokio::select! {
262 biased;
263 () = wait_for_cancellation(&mut state.cancel_rx) => {
264 break state.cancel_workflow(
265 &mut pending_requests,
266 &mut agent_tasks,
267 &mut sleep_tasks,
268 &js_commands,
269 &mut js_events,
270 ).await;
271 }
272 event = js_events.recv() => {
273 let event = match event {
274 Some(event) => event,
275 None => break Err(anyhow!("JavaScript runtime actor stopped unexpectedly")),
276 };
277 match state.handle_js_event(event, &mut pending_requests).await {
278 Ok(Some(result)) => break Ok(result),
279 Ok(None) => {}
280 Err(error) => break Err(error),
281 }
282 }
283 completion = agent_tasks.join_next(), if !agent_tasks.is_empty() => {
284 let completion = match completion {
285 Some(Ok(completion)) => completion,
286 Some(Err(error)) => break Err(anyhow!("agent provider task failed: {error}")),
287 None => break Err(anyhow!("agent task set ended unexpectedly")),
288 };
289 let AgentTaskCompletion { id, input, provider, result } = completion;
290 state.active_request_ids.remove(&id);
291 let resolution = match result {
292 Ok(result) => match state.apply_agent_result(&id, &input, provider, result).await {
293 Ok(value) => WorkflowRuntimeRequestResolution::OkWithBudget {
294 value,
295 budget: state.budget.clone(),
296 },
297 Err(error) => WorkflowRuntimeRequestResolution::Err {
298 message: error.to_string(),
299 },
300 },
301 Err(error) => {
302 let message = error.to_string();
303 if let Err(emit_error) = state.emit_agent_failed_event(&id, provider.as_deref(), &message).await {
304 log::debug!("failed to emit agent failure event: {emit_error:#}");
305 }
306 WorkflowRuntimeRequestResolution::Err { message }
307 },
308 };
309 if let Err(error) = send_js_command(&js_commands, JsCommand::ResolveRequest { id, resolution }).await {
310 break Err(error);
311 }
312 }
313 completion = sleep_tasks.join_next(), if !sleep_tasks.is_empty() => {
314 let completion = match completion {
315 Some(Ok(completion)) => completion,
316 Some(Err(error)) => break Err(anyhow!("sleep task failed: {error}")),
317 None => break Err(anyhow!("sleep task set ended unexpectedly")),
318 };
319 let SleepTaskCompletion { id, result } = completion;
320 state.active_request_ids.remove(&id);
321 let resolution = match result {
322 Ok(()) => WorkflowRuntimeRequestResolution::OkUndefined,
323 Err(error) => WorkflowRuntimeRequestResolution::Err {
324 message: error.to_string(),
325 },
326 };
327 if let Err(error) = send_js_command(&js_commands, JsCommand::ResolveRequest { id, resolution }).await {
328 break Err(error);
329 }
330 }
331 }
332 };
333
334 let _ = send_js_command(&js_commands, JsCommand::Shutdown).await;
335 let _ = js_task.await;
336
337 if emit_lifecycle_events {
338 match &workflow_result {
339 Ok(result) => {
340 state
341 .emit_event(WorkflowEvent::result(
342 result.token_usage.input_tokens,
343 result.token_usage.output_tokens,
344 result.token_usage.total_tokens,
345 result.output.result.clone(),
346 ))
347 .await?
348 }
349 Err(error) => {
350 state
351 .emit_event(WorkflowEvent::error(error.to_string(), None))
352 .await?;
353 }
354 }
355 }
356
357 workflow_result
358}
359
360enum JsCommand {
361 ResolveRequest {
362 id: String,
363 resolution: WorkflowRuntimeRequestResolution,
364 },
365 Shutdown,
366}
367
368enum JsEvent {
369 Call(WorkflowRuntimeCall),
370 Request(WorkflowRuntimeRequest),
371 Complete(WorkflowModuleOutput),
372 Error(String),
373}
374
375async fn js_runtime_actor(
376 mut execution: Box<dyn WorkflowRuntimeExecution>,
377 mut commands: mpsc::Receiver<JsCommand>,
378 events: mpsc::Sender<JsEvent>,
379) {
380 let mut outstanding_requests = 0usize;
381 loop {
382 match execution.poll() {
383 Ok(WorkflowRuntimePoll::Call(call)) => {
384 if events.send(JsEvent::Call(call)).await.is_err() {
385 return;
386 }
387 }
388 Ok(WorkflowRuntimePoll::Request(request)) => {
389 let requests = match execution.take_pending_requests() {
390 Ok(requests) if requests.is_empty() => vec![request],
391 Ok(requests) => requests,
392 Err(error) => {
393 let _ = events.send(JsEvent::Error(error.to_string())).await;
394 return;
395 }
396 };
397 outstanding_requests = outstanding_requests.saturating_add(requests.len());
398 for request in requests {
399 if events.send(JsEvent::Request(request)).await.is_err() {
400 return;
401 }
402 }
403 }
404 Ok(WorkflowRuntimePoll::Complete(output)) => {
405 let _ = events.send(JsEvent::Complete(output)).await;
406 return;
407 }
408 Ok(WorkflowRuntimePoll::Pending) => {
409 if outstanding_requests == 0 {
410 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
411 continue;
412 }
413 match commands.recv().await {
414 Some(JsCommand::ResolveRequest { id, resolution }) => {
415 outstanding_requests = outstanding_requests.saturating_sub(1);
416 if let Err(error) = execution.resolve_request(&id, resolution) {
417 let _ = events.send(JsEvent::Error(error.to_string())).await;
418 return;
419 }
420 }
421 Some(JsCommand::Shutdown) | None => return,
422 }
423 }
424 Err(error) => {
425 let _ = events.send(JsEvent::Error(error.to_string())).await;
426 return;
427 }
428 }
429 }
430}
431
432async fn send_js_command(
433 commands: &mpsc::Sender<JsCommand>,
434 command: JsCommand,
435) -> anyhow::Result<()> {
436 commands
437 .send(command)
438 .await
439 .map_err(|_| anyhow!("JavaScript runtime actor stopped unexpectedly"))
440}
441
442struct RunState {
443 script_path: PathBuf,
444 metadata: WorkflowMetadata,
445 event_start: Instant,
446 agent_provider: Arc<dyn AgentProvider>,
447 model_map: BTreeMap<String, String>,
448 logs: Vec<Vec<Value>>,
449 phases: Vec<WorkflowPhaseCall>,
450 agent_calls: Vec<WorkflowRuntimeRequest>,
451 workflow_calls: Vec<WorkflowRuntimeRequest>,
452 budget: WorkflowBudgetSnapshot,
453 token_usage: WorkflowTokenUsage,
454 token_usage_by_phase: std::collections::BTreeMap<String, WorkflowTokenUsage>,
455 agent_runs: Vec<WorkflowAgentRunSummary>,
456 active_request_ids: BTreeSet<String>,
457 nesting_depth: usize,
458 max_parallel_agent_requests: Option<usize>,
459 agent_runner: Arc<dyn WorkflowAgentRunner>,
460 cancel_rx: Option<watch::Receiver<bool>>,
461 event_sink: Option<Arc<dyn WorkflowEventSink>>,
462 event_parent_step_id: Option<String>,
463 session_log_sink: Option<Arc<dyn AgentSessionLogSink>>,
464}
465
466struct PreparedAgentRun {
467 provider_override: Option<String>,
468 input: AgentProviderRunInput,
469}
470
471struct AgentTaskCompletion {
472 id: String,
473 input: AgentProviderRunInput,
474 provider: Option<String>,
475 result: anyhow::Result<AgentProviderResult>,
476}
477
478struct SleepTaskCompletion {
479 id: String,
480 result: anyhow::Result<()>,
481}
482
483fn add_usage(total: &mut WorkflowTokenUsage, usage: Option<&AgentUsage>) {
484 let Some(usage) = usage else {
485 return;
486 };
487
488 total.input_tokens = total
489 .input_tokens
490 .saturating_add(usage.input_tokens.unwrap_or_default());
491 total.output_tokens = total
492 .output_tokens
493 .saturating_add(usage.output_tokens.unwrap_or_default());
494 total.cache_read_tokens = total
495 .cache_read_tokens
496 .saturating_add(usage.cache_read_tokens.unwrap_or_default());
497 total.cache_write_tokens = total
498 .cache_write_tokens
499 .saturating_add(usage.cache_write_tokens.unwrap_or_default());
500 total.total_tokens = total
501 .total_tokens
502 .saturating_add(usage.total_tokens.unwrap_or_default());
503
504 if let Some(cost) = usage.cost.as_ref() {
505 total.cost = Some(merge_cost(total.cost.as_ref(), cost));
506 }
507}
508
509fn merge_token_usage(total: &mut WorkflowTokenUsage, usage: &WorkflowTokenUsage) {
510 total.input_tokens = total.input_tokens.saturating_add(usage.input_tokens);
511 total.output_tokens = total.output_tokens.saturating_add(usage.output_tokens);
512 total.cache_read_tokens = total
513 .cache_read_tokens
514 .saturating_add(usage.cache_read_tokens);
515 total.cache_write_tokens = total
516 .cache_write_tokens
517 .saturating_add(usage.cache_write_tokens);
518 total.total_tokens = total.total_tokens.saturating_add(usage.total_tokens);
519 if let Some(cost) = usage.cost.as_ref() {
520 total.cost = Some(merge_cost(total.cost.as_ref(), cost));
521 }
522}
523
524fn merge_cost(left: Option<&AgentUsageCost>, right: &AgentUsageCost) -> AgentUsageCost {
525 AgentUsageCost {
526 input: sum_f64(left.and_then(|cost| cost.input), right.input),
527 output: sum_f64(left.and_then(|cost| cost.output), right.output),
528 cache_read: sum_f64(left.and_then(|cost| cost.cache_read), right.cache_read),
529 cache_write: sum_f64(left.and_then(|cost| cost.cache_write), right.cache_write),
530 total: sum_f64(left.and_then(|cost| cost.total), right.total),
531 currency: right
532 .currency
533 .clone()
534 .or_else(|| left.and_then(|cost| cost.currency.clone())),
535 }
536}
537
538fn elapsed_nanos(start: Instant) -> u64 {
539 u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX)
540}
541
542fn rfc3339_now() -> anyhow::Result<String> {
543 Ok(time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339)?)
544}
545
546fn raw_agent_event_payloads(raw: &Value) -> Vec<Value> {
547 if let Some(events) = raw.get("events").and_then(Value::as_array) {
548 events.clone()
549 } else if let Some(items) = raw.as_array() {
550 items.clone()
551 } else {
552 vec![raw.clone()]
553 }
554}
555
556fn agent_session_event_payload(provider_event: Value, metadata: &WorkflowEventMetadata) -> Value {
557 let mut payload = serde_json::Map::new();
558 if let Some(provider) = metadata.provider.as_ref() {
559 payload.insert("provider".to_string(), Value::String(provider.clone()));
560 }
561 if let Some(session_id) = metadata.session_id.as_ref() {
562 payload.insert("sessionId".to_string(), Value::String(session_id.clone()));
563 }
564 if let Some(run_id) = metadata.run_id.as_ref() {
565 payload.insert("runId".to_string(), Value::String(run_id.clone()));
566 }
567 if let Some(step_id) = metadata.step_id.as_ref() {
568 payload.insert("stepId".to_string(), Value::String(step_id.clone()));
569 }
570 payload.insert("providerEvent".to_string(), provider_event);
571 Value::Object(payload)
572}
573
574fn truncate_for_event(value: &str, max_chars: usize) -> String {
575 let mut chars = value.chars();
576 let truncated = chars.by_ref().take(max_chars).collect::<String>();
577 if chars.next().is_some() {
578 format!("{truncated}…")
579 } else {
580 truncated
581 }
582}
583
584fn format_log_message(values: &[Value]) -> String {
585 values
586 .iter()
587 .map(|value| match value {
588 Value::String(value) => value.clone(),
589 value => serde_json::to_string(value).unwrap_or_else(|_| String::from("<unprintable>")),
590 })
591 .collect::<Vec<_>>()
592 .join(" ")
593}
594
595fn sum_f64(left: Option<f64>, right: Option<f64>) -> Option<f64> {
596 match (left, right) {
597 (None, None) => None,
598 (left, right) => Some(left.unwrap_or_default() + right.unwrap_or_default()),
599 }
600}
601
602async fn wait_for_cancellation(cancel_rx: &mut Option<watch::Receiver<bool>>) {
603 let Some(cancel_rx) = cancel_rx else {
604 std::future::pending::<()>().await;
605 return;
606 };
607 while !*cancel_rx.borrow() {
608 if cancel_rx.changed().await.is_err() {
609 return;
610 }
611 }
612}
613
614impl RunState {
615 async fn handle_js_event(
616 &mut self,
617 event: JsEvent,
618 pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
619 ) -> anyhow::Result<Option<RunWorkflowResult>> {
620 match event {
621 JsEvent::Call(call) => self.handle_call(call).await?,
622 JsEvent::Request(request) => {
623 log::debug!(
624 "workflow runtime request id={} kind={}",
625 request.id(),
626 request.kind()
627 );
628 pending_requests.push_back(request);
629 }
630 JsEvent::Complete(output) => {
631 log::debug!(
632 "run_workflow complete script={} budget_spent={}",
633 self.script_path.display(),
634 self.budget.spent
635 );
636 return Ok(Some(RunWorkflowResult {
637 output,
638 logs: std::mem::take(&mut self.logs),
639 phases: std::mem::take(&mut self.phases),
640 agent_calls: std::mem::take(&mut self.agent_calls),
641 workflow_calls: std::mem::take(&mut self.workflow_calls),
642 budget: self.budget.clone(),
643 token_usage: std::mem::take(&mut self.token_usage),
644 token_usage_by_phase: std::mem::take(&mut self.token_usage_by_phase),
645 agent_runs: std::mem::take(&mut self.agent_runs),
646 }));
647 }
648 JsEvent::Error(message) => bail!(message),
649 }
650 Ok(None)
651 }
652
653 async fn start_pending_requests(
654 &mut self,
655 pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
656 agent_tasks: &mut JoinSet<AgentTaskCompletion>,
657 sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
658 js_commands: &mpsc::Sender<JsCommand>,
659 ) -> anyhow::Result<()> {
660 loop {
661 let Some(request) = pending_requests.front() else {
662 return Ok(());
663 };
664 if matches!(request, WorkflowRuntimeRequest::Agent { .. })
665 && !self.agent_capacity_available(agent_tasks.len())
666 {
667 return Ok(());
668 }
669
670 let request = pending_requests
671 .pop_front()
672 .expect("pending request should exist");
673 match request {
674 WorkflowRuntimeRequest::Agent { .. } => match self.prepare_agent_request(request) {
675 Ok((id, prepared)) => {
676 self.emit_agent_started_event(&id, &prepared).await?;
677 self.spawn_agent_task(agent_tasks, id, prepared);
678 }
679 Err((id, error)) => {
680 send_js_command(
681 js_commands,
682 JsCommand::ResolveRequest {
683 id,
684 resolution: WorkflowRuntimeRequestResolution::Err {
685 message: error.to_string(),
686 },
687 },
688 )
689 .await?;
690 }
691 },
692 WorkflowRuntimeRequest::Sleep { id, duration_ms } => {
693 self.spawn_sleep_task(sleep_tasks, id, duration_ms);
694 }
695 WorkflowRuntimeRequest::Workflow {
696 id,
697 workflow_ref,
698 args,
699 } => {
700 self.workflow_calls.push(WorkflowRuntimeRequest::Workflow {
701 id: id.clone(),
702 workflow_ref: workflow_ref.clone(),
703 args: args.clone(),
704 });
705 let parent_event_step_id = self.event_step_id(&id);
706 let resolution = match self
707 .handle_workflow(parent_event_step_id, workflow_ref, args)
708 .await
709 {
710 Ok(value) => WorkflowRuntimeRequestResolution::OkWithBudget {
711 value,
712 budget: self.budget.clone(),
713 },
714 Err(error) => WorkflowRuntimeRequestResolution::Err {
715 message: error.to_string(),
716 },
717 };
718 send_js_command(js_commands, JsCommand::ResolveRequest { id, resolution })
719 .await?;
720 }
721 }
722 }
723 }
724
725 async fn cancel_workflow(
726 &mut self,
727 pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
728 agent_tasks: &mut JoinSet<AgentTaskCompletion>,
729 sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
730 js_commands: &mpsc::Sender<JsCommand>,
731 js_events: &mut mpsc::Receiver<JsEvent>,
732 ) -> anyhow::Result<RunWorkflowResult> {
733 log::debug!(
734 "workflow cancellation requested script={}",
735 self.script_path.display()
736 );
737
738 if pending_requests.is_empty()
739 && self.active_request_ids.is_empty()
740 && agent_tasks.is_empty()
741 && sleep_tasks.is_empty()
742 && self
743 .reject_next_runtime_request_for_cancellation(js_commands, js_events)
744 .await
745 {
746 bail!("workflow cancelled");
747 }
748
749 self.reject_pending_requests_for_cancellation(pending_requests, js_commands)
750 .await;
751 sleep_tasks.abort_all();
752 self.reject_active_sleep_requests_for_cancellation(sleep_tasks, js_commands)
753 .await;
754
755 if self.session_log_sink.is_some() {
756 while let Some(completion) = agent_tasks.join_next().await {
757 match completion {
758 Ok(AgentTaskCompletion {
759 id,
760 input,
761 provider,
762 result: Ok(result),
763 }) => {
764 self.active_request_ids.remove(&id);
765 if let Err(error) = self
766 .emit_agent_result_events(&id, provider.as_deref(), &result)
767 .await
768 {
769 log::debug!("failed to emit drained agent events during cancellation: {error:#}");
770 }
771 if let Err(error) = self
772 .emit_agent_completed_event(&id, provider.as_deref(), &result)
773 .await
774 {
775 log::debug!("failed to emit drained agent completion during cancellation: {error:#}");
776 }
777 self.record_agent_run(&id, &input, provider, &result);
778 self.reject_request_for_cancellation(id, js_commands).await;
779 }
780 Ok(AgentTaskCompletion {
781 id,
782 provider,
783 result: Err(error),
784 ..
785 }) => {
786 self.active_request_ids.remove(&id);
787 let message = error.to_string();
788 if let Err(error) = self
789 .emit_agent_failed_event(&id, provider.as_deref(), &message)
790 .await
791 {
792 log::debug!("failed to emit drained agent failure during cancellation: {error:#}");
793 }
794 log::debug!("agent task failed while draining cancellation: {message}");
795 self.reject_request_for_cancellation(id, js_commands).await;
796 }
797 Err(error) => {
798 log::debug!("agent task join failed while draining cancellation: {error}");
799 }
800 }
801 }
802 } else {
803 let ids: Vec<String> = self.active_request_ids.iter().cloned().collect();
804 agent_tasks.abort_all();
805 for id in ids {
806 self.active_request_ids.remove(&id);
807 self.reject_request_for_cancellation(id, js_commands).await;
808 }
809 }
810
811 self.reject_remaining_active_requests_for_cancellation(js_commands)
812 .await;
813 self.drain_runtime_after_cancellation(js_events).await;
814 let _ = send_js_command(js_commands, JsCommand::Shutdown).await;
815 bail!("workflow cancelled")
816 }
817
818 async fn reject_next_runtime_request_for_cancellation(
819 &mut self,
820 js_commands: &mpsc::Sender<JsCommand>,
821 js_events: &mut mpsc::Receiver<JsEvent>,
822 ) -> bool {
823 loop {
824 match js_events.recv().await {
825 Some(JsEvent::Call(call)) => {
826 let _ = self.handle_call(call).await;
827 }
828 Some(JsEvent::Request(request)) => {
829 self.reject_request_for_cancellation(request.id().to_string(), js_commands)
830 .await;
831 return false;
832 }
833 Some(JsEvent::Complete(_)) | Some(JsEvent::Error(_)) | None => return true,
834 }
835 }
836 }
837
838 async fn reject_pending_requests_for_cancellation(
839 &mut self,
840 pending_requests: &mut VecDeque<WorkflowRuntimeRequest>,
841 js_commands: &mpsc::Sender<JsCommand>,
842 ) {
843 while let Some(request) = pending_requests.pop_front() {
844 self.reject_request_for_cancellation(request.id().to_string(), js_commands)
845 .await;
846 }
847 }
848
849 async fn reject_active_sleep_requests_for_cancellation(
850 &mut self,
851 sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
852 js_commands: &mpsc::Sender<JsCommand>,
853 ) {
854 while let Some(completion) = sleep_tasks.join_next().await {
855 if let Ok(SleepTaskCompletion { id, .. }) = completion {
856 self.active_request_ids.remove(&id);
857 self.reject_request_for_cancellation(id, js_commands).await;
858 }
859 }
860 }
861
862 async fn reject_remaining_active_requests_for_cancellation(
863 &mut self,
864 js_commands: &mpsc::Sender<JsCommand>,
865 ) {
866 let ids: Vec<String> = self.active_request_ids.iter().cloned().collect();
867 for id in ids {
868 self.active_request_ids.remove(&id);
869 self.reject_request_for_cancellation(id, js_commands).await;
870 }
871 }
872
873 async fn reject_request_for_cancellation(
874 &self,
875 id: String,
876 js_commands: &mpsc::Sender<JsCommand>,
877 ) {
878 let _ = send_js_command(
879 js_commands,
880 JsCommand::ResolveRequest {
881 id,
882 resolution: WorkflowRuntimeRequestResolution::Err {
883 message: "workflow cancelled".to_string(),
884 },
885 },
886 )
887 .await;
888 }
889
890 async fn drain_runtime_after_cancellation(&mut self, js_events: &mut mpsc::Receiver<JsEvent>) {
891 while let Some(event) = js_events.recv().await {
892 match event {
893 JsEvent::Call(call) => {
894 let _ = self.handle_call(call).await;
895 }
896 JsEvent::Request(request) => {
897 log::debug!(
898 "ignoring request after cancellation id={} kind={}",
899 request.id(),
900 request.kind()
901 );
902 }
903 JsEvent::Complete(_) | JsEvent::Error(_) => break,
904 }
905 }
906 }
907
908 fn event_step_id(&self, runtime_request_id: &str) -> String {
909 let parent = self.event_parent_step_id.as_deref().unwrap_or("");
910 let hash = blake3::hash(
911 format!("{parent}:{}:{runtime_request_id}", self.nesting_depth).as_bytes(),
912 );
913 format!("step_{}", &hash.to_hex()[..16])
914 }
915
916 async fn emit_event(&self, mut event: WorkflowEvent) -> anyhow::Result<()> {
917 if (event.event_type.as_str() != "workflow.started" || self.nesting_depth > 0)
918 && event.elapsed_nanos.is_none()
919 {
920 event.elapsed_nanos = Some(elapsed_nanos(self.event_start));
921 }
922 let metadata = event
923 .metadata
924 .get_or_insert_with(WorkflowEventMetadata::default);
925 if metadata.workflow_depth.is_none() {
926 metadata.workflow_depth = Some(u32::try_from(self.nesting_depth).unwrap_or(u32::MAX));
927 }
928 if metadata.parent_step_id.is_none() {
929 metadata.parent_step_id = self.event_parent_step_id.clone();
930 }
931 if let Some(event_sink) = self.event_sink.as_ref() {
932 event_sink.emit(event).await?;
933 }
934 Ok(())
935 }
936
937 async fn handle_call(&mut self, call: WorkflowRuntimeCall) -> anyhow::Result<()> {
938 match call {
939 WorkflowRuntimeCall::Log { values } => {
940 self.emit_event(WorkflowEvent::log(format_log_message(&values)))
941 .await?;
942 self.logs.push(values);
943 }
944 WorkflowRuntimeCall::Phase { name, options } => {
945 let phase = WorkflowPhaseCall { name, options };
946 self.emit_event(WorkflowEvent::phase(
947 phase.name.clone(),
948 phase.options.clone(),
949 ))
950 .await?;
951 self.phases.push(phase);
952 }
953 }
954 Ok(())
955 }
956
957 fn agent_capacity_available(&self, in_flight: usize) -> bool {
958 let max_parallel = self
959 .max_parallel_agent_requests
960 .filter(|value| *value > 0)
961 .unwrap_or(usize::MAX);
962 in_flight < max_parallel
963 }
964
965 fn prepare_agent_request(
966 &mut self,
967 request: WorkflowRuntimeRequest,
968 ) -> Result<(String, PreparedAgentRun), (String, anyhow::Error)> {
969 match request {
970 WorkflowRuntimeRequest::Agent {
971 id,
972 prompt,
973 options,
974 } => {
975 self.agent_calls.push(WorkflowRuntimeRequest::Agent {
976 id: id.clone(),
977 prompt: prompt.clone(),
978 options: options.clone(),
979 });
980 match self.prepare_agent_run(prompt, options) {
981 Ok(prepared) => Ok((id, prepared)),
982 Err(error) => Err((id, error)),
983 }
984 }
985 WorkflowRuntimeRequest::Workflow { .. } | WorkflowRuntimeRequest::Sleep { .. } => {
986 unreachable!("prepare_agent_request only accepts agent requests")
987 }
988 }
989 }
990
991 fn spawn_agent_task(
992 &mut self,
993 agent_tasks: &mut JoinSet<AgentTaskCompletion>,
994 id: String,
995 prepared: PreparedAgentRun,
996 ) {
997 let default_provider_name = self.agent_provider.name().to_string();
998 let default_provider = Arc::clone(&self.agent_provider);
999 let agent_runner = Arc::clone(&self.agent_runner);
1000 let retry_in_runtime = agent_runner.retry_in_runtime();
1001 let cancel_rx = self.cancel_rx.clone();
1002 let completion_input = prepared.input.clone();
1003 let completion_provider = prepared
1004 .provider_override
1005 .clone()
1006 .or(Some(default_provider_name));
1007 let session_log_sink = self.session_log_sink.clone();
1008 let max_parallel = self
1009 .max_parallel_agent_requests
1010 .filter(|value| *value > 0)
1011 .unwrap_or(usize::MAX);
1012 log::debug!(
1013 "starting agent request id={} in_flight_after_start={} max_parallel={}",
1014 id,
1015 agent_tasks.len() + 1,
1016 max_parallel
1017 );
1018 self.active_request_ids.insert(id.clone());
1019 agent_tasks.spawn(async move {
1020 let result = if retry_in_runtime {
1021 run_agent_runner_with_retry(
1022 Arc::clone(&agent_runner),
1023 default_provider,
1024 prepared.provider_override,
1025 prepared.input,
1026 cancel_rx,
1027 )
1028 .await
1029 } else {
1030 agent_runner
1031 .run_agent(default_provider, prepared.provider_override, prepared.input)
1032 .await
1033 };
1034 let result = match result {
1035 Ok(result) => {
1036 if let Some(session_log_sink) = session_log_sink.as_ref() {
1037 let provider_name = completion_provider
1038 .as_deref()
1039 .expect("completion provider should always be set");
1040 match session_log_sink
1041 .write_agent_result(provider_name, &result)
1042 .await
1043 {
1044 Ok(()) => Ok(result),
1045 Err(error) => Err(error),
1046 }
1047 } else {
1048 Ok(result)
1049 }
1050 }
1051 Err(error) => Err(error),
1052 };
1053 AgentTaskCompletion {
1054 id,
1055 input: completion_input,
1056 provider: completion_provider,
1057 result,
1058 }
1059 });
1060 }
1061
1062 fn spawn_sleep_task(
1063 &mut self,
1064 sleep_tasks: &mut JoinSet<SleepTaskCompletion>,
1065 id: String,
1066 duration_ms: u64,
1067 ) {
1068 let agent_runner = Arc::clone(&self.agent_runner);
1069 log::debug!(
1070 "starting sleep request id={} duration_ms={}",
1071 id,
1072 duration_ms
1073 );
1074 self.active_request_ids.insert(id.clone());
1075 sleep_tasks.spawn(async move {
1076 SleepTaskCompletion {
1077 id,
1078 result: agent_runner.sleep(duration_ms).await,
1079 }
1080 });
1081 }
1082
1083 fn prepare_agent_run(
1084 &self,
1085 prompt: String,
1086 options: Option<Value>,
1087 ) -> anyhow::Result<PreparedAgentRun> {
1088 let options = apply_phase_defaults(options, &self.metadata);
1089 let context = AgentProviderContext {
1090 phase: options
1091 .as_ref()
1092 .and_then(|options| options.get("phase"))
1093 .and_then(Value::as_str)
1094 .map(ToString::to_string),
1095 cwd: self.script_path.parent().map(Path::to_path_buf),
1096 };
1097 let provider_override = options
1098 .as_ref()
1099 .and_then(|options| options.get("provider"))
1100 .and_then(Value::as_str)
1101 .map(ToString::to_string);
1102 let provider_name = provider_override
1103 .as_deref()
1104 .unwrap_or_else(|| self.agent_provider.name());
1105 let options = resolve_model_options(options, provider_name, &self.model_map)?;
1106 agent_retry_policy(&options)?;
1107 log::debug!(
1108 "agent call provider={} phase={:?} model={:?} prompt_len={}",
1109 provider_name,
1110 context.phase.as_deref(),
1111 options
1112 .as_ref()
1113 .and_then(|options| options.get("model"))
1114 .and_then(Value::as_str),
1115 prompt.len()
1116 );
1117 Ok(PreparedAgentRun {
1118 provider_override,
1119 input: AgentProviderRunInput {
1120 prompt,
1121 options,
1122 environment: Arc::new(LocalExecutionEnvironment::new(context.cwd.clone())?),
1123 context,
1124 },
1125 })
1126 }
1127
1128 async fn emit_agent_started_event(
1129 &self,
1130 id: &str,
1131 prepared: &PreparedAgentRun,
1132 ) -> anyhow::Result<()> {
1133 let provider = prepared
1134 .provider_override
1135 .as_deref()
1136 .unwrap_or_else(|| self.agent_provider.name());
1137 let metadata = self.agent_event_metadata(id, Some(provider), None);
1138 self.emit_event(WorkflowEvent::agent_started(
1139 serde_json::json!({
1140 "phase": prepared.input.context.phase,
1141 "promptPreview": truncate_for_event(&prepared.input.prompt, 200),
1142 }),
1143 metadata,
1144 ))
1145 .await
1146 }
1147
1148 async fn apply_agent_result(
1149 &mut self,
1150 id: &str,
1151 input: &AgentProviderRunInput,
1152 provider: Option<String>,
1153 result: AgentProviderResult,
1154 ) -> anyhow::Result<Value> {
1155 if let Some(output_tokens) = result.usage.as_ref().and_then(|usage| usage.output_tokens) {
1156 self.budget.spent = self.budget.spent.saturating_add(output_tokens);
1157 }
1158 self.emit_agent_result_events(id, provider.as_deref(), &result)
1159 .await?;
1160 self.emit_agent_completed_event(id, provider.as_deref(), &result)
1161 .await?;
1162 self.record_agent_run(id, input, provider, &result);
1163 log::debug!(
1164 "agent call complete session_id={:?} output_tokens={:?} budget_spent={}",
1165 result.session_id,
1166 result.usage.as_ref().and_then(|usage| usage.output_tokens),
1167 self.budget.spent
1168 );
1169 Ok(result.output)
1170 }
1171
1172 async fn emit_agent_result_events(
1173 &self,
1174 id: &str,
1175 provider: Option<&str>,
1176 result: &AgentProviderResult,
1177 ) -> anyhow::Result<()> {
1178 let Some(raw) = result.raw.as_ref() else {
1179 return Ok(());
1180 };
1181 let metadata = self.agent_event_metadata(id, provider, result.session_id.clone());
1182 for provider_event in raw_agent_event_payloads(raw) {
1183 let event_data = agent_session_event_payload(provider_event, &metadata);
1184 self.emit_event(WorkflowEvent::agent_event(event_data, metadata.clone()))
1185 .await?;
1186 }
1187 Ok(())
1188 }
1189
1190 async fn emit_agent_completed_event(
1191 &self,
1192 id: &str,
1193 provider: Option<&str>,
1194 result: &AgentProviderResult,
1195 ) -> anyhow::Result<()> {
1196 let metadata = self.agent_event_metadata(id, provider, result.session_id.clone());
1197 self.emit_event(WorkflowEvent::agent_completed(
1198 serde_json::json!({
1199 "sessionId": result.session_id,
1200 "model": result.model,
1201 "usage": result.usage,
1202 }),
1203 metadata,
1204 ))
1205 .await
1206 }
1207
1208 async fn emit_agent_failed_event(
1209 &self,
1210 id: &str,
1211 provider: Option<&str>,
1212 message: &str,
1213 ) -> anyhow::Result<()> {
1214 let metadata = self.agent_event_metadata(id, provider, None);
1215 self.emit_event(WorkflowEvent::agent_failed(
1216 serde_json::json!({ "message": message }),
1217 metadata,
1218 ))
1219 .await
1220 }
1221
1222 fn agent_event_metadata(
1223 &self,
1224 id: &str,
1225 provider: Option<&str>,
1226 session_id: Option<String>,
1227 ) -> WorkflowEventMetadata {
1228 WorkflowEventMetadata {
1229 run_id: None,
1230 step_id: Some(self.event_step_id(id)),
1231 provider: Some(
1232 provider
1233 .unwrap_or_else(|| self.agent_provider.name())
1234 .to_string(),
1235 ),
1236 session_id,
1237 workflow_depth: None,
1238 parent_step_id: None,
1239 }
1240 }
1241
1242 fn record_agent_run(
1243 &mut self,
1244 id: &str,
1245 input: &AgentProviderRunInput,
1246 provider: Option<String>,
1247 result: &AgentProviderResult,
1248 ) {
1249 add_usage(&mut self.token_usage, result.usage.as_ref());
1250 if let Some(phase) = input.context.phase.as_ref() {
1251 let phase_usage = self.token_usage_by_phase.entry(phase.clone()).or_default();
1252 add_usage(phase_usage, result.usage.as_ref());
1253 }
1254 let model = result.model.clone().or_else(|| {
1255 input
1256 .options
1257 .as_ref()
1258 .and_then(|options| options.get("model"))
1259 .and_then(Value::as_str)
1260 .map(ToString::to_string)
1261 });
1262 self.agent_runs.push(WorkflowAgentRunSummary {
1263 id: id.to_string(),
1264 phase: input.context.phase.clone(),
1265 provider,
1266 model,
1267 provider_session_id: result.session_id.clone(),
1268 usage: result.usage.clone(),
1269 isolation: result.isolation.clone(),
1270 });
1271 }
1272
1273 async fn handle_workflow(
1274 &mut self,
1275 parent_step_id: String,
1276 workflow_ref: WorkflowRef,
1277 args: Option<Value>,
1278 ) -> anyhow::Result<Value> {
1279 if self.nesting_depth >= 1 {
1280 bail!("Nested workflow() calls are limited to one level");
1281 }
1282 let script_path = match workflow_ref {
1283 WorkflowRef::ScriptPath { script_path } => {
1284 resolve_relative_script(&self.script_path, &script_path)
1285 }
1286 WorkflowRef::Name(name) => resolve_named_workflow(&name)?,
1287 };
1288 log::debug!("child workflow call script={}", script_path.display());
1289 let child = Box::pin(run_workflow_inner(RunWorkflowOptions {
1290 script_path,
1291 args: args.unwrap_or(Value::Null),
1292 agent_provider: Arc::clone(&self.agent_provider),
1293 model_map: self.model_map.clone(),
1294 budget_total: self.budget.total,
1295 budget_spent: self.budget.spent,
1296 nesting_depth: self.nesting_depth + 1,
1297 max_parallel_agent_requests: self.max_parallel_agent_requests,
1298 agent_runner: Some(Arc::clone(&self.agent_runner)),
1299 cancel_rx: self.cancel_rx.clone(),
1300 event_sink: self.event_sink.clone(),
1301 event_parent_step_id: Some(parent_step_id),
1302 event_stream_start: Some(self.event_start),
1303 session_log_sink: self.session_log_sink.clone(),
1304 }))
1305 .await?;
1306 self.budget = child.budget;
1307 self.logs.extend(child.logs);
1308 self.phases.extend(child.phases);
1309 self.agent_calls.extend(child.agent_calls);
1310 self.workflow_calls.extend(child.workflow_calls);
1311 merge_token_usage(&mut self.token_usage, &child.token_usage);
1312 for (phase, usage) in child.token_usage_by_phase {
1313 merge_token_usage(self.token_usage_by_phase.entry(phase).or_default(), &usage);
1314 }
1315 self.agent_runs.extend(child.agent_runs);
1316 Ok(child.output.result)
1317 }
1318}
1319
1320async fn run_agent_runner_with_retry(
1321 agent_runner: Arc<dyn WorkflowAgentRunner>,
1322 default_provider: Arc<dyn AgentProvider>,
1323 provider_override: Option<String>,
1324 input: AgentProviderRunInput,
1325 mut cancel_rx: Option<watch::Receiver<bool>>,
1326) -> anyhow::Result<AgentProviderResult> {
1327 let retry = agent_retry_policy(&input.options)?;
1328 let mut final_result = None;
1329 for attempt in 1..=retry.max_attempts {
1330 let attempt_result = agent_runner
1331 .run_agent(
1332 Arc::clone(&default_provider),
1333 provider_override.clone(),
1334 input.clone(),
1335 )
1336 .await;
1337 match attempt_result {
1338 Ok(result) => {
1339 final_result = Some(Ok(result));
1340 break;
1341 }
1342 Err(error) if attempt < retry.max_attempts => {
1343 log::debug!(
1344 "agent call failed on attempt {attempt}/{}; retrying after {}ms: {error:#}",
1345 retry.max_attempts,
1346 retry.backoff_ms
1347 );
1348 sleep_retry_backoff(retry.backoff_ms, &mut cancel_rx).await?;
1349 }
1350 Err(error) => {
1351 final_result = Some(Err(error));
1352 break;
1353 }
1354 }
1355 }
1356 final_result.unwrap_or_else(|| Err(anyhow!("agent retry loop finished without a result")))
1357}
1358
1359async fn sleep_retry_backoff(
1360 backoff_ms: u64,
1361 cancel_rx: &mut Option<watch::Receiver<bool>>,
1362) -> anyhow::Result<()> {
1363 if backoff_ms == 0 {
1364 return Ok(());
1365 }
1366 let Some(cancel_rx) = cancel_rx.as_mut() else {
1367 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1368 return Ok(());
1369 };
1370 if *cancel_rx.borrow() {
1371 bail!("workflow cancelled");
1372 }
1373 let sleep = tokio::time::sleep(Duration::from_millis(backoff_ms));
1374 tokio::pin!(sleep);
1375 loop {
1376 tokio::select! {
1377 _ = &mut sleep => return Ok(()),
1378 changed = cancel_rx.changed() => {
1379 match changed {
1380 Ok(()) if *cancel_rx.borrow() => bail!("workflow cancelled"),
1381 Ok(()) => continue,
1382 Err(_) => {
1383 sleep.await;
1384 return Ok(());
1385 }
1386 }
1387 }
1388 }
1389 }
1390}
1391
1392pub(crate) async fn run_agent_provider_with_retry(
1393 default_provider: Arc<dyn AgentProvider>,
1394 provider_override: Option<String>,
1395 input: AgentProviderRunInput,
1396 mut cancel_rx: Option<watch::Receiver<bool>>,
1397) -> anyhow::Result<AgentProviderResult> {
1398 let retry = agent_retry_policy(&input.options)?;
1399 let provider = resolve_agent_provider(default_provider, provider_override)?;
1400 let mut final_result = None;
1401 for attempt in 1..=retry.max_attempts {
1402 let attempt_result =
1403 run_agent_with_optional_isolation(Arc::clone(&provider), input.clone()).await;
1404 match attempt_result {
1405 Ok(result) => {
1406 final_result = Some(Ok(result));
1407 break;
1408 }
1409 Err(error) if attempt < retry.max_attempts => {
1410 log::debug!(
1411 "agent provider failed on attempt {attempt}/{}; retrying after {}ms: {error:#}",
1412 retry.max_attempts,
1413 retry.backoff_ms
1414 );
1415 sleep_retry_backoff(retry.backoff_ms, &mut cancel_rx).await?;
1416 }
1417 Err(error) => {
1418 final_result = Some(Err(error));
1419 break;
1420 }
1421 }
1422 }
1423 final_result.unwrap_or_else(|| Err(anyhow!("agent retry loop finished without a result")))
1424}
1425
1426pub(crate) async fn run_agent_provider(
1427 default_provider: Arc<dyn AgentProvider>,
1428 provider_override: Option<String>,
1429 input: AgentProviderRunInput,
1430) -> anyhow::Result<AgentProviderResult> {
1431 let provider = resolve_agent_provider(default_provider, provider_override)?;
1432 run_agent_with_optional_isolation(provider, input).await
1433}
1434
1435fn resolve_agent_provider(
1436 default_provider: Arc<dyn AgentProvider>,
1437 provider_override: Option<String>,
1438) -> anyhow::Result<Arc<dyn AgentProvider>> {
1439 if let Some(provider_override) = provider_override {
1440 Ok(Arc::from(create_agent_provider(&provider_override)?))
1441 } else {
1442 Ok(default_provider)
1443 }
1444}
1445
1446#[derive(Debug, Clone, Copy)]
1447pub(crate) struct AgentRetryPolicy {
1448 pub max_attempts: u32,
1449 pub backoff_ms: u64,
1450}
1451
1452pub(crate) fn agent_retry_policy(options: &Option<Value>) -> anyhow::Result<AgentRetryPolicy> {
1453 let default = AgentRetryPolicy {
1454 max_attempts: 1,
1455 backoff_ms: 0,
1456 };
1457 let Some(retry) = options.as_ref().and_then(|options| options.get("retry")) else {
1458 return Ok(default);
1459 };
1460 if retry.is_null() {
1461 return Ok(default);
1462 }
1463 let object = retry
1464 .as_object()
1465 .ok_or_else(|| anyhow!("agent retry option must be an object"))?;
1466 let max_attempts = match object.get("maxAttempts") {
1467 Some(value) => {
1468 let value = value
1469 .as_u64()
1470 .ok_or_else(|| anyhow!("agent retry.maxAttempts must be a positive integer"))?;
1471 if value == 0 || value > u32::MAX as u64 {
1472 bail!("agent retry.maxAttempts must be between 1 and {}", u32::MAX);
1473 }
1474 value as u32
1475 }
1476 None => default.max_attempts,
1477 };
1478 let backoff_ms = match object.get("backoffMs") {
1479 Some(value) => value
1480 .as_u64()
1481 .ok_or_else(|| anyhow!("agent retry.backoffMs must be a non-negative integer"))?,
1482 None => default.backoff_ms,
1483 };
1484 Ok(AgentRetryPolicy {
1485 max_attempts,
1486 backoff_ms,
1487 })
1488}
1489
1490async fn run_agent_with_optional_isolation(
1491 provider: Arc<dyn AgentProvider>,
1492 input: AgentProviderRunInput,
1493) -> anyhow::Result<AgentProviderResult> {
1494 if let Some(request) = sandbox_isolation_request(&input.options)? {
1495 return run_agent_with_sandbox_isolation(provider, input, request).await;
1496 }
1497
1498 if requests_worktree_isolation(&input.options) {
1499 return run_agent_with_worktree_isolation(provider, input).await;
1500 }
1501
1502 run_agent_with_schema_validation(provider, input).await
1503}
1504
1505async fn run_agent_with_worktree_isolation(
1506 provider: Arc<dyn AgentProvider>,
1507 input: AgentProviderRunInput,
1508) -> anyhow::Result<AgentProviderResult> {
1509 let isolation = WorktreeIsolation::create(input.context.cwd.as_deref())?;
1510 let isolation_info = isolation.info();
1511 let mut isolated_input = input;
1512 isolated_input.context.cwd = Some(isolation.cwd.clone());
1513 isolated_input.environment =
1514 Arc::new(LocalExecutionEnvironment::new(Some(isolation.cwd.clone()))?);
1515 let mut result = run_agent_with_schema_validation(provider, isolated_input).await;
1516 if let Ok(result) = &mut result {
1517 result.isolation = Some(isolation_info);
1518 }
1519 if let Err(error) = isolation.cleanup() {
1520 log::warn!("failed to cleanup isolated agent worktree: {error:#}");
1521 }
1522 result
1523}
1524
1525async fn run_agent_with_sandbox_isolation(
1526 provider: Arc<dyn AgentProvider>,
1527 input: AgentProviderRunInput,
1528 request: SandboxIsolationRequest,
1529) -> anyhow::Result<AgentProviderResult> {
1530 let program = sandbox_provider_program(&request);
1531 let sandbox_group_id = format!("sbxgrp_{}", ulid::Ulid::new());
1532 let cwd = input.context.cwd.clone().ok_or_else(|| {
1533 anyhow!("agent sandbox isolation requires the workflow cwd to be available")
1534 })?;
1535 let sandbox_env = SandboxExecutionEnvironment::open(
1536 program,
1537 OpenSandboxRequest {
1538 metadata: SandboxMetadata::new(
1539 format!("req_{}", ulid::Ulid::new()),
1540 sandbox_group_id.clone(),
1541 ),
1542 profile: ProfileRef {
1543 provider: request.provider.clone(),
1544 name: request.profile.clone(),
1545 },
1546 workspace_sync: WorkspaceSync { host_path: cwd },
1547 cwd: request.cwd.clone(),
1548 },
1549 )
1550 .await
1551 .with_context(|| {
1552 format!(
1553 "failed to open sandbox profile `{}` with provider `{}`",
1554 request.profile, request.provider
1555 )
1556 })?;
1557
1558 let session = sandbox_env.session().clone();
1559 let mut isolated_input = input;
1560 isolated_input.context.cwd = sandbox_env.cwd().map(|path| PathBuf::from(path.as_str()));
1561 isolated_input.environment = Arc::new(sandbox_env.clone());
1562 let isolation_info = AgentRunIsolation {
1563 kind: "sandbox".to_string(),
1564 branch: None,
1565 worktree_path: None,
1566 cwd: session.cwd.clone(),
1567 profile: Some(request.profile.clone()),
1568 provider: Some(request.provider.clone()),
1569 session_id: Some(session.id.clone()),
1570 };
1571
1572 let mut result = run_agent_with_schema_validation(provider, isolated_input).await;
1573 if let Ok(result) = &mut result {
1574 result.isolation = Some(isolation_info);
1575 }
1576
1577 if let Err(error) = sandbox_env.close().await {
1578 log::warn!("failed to close sandbox-isolated agent session: {error:#}");
1579 }
1580
1581 result
1582}
1583
1584fn requests_worktree_isolation(options: &Option<Value>) -> bool {
1585 options
1586 .as_ref()
1587 .and_then(|options| options.get("isolation"))
1588 .and_then(Value::as_str)
1589 == Some("worktree")
1590}
1591
1592#[derive(Debug, Clone)]
1593struct SandboxIsolationRequest {
1594 provider: String,
1595 profile: String,
1596 cwd: Option<String>,
1597}
1598
1599fn sandbox_isolation_request(
1600 options: &Option<Value>,
1601) -> anyhow::Result<Option<SandboxIsolationRequest>> {
1602 let Some(isolation) = options
1603 .as_ref()
1604 .and_then(|options| options.get("isolation"))
1605 else {
1606 return Ok(None);
1607 };
1608 let Some(object) = isolation.as_object() else {
1609 return Ok(None);
1610 };
1611 if object.get("type").and_then(Value::as_str) != Some("sandbox") {
1612 return Ok(None);
1613 }
1614 let profile = object
1615 .get("profile")
1616 .and_then(Value::as_str)
1617 .ok_or_else(|| anyhow!("agent sandbox isolation requires isolation.profile"))?
1618 .to_string();
1619 let cwd = object
1620 .get("cwd")
1621 .and_then(Value::as_str)
1622 .map(ToString::to_string);
1623 let (provider, profile) = parse_sandbox_profile_ref(&profile)?;
1624 Ok(Some(SandboxIsolationRequest {
1625 provider,
1626 profile,
1627 cwd,
1628 }))
1629}
1630
1631fn parse_sandbox_profile_ref(value: &str) -> anyhow::Result<(String, String)> {
1632 let (provider, profile) = value.split_once('/').ok_or_else(|| {
1633 anyhow!("agent sandbox isolation.profile must use <provider>/<profile>, got `{value}`")
1634 })?;
1635 validate_sandbox_provider_name(provider)?;
1636 if profile.is_empty() {
1637 bail!(
1638 "agent sandbox isolation.profile must include a non-empty provider-local profile name"
1639 );
1640 }
1641 Ok((provider.to_string(), profile.to_string()))
1642}
1643
1644fn validate_sandbox_provider_name(provider: &str) -> anyhow::Result<()> {
1645 let valid = !provider.is_empty()
1646 && provider
1647 .bytes()
1648 .all(|byte| byte.is_ascii_lowercase() || byte.is_ascii_digit() || byte == b'-')
1649 && provider
1650 .as_bytes()
1651 .first()
1652 .is_some_and(u8::is_ascii_alphanumeric)
1653 && provider
1654 .as_bytes()
1655 .last()
1656 .is_some_and(u8::is_ascii_alphanumeric);
1657 if !valid {
1658 bail!("invalid sandbox provider name `{provider}`; expected lowercase letters, digits, and hyphens, starting and ending with an alphanumeric character");
1659 }
1660 Ok(())
1661}
1662
1663fn sandbox_provider_program(request: &SandboxIsolationRequest) -> String {
1664 format!("smol-sandbox-{}", request.provider)
1665}
1666
1667struct WorktreeIsolation {
1668 repo_root: PathBuf,
1669 worktree_root: PathBuf,
1670 cwd: PathBuf,
1671 branch_name: String,
1672 cleaned: bool,
1673 _temp_dir: tempfile::TempDir,
1674}
1675
1676impl WorktreeIsolation {
1677 fn create(cwd: Option<&Path>) -> anyhow::Result<Self> {
1678 let cwd = cwd
1679 .map(Path::to_path_buf)
1680 .unwrap_or(std::env::current_dir()?)
1681 .canonicalize()
1682 .context("failed to canonicalize workflow cwd for worktree isolation")?;
1683 let repo_root = git_output(&cwd, &["rev-parse", "--show-toplevel"]).context(
1684 "agent isolation='worktree' requires the workflow cwd to be inside a git repository",
1685 )?;
1686 let repo_root = PathBuf::from(repo_root.trim())
1687 .canonicalize()
1688 .context("failed to canonicalize git repository root for worktree isolation")?;
1689 let relative_cwd = cwd.strip_prefix(&repo_root).with_context(|| {
1690 format!(
1691 "workflow cwd {} is not under git repository root {}",
1692 cwd.display(),
1693 repo_root.display()
1694 )
1695 })?;
1696
1697 let temp_dir = tempfile::Builder::new()
1698 .prefix("smol-wf-agent-worktree-")
1699 .tempdir()
1700 .context("failed to create temp directory for agent worktree isolation")?;
1701 let worktree_root = temp_dir.path().join("worktree");
1702 let worktree_arg = path_arg(&worktree_root);
1703 let branch_name = format!(
1704 "smol-wf/agent-run/{}",
1705 ulid::Ulid::new().to_string().to_ascii_lowercase()
1706 );
1707 git_status(
1708 &repo_root,
1709 &[
1710 "worktree",
1711 "add",
1712 "--quiet",
1713 "-b",
1714 &branch_name,
1715 &worktree_arg,
1716 "HEAD",
1717 ],
1718 )
1719 .context("failed to create isolated git worktree for agent run")?;
1720 let isolated_cwd = if relative_cwd.as_os_str().is_empty() {
1721 worktree_root.clone()
1722 } else {
1723 worktree_root.join(relative_cwd)
1724 };
1725 Ok(Self {
1726 repo_root,
1727 worktree_root,
1728 cwd: isolated_cwd,
1729 branch_name,
1730 cleaned: false,
1731 _temp_dir: temp_dir,
1732 })
1733 }
1734
1735 fn info(&self) -> AgentRunIsolation {
1736 AgentRunIsolation {
1737 kind: "worktree".to_string(),
1738 branch: Some(self.branch_name.clone()),
1739 worktree_path: Some(path_arg(&self.worktree_root)),
1740 cwd: Some(path_arg(&self.cwd)),
1741 profile: None,
1742 provider: None,
1743 session_id: None,
1744 }
1745 }
1746
1747 fn cleanup(mut self) -> anyhow::Result<()> {
1748 self.remove_worktree()?;
1749 self.delete_branch()?;
1750 self.cleaned = true;
1751 Ok(())
1752 }
1753
1754 fn remove_worktree(&self) -> anyhow::Result<()> {
1755 let worktree_arg = path_arg(&self.worktree_root);
1756 git_status(
1757 &self.repo_root,
1758 &["worktree", "remove", "--force", &worktree_arg],
1759 )
1760 .context("failed to remove isolated git worktree")
1761 }
1762
1763 fn delete_branch(&self) -> anyhow::Result<()> {
1764 git_status(&self.repo_root, &["branch", "-D", &self.branch_name])
1765 .context("failed to delete isolated agent worktree branch")
1766 }
1767}
1768
1769impl Drop for WorktreeIsolation {
1770 fn drop(&mut self) {
1771 if !self.cleaned {
1772 if let Err(error) = self.remove_worktree() {
1773 log::warn!("failed to cleanup isolated agent worktree during drop: {error:#}");
1774 }
1775 if let Err(error) = self.delete_branch() {
1776 log::warn!(
1777 "failed to delete isolated agent worktree branch during drop: {error:#}"
1778 );
1779 }
1780 }
1781 }
1782}
1783
1784fn path_arg(path: &Path) -> String {
1785 path.to_string_lossy().into_owned()
1786}
1787
1788fn git_output(cwd: &Path, args: &[&str]) -> anyhow::Result<String> {
1789 let output = StdCommand::new("git")
1790 .args(args)
1791 .current_dir(cwd)
1792 .output()
1793 .with_context(|| format!("failed to run git {}", args.join(" ")))?;
1794 if output.status.success() {
1795 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
1796 } else {
1797 bail!(
1798 "git {} failed with {}{}",
1799 args.join(" "),
1800 status_text(output.status.code()),
1801 command_stderr(&output.stderr)
1802 )
1803 }
1804}
1805
1806fn git_status(cwd: &Path, args: &[&str]) -> anyhow::Result<()> {
1807 let output = StdCommand::new("git")
1808 .args(args)
1809 .current_dir(cwd)
1810 .output()
1811 .with_context(|| format!("failed to run git {}", args.join(" ")))?;
1812 if output.status.success() {
1813 Ok(())
1814 } else {
1815 bail!(
1816 "git {} failed with {}{}",
1817 args.join(" "),
1818 status_text(output.status.code()),
1819 command_stderr(&output.stderr)
1820 )
1821 }
1822}
1823
1824fn status_text(code: Option<i32>) -> String {
1825 code.map(|code| format!("code {code}"))
1826 .unwrap_or_else(|| "signal".to_string())
1827}
1828
1829fn command_stderr(stderr: &[u8]) -> String {
1830 let stderr = String::from_utf8_lossy(stderr);
1831 let stderr = stderr.trim();
1832 if stderr.is_empty() {
1833 String::new()
1834 } else {
1835 format!(": {stderr}")
1836 }
1837}
1838
1839async fn run_agent_with_schema_validation(
1840 provider: Arc<dyn AgentProvider>,
1841 input: AgentProviderRunInput,
1842) -> anyhow::Result<AgentProviderResult> {
1843 let Some(schema) = input
1844 .options
1845 .as_ref()
1846 .and_then(|options| options.get("schema"))
1847 .cloned()
1848 else {
1849 return provider.run(input).await;
1850 };
1851
1852 let max_attempts = 2;
1853 let original_prompt = input.prompt.clone();
1854 let mut attempt_input = input;
1855 let mut last_errors = Vec::new();
1856
1857 for attempt in 1..=max_attempts {
1858 let result = provider.run(attempt_input.clone()).await?;
1859 match validate_structured_output(&schema, &result.output) {
1860 Ok(()) => return Ok(result),
1861 Err(errors) => {
1862 last_errors = errors;
1863 if attempt < max_attempts {
1864 attempt_input.prompt =
1865 with_structured_output_retry_prompt(&original_prompt, &last_errors);
1866 }
1867 }
1868 }
1869 }
1870
1871 bail!(
1872 "{}",
1873 format_structured_output_validation_error(&last_errors)
1874 )
1875}
1876
1877fn validate_structured_output(schema: &Value, output: &Value) -> Result<(), Vec<String>> {
1878 let validator = jsonschema::validator_for(schema)
1879 .map_err(|error| vec![format!("/ schema is invalid: {}", error)])?;
1880 let errors = validator
1881 .iter_errors(output)
1882 .map(|error| {
1883 let path = error.instance_path().to_string();
1884 let path = if path.is_empty() {
1885 "/".to_string()
1886 } else {
1887 path
1888 };
1889 format!("{path} {error}")
1890 })
1891 .collect::<Vec<_>>();
1892
1893 if errors.is_empty() {
1894 Ok(())
1895 } else {
1896 Err(errors)
1897 }
1898}
1899
1900fn format_structured_output_validation_error(errors: &[String]) -> String {
1901 format!(
1902 "Structured output did not match JSON Schema: {}",
1903 errors.join("; ")
1904 )
1905}
1906
1907fn with_structured_output_retry_prompt(prompt: &str, errors: &[String]) -> String {
1908 let mut lines = vec![
1909 prompt.to_string(),
1910 String::new(),
1911 "Previous structured output failed JSON Schema validation.".to_string(),
1912 "Return a corrected structured output that satisfies the original JSON Schema.".to_string(),
1913 "Validation errors:".to_string(),
1914 ];
1915 lines.extend(errors.iter().map(|error| format!("- {error}")));
1916 lines.join("\n")
1917}
1918
1919#[derive(Debug, Clone, PartialEq, Eq)]
1920struct ResolvedModelSelector {
1921 requested: String,
1922 selector: String,
1923 model_id: String,
1924 model_provider: Option<String>,
1925 thinking: Option<String>,
1926}
1927
1928impl ResolvedModelSelector {
1929 fn provider_model(&self) -> String {
1930 match &self.model_provider {
1931 Some(provider) => format!("{provider}/{}", self.model_id),
1932 None => self.model_id.clone(),
1933 }
1934 }
1935}
1936
1937fn resolve_model_options(
1938 options: Option<Value>,
1939 agent_provider: &str,
1940 model_map: &BTreeMap<String, String>,
1941) -> anyhow::Result<Option<Value>> {
1942 let Some(model) = options
1943 .as_ref()
1944 .and_then(Value::as_object)
1945 .and_then(|object| object.get("model"))
1946 .and_then(Value::as_str)
1947 .map(ToString::to_string)
1948 else {
1949 return Ok(options);
1950 };
1951
1952 let mapped_selector = model_map.get(&model).cloned();
1953 let alias_matched = mapped_selector.is_some();
1954 let selector = mapped_selector.unwrap_or_else(|| model.clone());
1955 let resolved = parse_model_selector(&model, &selector)?;
1956 validate_model_selector_for_provider(&resolved, agent_provider)?;
1957
1958 let mut object = options
1959 .and_then(|value| value.as_object().cloned())
1960 .unwrap_or_default();
1961 object.insert(
1962 "model".to_string(),
1963 Value::String(resolved.provider_model()),
1964 );
1965
1966 let selector_has_extra_parts = alias_matched
1967 || resolved.selector.contains('?')
1968 || resolved.model_provider.is_some()
1969 || resolved.thinking.is_some();
1970 if selector_has_extra_parts {
1971 object.insert(
1972 "requestedModel".to_string(),
1973 Value::String(resolved.requested.clone()),
1974 );
1975 object.insert(
1976 "modelSelector".to_string(),
1977 Value::String(resolved.selector.clone()),
1978 );
1979 } else {
1980 object.remove("requestedModel");
1981 object.remove("modelSelector");
1982 }
1983
1984 if let Some(provider) = resolved.model_provider {
1985 object.insert("modelProvider".to_string(), Value::String(provider));
1986 } else {
1987 object.remove("modelProvider");
1988 }
1989 if let Some(thinking) = resolved.thinking {
1990 object.insert("thinking".to_string(), Value::String(thinking));
1991 } else {
1992 object.remove("thinking");
1993 }
1994 Ok(Some(Value::Object(object)))
1995}
1996
1997fn parse_model_selector(requested: &str, selector: &str) -> anyhow::Result<ResolvedModelSelector> {
1998 let (model_part, query) = selector.split_once('?').unwrap_or((selector, ""));
1999 if model_part.trim().is_empty() {
2000 bail!("model selector must include a model id: {selector}");
2001 }
2002
2003 let (slash_provider, model_id) = match model_part.split_once('/') {
2004 Some((provider, model_id)) if !provider.is_empty() && !model_id.is_empty() => {
2005 (Some(provider.to_string()), model_id.to_string())
2006 }
2007 Some(_) => bail!("model selector provider/model form is invalid: {selector}"),
2008 None => (None, model_part.to_string()),
2009 };
2010
2011 let mut query_provider = None::<String>;
2012 let mut thinking = None::<String>;
2013 if !query.is_empty() {
2014 for pair in query.split('&') {
2015 if pair.is_empty() {
2016 continue;
2017 }
2018 let (key, value) = pair.split_once('=').ok_or_else(|| {
2019 anyhow!("model selector query parameter must use key=value: {pair}")
2020 })?;
2021 let key = percent_decode(key)?;
2022 let value = percent_decode(value)?;
2023 if value.is_empty() {
2024 bail!("model selector query parameter `{key}` must not be empty");
2025 }
2026 match key.as_str() {
2027 "provider" => set_unique_query_value(&mut query_provider, key, value)?,
2028 "thinking" => set_unique_query_value(&mut thinking, key, value)?,
2029 _ => bail!("unknown model selector query parameter `{key}`"),
2030 }
2031 }
2032 }
2033
2034 let model_provider = match (slash_provider, query_provider) {
2035 (Some(slash), Some(query)) if slash != query => bail!(
2036 "conflicting model provider qualifiers in selector `{selector}`: `{slash}` and `{query}`"
2037 ),
2038 (Some(provider), Some(_)) | (Some(provider), None) | (None, Some(provider)) => {
2039 Some(provider)
2040 }
2041 (None, None) => None,
2042 };
2043
2044 Ok(ResolvedModelSelector {
2045 requested: requested.to_string(),
2046 selector: selector.to_string(),
2047 model_id,
2048 model_provider,
2049 thinking,
2050 })
2051}
2052
2053fn set_unique_query_value(
2054 target: &mut Option<String>,
2055 key: String,
2056 value: String,
2057) -> anyhow::Result<()> {
2058 if target.replace(value).is_some() {
2059 bail!("duplicate model selector query parameter `{key}`");
2060 }
2061 Ok(())
2062}
2063
2064fn percent_decode(value: &str) -> anyhow::Result<String> {
2065 let bytes = value.as_bytes();
2066 let mut output = Vec::with_capacity(bytes.len());
2067 let mut index = 0;
2068 while index < bytes.len() {
2069 match bytes[index] {
2070 b'%' => {
2071 if index + 2 >= bytes.len() {
2072 bail!("invalid percent escape in model selector query: {value}");
2073 }
2074 let high = hex_value(bytes[index + 1]).ok_or_else(|| {
2075 anyhow!("invalid percent escape in model selector query: {value}")
2076 })?;
2077 let low = hex_value(bytes[index + 2]).ok_or_else(|| {
2078 anyhow!("invalid percent escape in model selector query: {value}")
2079 })?;
2080 output.push((high << 4) | low);
2081 index += 3;
2082 }
2083 b'+' => {
2084 output.push(b' ');
2085 index += 1;
2086 }
2087 byte => {
2088 output.push(byte);
2089 index += 1;
2090 }
2091 }
2092 }
2093 String::from_utf8(output).context("model selector query is not valid UTF-8")
2094}
2095
2096fn hex_value(byte: u8) -> Option<u8> {
2097 match byte {
2098 b'0'..=b'9' => Some(byte - b'0'),
2099 b'a'..=b'f' => Some(byte - b'a' + 10),
2100 b'A'..=b'F' => Some(byte - b'A' + 10),
2101 _ => None,
2102 }
2103}
2104
2105fn validate_model_selector_for_provider(
2106 resolved: &ResolvedModelSelector,
2107 agent_provider: &str,
2108) -> anyhow::Result<()> {
2109 match agent_provider {
2110 "codex" => {
2111 if resolved.model_provider.is_some() {
2112 bail!("Codex model selectors do not support ?provider=... or provider/model form");
2113 }
2114 if resolved.thinking.is_some() {
2115 bail!("Codex model selectors do not support thinking=...");
2116 }
2117 }
2118 "claude-code" if resolved.model_provider.is_some() => {
2119 bail!(
2120 "Claude Code model selectors do not support ?provider=... or provider/model form"
2121 );
2122 }
2123 "opencode" if resolved.model_provider.is_none() => {
2124 bail!("OpenCode model selectors must use provider/model or ?provider=...");
2125 }
2126 "debug" | "pi" => {}
2127 _ => {}
2128 }
2129 Ok(())
2130}
2131
2132fn apply_phase_defaults(options: Option<Value>, metadata: &WorkflowMetadata) -> Option<Value> {
2133 let phase_name = options
2134 .as_ref()
2135 .and_then(|options| options.get("phase"))
2136 .and_then(Value::as_str)
2137 .map(ToString::to_string);
2138 let phase_metadata = phase_name.as_ref().and_then(|phase_name| {
2139 metadata
2140 .phases
2141 .iter()
2142 .find(|phase| phase.title == *phase_name)
2143 });
2144
2145 if phase_name.is_none() && phase_metadata.is_none() {
2146 return options;
2147 }
2148
2149 let mut object = options
2150 .and_then(|value| value.as_object().cloned())
2151 .unwrap_or_default();
2152
2153 if let Some(phase_name) = phase_name {
2154 object
2155 .entry("phase".to_string())
2156 .or_insert(Value::String(phase_name));
2157 }
2158 if let Some(model) = phase_metadata.and_then(|phase| phase.model.clone()) {
2159 object
2160 .entry("model".to_string())
2161 .or_insert(Value::String(model));
2162 }
2163 if let Some(provider) = phase_metadata.and_then(|phase| phase.provider.clone()) {
2164 object
2165 .entry("provider".to_string())
2166 .or_insert(Value::String(provider));
2167 }
2168
2169 Some(Value::Object(object))
2170}
2171
2172fn resolve_relative_script(current_script_path: &Path, script_path: &str) -> PathBuf {
2173 let script_path = PathBuf::from(script_path);
2174 if script_path.is_absolute() {
2175 script_path
2176 } else {
2177 current_script_path
2178 .parent()
2179 .unwrap_or_else(|| Path::new("."))
2180 .join(script_path)
2181 }
2182}
2183
2184fn resolve_named_workflow(name: &str) -> anyhow::Result<PathBuf> {
2185 let workflows_dir = PathBuf::from(".claude/workflows");
2186 for entry in fs::read_dir(&workflows_dir).unwrap_or_else(|_| fs::read_dir(".").unwrap()) {
2187 let entry = entry?;
2188 let path = entry.path();
2189 if path.extension().and_then(|extension| extension.to_str()) != Some("js") {
2190 continue;
2191 }
2192 if read_workflow_metadata(&path)?.is_some_and(|metadata| metadata.name == name) {
2193 return Ok(path);
2194 }
2195 }
2196 bail!("Unknown workflow: {name}")
2197}