1use crate::agent_providers::{AgentProvider, AgentProviderResult, AgentProviderRunInput};
8use crate::durable::json::{
9 DurableRunMode, FailureReasonJSON, LocalTaskParamsJSON, WorkflowRunJSON,
10};
11use crate::events::{WorkflowEvent, WorkflowEventMetadata, WorkflowEventSink};
12use crate::metadata::read_workflow_metadata;
13use crate::workflow::{
14 run_agent_provider_with_retry, run_workflow, RunWorkflowOptions, RunWorkflowResult,
15 WorkflowAgentRunner,
16};
17use anyhow::{bail, Context};
18use rusqlite::OptionalExtension;
19use serde_json::Value;
20use std::collections::{BTreeMap, HashMap};
21use std::path::PathBuf;
22use std::sync::{Arc, Mutex};
23use std::time::{Duration, Instant};
24use tokio::sync::watch;
25
26use super::sqlite::{new_id, now_ms, SqliteDurableStore};
27
28const WORKFLOW_TASK_NAME: &str = "workflow.run";
29const LOCAL_CLAIM_SCOPE: &str = "local";
30const DEFAULT_STEP_LEASE_MS: u64 = 60_000;
31
32pub struct LocalDurableRunOptions {
34 pub script_path: PathBuf,
35 pub workflow_cwd: Option<PathBuf>,
36 pub args: Value,
37 pub agent_provider: Arc<dyn AgentProvider>,
38 pub model_map: BTreeMap<String, String>,
39 pub budget_total: Option<u64>,
40 pub max_parallel_agent_requests: Option<usize>,
41 pub resume_run_id: Option<String>,
42 pub cancel_rx: Option<watch::Receiver<bool>>,
43 pub event_sink: Option<Arc<dyn crate::workflow::WorkflowEventSink>>,
44 pub session_log_sink: Option<Arc<dyn crate::workflow::AgentSessionLogSink>>,
45}
46
47impl LocalDurableRunOptions {
48 pub fn new(script_path: PathBuf, args: Value, agent_provider: Arc<dyn AgentProvider>) -> Self {
49 Self {
50 script_path,
51 workflow_cwd: None,
52 args,
53 agent_provider,
54 model_map: BTreeMap::new(),
55 budget_total: None,
56 max_parallel_agent_requests: None,
57 resume_run_id: None,
58 cancel_rx: None,
59 event_sink: None,
60 session_log_sink: None,
61 }
62 }
63}
64
65struct RunScopedWorkflowEventSink {
66 inner: Arc<dyn WorkflowEventSink>,
67 run_id: String,
68 start: Instant,
69}
70
71impl RunScopedWorkflowEventSink {
72 fn new(inner: Arc<dyn WorkflowEventSink>, run_id: String) -> Self {
73 Self {
74 inner,
75 run_id,
76 start: Instant::now(),
77 }
78 }
79}
80
81#[async_trait::async_trait]
82impl WorkflowEventSink for RunScopedWorkflowEventSink {
83 async fn emit(&self, event: WorkflowEvent) -> anyhow::Result<()> {
84 let workflow_depth = event
85 .metadata
86 .as_ref()
87 .and_then(|metadata| metadata.workflow_depth)
88 .unwrap_or(0);
89 if workflow_depth == 0
90 && matches!(
91 event.event_type.as_str(),
92 "workflow.started" | "workflow.result" | "workflow.error"
93 )
94 {
95 return Ok(());
96 }
97 self.emit_scoped(event).await
98 }
99}
100
101impl RunScopedWorkflowEventSink {
102 async fn emit_scoped(&self, mut event: WorkflowEvent) -> anyhow::Result<()> {
103 let metadata = event
104 .metadata
105 .get_or_insert_with(WorkflowEventMetadata::default);
106 if metadata.run_id.is_none() {
107 metadata.run_id = Some(self.run_id.clone());
108 }
109 if metadata.workflow_depth.is_none() {
110 metadata.workflow_depth = Some(0);
111 }
112 if event.event_type.as_str() != "workflow.started" && event.elapsed_nanos.is_none() {
113 event.elapsed_nanos = Some(elapsed_nanos(self.start));
114 }
115 self.inner.emit(event).await
116 }
117}
118
119fn elapsed_nanos(start: Instant) -> u64 {
120 u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX)
121}
122
123fn durable_agent_step_lease_expires_at(
124 now: i64,
125 retry_policy: crate::workflow::AgentRetryPolicy,
126) -> i64 {
127 let retry_backoff_budget = retry_policy
128 .backoff_ms
129 .saturating_mul(u64::from(retry_policy.max_attempts.saturating_sub(1)));
130 let lease_ms = DEFAULT_STEP_LEASE_MS.saturating_add(retry_backoff_budget);
131 let lease_ms = i64::try_from(lease_ms).unwrap_or(i64::MAX);
132 now.saturating_add(lease_ms)
133}
134
135fn rfc3339_now() -> anyhow::Result<String> {
136 Ok(time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339)?)
137}
138
139#[derive(Debug)]
141pub struct LocalDurableRunResult {
142 pub task_id: String,
143 pub run_id: String,
144 pub attempts: u32,
145 pub workflow: RunWorkflowResult,
146}
147
148#[derive(Debug)]
149pub struct SqliteDurableAgentRunner {
150 db_path: PathBuf,
151 run_id: String,
152 root_run_id: String,
153 worker_id: String,
154 cancel_rx: Option<watch::Receiver<bool>>,
155 occurrences: Mutex<HashMap<String, u64>>,
156}
157
158impl SqliteDurableAgentRunner {
159 pub fn new(
160 db_path: PathBuf,
161 run_id: String,
162 root_run_id: String,
163 worker_id: String,
164 cancel_rx: Option<watch::Receiver<bool>>,
165 ) -> Self {
166 Self {
167 db_path,
168 run_id,
169 root_run_id,
170 worker_id,
171 cancel_rx,
172 occurrences: Mutex::new(HashMap::new()),
173 }
174 }
175
176 fn next_checkpoint_name(&self, base_checkpoint_name: String) -> anyhow::Result<String> {
177 let mut occurrences = self
178 .occurrences
179 .lock()
180 .map_err(|_| anyhow::anyhow!("durable occurrence counter lock was poisoned"))?;
181 let count = occurrences.entry(base_checkpoint_name.clone()).or_insert(0);
182 *count += 1;
183 if *count == 1 {
184 Ok(base_checkpoint_name)
185 } else {
186 Ok(format!("{base_checkpoint_name}#{count}"))
187 }
188 }
189}
190
191#[async_trait::async_trait]
192impl WorkflowAgentRunner for SqliteDurableAgentRunner {
193 fn retry_in_runtime(&self) -> bool {
194 false
195 }
196
197 async fn run_agent(
198 &self,
199 default_provider: Arc<dyn AgentProvider>,
200 provider_override: Option<String>,
201 input: AgentProviderRunInput,
202 ) -> anyhow::Result<AgentProviderResult> {
203 let provider_name = provider_override
204 .as_deref()
205 .unwrap_or_else(|| default_provider.name())
206 .to_string();
207 let input_signature = agent_input_signature(&provider_name, &input);
208 let input_signature_json = canonical_json_string(&input_signature)?;
209 let input_signature_hash = short_blake3_hex(&input_signature_json);
210 let base_checkpoint_name = format!("step:sig_{input_signature_hash}");
211 let checkpoint_name = self.next_checkpoint_name(base_checkpoint_name)?;
212 let input_json = serde_json::to_value(&input_signature)
213 .context("failed to serialize durable agent input")?;
214 let retry_policy = crate::workflow::agent_retry_policy(&input.options)?;
215
216 loop {
217 let claim = {
218 let mut store = SqliteDurableStore::open(&self.db_path)?;
219 store.claim_or_replay_agent_step(AgentStepClaimInput {
220 run_id: &self.run_id,
221 root_run_id: &self.root_run_id,
222 checkpoint_name: &checkpoint_name,
223 input_signature_hash: &input_signature_hash,
224 input_signature_json: &input_signature_json,
225 input_json: &input_json,
226 worker_id: &self.worker_id,
227 lease_expires_at: durable_agent_step_lease_expires_at(now_ms(), retry_policy),
228 now: now_ms(),
229 })?
230 };
231
232 match claim {
233 AgentStepClaim::Replay(result) => return Ok(*result),
234 AgentStepClaim::Run { step_id } => {
235 let provider_result = run_durable_agent_provider(
236 default_provider,
237 provider_override,
238 input,
239 self.cancel_rx.clone(),
240 )
241 .await;
242 let mut store = SqliteDurableStore::open(&self.db_path)?;
243 match provider_result {
244 Ok(result) => {
245 store.complete_agent_step(AgentStepCompleteInput {
246 step_id: &step_id,
247 run_id: &self.run_id,
248 root_run_id: &self.root_run_id,
249 result: &result,
250 now: now_ms(),
251 })?;
252 return Ok(result);
253 }
254 Err(error) => {
255 let failure_reason = serde_json::to_value(FailureReasonJSON {
256 message: error.to_string(),
257 })?;
258 store.fail_agent_step(AgentStepFailInput {
259 step_id: &step_id,
260 failure_reason: &failure_reason,
261 now: now_ms(),
262 })?;
263 return Err(error);
264 }
265 }
266 }
267 AgentStepClaim::Wait => {
268 tokio::time::sleep(Duration::from_millis(50)).await;
269 }
270 }
271 }
272 }
273
274 async fn sleep(&self, duration_ms: u64) -> anyhow::Result<()> {
275 let input_signature = sleep_input_signature(duration_ms);
276 let input_signature_json = canonical_json_string(&input_signature)?;
277 let input_signature_hash = short_blake3_hex(&input_signature_json);
278 let base_checkpoint_name = format!("step:sig_{input_signature_hash}");
279 let checkpoint_name = self.next_checkpoint_name(base_checkpoint_name)?;
280
281 let now = now_ms();
282 let claim = {
283 let mut store = SqliteDurableStore::open(&self.db_path)?;
284 store.claim_or_replay_sleep_step(SleepStepClaimInput {
285 run_id: &self.run_id,
286 root_run_id: &self.root_run_id,
287 checkpoint_name: &checkpoint_name,
288 input_signature_hash: &input_signature_hash,
289 input_signature_json: &input_signature_json,
290 duration_ms,
291 worker_id: &self.worker_id,
292 lease_expires_at: now + 60_000,
293 now,
294 })?
295 };
296
297 match claim {
298 SleepStepClaim::Replay => Ok(()),
299 SleepStepClaim::WaitUntil { step_id, wake_at } => {
300 let now = now_ms();
301 if wake_at > now {
302 tokio::time::sleep(Duration::from_millis((wake_at - now) as u64)).await;
303 }
304 let mut store = SqliteDurableStore::open(&self.db_path)?;
305 store.complete_sleep_step(SleepStepCompleteInput {
306 step_id: &step_id,
307 duration_ms,
308 wake_at,
309 now: now_ms(),
310 })?;
311 Ok(())
312 }
313 }
314 }
315}
316
317pub async fn run_local_durable_workflow(
319 store: &mut SqliteDurableStore,
320 options: LocalDurableRunOptions,
321) -> anyhow::Result<LocalDurableRunResult> {
322 store.init()?;
323
324 let owner_id = new_id("owner");
325 let now = now_ms();
326 let params_json = serde_json::to_value(LocalTaskParamsJSON {
327 mode: DurableRunMode::Local,
328 script_path: options.script_path.clone(),
329 args: options.args.clone(),
330 budget_total: options.budget_total,
331 })?;
332 let workflow_metadata = read_workflow_metadata(&options.script_path).ok().flatten();
333 let workflow_run_json = serde_json::to_value(WorkflowRunJSON {
334 mode: DurableRunMode::Local,
335 script_path: options.script_path.clone(),
336 metadata: workflow_metadata,
337 })?;
338
339 let (task_id, run_id, first_attempt) = if let Some(run_id) = options.resume_run_id.clone() {
340 let (task_id, current_attempts) = store.prepare_resume_run(&run_id, &owner_id, now)?;
341 (task_id, run_id, current_attempts + 1)
342 } else {
343 let task_id = new_id("task");
344 let run_id = new_id("run");
345 store.insert_local_task_and_run(LocalTaskAndRunInsert {
346 task_id: &task_id,
347 run_id: &run_id,
348 owner_id: &owner_id,
349 params_json: ¶ms_json,
350 workflow_run_json: &workflow_run_json,
351 args_json: &options.args,
352 budget_total: options.budget_total,
353 max_attempts: 1,
354 now,
355 })?;
356 (task_id, run_id, 1)
357 };
358
359 let workflow_event_sink = options.event_sink.as_ref().map(|sink| {
360 Arc::new(RunScopedWorkflowEventSink::new(
361 Arc::clone(sink),
362 run_id.clone(),
363 ))
364 });
365 if let Some(event_sink) = workflow_event_sink.as_ref() {
366 event_sink
367 .emit_scoped(WorkflowEvent::started(rfc3339_now()?))
368 .await
369 .context("failed to emit workflow started event")?;
370 }
371
372 let attempt = first_attempt;
373 let attempt_id = new_id("attempt");
374 store.start_attempt(LocalAttemptStart {
375 task_id: &task_id,
376 run_id: &run_id,
377 attempt_id: &attempt_id,
378 owner_id: &owner_id,
379 attempt,
380 lease_expires_at: now_ms() + 60_000,
381 now: now_ms(),
382 })?;
383
384 let agent_runner = store.path().map(|db_path| {
385 Arc::new(SqliteDurableAgentRunner::new(
386 db_path.to_path_buf(),
387 run_id.clone(),
388 run_id.clone(),
389 owner_id.clone(),
390 options.cancel_rx.clone(),
391 )) as Arc<dyn WorkflowAgentRunner>
392 });
393
394 let result = run_workflow(RunWorkflowOptions {
395 script_path: options.script_path.clone(),
396 workflow_cwd: options.workflow_cwd.clone(),
397 args: options.args.clone(),
398 agent_provider: Arc::clone(&options.agent_provider),
399 model_map: options.model_map.clone(),
400 budget_total: options.budget_total,
401 budget_spent: 0,
402 nesting_depth: 0,
403 max_parallel_agent_requests: options.max_parallel_agent_requests,
404 agent_runner,
405 cancel_rx: options.cancel_rx.clone(),
406 event_sink: workflow_event_sink
407 .as_ref()
408 .map(|sink| Arc::clone(sink) as Arc<dyn WorkflowEventSink>),
409 event_parent_step_id: None,
410 event_stream_start: workflow_event_sink.as_ref().map(|sink| sink.start),
411 session_log_sink: options.session_log_sink.clone(),
412 })
413 .await;
414
415 match result {
416 Ok(workflow) => {
417 let completed_payload = serde_json::to_value(&workflow.output)
418 .context("failed to serialize durable workflow output")?;
419 store.complete_attempt_and_task(LocalAttemptComplete {
420 task_id: &task_id,
421 run_id: &run_id,
422 attempt_id: &attempt_id,
423 completed_payload: &completed_payload,
424 budget_spent: workflow.budget.spent,
425 now: now_ms(),
426 })?;
427 if let Some(event_sink) = workflow_event_sink.as_ref() {
428 event_sink
429 .emit_scoped(WorkflowEvent::result(
430 workflow.token_usage.input_tokens,
431 workflow.token_usage.output_tokens,
432 workflow.token_usage.total_tokens,
433 workflow.output.result.clone(),
434 ))
435 .await
436 .context("failed to emit workflow result event")?;
437 }
438 Ok(LocalDurableRunResult {
439 task_id,
440 run_id,
441 attempts: attempt,
442 workflow,
443 })
444 }
445 Err(error) => {
446 let failure_reason = serde_json::to_value(FailureReasonJSON {
447 message: error.to_string(),
448 })?;
449 if cancellation_requested(&options.cancel_rx) {
450 store.cancel_attempt_and_task(LocalAttemptCancel {
451 task_id: &task_id,
452 run_id: &run_id,
453 attempt_id: &attempt_id,
454 failure_reason: &failure_reason,
455 now: now_ms(),
456 })?;
457 } else {
458 store.fail_attempt(LocalAttemptFail {
459 task_id: &task_id,
460 run_id: &run_id,
461 attempt_id: &attempt_id,
462 failure_reason: &failure_reason,
463 terminal: true,
464 now: now_ms(),
465 })?;
466 }
467 if let Some(event_sink) = workflow_event_sink.as_ref() {
468 event_sink
469 .emit_scoped(WorkflowEvent::error(error.to_string(), None))
470 .await
471 .context("failed to emit workflow error event")?;
472 }
473 Err(error)
474 }
475 }
476}
477
478pub struct LocalTaskAndRunInsert<'a> {
479 pub task_id: &'a str,
480 pub run_id: &'a str,
481 pub owner_id: &'a str,
482 pub params_json: &'a Value,
483 pub workflow_run_json: &'a Value,
484 pub args_json: &'a Value,
485 pub budget_total: Option<u64>,
486 pub max_attempts: u32,
487 pub now: i64,
488}
489
490pub struct LocalAttemptStart<'a> {
491 pub task_id: &'a str,
492 pub run_id: &'a str,
493 pub attempt_id: &'a str,
494 pub owner_id: &'a str,
495 pub attempt: u32,
496 pub lease_expires_at: i64,
497 pub now: i64,
498}
499
500pub struct LocalAttemptComplete<'a> {
501 pub task_id: &'a str,
502 pub run_id: &'a str,
503 pub attempt_id: &'a str,
504 pub completed_payload: &'a Value,
505 pub budget_spent: u64,
506 pub now: i64,
507}
508
509pub struct LocalAttemptFail<'a> {
510 pub task_id: &'a str,
511 pub run_id: &'a str,
512 pub attempt_id: &'a str,
513 pub failure_reason: &'a Value,
514 pub terminal: bool,
515 pub now: i64,
516}
517
518pub struct LocalAttemptCancel<'a> {
519 pub task_id: &'a str,
520 pub run_id: &'a str,
521 pub attempt_id: &'a str,
522 pub failure_reason: &'a Value,
523 pub now: i64,
524}
525
526fn cancellation_requested(cancel_rx: &Option<watch::Receiver<bool>>) -> bool {
527 cancel_rx
528 .as_ref()
529 .is_some_and(|cancel_rx| *cancel_rx.borrow())
530}
531
532impl SqliteDurableStore {
533 pub fn insert_local_task_and_run(
534 &mut self,
535 input: LocalTaskAndRunInsert<'_>,
536 ) -> anyhow::Result<()> {
537 let tx = self
538 .connection_mut()
539 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
540 .context("failed to begin local durable task transaction")?;
541 tx.execute(
542 r#"
543 INSERT INTO sw_workflow_tasks (
544 task_id,
545 task_name,
546 state,
547 params_json,
548 submitted_by_owner_id,
549 claimed_by_owner_id,
550 claim_scope,
551 created_at,
552 updated_at,
553 max_attempts
554 )
555 VALUES (?1, ?2, 'pending', ?3, ?4, NULL, ?5, ?6, ?6, ?7)
556 "#,
557 rusqlite::params![
558 input.task_id,
559 WORKFLOW_TASK_NAME,
560 serde_json::to_string(input.params_json)?,
561 input.owner_id,
562 LOCAL_CLAIM_SCOPE,
563 input.now,
564 input.max_attempts,
565 ],
566 )
567 .context("failed to insert durable workflow task")?;
568 tx.execute(
569 r#"
570 INSERT INTO sw_workflow_runs (
571 run_id,
572 task_id,
573 root_run_id,
574 state,
575 workflow_run_json,
576 args_json,
577 budget_total,
578 budget_spent,
579 created_at,
580 updated_at
581 )
582 VALUES (?1, ?2, ?1, 'pending', ?3, ?4, ?5, 0, ?6, ?6)
583 "#,
584 rusqlite::params![
585 input.run_id,
586 input.task_id,
587 serde_json::to_string(input.workflow_run_json)?,
588 serde_json::to_string(input.args_json)?,
589 input.budget_total.map(|value| value as i64),
590 input.now,
591 ],
592 )
593 .context("failed to insert durable workflow run")?;
594 tx.commit()
595 .context("failed to commit local durable task transaction")
596 }
597
598 pub fn prepare_resume_run(
599 &mut self,
600 run_id: &str,
601 owner_id: &str,
602 now: i64,
603 ) -> anyhow::Result<(String, u32)> {
604 let db_label = self
605 .path()
606 .map(|path| path.display().to_string())
607 .unwrap_or_else(|| "<in-memory>".to_string());
608 let tx = self
609 .connection_mut()
610 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
611 .context("failed to begin durable resume transaction")?;
612 let task_id: String = tx
613 .query_row(
614 r#"
615 SELECT task_id
616 FROM sw_workflow_runs
617 WHERE run_id = ?1
618 "#,
619 rusqlite::params![run_id],
620 |row| row.get(0),
621 )
622 .optional()
623 .context("failed to query durable run to resume")?
624 .ok_or_else(|| {
625 anyhow::anyhow!("workflow run {run_id} was not found in {db_label}; check --db")
626 })?;
627 let current_attempts: u32 =
628 tx.query_row(
629 r#"
630 SELECT COUNT(*)
631 FROM sw_workflow_attempts
632 WHERE run_id = ?1
633 "#,
634 rusqlite::params![run_id],
635 |row| row.get::<_, i64>(0),
636 )
637 .context("failed to count durable run attempts")? as u32;
638 tx.execute(
639 r#"
640 UPDATE sw_workflow_tasks
641 SET state = 'pending',
642 claimed_by_owner_id = NULL,
643 lease_expires_at = NULL,
644 updated_at = ?1
645 WHERE task_id = ?2
646 AND state IN ('pending', 'running', 'failed')
647 "#,
648 rusqlite::params![now, task_id],
649 )
650 .context("failed to prepare durable task for resume")?;
651 tx.execute(
652 r#"
653 UPDATE sw_workflow_runs
654 SET state = 'pending',
655 updated_at = ?1
656 WHERE run_id = ?2
657 AND state IN ('pending', 'running', 'failed')
658 "#,
659 rusqlite::params![now, run_id],
660 )
661 .context("failed to prepare durable run for resume")?;
662 tx.execute(
663 r#"
664 UPDATE sw_workflow_tasks
665 SET submitted_by_owner_id = ?1
666 WHERE task_id = ?2
667 AND claim_scope = 'local'
668 "#,
669 rusqlite::params![owner_id, task_id],
670 )
671 .context("failed to adopt durable task owner")?;
672 tx.commit().context("failed to commit durable resume")?;
673 Ok((task_id, current_attempts))
674 }
675
676 pub fn start_attempt(&mut self, input: LocalAttemptStart<'_>) -> anyhow::Result<()> {
677 let tx = self
678 .connection_mut()
679 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
680 .context("failed to begin durable attempt start transaction")?;
681 tx.execute(
682 r#"
683 UPDATE sw_workflow_tasks
684 SET state = 'running',
685 claimed_by_owner_id = ?1,
686 lease_expires_at = ?2,
687 updated_at = ?3
688 WHERE task_id = ?4
689 AND claim_scope = 'local'
690 AND submitted_by_owner_id = ?1
691 AND state IN ('pending', 'running')
692 "#,
693 rusqlite::params![
694 input.owner_id,
695 input.lease_expires_at,
696 input.now,
697 input.task_id
698 ],
699 )
700 .context("failed to claim local durable task")?;
701 tx.execute(
702 r#"
703 UPDATE sw_workflow_runs
704 SET state = 'running',
705 updated_at = ?1
706 WHERE run_id = ?2
707 "#,
708 rusqlite::params![input.now, input.run_id],
709 )
710 .context("failed to mark durable workflow run running")?;
711 tx.execute(
712 r#"
713 INSERT INTO sw_workflow_attempts (
714 attempt_id,
715 run_id,
716 task_id,
717 attempt,
718 worker_id,
719 state,
720 lease_expires_at,
721 started_at
722 )
723 VALUES (?1, ?2, ?3, ?4, ?5, 'running', ?6, ?7)
724 "#,
725 rusqlite::params![
726 input.attempt_id,
727 input.run_id,
728 input.task_id,
729 input.attempt,
730 input.owner_id,
731 input.lease_expires_at,
732 input.now,
733 ],
734 )
735 .context("failed to insert durable workflow attempt")?;
736 tx.commit()
737 .context("failed to commit durable attempt start transaction")
738 }
739
740 pub fn complete_attempt_and_task(
741 &mut self,
742 input: LocalAttemptComplete<'_>,
743 ) -> anyhow::Result<()> {
744 let payload = serde_json::to_string(input.completed_payload)?;
745 let tx = self
746 .connection_mut()
747 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
748 .context("failed to begin durable completion transaction")?;
749 tx.execute(
750 r#"
751 UPDATE sw_workflow_attempts
752 SET state = 'completed',
753 completed_at = ?1
754 WHERE attempt_id = ?2
755 "#,
756 rusqlite::params![input.now, input.attempt_id],
757 )
758 .context("failed to mark durable attempt completed")?;
759 tx.execute(
760 r#"
761 UPDATE sw_workflow_runs
762 SET state = 'completed',
763 budget_spent = ?1,
764 completed_payload_json = ?2,
765 updated_at = ?3
766 WHERE run_id = ?4
767 "#,
768 rusqlite::params![input.budget_spent as i64, payload, input.now, input.run_id],
769 )
770 .context("failed to mark durable run completed")?;
771 tx.execute(
772 r#"
773 UPDATE sw_workflow_tasks
774 SET state = 'completed',
775 completed_payload_json = ?1,
776 lease_expires_at = NULL,
777 updated_at = ?2
778 WHERE task_id = ?3
779 "#,
780 rusqlite::params![payload, input.now, input.task_id],
781 )
782 .context("failed to mark durable task completed")?;
783 tx.commit()
784 .context("failed to commit durable completion transaction")
785 }
786
787 pub fn fail_attempt(&mut self, input: LocalAttemptFail<'_>) -> anyhow::Result<()> {
788 let failure = serde_json::to_string(input.failure_reason)?;
789 let tx = self
790 .connection_mut()
791 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
792 .context("failed to begin durable failure transaction")?;
793 tx.execute(
794 r#"
795 UPDATE sw_workflow_attempts
796 SET state = 'failed',
797 completed_at = ?1,
798 failure_reason_json = ?2
799 WHERE attempt_id = ?3
800 "#,
801 rusqlite::params![input.now, failure, input.attempt_id],
802 )
803 .context("failed to mark durable attempt failed")?;
804 let next_state = if input.terminal { "failed" } else { "pending" };
805 tx.execute(
806 r#"
807 UPDATE sw_workflow_runs
808 SET state = ?1,
809 failure_reason_json = ?2,
810 updated_at = ?3
811 WHERE run_id = ?4
812 "#,
813 rusqlite::params![next_state, failure, input.now, input.run_id],
814 )
815 .context("failed to update durable run failure state")?;
816 tx.execute(
817 r#"
818 UPDATE sw_workflow_tasks
819 SET state = ?1,
820 failure_reason_json = ?2,
821 lease_expires_at = NULL,
822 updated_at = ?3
823 WHERE task_id = ?4
824 "#,
825 rusqlite::params![next_state, failure, input.now, input.task_id],
826 )
827 .context("failed to update durable task failure state")?;
828 tx.commit()
829 .context("failed to commit durable failure transaction")
830 }
831
832 pub fn cancel_attempt_and_task(&mut self, input: LocalAttemptCancel<'_>) -> anyhow::Result<()> {
833 let failure = serde_json::to_string(input.failure_reason)?;
834 let tx = self
835 .connection_mut()
836 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
837 .context("failed to begin durable cancellation transaction")?;
838 tx.execute(
839 r#"
840 UPDATE sw_workflow_attempts
841 SET state = 'cancelled',
842 completed_at = ?1,
843 failure_reason_json = ?2
844 WHERE attempt_id = ?3
845 "#,
846 rusqlite::params![input.now, failure, input.attempt_id],
847 )
848 .context("failed to mark durable attempt cancelled")?;
849 tx.execute(
850 r#"
851 UPDATE sw_workflow_runs
852 SET state = 'cancelled',
853 failure_reason_json = ?1,
854 updated_at = ?2
855 WHERE run_id = ?3
856 "#,
857 rusqlite::params![failure, input.now, input.run_id],
858 )
859 .context("failed to mark durable run cancelled")?;
860 tx.execute(
861 r#"
862 UPDATE sw_workflow_tasks
863 SET state = 'cancelled',
864 failure_reason_json = ?1,
865 lease_expires_at = NULL,
866 updated_at = ?2
867 WHERE task_id = ?3
868 "#,
869 rusqlite::params![failure, input.now, input.task_id],
870 )
871 .context("failed to mark durable task cancelled")?;
872 tx.commit()
873 .context("failed to commit durable cancellation transaction")
874 }
875
876 pub fn claim_or_replay_agent_step(
877 &mut self,
878 input: AgentStepClaimInput<'_>,
879 ) -> anyhow::Result<AgentStepClaim> {
880 let tx = self
881 .connection_mut()
882 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
883 .context("failed to begin durable agent step claim transaction")?;
884
885 let existing = tx
886 .query_row(
887 r#"
888 SELECT step_id, state, input_signature_json, result_json, lease_expires_at
889 FROM sw_workflow_steps
890 WHERE run_id = ?1 AND checkpoint_name = ?2
891 "#,
892 rusqlite::params![input.run_id, input.checkpoint_name],
893 |row| {
894 Ok((
895 row.get::<_, String>(0)?,
896 row.get::<_, String>(1)?,
897 row.get::<_, String>(2)?,
898 row.get::<_, Option<String>>(3)?,
899 row.get::<_, Option<i64>>(4)?,
900 ))
901 },
902 )
903 .optional()
904 .context("failed to query durable agent step")?;
905
906 if let Some((step_id, state, stored_signature, result_json, lease_expires_at)) = existing {
907 if stored_signature != input.input_signature_json {
908 bail!(
909 "durable step input signature mismatch for {}",
910 input.checkpoint_name
911 );
912 }
913 match state.as_str() {
914 "completed" => {
915 let result_json = result_json.ok_or_else(|| {
916 anyhow::anyhow!("completed durable agent step missing result_json")
917 })?;
918 let result = serde_json::from_str::<AgentProviderResult>(&result_json)
919 .context("failed to deserialize durable agent result")?;
920 tx.commit().context("failed to commit replay transaction")?;
921 return Ok(AgentStepClaim::Replay(Box::new(result)));
922 }
923 "running" if lease_expires_at.is_some_and(|lease| lease > input.now) => {
924 tx.commit().context("failed to commit wait transaction")?;
925 return Ok(AgentStepClaim::Wait);
926 }
927 "running" | "failed" | "cancelled" | "pending" => {
928 tx.execute(
929 r#"
930 UPDATE sw_workflow_steps
931 SET state = 'running',
932 worker_id = ?1,
933 lease_expires_at = ?2,
934 attempts = attempts + 1,
935 last_attempt_at = ?3,
936 updated_at = ?3,
937 failure_reason_json = NULL
938 WHERE step_id = ?4
939 "#,
940 rusqlite::params![
941 input.worker_id,
942 input.lease_expires_at,
943 input.now,
944 step_id
945 ],
946 )
947 .context("failed to reclaim durable agent step")?;
948 tx.commit()
949 .context("failed to commit durable agent reclaim transaction")?;
950 return Ok(AgentStepClaim::Run { step_id });
951 }
952 other => bail!("unknown durable step state: {other}"),
953 }
954 }
955
956 let step_id = new_id("step");
957 tx.execute(
958 r#"
959 INSERT INTO sw_workflow_steps (
960 step_id,
961 run_id,
962 root_run_id,
963 step_kind,
964 checkpoint_name,
965 input_signature_hash,
966 input_signature_json,
967 state,
968 input_json,
969 worker_id,
970 lease_expires_at,
971 attempts,
972 last_attempt_at,
973 created_at,
974 updated_at
975 )
976 VALUES (?1, ?2, ?3, 'agent', ?4, ?5, ?6, 'running', ?7, ?8, ?9, 1, ?10, ?10, ?10)
977 "#,
978 rusqlite::params![
979 step_id,
980 input.run_id,
981 input.root_run_id,
982 input.checkpoint_name,
983 input.input_signature_hash,
984 input.input_signature_json,
985 serde_json::to_string(input.input_json)?,
986 input.worker_id,
987 input.lease_expires_at,
988 input.now,
989 ],
990 )
991 .context("failed to insert durable agent step")?;
992 tx.commit()
993 .context("failed to commit durable agent step insert")?;
994 Ok(AgentStepClaim::Run { step_id })
995 }
996
997 pub fn complete_agent_step(&mut self, input: AgentStepCompleteInput<'_>) -> anyhow::Result<()> {
998 let result_json = serde_json::to_string(&compact_agent_result_for_replay(input.result))?;
999 let output_tokens = input
1000 .result
1001 .usage
1002 .as_ref()
1003 .and_then(|usage| usage.output_tokens)
1004 .unwrap_or(0);
1005 let usage_json = input
1006 .result
1007 .usage
1008 .as_ref()
1009 .map(serde_json::to_string)
1010 .transpose()?;
1011 let budget_entry_id = new_id("budget");
1012 let tx = self
1013 .connection_mut()
1014 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1015 .context("failed to begin durable agent step completion transaction")?;
1016 tx.execute(
1017 r#"
1018 UPDATE sw_workflow_steps
1019 SET state = 'completed',
1020 result_json = ?1,
1021 lease_expires_at = NULL,
1022 updated_at = ?2
1023 WHERE step_id = ?3
1024 "#,
1025 rusqlite::params![result_json, input.now, input.step_id],
1026 )
1027 .context("failed to mark durable agent step completed")?;
1028 tx.execute(
1029 r#"
1030 INSERT OR IGNORE INTO sw_budget_ledger (
1031 budget_entry_id,
1032 run_id,
1033 root_run_id,
1034 step_id,
1035 source_type,
1036 source_id,
1037 output_tokens,
1038 usage_json,
1039 created_at
1040 )
1041 VALUES (?1, ?2, ?3, ?4, 'agent_step', ?4, ?5, ?6, ?7)
1042 "#,
1043 rusqlite::params![
1044 budget_entry_id,
1045 input.run_id,
1046 input.root_run_id,
1047 input.step_id,
1048 output_tokens as i64,
1049 usage_json,
1050 input.now,
1051 ],
1052 )
1053 .context("failed to insert durable budget ledger entry")?;
1054 tx.commit()
1055 .context("failed to commit durable agent step completion")
1056 }
1057
1058 pub fn claim_or_replay_sleep_step(
1059 &mut self,
1060 input: SleepStepClaimInput<'_>,
1061 ) -> anyhow::Result<SleepStepClaim> {
1062 let tx = self
1063 .connection_mut()
1064 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1065 .context("failed to begin durable sleep step claim transaction")?;
1066
1067 let existing = tx
1068 .query_row(
1069 r#"
1070 SELECT step_id, state, input_signature_json, input_json, lease_expires_at
1071 FROM sw_workflow_steps
1072 WHERE run_id = ?1 AND checkpoint_name = ?2
1073 "#,
1074 rusqlite::params![input.run_id, input.checkpoint_name],
1075 |row| {
1076 Ok((
1077 row.get::<_, String>(0)?,
1078 row.get::<_, String>(1)?,
1079 row.get::<_, String>(2)?,
1080 row.get::<_, String>(3)?,
1081 row.get::<_, Option<i64>>(4)?,
1082 ))
1083 },
1084 )
1085 .optional()
1086 .context("failed to query durable sleep step")?;
1087
1088 if let Some((step_id, state, stored_signature, input_json, lease_expires_at)) = existing {
1089 if stored_signature != input.input_signature_json {
1090 bail!(
1091 "durable sleep step input signature mismatch for {}",
1092 input.checkpoint_name
1093 );
1094 }
1095 match state.as_str() {
1096 "completed" => {
1097 tx.commit()
1098 .context("failed to commit sleep replay transaction")?;
1099 return Ok(SleepStepClaim::Replay);
1100 }
1101 "running" | "pending" | "failed" | "cancelled" => {
1102 let input_value = serde_json::from_str::<Value>(&input_json)
1103 .context("failed to deserialize durable sleep input")?;
1104 let wake_at = input_value
1105 .get("wakeAt")
1106 .and_then(Value::as_i64)
1107 .ok_or_else(|| anyhow::anyhow!("durable sleep step missing wakeAt"))?;
1108 if wake_at < 0 {
1109 log::warn!(
1110 "durable sleep step {} has negative wakeAt {}; resolving immediately",
1111 step_id,
1112 wake_at
1113 );
1114 }
1115 if state == "running"
1116 && lease_expires_at.is_some_and(|lease| lease > input.now)
1117 && wake_at > input.now
1118 {
1119 tx.commit()
1120 .context("failed to commit sleep wait transaction")?;
1121 return Ok(SleepStepClaim::WaitUntil { step_id, wake_at });
1122 }
1123 tx.execute(
1124 r#"
1125 UPDATE sw_workflow_steps
1126 SET state = 'running',
1127 worker_id = ?1,
1128 lease_expires_at = ?2,
1129 attempts = attempts + 1,
1130 last_attempt_at = ?3,
1131 updated_at = ?3,
1132 failure_reason_json = NULL
1133 WHERE step_id = ?4
1134 "#,
1135 rusqlite::params![
1136 input.worker_id,
1137 input.lease_expires_at,
1138 input.now,
1139 step_id
1140 ],
1141 )
1142 .context("failed to reclaim durable sleep step")?;
1143 tx.commit()
1144 .context("failed to commit durable sleep reclaim transaction")?;
1145 return Ok(SleepStepClaim::WaitUntil { step_id, wake_at });
1146 }
1147 other => bail!("unknown durable step state: {other}"),
1148 }
1149 }
1150
1151 let step_id = new_id("step");
1152 let duration_ms_i64 =
1153 i64::try_from(input.duration_ms).context("sleep duration is too large to persist")?;
1154 let wake_at = input.now.saturating_add(duration_ms_i64);
1155 let input_json = serde_json::json!({
1156 "durationMs": input.duration_ms,
1157 "wakeAt": wake_at,
1158 });
1159 tx.execute(
1160 r#"
1161 INSERT INTO sw_workflow_steps (
1162 step_id,
1163 run_id,
1164 root_run_id,
1165 step_kind,
1166 checkpoint_name,
1167 input_signature_hash,
1168 input_signature_json,
1169 state,
1170 input_json,
1171 worker_id,
1172 lease_expires_at,
1173 attempts,
1174 last_attempt_at,
1175 created_at,
1176 updated_at
1177 )
1178 VALUES (?1, ?2, ?3, 'sleep', ?4, ?5, ?6, 'running', ?7, ?8, ?9, 1, ?10, ?10, ?10)
1179 "#,
1180 rusqlite::params![
1181 step_id,
1182 input.run_id,
1183 input.root_run_id,
1184 input.checkpoint_name,
1185 input.input_signature_hash,
1186 input.input_signature_json,
1187 serde_json::to_string(&input_json)?,
1188 input.worker_id,
1189 input.lease_expires_at,
1190 input.now,
1191 ],
1192 )
1193 .context("failed to insert durable sleep step")?;
1194 tx.commit()
1195 .context("failed to commit durable sleep step insert")?;
1196 Ok(SleepStepClaim::WaitUntil { step_id, wake_at })
1197 }
1198
1199 pub fn complete_sleep_step(&mut self, input: SleepStepCompleteInput<'_>) -> anyhow::Result<()> {
1200 let result_json = serde_json::to_string(&serde_json::json!({
1201 "ok": true,
1202 "durationMs": input.duration_ms,
1203 "wakeAt": input.wake_at,
1204 "completedAt": input.now,
1205 }))?;
1206 let updated = self
1207 .connection_mut()
1208 .execute(
1209 r#"
1210 UPDATE sw_workflow_steps
1211 SET state = 'completed',
1212 result_json = ?1,
1213 lease_expires_at = NULL,
1214 updated_at = ?2
1215 WHERE step_id = ?3
1216 AND state = 'running'
1217 "#,
1218 rusqlite::params![result_json, input.now, input.step_id],
1219 )
1220 .context("failed to mark durable sleep step completed")?;
1221 if updated == 1 {
1222 return Ok(());
1223 }
1224
1225 let state = self
1226 .connection()
1227 .query_row(
1228 "SELECT state FROM sw_workflow_steps WHERE step_id = ?1",
1229 rusqlite::params![input.step_id],
1230 |row| row.get::<_, String>(0),
1231 )
1232 .optional()
1233 .context("failed to inspect durable sleep step after completion race")?;
1234 match state.as_deref() {
1235 Some("completed") => Ok(()),
1236 Some(state) => bail!("cannot complete durable sleep step in state {state}"),
1237 None => bail!(
1238 "cannot complete missing durable sleep step {}",
1239 input.step_id
1240 ),
1241 }
1242 }
1243
1244 pub fn fail_agent_step(&mut self, input: AgentStepFailInput<'_>) -> anyhow::Result<()> {
1245 let failure = serde_json::to_string(input.failure_reason)?;
1246 self.connection_mut()
1247 .execute(
1248 r#"
1249 UPDATE sw_workflow_steps
1250 SET state = 'failed',
1251 failure_reason_json = ?1,
1252 lease_expires_at = NULL,
1253 updated_at = ?2
1254 WHERE step_id = ?3
1255 "#,
1256 rusqlite::params![failure, input.now, input.step_id],
1257 )
1258 .context("failed to mark durable agent step failed")?;
1259 Ok(())
1260 }
1261}
1262
1263#[derive(Debug)]
1264pub enum AgentStepClaim {
1265 Replay(Box<AgentProviderResult>),
1266 Run { step_id: String },
1267 Wait,
1268}
1269
1270#[derive(Debug)]
1271pub enum SleepStepClaim {
1272 Replay,
1273 WaitUntil { step_id: String, wake_at: i64 },
1274}
1275
1276fn compact_agent_result_for_replay(result: &AgentProviderResult) -> AgentProviderResult {
1277 AgentProviderResult {
1278 output: result.output.clone(),
1279 session_id: result.session_id.clone(),
1280 model: result.model.clone(),
1281 usage: result.usage.clone(),
1282 isolation: result.isolation.clone(),
1283 raw: None,
1284 }
1285}
1286
1287pub struct AgentStepClaimInput<'a> {
1288 pub run_id: &'a str,
1289 pub root_run_id: &'a str,
1290 pub checkpoint_name: &'a str,
1291 pub input_signature_hash: &'a str,
1292 pub input_signature_json: &'a str,
1293 pub input_json: &'a Value,
1294 pub worker_id: &'a str,
1295 pub lease_expires_at: i64,
1296 pub now: i64,
1297}
1298
1299pub struct AgentStepCompleteInput<'a> {
1300 pub step_id: &'a str,
1301 pub run_id: &'a str,
1302 pub root_run_id: &'a str,
1303 pub result: &'a AgentProviderResult,
1304 pub now: i64,
1305}
1306
1307pub struct AgentStepFailInput<'a> {
1308 pub step_id: &'a str,
1309 pub failure_reason: &'a Value,
1310 pub now: i64,
1311}
1312
1313pub struct SleepStepClaimInput<'a> {
1314 pub run_id: &'a str,
1315 pub root_run_id: &'a str,
1316 pub checkpoint_name: &'a str,
1317 pub input_signature_hash: &'a str,
1318 pub input_signature_json: &'a str,
1319 pub duration_ms: u64,
1320 pub worker_id: &'a str,
1321 pub lease_expires_at: i64,
1322 pub now: i64,
1323}
1324
1325pub struct SleepStepCompleteInput<'a> {
1326 pub step_id: &'a str,
1327 pub duration_ms: u64,
1328 pub wake_at: i64,
1329 pub now: i64,
1330}
1331
1332async fn run_durable_agent_provider(
1333 default_provider: Arc<dyn AgentProvider>,
1334 provider_override: Option<String>,
1335 input: AgentProviderRunInput,
1336 cancel_rx: Option<watch::Receiver<bool>>,
1337) -> anyhow::Result<AgentProviderResult> {
1338 run_agent_provider_with_retry(default_provider, provider_override, input, cancel_rx).await
1339}
1340
1341fn agent_input_signature(provider_name: &str, input: &AgentProviderRunInput) -> Value {
1342 serde_json::json!({
1343 "signatureVersion": 2,
1344 "kind": "agent",
1345 "workflowScope": "root",
1346 "provider": provider_name,
1347 "prompt": input.prompt,
1348 "options": input.options,
1349 "context": {
1350 "phase": input.context.phase,
1351 "cwd": input.context.cwd.as_ref().map(|path| path.to_string_lossy().into_owned()),
1352 }
1353 })
1354}
1355
1356fn sleep_input_signature(duration_ms: u64) -> Value {
1357 serde_json::json!({
1358 "signatureVersion": 1,
1359 "kind": "sleep",
1360 "workflowScope": "root",
1361 "durationMs": duration_ms,
1362 })
1363}
1364
1365fn short_blake3_hex(input: &str) -> String {
1366 let hash = blake3::hash(input.as_bytes());
1367 hash.as_bytes()[..12]
1368 .iter()
1369 .map(|byte| format!("{byte:02x}"))
1370 .collect()
1371}
1372
1373fn canonical_json_string(value: &Value) -> anyhow::Result<String> {
1374 let canonical = canonical_json_value(value);
1375 serde_json::to_string(&canonical).context("failed to serialize canonical JSON")
1376}
1377
1378fn canonical_json_value(value: &Value) -> Value {
1379 match value {
1380 Value::Object(object) => {
1381 let mut sorted = BTreeMap::new();
1382 for (key, value) in object {
1383 sorted.insert(key.clone(), canonical_json_value(value));
1384 }
1385 Value::Object(sorted.into_iter().collect())
1386 }
1387 Value::Array(array) => Value::Array(array.iter().map(canonical_json_value).collect()),
1388 value => value.clone(),
1389 }
1390}
1391
1392#[cfg(test)]
1393mod tests {
1394 use super::*;
1395 use crate::agent_providers::{
1396 AgentProvider, AgentProviderResult, AgentProviderRunInput, AgentProviderSchemaMode,
1397 AgentProviderUsageMode,
1398 };
1399 use serde_json::json;
1400 use std::fs;
1401 use std::sync::{
1402 atomic::{AtomicUsize, Ordering},
1403 Arc, Barrier, Mutex,
1404 };
1405 use std::thread;
1406 use std::time::{Duration, Instant};
1407
1408 struct CountingProvider {
1409 calls: AtomicUsize,
1410 }
1411
1412 struct FlakyOnceProvider {
1413 calls: AtomicUsize,
1414 }
1415
1416 struct TimedProvider {
1417 events: Mutex<Vec<TimedProviderEvent>>,
1418 }
1419
1420 struct SessionProvider {
1421 session_id: &'static str,
1422 cancel_on_run: Option<watch::Sender<bool>>,
1423 delay_ms: u64,
1424 }
1425
1426 #[derive(Debug, Clone)]
1427 struct TimedProviderEvent {
1428 prompt: String,
1429 kind: &'static str,
1430 at: Instant,
1431 }
1432
1433 #[async_trait::async_trait]
1434 impl AgentProvider for CountingProvider {
1435 fn name(&self) -> &str {
1436 "counting"
1437 }
1438 fn schema_mode(&self) -> AgentProviderSchemaMode {
1439 AgentProviderSchemaMode::Builtin
1440 }
1441 fn usage_mode(&self) -> AgentProviderUsageMode {
1442 AgentProviderUsageMode::Builtin
1443 }
1444 async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1445 let count = self.calls.fetch_add(1, Ordering::SeqCst) + 1;
1446 Ok(AgentProviderResult {
1447 output: json!(format!("{}:{count}", input.prompt)),
1448 session_id: None,
1449 model: None,
1450 usage: None,
1451 isolation: None,
1452 raw: None,
1453 })
1454 }
1455 }
1456
1457 #[async_trait::async_trait]
1458 impl AgentProvider for FlakyOnceProvider {
1459 fn name(&self) -> &str {
1460 "flaky-once"
1461 }
1462 fn schema_mode(&self) -> AgentProviderSchemaMode {
1463 AgentProviderSchemaMode::Builtin
1464 }
1465 fn usage_mode(&self) -> AgentProviderUsageMode {
1466 AgentProviderUsageMode::Builtin
1467 }
1468 async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1469 let call = self.calls.fetch_add(1, Ordering::SeqCst) + 1;
1470 if call == 1 {
1471 bail!("temporary provider failure")
1472 }
1473 Ok(AgentProviderResult {
1474 output: json!(format!("recovered: {}", input.prompt)),
1475 session_id: None,
1476 model: None,
1477 usage: None,
1478 isolation: None,
1479 raw: None,
1480 })
1481 }
1482 }
1483
1484 #[async_trait::async_trait]
1485 impl AgentProvider for SessionProvider {
1486 fn name(&self) -> &str {
1487 "session"
1488 }
1489 fn schema_mode(&self) -> AgentProviderSchemaMode {
1490 AgentProviderSchemaMode::Builtin
1491 }
1492 fn usage_mode(&self) -> AgentProviderUsageMode {
1493 AgentProviderUsageMode::Builtin
1494 }
1495 async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1496 if let Some(cancel_tx) = &self.cancel_on_run {
1497 let _ = cancel_tx.send(true);
1498 }
1499 if self.delay_ms > 0 {
1500 tokio::time::sleep(Duration::from_millis(self.delay_ms)).await;
1501 }
1502 Ok(AgentProviderResult {
1503 output: json!(format!("session: {}", input.prompt)),
1504 session_id: Some(self.session_id.to_string()),
1505 model: Some("session-model".to_string()),
1506 usage: Some(crate::agent_providers::AgentUsage {
1507 output_tokens: Some(1),
1508 ..Default::default()
1509 }),
1510 isolation: None,
1511 raw: Some(json!({
1512 "events": [
1513 { "sessionId": self.session_id, "prompt": input.prompt }
1514 ]
1515 })),
1516 })
1517 }
1518 }
1519
1520 #[async_trait::async_trait]
1521 impl AgentProvider for TimedProvider {
1522 fn name(&self) -> &str {
1523 "timed"
1524 }
1525 fn schema_mode(&self) -> AgentProviderSchemaMode {
1526 AgentProviderSchemaMode::Builtin
1527 }
1528 fn usage_mode(&self) -> AgentProviderUsageMode {
1529 AgentProviderUsageMode::Builtin
1530 }
1531 async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1532 let delay_ms = input
1533 .prompt
1534 .split(":delay:")
1535 .nth(1)
1536 .and_then(|suffix| suffix.split(':').next())
1537 .and_then(|value| value.parse::<u64>().ok())
1538 .unwrap_or(0);
1539 self.events.lock().unwrap().push(TimedProviderEvent {
1540 prompt: input.prompt.clone(),
1541 kind: "start",
1542 at: Instant::now(),
1543 });
1544 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1545 self.events.lock().unwrap().push(TimedProviderEvent {
1546 prompt: input.prompt.clone(),
1547 kind: "end",
1548 at: Instant::now(),
1549 });
1550 Ok(AgentProviderResult {
1551 output: json!(format!("{}:done", input.prompt)),
1552 session_id: None,
1553 model: None,
1554 usage: Some(crate::agent_providers::AgentUsage {
1555 output_tokens: Some(1),
1556 ..Default::default()
1557 }),
1558 isolation: None,
1559 raw: None,
1560 })
1561 }
1562 }
1563
1564 struct TestSessionLogSink {
1565 root: std::path::PathBuf,
1566 saved_tx: Option<watch::Sender<bool>>,
1567 }
1568
1569 #[async_trait::async_trait]
1570 impl crate::workflow::AgentSessionLogSink for TestSessionLogSink {
1571 async fn write_agent_result(
1572 &self,
1573 provider: &str,
1574 result: &AgentProviderResult,
1575 ) -> anyhow::Result<()> {
1576 write_test_raw_session(&self.root, provider, result)?;
1577 if let Some(saved_tx) = &self.saved_tx {
1578 let _ = saved_tx.send(true);
1579 }
1580 Ok(())
1581 }
1582 }
1583
1584 fn write_test_raw_session(
1585 root: &std::path::Path,
1586 provider: &str,
1587 result: &AgentProviderResult,
1588 ) -> anyhow::Result<()> {
1589 let session_id = result
1590 .session_id
1591 .as_deref()
1592 .ok_or_else(|| anyhow::anyhow!("session id missing"))?;
1593 let events = result
1594 .raw
1595 .as_ref()
1596 .and_then(|raw| raw.get("events"))
1597 .and_then(Value::as_array)
1598 .ok_or_else(|| anyhow::anyhow!("raw events missing"))?;
1599 let dir = root.join(provider);
1600 fs::create_dir_all(&dir)?;
1601 let mut lines = String::new();
1602 for event in events {
1603 lines.push_str(&serde_json::to_string(event)?);
1604 lines.push('\n');
1605 }
1606 fs::write(dir.join(format!("{session_id}.jsonl")), lines)?;
1607 Ok(())
1608 }
1609
1610 fn seed_durable_run(db_path: &std::path::Path) -> (String, String) {
1611 let mut store = SqliteDurableStore::open(db_path).expect("store should open");
1612 store.init().expect("schema should initialize");
1613 let task_id = new_id("task");
1614 let run_id = new_id("run");
1615 store
1616 .insert_local_task_and_run(LocalTaskAndRunInsert {
1617 task_id: &task_id,
1618 run_id: &run_id,
1619 owner_id: "owner",
1620 params_json: &json!({ "scriptPath": "workflow.mjs" }),
1621 workflow_run_json: &json!({ "scriptPath": "workflow.mjs" }),
1622 args_json: &json!({}),
1623 budget_total: Some(100),
1624 max_attempts: 3,
1625 now: 1,
1626 })
1627 .expect("run should be inserted");
1628 (task_id, run_id)
1629 }
1630
1631 fn claim_input<'a>(
1632 run_id: &'a str,
1633 checkpoint_name: &'a str,
1634 worker_id: &'a str,
1635 lease_expires_at: i64,
1636 now: i64,
1637 input_json: &'a Value,
1638 ) -> AgentStepClaimInput<'a> {
1639 AgentStepClaimInput {
1640 run_id,
1641 root_run_id: run_id,
1642 checkpoint_name,
1643 input_signature_hash: "sig",
1644 input_signature_json: r#"{"prompt":"hello"}"#,
1645 input_json,
1646 worker_id,
1647 lease_expires_at,
1648 now,
1649 }
1650 }
1651
1652 #[test]
1653 fn durable_agent_retry_backoff_extends_step_lease_deadline() {
1654 let no_retry = crate::workflow::AgentRetryPolicy {
1655 max_attempts: 1,
1656 backoff_ms: 0,
1657 };
1658 let long_backoff = crate::workflow::AgentRetryPolicy {
1659 max_attempts: 3,
1660 backoff_ms: 120_000,
1661 };
1662 assert_eq!(durable_agent_step_lease_expires_at(1_000, no_retry), 61_000);
1663 assert_eq!(
1664 durable_agent_step_lease_expires_at(1_000, long_backoff),
1665 301_000
1666 );
1667 }
1668
1669 #[test]
1670 fn concurrent_agent_step_claims_allow_only_one_runner() {
1671 let dir = tempfile::tempdir().unwrap();
1672 let db_path = dir.path().join("durable.db");
1673 let (_task_id, run_id) = seed_durable_run(&db_path);
1674 let workers = 8;
1675 let barrier = Arc::new(Barrier::new(workers));
1676 let outcomes = Arc::new(Mutex::new(Vec::new()));
1677
1678 thread::scope(|scope| {
1679 for worker in 0..workers {
1680 let barrier = Arc::clone(&barrier);
1681 let outcomes = Arc::clone(&outcomes);
1682 let db_path = db_path.clone();
1683 let run_id = run_id.clone();
1684 scope.spawn(move || {
1685 let input_json = json!({ "prompt": "hello" });
1686 barrier.wait();
1687 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1688 let claim = store
1689 .claim_or_replay_agent_step(claim_input(
1690 &run_id,
1691 "agent:shared",
1692 &format!("worker-{worker}"),
1693 10_000,
1694 100,
1695 &input_json,
1696 ))
1697 .expect("claim should succeed");
1698 let label = match claim {
1699 AgentStepClaim::Run { .. } => "run",
1700 AgentStepClaim::Wait => "wait",
1701 AgentStepClaim::Replay(_) => "replay",
1702 };
1703 outcomes.lock().unwrap().push(label);
1704 });
1705 }
1706 });
1707
1708 let outcomes = outcomes.lock().unwrap();
1709 assert_eq!(
1710 outcomes.iter().filter(|&&outcome| outcome == "run").count(),
1711 1
1712 );
1713 assert_eq!(
1714 outcomes
1715 .iter()
1716 .filter(|&&outcome| outcome == "wait")
1717 .count(),
1718 workers - 1
1719 );
1720 assert_eq!(
1721 outcomes
1722 .iter()
1723 .filter(|&&outcome| outcome == "replay")
1724 .count(),
1725 0
1726 );
1727
1728 let store = SqliteDurableStore::open(&db_path).unwrap();
1729 let (steps, attempts): (i64, i64) = store
1730 .connection()
1731 .query_row(
1732 "SELECT COUNT(*), COALESCE(SUM(attempts), 0) FROM sw_workflow_steps",
1733 [],
1734 |row| Ok((row.get(0)?, row.get(1)?)),
1735 )
1736 .unwrap();
1737 assert_eq!(steps, 1);
1738 assert_eq!(attempts, 1);
1739 }
1740
1741 #[test]
1742 fn concurrent_agent_step_completion_writes_one_budget_ledger_entry() {
1743 let dir = tempfile::tempdir().unwrap();
1744 let db_path = dir.path().join("durable.db");
1745 let (_task_id, run_id) = seed_durable_run(&db_path);
1746 let input_json = json!({ "prompt": "hello" });
1747 let step_id = {
1748 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1749 match store
1750 .claim_or_replay_agent_step(claim_input(
1751 &run_id,
1752 "agent:complete",
1753 "worker-0",
1754 10_000,
1755 100,
1756 &input_json,
1757 ))
1758 .expect("claim should succeed")
1759 {
1760 AgentStepClaim::Run { step_id } => step_id,
1761 other => panic!("expected run claim, got {other:?}"),
1762 }
1763 };
1764 let workers = 4;
1765 let barrier = Arc::new(Barrier::new(workers));
1766
1767 thread::scope(|scope| {
1768 for _ in 0..workers {
1769 let barrier = Arc::clone(&barrier);
1770 let db_path = db_path.clone();
1771 let run_id = run_id.clone();
1772 let step_id = step_id.clone();
1773 scope.spawn(move || {
1774 let result = AgentProviderResult {
1775 output: json!("done"),
1776 session_id: None,
1777 model: None,
1778 usage: Some(crate::agent_providers::AgentUsage {
1779 output_tokens: Some(7),
1780 ..Default::default()
1781 }),
1782 isolation: None,
1783 raw: None,
1784 };
1785 barrier.wait();
1786 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1787 store
1788 .complete_agent_step(AgentStepCompleteInput {
1789 step_id: &step_id,
1790 run_id: &run_id,
1791 root_run_id: &run_id,
1792 result: &result,
1793 now: 200,
1794 })
1795 .expect("completion should be idempotent");
1796 });
1797 }
1798 });
1799
1800 let store = SqliteDurableStore::open(&db_path).unwrap();
1801 let (entries, tokens): (i64, i64) = store
1802 .connection()
1803 .query_row(
1804 "SELECT COUNT(*), COALESCE(SUM(output_tokens), 0) FROM sw_budget_ledger",
1805 [],
1806 |row| Ok((row.get(0)?, row.get(1)?)),
1807 )
1808 .unwrap();
1809 assert_eq!(entries, 1);
1810 assert_eq!(tokens, 7);
1811 }
1812
1813 #[test]
1814 fn completed_agent_step_persists_compact_replay_result_without_raw() {
1815 let dir = tempfile::tempdir().unwrap();
1816 let db_path = dir.path().join("durable.db");
1817 let (_task_id, run_id) = seed_durable_run(&db_path);
1818 let input_json = json!({ "prompt": "hello" });
1819 let step_id = {
1820 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1821 match store
1822 .claim_or_replay_agent_step(claim_input(
1823 &run_id,
1824 "agent:compact",
1825 "worker-0",
1826 10_000,
1827 100,
1828 &input_json,
1829 ))
1830 .expect("claim should succeed")
1831 {
1832 AgentStepClaim::Run { step_id } => step_id,
1833 other => panic!("expected run claim, got {other:?}"),
1834 }
1835 };
1836 let result = AgentProviderResult {
1837 output: json!("done"),
1838 session_id: Some("provider-session-1".into()),
1839 model: Some("provider/model-from-result".into()),
1840 usage: Some(crate::agent_providers::AgentUsage {
1841 output_tokens: Some(7),
1842 ..Default::default()
1843 }),
1844 isolation: None,
1845 raw: Some(json!({ "events": ["large provider transcript"] })),
1846 };
1847
1848 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1849 store
1850 .complete_agent_step(AgentStepCompleteInput {
1851 step_id: &step_id,
1852 run_id: &run_id,
1853 root_run_id: &run_id,
1854 result: &result,
1855 now: 200,
1856 })
1857 .expect("completion should succeed");
1858 let stored: String = store
1859 .connection()
1860 .query_row(
1861 "SELECT result_json FROM sw_workflow_steps WHERE step_id = ?1",
1862 [&step_id],
1863 |row| row.get(0),
1864 )
1865 .unwrap();
1866 let stored: Value = serde_json::from_str(&stored).unwrap();
1867
1868 assert_eq!(stored["output"], json!("done"));
1869 assert_eq!(stored["sessionId"], json!("provider-session-1"));
1870 assert_eq!(stored["model"], json!("provider/model-from-result"));
1871 assert_eq!(stored["usage"]["outputTokens"], json!(7));
1872 assert!(stored.get("raw").is_none());
1873 }
1874
1875 #[test]
1876 fn expired_agent_step_lease_can_be_reclaimed_from_another_connection() {
1877 let dir = tempfile::tempdir().unwrap();
1878 let db_path = dir.path().join("durable.db");
1879 let (_task_id, run_id) = seed_durable_run(&db_path);
1880 let input_json = json!({ "prompt": "hello" });
1881 let first_step_id = {
1882 let mut store = SqliteDurableStore::open(&db_path).unwrap();
1883 match store
1884 .claim_or_replay_agent_step(claim_input(
1885 &run_id,
1886 "agent:lease",
1887 "worker-1",
1888 100,
1889 0,
1890 &input_json,
1891 ))
1892 .unwrap()
1893 {
1894 AgentStepClaim::Run { step_id } => step_id,
1895 other => panic!("expected run claim, got {other:?}"),
1896 }
1897 };
1898
1899 let mut waiting_store = SqliteDurableStore::open(&db_path).unwrap();
1900 assert!(matches!(
1901 waiting_store
1902 .claim_or_replay_agent_step(claim_input(
1903 &run_id,
1904 "agent:lease",
1905 "worker-2",
1906 200,
1907 50,
1908 &input_json,
1909 ))
1910 .unwrap(),
1911 AgentStepClaim::Wait
1912 ));
1913
1914 let mut reclaiming_store = SqliteDurableStore::open(&db_path).unwrap();
1915 let reclaimed_step_id = match reclaiming_store
1916 .claim_or_replay_agent_step(claim_input(
1917 &run_id,
1918 "agent:lease",
1919 "worker-3",
1920 300,
1921 101,
1922 &input_json,
1923 ))
1924 .unwrap()
1925 {
1926 AgentStepClaim::Run { step_id } => step_id,
1927 other => panic!("expected run claim, got {other:?}"),
1928 };
1929 assert_eq!(reclaimed_step_id, first_step_id);
1930
1931 let attempts: i64 = reclaiming_store
1932 .connection()
1933 .query_row(
1934 "SELECT attempts FROM sw_workflow_steps WHERE step_id = ?1",
1935 rusqlite::params![reclaimed_step_id],
1936 |row| row.get(0),
1937 )
1938 .unwrap();
1939 assert_eq!(attempts, 2);
1940 }
1941
1942 #[tokio::test]
1943 async fn more_than_five_durable_workflows_share_db_while_steps_have_varied_durations() {
1944 async fn run_one(
1945 db_path: std::path::PathBuf,
1946 script_path: std::path::PathBuf,
1947 provider: Arc<TimedProvider>,
1948 id: usize,
1949 base_delay_ms: u64,
1950 ) -> anyhow::Result<LocalDurableRunResult> {
1951 let mut store = SqliteDurableStore::open(db_path)?;
1952 run_local_durable_workflow(
1953 &mut store,
1954 LocalDurableRunOptions::new(
1955 script_path,
1956 json!({ "id": id, "baseDelay": base_delay_ms }),
1957 provider,
1958 ),
1959 )
1960 .await
1961 }
1962
1963 let dir = tempfile::tempdir().unwrap();
1964 let db_path = dir.path().join("durable.db");
1965 let script_path = dir.path().join("workflow.mjs");
1966 fs::write(
1967 &script_path,
1968 r#"
1969export const meta = { name: "durable-concurrent", description: "durable concurrent" };
1970const id = args.id;
1971const baseDelay = args.baseDelay;
1972const first = await agent(`wf:${id}:delay:${baseDelay}:first`);
1973const second = await agent(`wf:${id}:delay:${baseDelay + 30}:second`);
1974const third = await agent(`wf:${id}:delay:${baseDelay + 10}:third`);
1975export default { id, first, second, third };
1976"#,
1977 )
1978 .unwrap();
1979 let provider = Arc::new(TimedProvider {
1980 events: Mutex::new(Vec::new()),
1981 });
1982 let started = Instant::now();
1983 let (r0, r1, r2, r3, r4, r5) = tokio::join!(
1984 run_one(
1985 db_path.clone(),
1986 script_path.clone(),
1987 Arc::clone(&provider),
1988 0,
1989 40
1990 ),
1991 run_one(
1992 db_path.clone(),
1993 script_path.clone(),
1994 Arc::clone(&provider),
1995 1,
1996 55
1997 ),
1998 run_one(
1999 db_path.clone(),
2000 script_path.clone(),
2001 Arc::clone(&provider),
2002 2,
2003 70
2004 ),
2005 run_one(
2006 db_path.clone(),
2007 script_path.clone(),
2008 Arc::clone(&provider),
2009 3,
2010 85
2011 ),
2012 run_one(
2013 db_path.clone(),
2014 script_path.clone(),
2015 Arc::clone(&provider),
2016 4,
2017 100
2018 ),
2019 run_one(
2020 db_path.clone(),
2021 script_path.clone(),
2022 Arc::clone(&provider),
2023 5,
2024 115
2025 ),
2026 );
2027 let elapsed = started.elapsed();
2028 let results = [r0, r1, r2, r3, r4, r5]
2029 .into_iter()
2030 .collect::<Result<Vec<_>, _>>()
2031 .unwrap();
2032
2033 assert_eq!(results.len(), 6);
2034 for (id, result) in results.iter().enumerate() {
2035 assert_eq!(result.workflow.output.result["id"], json!(id));
2036 assert_eq!(result.attempts, 1);
2037 }
2038
2039 let events = provider.events.lock().unwrap().clone();
2040 assert_eq!(
2041 events.iter().filter(|event| event.kind == "start").count(),
2042 18
2043 );
2044 assert_eq!(
2045 events.iter().filter(|event| event.kind == "end").count(),
2046 18
2047 );
2048 assert!(events
2049 .iter()
2050 .any(|event| event.prompt == "wf:5:delay:145:second"));
2051 assert!(
2052 max_in_flight(&events) > 1,
2053 "expected overlapping provider steps, got events: {events:?}"
2054 );
2055 assert!(
2056 elapsed < Duration::from_millis(900),
2057 "runs should overlap on shared DB instead of serializing all varied sleeps; elapsed={elapsed:?}"
2058 );
2059
2060 let store = SqliteDurableStore::open(&db_path).unwrap();
2061 let (completed_runs, completed_tasks, completed_steps, budget_entries, output_tokens): (
2062 i64,
2063 i64,
2064 i64,
2065 i64,
2066 i64,
2067 ) = store
2068 .connection()
2069 .query_row(
2070 r#"
2071 SELECT
2072 (SELECT COUNT(*) FROM sw_workflow_runs WHERE state = 'completed'),
2073 (SELECT COUNT(*) FROM sw_workflow_tasks WHERE state = 'completed'),
2074 (SELECT COUNT(*) FROM sw_workflow_steps WHERE state = 'completed'),
2075 (SELECT COUNT(*) FROM sw_budget_ledger),
2076 (SELECT COALESCE(SUM(output_tokens), 0) FROM sw_budget_ledger)
2077 "#,
2078 [],
2079 |row| {
2080 Ok((
2081 row.get(0)?,
2082 row.get(1)?,
2083 row.get(2)?,
2084 row.get(3)?,
2085 row.get(4)?,
2086 ))
2087 },
2088 )
2089 .unwrap();
2090 assert_eq!(completed_runs, 6);
2091 assert_eq!(completed_tasks, 6);
2092 assert_eq!(completed_steps, 18);
2093 assert_eq!(budget_entries, 18);
2094 assert_eq!(output_tokens, 18);
2095 }
2096
2097 fn max_in_flight(events: &[TimedProviderEvent]) -> usize {
2098 let mut events = events.to_vec();
2099 events.sort_by_key(|event| (event.at, if event.kind == "start" { 0 } else { 1 }));
2100 let mut current = 0usize;
2101 let mut max = 0usize;
2102 for event in events {
2103 if event.kind == "start" {
2104 current += 1;
2105 max = max.max(current);
2106 } else {
2107 current = current.saturating_sub(1);
2108 }
2109 }
2110 max
2111 }
2112
2113 #[tokio::test]
2114 async fn local_durable_run_persists_sleep_step_without_budget_entry() {
2115 let dir = tempfile::tempdir().unwrap();
2116 let db_path = dir.path().join("durable.db");
2117 let script_path = dir.path().join("sleep.workflow.js");
2118 fs::write(
2119 &script_path,
2120 r#"
2121import { sleep } from "workflow:extra";
2122export const meta = { name: "durable-sleep", description: "durable sleep" };
2123await sleep(5);
2124export default { slept: true };
2125"#,
2126 )
2127 .unwrap();
2128
2129 let mut store = SqliteDurableStore::open(&db_path).unwrap();
2130 let result = run_local_durable_workflow(
2131 &mut store,
2132 LocalDurableRunOptions::new(
2133 script_path,
2134 json!({}),
2135 Arc::new(CountingProvider {
2136 calls: AtomicUsize::new(0),
2137 }),
2138 ),
2139 )
2140 .await
2141 .expect("durable workflow should run");
2142
2143 assert_eq!(result.workflow.output.result, json!({ "slept": true }));
2144 let (sleep_steps, budget_entries): (i64, i64) = store
2145 .connection()
2146 .query_row(
2147 r#"
2148 SELECT
2149 (SELECT COUNT(*) FROM sw_workflow_steps WHERE step_kind = 'sleep' AND state = 'completed'),
2150 (SELECT COUNT(*) FROM sw_budget_ledger)
2151 "#,
2152 [],
2153 |row| Ok((row.get(0)?, row.get(1)?)),
2154 )
2155 .unwrap();
2156 assert_eq!(sleep_steps, 1);
2157 assert_eq!(budget_entries, 0);
2158 }
2159
2160 #[tokio::test]
2161 async fn local_durable_run_marks_cancelled_when_cancel_requested() {
2162 let dir = tempfile::tempdir().unwrap();
2163 let db_path = dir.path().join("durable.db");
2164 let script_path = dir.path().join("workflow.mjs");
2165 fs::write(
2166 &script_path,
2167 r#"
2168export const meta = { name: "durable-cancel", description: "durable cancel" };
2169export default { result: await agent("hello") };
2170"#,
2171 )
2172 .unwrap();
2173 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2174 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(true);
2175 let mut options = LocalDurableRunOptions::new(
2176 script_path,
2177 json!({}),
2178 Arc::new(CountingProvider {
2179 calls: AtomicUsize::new(0),
2180 }),
2181 );
2182 options.cancel_rx = Some(cancel_rx);
2183
2184 let error = run_local_durable_workflow(&mut store, options)
2185 .await
2186 .expect_err("cancelled durable workflow should return an error");
2187 assert_eq!(error.to_string(), "workflow cancelled");
2188
2189 let (run_state, task_state, attempt_state): (String, String, String) = store
2190 .connection()
2191 .query_row(
2192 r#"
2193 SELECT r.state, t.state, a.state
2194 FROM sw_workflow_runs r
2195 JOIN sw_workflow_tasks t ON t.task_id = r.task_id
2196 JOIN sw_workflow_attempts a ON a.run_id = r.run_id
2197 "#,
2198 [],
2199 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2200 )
2201 .unwrap();
2202 assert_eq!(run_state, "cancelled");
2203 assert_eq!(task_state, "cancelled");
2204 assert_eq!(attempt_state, "cancelled");
2205 }
2206
2207 #[tokio::test]
2208 async fn local_durable_run_exports_raw_session_before_workflow_completes() {
2209 let dir = tempfile::tempdir().unwrap();
2210 let db_path = dir.path().join("durable.db");
2211 let raw_dir = dir.path().join("raw");
2212 let script_path = dir.path().join("workflow.mjs");
2213 fs::write(
2214 &script_path,
2215 r#"
2216import { sleep } from "workflow:extra";
2217export const meta = { name: "durable-early-session", description: "durable early session" };
2218await agent("before long sleep");
2219await sleep(100);
2220export default { ok: true };
2221"#,
2222 )
2223 .unwrap();
2224 let provider = Arc::new(SessionProvider {
2225 session_id: "early-session-1",
2226 cancel_on_run: None,
2227 delay_ms: 0,
2228 });
2229 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2230 let mut options = LocalDurableRunOptions::new(script_path, json!({}), provider);
2231 let (saved_tx, mut saved_rx) = tokio::sync::watch::channel(false);
2232 options.session_log_sink = Some(Arc::new(TestSessionLogSink {
2233 root: raw_dir.clone(),
2234 saved_tx: Some(saved_tx),
2235 }));
2236
2237 let run = run_local_durable_workflow(&mut store, options);
2238 tokio::pin!(run);
2239 tokio::select! {
2240 changed = saved_rx.changed() => {
2241 changed.expect("raw session notification should be sent");
2242 assert!(*saved_rx.borrow());
2243 assert!(
2244 raw_dir.join("session/early-session-1.jsonl").exists(),
2245 "raw session file should be written before workflow completion"
2246 );
2247 }
2248 result = &mut run => {
2249 panic!("workflow completed before raw session export notification: {result:?}");
2250 }
2251 }
2252
2253 let result = run.await.expect("workflow should complete");
2254 assert_eq!(result.workflow.output.result, json!({ "ok": true }));
2255 }
2256
2257 #[tokio::test]
2258 async fn local_durable_run_exports_raw_session_before_failed_terminal_state() {
2259 let dir = tempfile::tempdir().unwrap();
2260 let db_path = dir.path().join("durable.db");
2261 let raw_dir = dir.path().join("raw");
2262 let script_path = dir.path().join("workflow.mjs");
2263 fs::write(
2264 &script_path,
2265 r#"
2266export const meta = { name: "durable-fail-session", description: "durable fail session" };
2267await agent("before failure");
2268throw new Error("middle failure");
2269"#,
2270 )
2271 .unwrap();
2272 let provider = Arc::new(SessionProvider {
2273 session_id: "failed-session-1",
2274 cancel_on_run: None,
2275 delay_ms: 0,
2276 });
2277 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2278 let mut options = LocalDurableRunOptions::new(script_path, json!({}), provider);
2279 options.session_log_sink = Some(Arc::new(TestSessionLogSink {
2280 root: raw_dir.clone(),
2281 saved_tx: None,
2282 }));
2283
2284 let error = run_local_durable_workflow(&mut store, options)
2285 .await
2286 .expect_err("workflow should fail after exporting agent session");
2287 assert!(
2288 error.to_string().contains("middle failure"),
2289 "unexpected error: {error:#}"
2290 );
2291
2292 let raw_file = raw_dir.join("session/failed-session-1.jsonl");
2293 assert!(raw_file.exists(), "raw session file should be written");
2294 let raw_line = fs::read_to_string(raw_file).unwrap();
2295 assert!(raw_line.contains("before failure"));
2296 let run_state: String = store
2297 .connection()
2298 .query_row("SELECT state FROM sw_workflow_runs", [], |row| row.get(0))
2299 .unwrap();
2300 assert_eq!(run_state, "failed");
2301 }
2302
2303 #[tokio::test]
2304 async fn local_durable_run_exports_raw_session_before_cancelled_state() {
2305 let dir = tempfile::tempdir().unwrap();
2306 let db_path = dir.path().join("durable.db");
2307 let raw_dir = dir.path().join("raw");
2308 let script_path = dir.path().join("workflow.mjs");
2309 fs::write(
2310 &script_path,
2311 r#"
2312export const meta = { name: "durable-cancel-session", description: "durable cancel session" };
2313export default { result: await agent("before cancel") };
2314"#,
2315 )
2316 .unwrap();
2317 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2318 let provider = Arc::new(SessionProvider {
2319 session_id: "cancelled-session-1",
2320 cancel_on_run: Some(cancel_tx),
2321 delay_ms: 10,
2322 });
2323 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2324 let mut options = LocalDurableRunOptions::new(script_path, json!({}), provider);
2325 options.cancel_rx = Some(cancel_rx);
2326 options.session_log_sink = Some(Arc::new(TestSessionLogSink {
2327 root: raw_dir.clone(),
2328 saved_tx: None,
2329 }));
2330
2331 let error = run_local_durable_workflow(&mut store, options)
2332 .await
2333 .expect_err("workflow should be cancelled after exporting agent session");
2334 assert_eq!(error.to_string(), "workflow cancelled");
2335
2336 let raw_file = raw_dir.join("session/cancelled-session-1.jsonl");
2337 assert!(raw_file.exists(), "raw session file should be written");
2338 let raw_line = fs::read_to_string(raw_file).unwrap();
2339 assert!(raw_line.contains("before cancel"));
2340 let run_state: String = store
2341 .connection()
2342 .query_row("SELECT state FROM sw_workflow_runs", [], |row| row.get(0))
2343 .unwrap();
2344 assert_eq!(run_state, "cancelled");
2345 }
2346
2347 #[tokio::test]
2348 async fn local_durable_run_persists_successful_task_and_run() {
2349 let dir = tempfile::tempdir().unwrap();
2350 let db_path = dir.path().join("durable.db");
2351 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2352 let script_path = dir.path().join("workflow.mjs");
2353 fs::write(
2354 &script_path,
2355 r#"
2356export const meta = { name: "durable-local", description: "durable local" };
2357export default { result: await agent("hello") };
2358"#,
2359 )
2360 .unwrap();
2361 let provider = Arc::new(CountingProvider {
2362 calls: AtomicUsize::new(0),
2363 });
2364 let result = run_local_durable_workflow(
2365 &mut store,
2366 LocalDurableRunOptions::new(script_path, json!({}), provider.clone()),
2367 )
2368 .await
2369 .unwrap();
2370 assert_eq!(
2371 result.workflow.output.result,
2372 json!({ "result": "hello:1" })
2373 );
2374 assert_eq!(result.attempts, 1);
2375 assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2376
2377 let state: String = store
2378 .connection()
2379 .query_row(
2380 "SELECT state FROM sw_workflow_tasks WHERE task_id = ?1",
2381 rusqlite::params![result.task_id],
2382 |row| row.get(0),
2383 )
2384 .unwrap();
2385 assert_eq!(state, "completed");
2386 let run_state: String = store
2387 .connection()
2388 .query_row(
2389 "SELECT state FROM sw_workflow_runs WHERE run_id = ?1",
2390 rusqlite::params![result.run_id],
2391 |row| row.get(0),
2392 )
2393 .unwrap();
2394 assert_eq!(run_state, "completed");
2395 let completed_steps: i64 = store
2396 .connection()
2397 .query_row(
2398 "SELECT COUNT(*) FROM sw_workflow_steps WHERE state = 'completed'",
2399 [],
2400 |row| row.get(0),
2401 )
2402 .unwrap();
2403 assert_eq!(completed_steps, 1);
2404 }
2405
2406 #[tokio::test]
2407 async fn local_durable_run_uses_runtime_agent_retry_without_global_retry() {
2408 let dir = tempfile::tempdir().unwrap();
2409 let db_path = dir.path().join("durable.db");
2410 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2411 let script_path = dir.path().join("workflow.mjs");
2412 fs::write(
2413 &script_path,
2414 r#"
2415export const meta = { name: "durable-runtime-retry", description: "durable runtime retry" };
2416export default { result: await agent("hello", { retry: { maxAttempts: 2, backoffMs: 0 } }) };
2417"#,
2418 )
2419 .unwrap();
2420 let provider = Arc::new(FlakyOnceProvider {
2421 calls: AtomicUsize::new(0),
2422 });
2423
2424 let result = run_local_durable_workflow(
2425 &mut store,
2426 LocalDurableRunOptions::new(script_path, json!({}), provider.clone()),
2427 )
2428 .await
2429 .unwrap();
2430 assert_eq!(
2431 result.workflow.output.result,
2432 json!({ "result": "recovered: hello" })
2433 );
2434 assert_eq!(provider.calls.load(Ordering::SeqCst), 2);
2435
2436 let (workflow_attempts, completed_steps, failed_steps): (i64, i64, i64) = store
2437 .connection()
2438 .query_row(
2439 r#"
2440 SELECT
2441 (SELECT COUNT(*) FROM sw_workflow_attempts),
2442 (SELECT COUNT(*) FROM sw_workflow_steps WHERE step_kind = 'agent' AND state = 'completed'),
2443 (SELECT COUNT(*) FROM sw_workflow_steps WHERE step_kind = 'agent' AND state = 'failed')
2444 "#,
2445 [],
2446 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2447 )
2448 .unwrap();
2449 assert_eq!(workflow_attempts, 1);
2450 assert_eq!(completed_steps, 1);
2451 assert_eq!(failed_steps, 0);
2452 }
2453
2454 #[tokio::test]
2455 async fn local_durable_run_records_one_attempt_without_global_retry() {
2456 let dir = tempfile::tempdir().unwrap();
2457 let db_path = dir.path().join("durable.db");
2458 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2459 let script_path = dir.path().join("workflow.mjs");
2460 fs::write(
2461 &script_path,
2462 r#"
2463export const meta = { name: "durable-no-global-retry", description: "durable no global retry" };
2464await agent("hello");
2465throw new Error("boom");
2466export default { unreachable: true };
2467"#,
2468 )
2469 .unwrap();
2470 let provider = Arc::new(CountingProvider {
2471 calls: AtomicUsize::new(0),
2472 });
2473 let options = LocalDurableRunOptions::new(script_path, json!({}), provider.clone());
2474
2475 let error = run_local_durable_workflow(&mut store, options)
2476 .await
2477 .unwrap_err();
2478 assert!(error.to_string().contains("boom"));
2479 assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2480
2481 let attempts: i64 = store
2482 .connection()
2483 .query_row("SELECT COUNT(*) FROM sw_workflow_attempts", [], |row| {
2484 row.get(0)
2485 })
2486 .unwrap();
2487 assert_eq!(attempts, 1);
2488 let completed_steps: i64 = store
2489 .connection()
2490 .query_row(
2491 "SELECT COUNT(*) FROM sw_workflow_steps WHERE state = 'completed'",
2492 [],
2493 |row| row.get(0),
2494 )
2495 .unwrap();
2496 assert_eq!(completed_steps, 1);
2497 }
2498
2499 #[test]
2500 fn prepare_resume_run_reports_missing_run_id_and_database() {
2501 let dir = tempfile::tempdir().unwrap();
2502 let db_path = dir.path().join("durable.db");
2503 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2504 store.init().expect("schema should initialize");
2505
2506 let error = store
2507 .prepare_resume_run("run_missing", "owner", 1)
2508 .unwrap_err()
2509 .to_string();
2510
2511 assert!(error.contains("workflow run run_missing was not found"));
2512 assert!(error.contains(&db_path.display().to_string()));
2513 assert!(error.contains("check --db"));
2514 }
2515
2516 #[tokio::test]
2517 async fn local_durable_run_resumes_existing_run_and_replays_steps() {
2518 let dir = tempfile::tempdir().unwrap();
2519 let db_path = dir.path().join("durable.db");
2520 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2521 let script_path = dir.path().join("workflow.mjs");
2522 fs::write(
2523 &script_path,
2524 r#"
2525export const meta = { name: "durable-resume", description: "durable resume" };
2526const value = await agent("hello");
2527throw new Error("boom");
2528export default { value };
2529"#,
2530 )
2531 .unwrap();
2532 let provider = Arc::new(CountingProvider {
2533 calls: AtomicUsize::new(0),
2534 });
2535 let first_options =
2536 LocalDurableRunOptions::new(script_path.clone(), json!({}), provider.clone());
2537 let first_error = run_local_durable_workflow(&mut store, first_options)
2538 .await
2539 .unwrap_err();
2540 assert!(first_error.to_string().contains("boom"));
2541 assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2542
2543 let run_id: String = store
2544 .connection()
2545 .query_row("SELECT run_id FROM sw_workflow_runs", [], |row| row.get(0))
2546 .unwrap();
2547
2548 fs::write(
2549 &script_path,
2550 r#"
2551export const meta = { name: "durable-resume", description: "durable resume" };
2552const value = await agent("hello");
2553export default { value };
2554"#,
2555 )
2556 .unwrap();
2557 let mut resume_options =
2558 LocalDurableRunOptions::new(script_path.clone(), json!({}), provider.clone());
2559 resume_options.resume_run_id = Some(run_id);
2560 let resumed = run_local_durable_workflow(&mut store, resume_options)
2561 .await
2562 .unwrap();
2563 assert_eq!(
2564 resumed.workflow.output.result,
2565 json!({ "value": "hello:1" })
2566 );
2567 assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2568 }
2569}