1use crate::agent_providers::{AgentProvider, AgentProviderResult, AgentProviderRunInput};
8use crate::durable::json::{
9 DurableRunMode, FailureReasonJSON, LocalTaskParamsJSON, WorkflowRunJSON,
10};
11use crate::events::{WorkflowEvent, WorkflowEventMetadata, WorkflowEventSink};
12use crate::metadata::read_workflow_metadata;
13use crate::workflow::{
14 run_agent_provider_with_retry, run_workflow, RunWorkflowOptions, RunWorkflowResult,
15 WorkflowAgentRunner,
16};
17use anyhow::{bail, Context};
18use rusqlite::OptionalExtension;
19use serde_json::Value;
20use std::collections::{BTreeMap, HashMap};
21use std::path::PathBuf;
22use std::sync::{Arc, Mutex};
23use std::time::{Duration, Instant};
24use tokio::sync::watch;
25
26use super::sqlite::{new_id, now_ms, SqliteDurableStore};
27
28const WORKFLOW_TASK_NAME: &str = "workflow.run";
29const LOCAL_CLAIM_SCOPE: &str = "local";
30const DEFAULT_STEP_LEASE_MS: u64 = 60_000;
31
32pub struct LocalDurableRunOptions {
34 pub script_path: PathBuf,
35 pub args: Value,
36 pub agent_provider: Arc<dyn AgentProvider>,
37 pub model_map: BTreeMap<String, String>,
38 pub budget_total: Option<u64>,
39 pub max_parallel_agent_requests: Option<usize>,
40 pub resume_run_id: Option<String>,
41 pub cancel_rx: Option<watch::Receiver<bool>>,
42 pub event_sink: Option<Arc<dyn crate::workflow::WorkflowEventSink>>,
43 pub session_log_sink: Option<Arc<dyn crate::workflow::AgentSessionLogSink>>,
44}
45
46impl LocalDurableRunOptions {
47 pub fn new(script_path: PathBuf, args: Value, agent_provider: Arc<dyn AgentProvider>) -> Self {
48 Self {
49 script_path,
50 args,
51 agent_provider,
52 model_map: BTreeMap::new(),
53 budget_total: None,
54 max_parallel_agent_requests: None,
55 resume_run_id: None,
56 cancel_rx: None,
57 event_sink: None,
58 session_log_sink: None,
59 }
60 }
61}
62
63struct RunScopedWorkflowEventSink {
64 inner: Arc<dyn WorkflowEventSink>,
65 run_id: String,
66 start: Instant,
67}
68
69impl RunScopedWorkflowEventSink {
70 fn new(inner: Arc<dyn WorkflowEventSink>, run_id: String) -> Self {
71 Self {
72 inner,
73 run_id,
74 start: Instant::now(),
75 }
76 }
77}
78
79#[async_trait::async_trait]
80impl WorkflowEventSink for RunScopedWorkflowEventSink {
81 async fn emit(&self, event: WorkflowEvent) -> anyhow::Result<()> {
82 let workflow_depth = event
83 .metadata
84 .as_ref()
85 .and_then(|metadata| metadata.workflow_depth)
86 .unwrap_or(0);
87 if workflow_depth == 0
88 && matches!(
89 event.event_type.as_str(),
90 "workflow.started" | "workflow.result" | "workflow.error"
91 )
92 {
93 return Ok(());
94 }
95 self.emit_scoped(event).await
96 }
97}
98
99impl RunScopedWorkflowEventSink {
100 async fn emit_scoped(&self, mut event: WorkflowEvent) -> anyhow::Result<()> {
101 let metadata = event
102 .metadata
103 .get_or_insert_with(WorkflowEventMetadata::default);
104 if metadata.run_id.is_none() {
105 metadata.run_id = Some(self.run_id.clone());
106 }
107 if metadata.workflow_depth.is_none() {
108 metadata.workflow_depth = Some(0);
109 }
110 if event.event_type.as_str() != "workflow.started" && event.elapsed_nanos.is_none() {
111 event.elapsed_nanos = Some(elapsed_nanos(self.start));
112 }
113 self.inner.emit(event).await
114 }
115}
116
117fn elapsed_nanos(start: Instant) -> u64 {
118 u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX)
119}
120
121fn durable_agent_step_lease_expires_at(
122 now: i64,
123 retry_policy: crate::workflow::AgentRetryPolicy,
124) -> i64 {
125 let retry_backoff_budget = retry_policy
126 .backoff_ms
127 .saturating_mul(u64::from(retry_policy.max_attempts.saturating_sub(1)));
128 let lease_ms = DEFAULT_STEP_LEASE_MS.saturating_add(retry_backoff_budget);
129 let lease_ms = i64::try_from(lease_ms).unwrap_or(i64::MAX);
130 now.saturating_add(lease_ms)
131}
132
133fn rfc3339_now() -> anyhow::Result<String> {
134 Ok(time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339)?)
135}
136
137#[derive(Debug)]
139pub struct LocalDurableRunResult {
140 pub task_id: String,
141 pub run_id: String,
142 pub attempts: u32,
143 pub workflow: RunWorkflowResult,
144}
145
146#[derive(Debug)]
147pub struct SqliteDurableAgentRunner {
148 db_path: PathBuf,
149 run_id: String,
150 root_run_id: String,
151 worker_id: String,
152 cancel_rx: Option<watch::Receiver<bool>>,
153 occurrences: Mutex<HashMap<String, u64>>,
154}
155
156impl SqliteDurableAgentRunner {
157 pub fn new(
158 db_path: PathBuf,
159 run_id: String,
160 root_run_id: String,
161 worker_id: String,
162 cancel_rx: Option<watch::Receiver<bool>>,
163 ) -> Self {
164 Self {
165 db_path,
166 run_id,
167 root_run_id,
168 worker_id,
169 cancel_rx,
170 occurrences: Mutex::new(HashMap::new()),
171 }
172 }
173
174 fn next_checkpoint_name(&self, base_checkpoint_name: String) -> anyhow::Result<String> {
175 let mut occurrences = self
176 .occurrences
177 .lock()
178 .map_err(|_| anyhow::anyhow!("durable occurrence counter lock was poisoned"))?;
179 let count = occurrences.entry(base_checkpoint_name.clone()).or_insert(0);
180 *count += 1;
181 if *count == 1 {
182 Ok(base_checkpoint_name)
183 } else {
184 Ok(format!("{base_checkpoint_name}#{count}"))
185 }
186 }
187}
188
189#[async_trait::async_trait]
190impl WorkflowAgentRunner for SqliteDurableAgentRunner {
191 fn retry_in_runtime(&self) -> bool {
192 false
193 }
194
195 async fn run_agent(
196 &self,
197 default_provider: Arc<dyn AgentProvider>,
198 provider_override: Option<String>,
199 input: AgentProviderRunInput,
200 ) -> anyhow::Result<AgentProviderResult> {
201 let provider_name = provider_override
202 .as_deref()
203 .unwrap_or_else(|| default_provider.name())
204 .to_string();
205 let input_signature = agent_input_signature(&provider_name, &input);
206 let input_signature_json = canonical_json_string(&input_signature)?;
207 let input_signature_hash = short_blake3_hex(&input_signature_json);
208 let base_checkpoint_name = format!("step:sig_{input_signature_hash}");
209 let checkpoint_name = self.next_checkpoint_name(base_checkpoint_name)?;
210 let input_json = serde_json::to_value(&input_signature)
211 .context("failed to serialize durable agent input")?;
212 let retry_policy = crate::workflow::agent_retry_policy(&input.options)?;
213
214 loop {
215 let claim = {
216 let mut store = SqliteDurableStore::open(&self.db_path)?;
217 store.claim_or_replay_agent_step(AgentStepClaimInput {
218 run_id: &self.run_id,
219 root_run_id: &self.root_run_id,
220 checkpoint_name: &checkpoint_name,
221 input_signature_hash: &input_signature_hash,
222 input_signature_json: &input_signature_json,
223 input_json: &input_json,
224 worker_id: &self.worker_id,
225 lease_expires_at: durable_agent_step_lease_expires_at(now_ms(), retry_policy),
226 now: now_ms(),
227 })?
228 };
229
230 match claim {
231 AgentStepClaim::Replay(result) => return Ok(*result),
232 AgentStepClaim::Run { step_id } => {
233 let provider_result = run_durable_agent_provider(
234 default_provider,
235 provider_override,
236 input,
237 self.cancel_rx.clone(),
238 )
239 .await;
240 let mut store = SqliteDurableStore::open(&self.db_path)?;
241 match provider_result {
242 Ok(result) => {
243 store.complete_agent_step(AgentStepCompleteInput {
244 step_id: &step_id,
245 run_id: &self.run_id,
246 root_run_id: &self.root_run_id,
247 result: &result,
248 now: now_ms(),
249 })?;
250 return Ok(result);
251 }
252 Err(error) => {
253 let failure_reason = serde_json::to_value(FailureReasonJSON {
254 message: error.to_string(),
255 })?;
256 store.fail_agent_step(AgentStepFailInput {
257 step_id: &step_id,
258 failure_reason: &failure_reason,
259 now: now_ms(),
260 })?;
261 return Err(error);
262 }
263 }
264 }
265 AgentStepClaim::Wait => {
266 tokio::time::sleep(Duration::from_millis(50)).await;
267 }
268 }
269 }
270 }
271
272 async fn sleep(&self, duration_ms: u64) -> anyhow::Result<()> {
273 let input_signature = sleep_input_signature(duration_ms);
274 let input_signature_json = canonical_json_string(&input_signature)?;
275 let input_signature_hash = short_blake3_hex(&input_signature_json);
276 let base_checkpoint_name = format!("step:sig_{input_signature_hash}");
277 let checkpoint_name = self.next_checkpoint_name(base_checkpoint_name)?;
278
279 let now = now_ms();
280 let claim = {
281 let mut store = SqliteDurableStore::open(&self.db_path)?;
282 store.claim_or_replay_sleep_step(SleepStepClaimInput {
283 run_id: &self.run_id,
284 root_run_id: &self.root_run_id,
285 checkpoint_name: &checkpoint_name,
286 input_signature_hash: &input_signature_hash,
287 input_signature_json: &input_signature_json,
288 duration_ms,
289 worker_id: &self.worker_id,
290 lease_expires_at: now + 60_000,
291 now,
292 })?
293 };
294
295 match claim {
296 SleepStepClaim::Replay => Ok(()),
297 SleepStepClaim::WaitUntil { step_id, wake_at } => {
298 let now = now_ms();
299 if wake_at > now {
300 tokio::time::sleep(Duration::from_millis((wake_at - now) as u64)).await;
301 }
302 let mut store = SqliteDurableStore::open(&self.db_path)?;
303 store.complete_sleep_step(SleepStepCompleteInput {
304 step_id: &step_id,
305 duration_ms,
306 wake_at,
307 now: now_ms(),
308 })?;
309 Ok(())
310 }
311 }
312 }
313}
314
315pub async fn run_local_durable_workflow(
317 store: &mut SqliteDurableStore,
318 options: LocalDurableRunOptions,
319) -> anyhow::Result<LocalDurableRunResult> {
320 store.init()?;
321
322 let owner_id = new_id("owner");
323 let now = now_ms();
324 let params_json = serde_json::to_value(LocalTaskParamsJSON {
325 mode: DurableRunMode::Local,
326 script_path: options.script_path.clone(),
327 args: options.args.clone(),
328 budget_total: options.budget_total,
329 })?;
330 let workflow_metadata = read_workflow_metadata(&options.script_path).ok().flatten();
331 let workflow_run_json = serde_json::to_value(WorkflowRunJSON {
332 mode: DurableRunMode::Local,
333 script_path: options.script_path.clone(),
334 metadata: workflow_metadata,
335 })?;
336
337 let (task_id, run_id, first_attempt) = if let Some(run_id) = options.resume_run_id.clone() {
338 let (task_id, current_attempts) = store.prepare_resume_run(&run_id, &owner_id, now)?;
339 (task_id, run_id, current_attempts + 1)
340 } else {
341 let task_id = new_id("task");
342 let run_id = new_id("run");
343 store.insert_local_task_and_run(LocalTaskAndRunInsert {
344 task_id: &task_id,
345 run_id: &run_id,
346 owner_id: &owner_id,
347 params_json: ¶ms_json,
348 workflow_run_json: &workflow_run_json,
349 args_json: &options.args,
350 budget_total: options.budget_total,
351 max_attempts: 1,
352 now,
353 })?;
354 (task_id, run_id, 1)
355 };
356
357 let workflow_event_sink = options.event_sink.as_ref().map(|sink| {
358 Arc::new(RunScopedWorkflowEventSink::new(
359 Arc::clone(sink),
360 run_id.clone(),
361 ))
362 });
363 if let Some(event_sink) = workflow_event_sink.as_ref() {
364 event_sink
365 .emit_scoped(WorkflowEvent::started(rfc3339_now()?))
366 .await
367 .context("failed to emit workflow started event")?;
368 }
369
370 let attempt = first_attempt;
371 let attempt_id = new_id("attempt");
372 store.start_attempt(LocalAttemptStart {
373 task_id: &task_id,
374 run_id: &run_id,
375 attempt_id: &attempt_id,
376 owner_id: &owner_id,
377 attempt,
378 lease_expires_at: now_ms() + 60_000,
379 now: now_ms(),
380 })?;
381
382 let agent_runner = store.path().map(|db_path| {
383 Arc::new(SqliteDurableAgentRunner::new(
384 db_path.to_path_buf(),
385 run_id.clone(),
386 run_id.clone(),
387 owner_id.clone(),
388 options.cancel_rx.clone(),
389 )) as Arc<dyn WorkflowAgentRunner>
390 });
391
392 let result = run_workflow(RunWorkflowOptions {
393 script_path: options.script_path.clone(),
394 args: options.args.clone(),
395 agent_provider: Arc::clone(&options.agent_provider),
396 model_map: options.model_map.clone(),
397 budget_total: options.budget_total,
398 budget_spent: 0,
399 nesting_depth: 0,
400 max_parallel_agent_requests: options.max_parallel_agent_requests,
401 agent_runner,
402 cancel_rx: options.cancel_rx.clone(),
403 event_sink: workflow_event_sink
404 .as_ref()
405 .map(|sink| Arc::clone(sink) as Arc<dyn WorkflowEventSink>),
406 event_parent_step_id: None,
407 event_stream_start: workflow_event_sink.as_ref().map(|sink| sink.start),
408 session_log_sink: options.session_log_sink.clone(),
409 })
410 .await;
411
412 match result {
413 Ok(workflow) => {
414 let completed_payload = serde_json::to_value(&workflow.output)
415 .context("failed to serialize durable workflow output")?;
416 store.complete_attempt_and_task(LocalAttemptComplete {
417 task_id: &task_id,
418 run_id: &run_id,
419 attempt_id: &attempt_id,
420 completed_payload: &completed_payload,
421 budget_spent: workflow.budget.spent,
422 now: now_ms(),
423 })?;
424 if let Some(event_sink) = workflow_event_sink.as_ref() {
425 event_sink
426 .emit_scoped(WorkflowEvent::result(
427 workflow.token_usage.input_tokens,
428 workflow.token_usage.output_tokens,
429 workflow.token_usage.total_tokens,
430 workflow.output.result.clone(),
431 ))
432 .await
433 .context("failed to emit workflow result event")?;
434 }
435 Ok(LocalDurableRunResult {
436 task_id,
437 run_id,
438 attempts: attempt,
439 workflow,
440 })
441 }
442 Err(error) => {
443 let failure_reason = serde_json::to_value(FailureReasonJSON {
444 message: error.to_string(),
445 })?;
446 if cancellation_requested(&options.cancel_rx) {
447 store.cancel_attempt_and_task(LocalAttemptCancel {
448 task_id: &task_id,
449 run_id: &run_id,
450 attempt_id: &attempt_id,
451 failure_reason: &failure_reason,
452 now: now_ms(),
453 })?;
454 } else {
455 store.fail_attempt(LocalAttemptFail {
456 task_id: &task_id,
457 run_id: &run_id,
458 attempt_id: &attempt_id,
459 failure_reason: &failure_reason,
460 terminal: true,
461 now: now_ms(),
462 })?;
463 }
464 if let Some(event_sink) = workflow_event_sink.as_ref() {
465 event_sink
466 .emit_scoped(WorkflowEvent::error(error.to_string(), None))
467 .await
468 .context("failed to emit workflow error event")?;
469 }
470 Err(error)
471 }
472 }
473}
474
475pub struct LocalTaskAndRunInsert<'a> {
476 pub task_id: &'a str,
477 pub run_id: &'a str,
478 pub owner_id: &'a str,
479 pub params_json: &'a Value,
480 pub workflow_run_json: &'a Value,
481 pub args_json: &'a Value,
482 pub budget_total: Option<u64>,
483 pub max_attempts: u32,
484 pub now: i64,
485}
486
487pub struct LocalAttemptStart<'a> {
488 pub task_id: &'a str,
489 pub run_id: &'a str,
490 pub attempt_id: &'a str,
491 pub owner_id: &'a str,
492 pub attempt: u32,
493 pub lease_expires_at: i64,
494 pub now: i64,
495}
496
497pub struct LocalAttemptComplete<'a> {
498 pub task_id: &'a str,
499 pub run_id: &'a str,
500 pub attempt_id: &'a str,
501 pub completed_payload: &'a Value,
502 pub budget_spent: u64,
503 pub now: i64,
504}
505
506pub struct LocalAttemptFail<'a> {
507 pub task_id: &'a str,
508 pub run_id: &'a str,
509 pub attempt_id: &'a str,
510 pub failure_reason: &'a Value,
511 pub terminal: bool,
512 pub now: i64,
513}
514
515pub struct LocalAttemptCancel<'a> {
516 pub task_id: &'a str,
517 pub run_id: &'a str,
518 pub attempt_id: &'a str,
519 pub failure_reason: &'a Value,
520 pub now: i64,
521}
522
523fn cancellation_requested(cancel_rx: &Option<watch::Receiver<bool>>) -> bool {
524 cancel_rx
525 .as_ref()
526 .is_some_and(|cancel_rx| *cancel_rx.borrow())
527}
528
529impl SqliteDurableStore {
530 pub fn insert_local_task_and_run(
531 &mut self,
532 input: LocalTaskAndRunInsert<'_>,
533 ) -> anyhow::Result<()> {
534 let tx = self
535 .connection_mut()
536 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
537 .context("failed to begin local durable task transaction")?;
538 tx.execute(
539 r#"
540 INSERT INTO sw_workflow_tasks (
541 task_id,
542 task_name,
543 state,
544 params_json,
545 submitted_by_owner_id,
546 claimed_by_owner_id,
547 claim_scope,
548 created_at,
549 updated_at,
550 max_attempts
551 )
552 VALUES (?1, ?2, 'pending', ?3, ?4, NULL, ?5, ?6, ?6, ?7)
553 "#,
554 rusqlite::params![
555 input.task_id,
556 WORKFLOW_TASK_NAME,
557 serde_json::to_string(input.params_json)?,
558 input.owner_id,
559 LOCAL_CLAIM_SCOPE,
560 input.now,
561 input.max_attempts,
562 ],
563 )
564 .context("failed to insert durable workflow task")?;
565 tx.execute(
566 r#"
567 INSERT INTO sw_workflow_runs (
568 run_id,
569 task_id,
570 root_run_id,
571 state,
572 workflow_run_json,
573 args_json,
574 budget_total,
575 budget_spent,
576 created_at,
577 updated_at
578 )
579 VALUES (?1, ?2, ?1, 'pending', ?3, ?4, ?5, 0, ?6, ?6)
580 "#,
581 rusqlite::params![
582 input.run_id,
583 input.task_id,
584 serde_json::to_string(input.workflow_run_json)?,
585 serde_json::to_string(input.args_json)?,
586 input.budget_total.map(|value| value as i64),
587 input.now,
588 ],
589 )
590 .context("failed to insert durable workflow run")?;
591 tx.commit()
592 .context("failed to commit local durable task transaction")
593 }
594
595 pub fn prepare_resume_run(
596 &mut self,
597 run_id: &str,
598 owner_id: &str,
599 now: i64,
600 ) -> anyhow::Result<(String, u32)> {
601 let db_label = self
602 .path()
603 .map(|path| path.display().to_string())
604 .unwrap_or_else(|| "<in-memory>".to_string());
605 let tx = self
606 .connection_mut()
607 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
608 .context("failed to begin durable resume transaction")?;
609 let task_id: String = tx
610 .query_row(
611 r#"
612 SELECT task_id
613 FROM sw_workflow_runs
614 WHERE run_id = ?1
615 "#,
616 rusqlite::params![run_id],
617 |row| row.get(0),
618 )
619 .optional()
620 .context("failed to query durable run to resume")?
621 .ok_or_else(|| {
622 anyhow::anyhow!("workflow run {run_id} was not found in {db_label}; check --db")
623 })?;
624 let current_attempts: u32 =
625 tx.query_row(
626 r#"
627 SELECT COUNT(*)
628 FROM sw_workflow_attempts
629 WHERE run_id = ?1
630 "#,
631 rusqlite::params![run_id],
632 |row| row.get::<_, i64>(0),
633 )
634 .context("failed to count durable run attempts")? as u32;
635 tx.execute(
636 r#"
637 UPDATE sw_workflow_tasks
638 SET state = 'pending',
639 claimed_by_owner_id = NULL,
640 lease_expires_at = NULL,
641 updated_at = ?1
642 WHERE task_id = ?2
643 AND state IN ('pending', 'running', 'failed')
644 "#,
645 rusqlite::params![now, task_id],
646 )
647 .context("failed to prepare durable task for resume")?;
648 tx.execute(
649 r#"
650 UPDATE sw_workflow_runs
651 SET state = 'pending',
652 updated_at = ?1
653 WHERE run_id = ?2
654 AND state IN ('pending', 'running', 'failed')
655 "#,
656 rusqlite::params![now, run_id],
657 )
658 .context("failed to prepare durable run for resume")?;
659 tx.execute(
660 r#"
661 UPDATE sw_workflow_tasks
662 SET submitted_by_owner_id = ?1
663 WHERE task_id = ?2
664 AND claim_scope = 'local'
665 "#,
666 rusqlite::params![owner_id, task_id],
667 )
668 .context("failed to adopt durable task owner")?;
669 tx.commit().context("failed to commit durable resume")?;
670 Ok((task_id, current_attempts))
671 }
672
673 pub fn start_attempt(&mut self, input: LocalAttemptStart<'_>) -> anyhow::Result<()> {
674 let tx = self
675 .connection_mut()
676 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
677 .context("failed to begin durable attempt start transaction")?;
678 tx.execute(
679 r#"
680 UPDATE sw_workflow_tasks
681 SET state = 'running',
682 claimed_by_owner_id = ?1,
683 lease_expires_at = ?2,
684 updated_at = ?3
685 WHERE task_id = ?4
686 AND claim_scope = 'local'
687 AND submitted_by_owner_id = ?1
688 AND state IN ('pending', 'running')
689 "#,
690 rusqlite::params![
691 input.owner_id,
692 input.lease_expires_at,
693 input.now,
694 input.task_id
695 ],
696 )
697 .context("failed to claim local durable task")?;
698 tx.execute(
699 r#"
700 UPDATE sw_workflow_runs
701 SET state = 'running',
702 updated_at = ?1
703 WHERE run_id = ?2
704 "#,
705 rusqlite::params![input.now, input.run_id],
706 )
707 .context("failed to mark durable workflow run running")?;
708 tx.execute(
709 r#"
710 INSERT INTO sw_workflow_attempts (
711 attempt_id,
712 run_id,
713 task_id,
714 attempt,
715 worker_id,
716 state,
717 lease_expires_at,
718 started_at
719 )
720 VALUES (?1, ?2, ?3, ?4, ?5, 'running', ?6, ?7)
721 "#,
722 rusqlite::params![
723 input.attempt_id,
724 input.run_id,
725 input.task_id,
726 input.attempt,
727 input.owner_id,
728 input.lease_expires_at,
729 input.now,
730 ],
731 )
732 .context("failed to insert durable workflow attempt")?;
733 tx.commit()
734 .context("failed to commit durable attempt start transaction")
735 }
736
737 pub fn complete_attempt_and_task(
738 &mut self,
739 input: LocalAttemptComplete<'_>,
740 ) -> anyhow::Result<()> {
741 let payload = serde_json::to_string(input.completed_payload)?;
742 let tx = self
743 .connection_mut()
744 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
745 .context("failed to begin durable completion transaction")?;
746 tx.execute(
747 r#"
748 UPDATE sw_workflow_attempts
749 SET state = 'completed',
750 completed_at = ?1
751 WHERE attempt_id = ?2
752 "#,
753 rusqlite::params![input.now, input.attempt_id],
754 )
755 .context("failed to mark durable attempt completed")?;
756 tx.execute(
757 r#"
758 UPDATE sw_workflow_runs
759 SET state = 'completed',
760 budget_spent = ?1,
761 completed_payload_json = ?2,
762 updated_at = ?3
763 WHERE run_id = ?4
764 "#,
765 rusqlite::params![input.budget_spent as i64, payload, input.now, input.run_id],
766 )
767 .context("failed to mark durable run completed")?;
768 tx.execute(
769 r#"
770 UPDATE sw_workflow_tasks
771 SET state = 'completed',
772 completed_payload_json = ?1,
773 lease_expires_at = NULL,
774 updated_at = ?2
775 WHERE task_id = ?3
776 "#,
777 rusqlite::params![payload, input.now, input.task_id],
778 )
779 .context("failed to mark durable task completed")?;
780 tx.commit()
781 .context("failed to commit durable completion transaction")
782 }
783
784 pub fn fail_attempt(&mut self, input: LocalAttemptFail<'_>) -> anyhow::Result<()> {
785 let failure = serde_json::to_string(input.failure_reason)?;
786 let tx = self
787 .connection_mut()
788 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
789 .context("failed to begin durable failure transaction")?;
790 tx.execute(
791 r#"
792 UPDATE sw_workflow_attempts
793 SET state = 'failed',
794 completed_at = ?1,
795 failure_reason_json = ?2
796 WHERE attempt_id = ?3
797 "#,
798 rusqlite::params![input.now, failure, input.attempt_id],
799 )
800 .context("failed to mark durable attempt failed")?;
801 let next_state = if input.terminal { "failed" } else { "pending" };
802 tx.execute(
803 r#"
804 UPDATE sw_workflow_runs
805 SET state = ?1,
806 failure_reason_json = ?2,
807 updated_at = ?3
808 WHERE run_id = ?4
809 "#,
810 rusqlite::params![next_state, failure, input.now, input.run_id],
811 )
812 .context("failed to update durable run failure state")?;
813 tx.execute(
814 r#"
815 UPDATE sw_workflow_tasks
816 SET state = ?1,
817 failure_reason_json = ?2,
818 lease_expires_at = NULL,
819 updated_at = ?3
820 WHERE task_id = ?4
821 "#,
822 rusqlite::params![next_state, failure, input.now, input.task_id],
823 )
824 .context("failed to update durable task failure state")?;
825 tx.commit()
826 .context("failed to commit durable failure transaction")
827 }
828
829 pub fn cancel_attempt_and_task(&mut self, input: LocalAttemptCancel<'_>) -> anyhow::Result<()> {
830 let failure = serde_json::to_string(input.failure_reason)?;
831 let tx = self
832 .connection_mut()
833 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
834 .context("failed to begin durable cancellation transaction")?;
835 tx.execute(
836 r#"
837 UPDATE sw_workflow_attempts
838 SET state = 'cancelled',
839 completed_at = ?1,
840 failure_reason_json = ?2
841 WHERE attempt_id = ?3
842 "#,
843 rusqlite::params![input.now, failure, input.attempt_id],
844 )
845 .context("failed to mark durable attempt cancelled")?;
846 tx.execute(
847 r#"
848 UPDATE sw_workflow_runs
849 SET state = 'cancelled',
850 failure_reason_json = ?1,
851 updated_at = ?2
852 WHERE run_id = ?3
853 "#,
854 rusqlite::params![failure, input.now, input.run_id],
855 )
856 .context("failed to mark durable run cancelled")?;
857 tx.execute(
858 r#"
859 UPDATE sw_workflow_tasks
860 SET state = 'cancelled',
861 failure_reason_json = ?1,
862 lease_expires_at = NULL,
863 updated_at = ?2
864 WHERE task_id = ?3
865 "#,
866 rusqlite::params![failure, input.now, input.task_id],
867 )
868 .context("failed to mark durable task cancelled")?;
869 tx.commit()
870 .context("failed to commit durable cancellation transaction")
871 }
872
873 pub fn claim_or_replay_agent_step(
874 &mut self,
875 input: AgentStepClaimInput<'_>,
876 ) -> anyhow::Result<AgentStepClaim> {
877 let tx = self
878 .connection_mut()
879 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
880 .context("failed to begin durable agent step claim transaction")?;
881
882 let existing = tx
883 .query_row(
884 r#"
885 SELECT step_id, state, input_signature_json, result_json, lease_expires_at
886 FROM sw_workflow_steps
887 WHERE run_id = ?1 AND checkpoint_name = ?2
888 "#,
889 rusqlite::params![input.run_id, input.checkpoint_name],
890 |row| {
891 Ok((
892 row.get::<_, String>(0)?,
893 row.get::<_, String>(1)?,
894 row.get::<_, String>(2)?,
895 row.get::<_, Option<String>>(3)?,
896 row.get::<_, Option<i64>>(4)?,
897 ))
898 },
899 )
900 .optional()
901 .context("failed to query durable agent step")?;
902
903 if let Some((step_id, state, stored_signature, result_json, lease_expires_at)) = existing {
904 if stored_signature != input.input_signature_json {
905 bail!(
906 "durable step input signature mismatch for {}",
907 input.checkpoint_name
908 );
909 }
910 match state.as_str() {
911 "completed" => {
912 let result_json = result_json.ok_or_else(|| {
913 anyhow::anyhow!("completed durable agent step missing result_json")
914 })?;
915 let result = serde_json::from_str::<AgentProviderResult>(&result_json)
916 .context("failed to deserialize durable agent result")?;
917 tx.commit().context("failed to commit replay transaction")?;
918 return Ok(AgentStepClaim::Replay(Box::new(result)));
919 }
920 "running" if lease_expires_at.is_some_and(|lease| lease > input.now) => {
921 tx.commit().context("failed to commit wait transaction")?;
922 return Ok(AgentStepClaim::Wait);
923 }
924 "running" | "failed" | "cancelled" | "pending" => {
925 tx.execute(
926 r#"
927 UPDATE sw_workflow_steps
928 SET state = 'running',
929 worker_id = ?1,
930 lease_expires_at = ?2,
931 attempts = attempts + 1,
932 last_attempt_at = ?3,
933 updated_at = ?3,
934 failure_reason_json = NULL
935 WHERE step_id = ?4
936 "#,
937 rusqlite::params![
938 input.worker_id,
939 input.lease_expires_at,
940 input.now,
941 step_id
942 ],
943 )
944 .context("failed to reclaim durable agent step")?;
945 tx.commit()
946 .context("failed to commit durable agent reclaim transaction")?;
947 return Ok(AgentStepClaim::Run { step_id });
948 }
949 other => bail!("unknown durable step state: {other}"),
950 }
951 }
952
953 let step_id = new_id("step");
954 tx.execute(
955 r#"
956 INSERT INTO sw_workflow_steps (
957 step_id,
958 run_id,
959 root_run_id,
960 step_kind,
961 checkpoint_name,
962 input_signature_hash,
963 input_signature_json,
964 state,
965 input_json,
966 worker_id,
967 lease_expires_at,
968 attempts,
969 last_attempt_at,
970 created_at,
971 updated_at
972 )
973 VALUES (?1, ?2, ?3, 'agent', ?4, ?5, ?6, 'running', ?7, ?8, ?9, 1, ?10, ?10, ?10)
974 "#,
975 rusqlite::params![
976 step_id,
977 input.run_id,
978 input.root_run_id,
979 input.checkpoint_name,
980 input.input_signature_hash,
981 input.input_signature_json,
982 serde_json::to_string(input.input_json)?,
983 input.worker_id,
984 input.lease_expires_at,
985 input.now,
986 ],
987 )
988 .context("failed to insert durable agent step")?;
989 tx.commit()
990 .context("failed to commit durable agent step insert")?;
991 Ok(AgentStepClaim::Run { step_id })
992 }
993
994 pub fn complete_agent_step(&mut self, input: AgentStepCompleteInput<'_>) -> anyhow::Result<()> {
995 let result_json = serde_json::to_string(&compact_agent_result_for_replay(input.result))?;
996 let output_tokens = input
997 .result
998 .usage
999 .as_ref()
1000 .and_then(|usage| usage.output_tokens)
1001 .unwrap_or(0);
1002 let usage_json = input
1003 .result
1004 .usage
1005 .as_ref()
1006 .map(serde_json::to_string)
1007 .transpose()?;
1008 let budget_entry_id = new_id("budget");
1009 let tx = self
1010 .connection_mut()
1011 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1012 .context("failed to begin durable agent step completion transaction")?;
1013 tx.execute(
1014 r#"
1015 UPDATE sw_workflow_steps
1016 SET state = 'completed',
1017 result_json = ?1,
1018 lease_expires_at = NULL,
1019 updated_at = ?2
1020 WHERE step_id = ?3
1021 "#,
1022 rusqlite::params![result_json, input.now, input.step_id],
1023 )
1024 .context("failed to mark durable agent step completed")?;
1025 tx.execute(
1026 r#"
1027 INSERT OR IGNORE INTO sw_budget_ledger (
1028 budget_entry_id,
1029 run_id,
1030 root_run_id,
1031 step_id,
1032 source_type,
1033 source_id,
1034 output_tokens,
1035 usage_json,
1036 created_at
1037 )
1038 VALUES (?1, ?2, ?3, ?4, 'agent_step', ?4, ?5, ?6, ?7)
1039 "#,
1040 rusqlite::params![
1041 budget_entry_id,
1042 input.run_id,
1043 input.root_run_id,
1044 input.step_id,
1045 output_tokens as i64,
1046 usage_json,
1047 input.now,
1048 ],
1049 )
1050 .context("failed to insert durable budget ledger entry")?;
1051 tx.commit()
1052 .context("failed to commit durable agent step completion")
1053 }
1054
1055 pub fn claim_or_replay_sleep_step(
1056 &mut self,
1057 input: SleepStepClaimInput<'_>,
1058 ) -> anyhow::Result<SleepStepClaim> {
1059 let tx = self
1060 .connection_mut()
1061 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1062 .context("failed to begin durable sleep step claim transaction")?;
1063
1064 let existing = tx
1065 .query_row(
1066 r#"
1067 SELECT step_id, state, input_signature_json, input_json, lease_expires_at
1068 FROM sw_workflow_steps
1069 WHERE run_id = ?1 AND checkpoint_name = ?2
1070 "#,
1071 rusqlite::params![input.run_id, input.checkpoint_name],
1072 |row| {
1073 Ok((
1074 row.get::<_, String>(0)?,
1075 row.get::<_, String>(1)?,
1076 row.get::<_, String>(2)?,
1077 row.get::<_, String>(3)?,
1078 row.get::<_, Option<i64>>(4)?,
1079 ))
1080 },
1081 )
1082 .optional()
1083 .context("failed to query durable sleep step")?;
1084
1085 if let Some((step_id, state, stored_signature, input_json, lease_expires_at)) = existing {
1086 if stored_signature != input.input_signature_json {
1087 bail!(
1088 "durable sleep step input signature mismatch for {}",
1089 input.checkpoint_name
1090 );
1091 }
1092 match state.as_str() {
1093 "completed" => {
1094 tx.commit()
1095 .context("failed to commit sleep replay transaction")?;
1096 return Ok(SleepStepClaim::Replay);
1097 }
1098 "running" | "pending" | "failed" | "cancelled" => {
1099 let input_value = serde_json::from_str::<Value>(&input_json)
1100 .context("failed to deserialize durable sleep input")?;
1101 let wake_at = input_value
1102 .get("wakeAt")
1103 .and_then(Value::as_i64)
1104 .ok_or_else(|| anyhow::anyhow!("durable sleep step missing wakeAt"))?;
1105 if wake_at < 0 {
1106 log::warn!(
1107 "durable sleep step {} has negative wakeAt {}; resolving immediately",
1108 step_id,
1109 wake_at
1110 );
1111 }
1112 if state == "running"
1113 && lease_expires_at.is_some_and(|lease| lease > input.now)
1114 && wake_at > input.now
1115 {
1116 tx.commit()
1117 .context("failed to commit sleep wait transaction")?;
1118 return Ok(SleepStepClaim::WaitUntil { step_id, wake_at });
1119 }
1120 tx.execute(
1121 r#"
1122 UPDATE sw_workflow_steps
1123 SET state = 'running',
1124 worker_id = ?1,
1125 lease_expires_at = ?2,
1126 attempts = attempts + 1,
1127 last_attempt_at = ?3,
1128 updated_at = ?3,
1129 failure_reason_json = NULL
1130 WHERE step_id = ?4
1131 "#,
1132 rusqlite::params![
1133 input.worker_id,
1134 input.lease_expires_at,
1135 input.now,
1136 step_id
1137 ],
1138 )
1139 .context("failed to reclaim durable sleep step")?;
1140 tx.commit()
1141 .context("failed to commit durable sleep reclaim transaction")?;
1142 return Ok(SleepStepClaim::WaitUntil { step_id, wake_at });
1143 }
1144 other => bail!("unknown durable step state: {other}"),
1145 }
1146 }
1147
1148 let step_id = new_id("step");
1149 let duration_ms_i64 =
1150 i64::try_from(input.duration_ms).context("sleep duration is too large to persist")?;
1151 let wake_at = input.now.saturating_add(duration_ms_i64);
1152 let input_json = serde_json::json!({
1153 "durationMs": input.duration_ms,
1154 "wakeAt": wake_at,
1155 });
1156 tx.execute(
1157 r#"
1158 INSERT INTO sw_workflow_steps (
1159 step_id,
1160 run_id,
1161 root_run_id,
1162 step_kind,
1163 checkpoint_name,
1164 input_signature_hash,
1165 input_signature_json,
1166 state,
1167 input_json,
1168 worker_id,
1169 lease_expires_at,
1170 attempts,
1171 last_attempt_at,
1172 created_at,
1173 updated_at
1174 )
1175 VALUES (?1, ?2, ?3, 'sleep', ?4, ?5, ?6, 'running', ?7, ?8, ?9, 1, ?10, ?10, ?10)
1176 "#,
1177 rusqlite::params![
1178 step_id,
1179 input.run_id,
1180 input.root_run_id,
1181 input.checkpoint_name,
1182 input.input_signature_hash,
1183 input.input_signature_json,
1184 serde_json::to_string(&input_json)?,
1185 input.worker_id,
1186 input.lease_expires_at,
1187 input.now,
1188 ],
1189 )
1190 .context("failed to insert durable sleep step")?;
1191 tx.commit()
1192 .context("failed to commit durable sleep step insert")?;
1193 Ok(SleepStepClaim::WaitUntil { step_id, wake_at })
1194 }
1195
1196 pub fn complete_sleep_step(&mut self, input: SleepStepCompleteInput<'_>) -> anyhow::Result<()> {
1197 let result_json = serde_json::to_string(&serde_json::json!({
1198 "ok": true,
1199 "durationMs": input.duration_ms,
1200 "wakeAt": input.wake_at,
1201 "completedAt": input.now,
1202 }))?;
1203 let updated = self
1204 .connection_mut()
1205 .execute(
1206 r#"
1207 UPDATE sw_workflow_steps
1208 SET state = 'completed',
1209 result_json = ?1,
1210 lease_expires_at = NULL,
1211 updated_at = ?2
1212 WHERE step_id = ?3
1213 AND state = 'running'
1214 "#,
1215 rusqlite::params![result_json, input.now, input.step_id],
1216 )
1217 .context("failed to mark durable sleep step completed")?;
1218 if updated == 1 {
1219 return Ok(());
1220 }
1221
1222 let state = self
1223 .connection()
1224 .query_row(
1225 "SELECT state FROM sw_workflow_steps WHERE step_id = ?1",
1226 rusqlite::params![input.step_id],
1227 |row| row.get::<_, String>(0),
1228 )
1229 .optional()
1230 .context("failed to inspect durable sleep step after completion race")?;
1231 match state.as_deref() {
1232 Some("completed") => Ok(()),
1233 Some(state) => bail!("cannot complete durable sleep step in state {state}"),
1234 None => bail!(
1235 "cannot complete missing durable sleep step {}",
1236 input.step_id
1237 ),
1238 }
1239 }
1240
1241 pub fn fail_agent_step(&mut self, input: AgentStepFailInput<'_>) -> anyhow::Result<()> {
1242 let failure = serde_json::to_string(input.failure_reason)?;
1243 self.connection_mut()
1244 .execute(
1245 r#"
1246 UPDATE sw_workflow_steps
1247 SET state = 'failed',
1248 failure_reason_json = ?1,
1249 lease_expires_at = NULL,
1250 updated_at = ?2
1251 WHERE step_id = ?3
1252 "#,
1253 rusqlite::params![failure, input.now, input.step_id],
1254 )
1255 .context("failed to mark durable agent step failed")?;
1256 Ok(())
1257 }
1258}
1259
1260#[derive(Debug)]
1261pub enum AgentStepClaim {
1262 Replay(Box<AgentProviderResult>),
1263 Run { step_id: String },
1264 Wait,
1265}
1266
1267#[derive(Debug)]
1268pub enum SleepStepClaim {
1269 Replay,
1270 WaitUntil { step_id: String, wake_at: i64 },
1271}
1272
1273fn compact_agent_result_for_replay(result: &AgentProviderResult) -> AgentProviderResult {
1274 AgentProviderResult {
1275 output: result.output.clone(),
1276 session_id: result.session_id.clone(),
1277 model: result.model.clone(),
1278 usage: result.usage.clone(),
1279 isolation: result.isolation.clone(),
1280 raw: None,
1281 }
1282}
1283
1284pub struct AgentStepClaimInput<'a> {
1285 pub run_id: &'a str,
1286 pub root_run_id: &'a str,
1287 pub checkpoint_name: &'a str,
1288 pub input_signature_hash: &'a str,
1289 pub input_signature_json: &'a str,
1290 pub input_json: &'a Value,
1291 pub worker_id: &'a str,
1292 pub lease_expires_at: i64,
1293 pub now: i64,
1294}
1295
1296pub struct AgentStepCompleteInput<'a> {
1297 pub step_id: &'a str,
1298 pub run_id: &'a str,
1299 pub root_run_id: &'a str,
1300 pub result: &'a AgentProviderResult,
1301 pub now: i64,
1302}
1303
1304pub struct AgentStepFailInput<'a> {
1305 pub step_id: &'a str,
1306 pub failure_reason: &'a Value,
1307 pub now: i64,
1308}
1309
1310pub struct SleepStepClaimInput<'a> {
1311 pub run_id: &'a str,
1312 pub root_run_id: &'a str,
1313 pub checkpoint_name: &'a str,
1314 pub input_signature_hash: &'a str,
1315 pub input_signature_json: &'a str,
1316 pub duration_ms: u64,
1317 pub worker_id: &'a str,
1318 pub lease_expires_at: i64,
1319 pub now: i64,
1320}
1321
1322pub struct SleepStepCompleteInput<'a> {
1323 pub step_id: &'a str,
1324 pub duration_ms: u64,
1325 pub wake_at: i64,
1326 pub now: i64,
1327}
1328
1329async fn run_durable_agent_provider(
1330 default_provider: Arc<dyn AgentProvider>,
1331 provider_override: Option<String>,
1332 input: AgentProviderRunInput,
1333 cancel_rx: Option<watch::Receiver<bool>>,
1334) -> anyhow::Result<AgentProviderResult> {
1335 run_agent_provider_with_retry(default_provider, provider_override, input, cancel_rx).await
1336}
1337
1338fn agent_input_signature(provider_name: &str, input: &AgentProviderRunInput) -> Value {
1339 serde_json::json!({
1340 "signatureVersion": 2,
1341 "kind": "agent",
1342 "workflowScope": "root",
1343 "provider": provider_name,
1344 "prompt": input.prompt,
1345 "options": input.options,
1346 "context": {
1347 "phase": input.context.phase,
1348 "cwd": input.context.cwd.as_ref().map(|path| path.to_string_lossy().into_owned()),
1349 }
1350 })
1351}
1352
1353fn sleep_input_signature(duration_ms: u64) -> Value {
1354 serde_json::json!({
1355 "signatureVersion": 1,
1356 "kind": "sleep",
1357 "workflowScope": "root",
1358 "durationMs": duration_ms,
1359 })
1360}
1361
1362fn short_blake3_hex(input: &str) -> String {
1363 let hash = blake3::hash(input.as_bytes());
1364 hash.as_bytes()[..12]
1365 .iter()
1366 .map(|byte| format!("{byte:02x}"))
1367 .collect()
1368}
1369
1370fn canonical_json_string(value: &Value) -> anyhow::Result<String> {
1371 let canonical = canonical_json_value(value);
1372 serde_json::to_string(&canonical).context("failed to serialize canonical JSON")
1373}
1374
1375fn canonical_json_value(value: &Value) -> Value {
1376 match value {
1377 Value::Object(object) => {
1378 let mut sorted = BTreeMap::new();
1379 for (key, value) in object {
1380 sorted.insert(key.clone(), canonical_json_value(value));
1381 }
1382 Value::Object(sorted.into_iter().collect())
1383 }
1384 Value::Array(array) => Value::Array(array.iter().map(canonical_json_value).collect()),
1385 value => value.clone(),
1386 }
1387}
1388
1389#[cfg(test)]
1390mod tests {
1391 use super::*;
1392 use crate::agent_providers::{
1393 AgentProvider, AgentProviderResult, AgentProviderRunInput, AgentProviderSchemaMode,
1394 AgentProviderUsageMode,
1395 };
1396 use serde_json::json;
1397 use std::fs;
1398 use std::sync::{
1399 atomic::{AtomicUsize, Ordering},
1400 Arc, Barrier, Mutex,
1401 };
1402 use std::thread;
1403 use std::time::{Duration, Instant};
1404
1405 struct CountingProvider {
1406 calls: AtomicUsize,
1407 }
1408
1409 struct FlakyOnceProvider {
1410 calls: AtomicUsize,
1411 }
1412
1413 struct TimedProvider {
1414 events: Mutex<Vec<TimedProviderEvent>>,
1415 }
1416
1417 struct SessionProvider {
1418 session_id: &'static str,
1419 cancel_on_run: Option<watch::Sender<bool>>,
1420 delay_ms: u64,
1421 }
1422
1423 #[derive(Debug, Clone)]
1424 struct TimedProviderEvent {
1425 prompt: String,
1426 kind: &'static str,
1427 at: Instant,
1428 }
1429
1430 #[async_trait::async_trait]
1431 impl AgentProvider for CountingProvider {
1432 fn name(&self) -> &str {
1433 "counting"
1434 }
1435 fn schema_mode(&self) -> AgentProviderSchemaMode {
1436 AgentProviderSchemaMode::Builtin
1437 }
1438 fn usage_mode(&self) -> AgentProviderUsageMode {
1439 AgentProviderUsageMode::Builtin
1440 }
1441 async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1442 let count = self.calls.fetch_add(1, Ordering::SeqCst) + 1;
1443 Ok(AgentProviderResult {
1444 output: json!(format!("{}:{count}", input.prompt)),
1445 session_id: None,
1446 model: None,
1447 usage: None,
1448 isolation: None,
1449 raw: None,
1450 })
1451 }
1452 }
1453
1454 #[async_trait::async_trait]
1455 impl AgentProvider for FlakyOnceProvider {
1456 fn name(&self) -> &str {
1457 "flaky-once"
1458 }
1459 fn schema_mode(&self) -> AgentProviderSchemaMode {
1460 AgentProviderSchemaMode::Builtin
1461 }
1462 fn usage_mode(&self) -> AgentProviderUsageMode {
1463 AgentProviderUsageMode::Builtin
1464 }
1465 async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1466 let call = self.calls.fetch_add(1, Ordering::SeqCst) + 1;
1467 if call == 1 {
1468 bail!("temporary provider failure")
1469 }
1470 Ok(AgentProviderResult {
1471 output: json!(format!("recovered: {}", input.prompt)),
1472 session_id: None,
1473 model: None,
1474 usage: None,
1475 isolation: None,
1476 raw: None,
1477 })
1478 }
1479 }
1480
1481 #[async_trait::async_trait]
1482 impl AgentProvider for SessionProvider {
1483 fn name(&self) -> &str {
1484 "session"
1485 }
1486 fn schema_mode(&self) -> AgentProviderSchemaMode {
1487 AgentProviderSchemaMode::Builtin
1488 }
1489 fn usage_mode(&self) -> AgentProviderUsageMode {
1490 AgentProviderUsageMode::Builtin
1491 }
1492 async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1493 if let Some(cancel_tx) = &self.cancel_on_run {
1494 let _ = cancel_tx.send(true);
1495 }
1496 if self.delay_ms > 0 {
1497 tokio::time::sleep(Duration::from_millis(self.delay_ms)).await;
1498 }
1499 Ok(AgentProviderResult {
1500 output: json!(format!("session: {}", input.prompt)),
1501 session_id: Some(self.session_id.to_string()),
1502 model: Some("session-model".to_string()),
1503 usage: Some(crate::agent_providers::AgentUsage {
1504 output_tokens: Some(1),
1505 ..Default::default()
1506 }),
1507 isolation: None,
1508 raw: Some(json!({
1509 "events": [
1510 { "sessionId": self.session_id, "prompt": input.prompt }
1511 ]
1512 })),
1513 })
1514 }
1515 }
1516
1517 #[async_trait::async_trait]
1518 impl AgentProvider for TimedProvider {
1519 fn name(&self) -> &str {
1520 "timed"
1521 }
1522 fn schema_mode(&self) -> AgentProviderSchemaMode {
1523 AgentProviderSchemaMode::Builtin
1524 }
1525 fn usage_mode(&self) -> AgentProviderUsageMode {
1526 AgentProviderUsageMode::Builtin
1527 }
1528 async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1529 let delay_ms = input
1530 .prompt
1531 .split(":delay:")
1532 .nth(1)
1533 .and_then(|suffix| suffix.split(':').next())
1534 .and_then(|value| value.parse::<u64>().ok())
1535 .unwrap_or(0);
1536 self.events.lock().unwrap().push(TimedProviderEvent {
1537 prompt: input.prompt.clone(),
1538 kind: "start",
1539 at: Instant::now(),
1540 });
1541 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1542 self.events.lock().unwrap().push(TimedProviderEvent {
1543 prompt: input.prompt.clone(),
1544 kind: "end",
1545 at: Instant::now(),
1546 });
1547 Ok(AgentProviderResult {
1548 output: json!(format!("{}:done", input.prompt)),
1549 session_id: None,
1550 model: None,
1551 usage: Some(crate::agent_providers::AgentUsage {
1552 output_tokens: Some(1),
1553 ..Default::default()
1554 }),
1555 isolation: None,
1556 raw: None,
1557 })
1558 }
1559 }
1560
1561 struct TestSessionLogSink {
1562 root: std::path::PathBuf,
1563 saved_tx: Option<watch::Sender<bool>>,
1564 }
1565
1566 #[async_trait::async_trait]
1567 impl crate::workflow::AgentSessionLogSink for TestSessionLogSink {
1568 async fn write_agent_result(
1569 &self,
1570 provider: &str,
1571 result: &AgentProviderResult,
1572 ) -> anyhow::Result<()> {
1573 write_test_raw_session(&self.root, provider, result)?;
1574 if let Some(saved_tx) = &self.saved_tx {
1575 let _ = saved_tx.send(true);
1576 }
1577 Ok(())
1578 }
1579 }
1580
1581 fn write_test_raw_session(
1582 root: &std::path::Path,
1583 provider: &str,
1584 result: &AgentProviderResult,
1585 ) -> anyhow::Result<()> {
1586 let session_id = result
1587 .session_id
1588 .as_deref()
1589 .ok_or_else(|| anyhow::anyhow!("session id missing"))?;
1590 let events = result
1591 .raw
1592 .as_ref()
1593 .and_then(|raw| raw.get("events"))
1594 .and_then(Value::as_array)
1595 .ok_or_else(|| anyhow::anyhow!("raw events missing"))?;
1596 let dir = root.join(provider);
1597 fs::create_dir_all(&dir)?;
1598 let mut lines = String::new();
1599 for event in events {
1600 lines.push_str(&serde_json::to_string(event)?);
1601 lines.push('\n');
1602 }
1603 fs::write(dir.join(format!("{session_id}.jsonl")), lines)?;
1604 Ok(())
1605 }
1606
1607 fn seed_durable_run(db_path: &std::path::Path) -> (String, String) {
1608 let mut store = SqliteDurableStore::open(db_path).expect("store should open");
1609 store.init().expect("schema should initialize");
1610 let task_id = new_id("task");
1611 let run_id = new_id("run");
1612 store
1613 .insert_local_task_and_run(LocalTaskAndRunInsert {
1614 task_id: &task_id,
1615 run_id: &run_id,
1616 owner_id: "owner",
1617 params_json: &json!({ "scriptPath": "workflow.mjs" }),
1618 workflow_run_json: &json!({ "scriptPath": "workflow.mjs" }),
1619 args_json: &json!({}),
1620 budget_total: Some(100),
1621 max_attempts: 3,
1622 now: 1,
1623 })
1624 .expect("run should be inserted");
1625 (task_id, run_id)
1626 }
1627
1628 fn claim_input<'a>(
1629 run_id: &'a str,
1630 checkpoint_name: &'a str,
1631 worker_id: &'a str,
1632 lease_expires_at: i64,
1633 now: i64,
1634 input_json: &'a Value,
1635 ) -> AgentStepClaimInput<'a> {
1636 AgentStepClaimInput {
1637 run_id,
1638 root_run_id: run_id,
1639 checkpoint_name,
1640 input_signature_hash: "sig",
1641 input_signature_json: r#"{"prompt":"hello"}"#,
1642 input_json,
1643 worker_id,
1644 lease_expires_at,
1645 now,
1646 }
1647 }
1648
1649 #[test]
1650 fn durable_agent_retry_backoff_extends_step_lease_deadline() {
1651 let no_retry = crate::workflow::AgentRetryPolicy {
1652 max_attempts: 1,
1653 backoff_ms: 0,
1654 };
1655 let long_backoff = crate::workflow::AgentRetryPolicy {
1656 max_attempts: 3,
1657 backoff_ms: 120_000,
1658 };
1659 assert_eq!(durable_agent_step_lease_expires_at(1_000, no_retry), 61_000);
1660 assert_eq!(
1661 durable_agent_step_lease_expires_at(1_000, long_backoff),
1662 301_000
1663 );
1664 }
1665
1666 #[test]
1667 fn concurrent_agent_step_claims_allow_only_one_runner() {
1668 let dir = tempfile::tempdir().unwrap();
1669 let db_path = dir.path().join("durable.db");
1670 let (_task_id, run_id) = seed_durable_run(&db_path);
1671 let workers = 8;
1672 let barrier = Arc::new(Barrier::new(workers));
1673 let outcomes = Arc::new(Mutex::new(Vec::new()));
1674
1675 thread::scope(|scope| {
1676 for worker in 0..workers {
1677 let barrier = Arc::clone(&barrier);
1678 let outcomes = Arc::clone(&outcomes);
1679 let db_path = db_path.clone();
1680 let run_id = run_id.clone();
1681 scope.spawn(move || {
1682 let input_json = json!({ "prompt": "hello" });
1683 barrier.wait();
1684 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1685 let claim = store
1686 .claim_or_replay_agent_step(claim_input(
1687 &run_id,
1688 "agent:shared",
1689 &format!("worker-{worker}"),
1690 10_000,
1691 100,
1692 &input_json,
1693 ))
1694 .expect("claim should succeed");
1695 let label = match claim {
1696 AgentStepClaim::Run { .. } => "run",
1697 AgentStepClaim::Wait => "wait",
1698 AgentStepClaim::Replay(_) => "replay",
1699 };
1700 outcomes.lock().unwrap().push(label);
1701 });
1702 }
1703 });
1704
1705 let outcomes = outcomes.lock().unwrap();
1706 assert_eq!(
1707 outcomes.iter().filter(|&&outcome| outcome == "run").count(),
1708 1
1709 );
1710 assert_eq!(
1711 outcomes
1712 .iter()
1713 .filter(|&&outcome| outcome == "wait")
1714 .count(),
1715 workers - 1
1716 );
1717 assert_eq!(
1718 outcomes
1719 .iter()
1720 .filter(|&&outcome| outcome == "replay")
1721 .count(),
1722 0
1723 );
1724
1725 let store = SqliteDurableStore::open(&db_path).unwrap();
1726 let (steps, attempts): (i64, i64) = store
1727 .connection()
1728 .query_row(
1729 "SELECT COUNT(*), COALESCE(SUM(attempts), 0) FROM sw_workflow_steps",
1730 [],
1731 |row| Ok((row.get(0)?, row.get(1)?)),
1732 )
1733 .unwrap();
1734 assert_eq!(steps, 1);
1735 assert_eq!(attempts, 1);
1736 }
1737
1738 #[test]
1739 fn concurrent_agent_step_completion_writes_one_budget_ledger_entry() {
1740 let dir = tempfile::tempdir().unwrap();
1741 let db_path = dir.path().join("durable.db");
1742 let (_task_id, run_id) = seed_durable_run(&db_path);
1743 let input_json = json!({ "prompt": "hello" });
1744 let step_id = {
1745 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1746 match store
1747 .claim_or_replay_agent_step(claim_input(
1748 &run_id,
1749 "agent:complete",
1750 "worker-0",
1751 10_000,
1752 100,
1753 &input_json,
1754 ))
1755 .expect("claim should succeed")
1756 {
1757 AgentStepClaim::Run { step_id } => step_id,
1758 other => panic!("expected run claim, got {other:?}"),
1759 }
1760 };
1761 let workers = 4;
1762 let barrier = Arc::new(Barrier::new(workers));
1763
1764 thread::scope(|scope| {
1765 for _ in 0..workers {
1766 let barrier = Arc::clone(&barrier);
1767 let db_path = db_path.clone();
1768 let run_id = run_id.clone();
1769 let step_id = step_id.clone();
1770 scope.spawn(move || {
1771 let result = AgentProviderResult {
1772 output: json!("done"),
1773 session_id: None,
1774 model: None,
1775 usage: Some(crate::agent_providers::AgentUsage {
1776 output_tokens: Some(7),
1777 ..Default::default()
1778 }),
1779 isolation: None,
1780 raw: None,
1781 };
1782 barrier.wait();
1783 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1784 store
1785 .complete_agent_step(AgentStepCompleteInput {
1786 step_id: &step_id,
1787 run_id: &run_id,
1788 root_run_id: &run_id,
1789 result: &result,
1790 now: 200,
1791 })
1792 .expect("completion should be idempotent");
1793 });
1794 }
1795 });
1796
1797 let store = SqliteDurableStore::open(&db_path).unwrap();
1798 let (entries, tokens): (i64, i64) = store
1799 .connection()
1800 .query_row(
1801 "SELECT COUNT(*), COALESCE(SUM(output_tokens), 0) FROM sw_budget_ledger",
1802 [],
1803 |row| Ok((row.get(0)?, row.get(1)?)),
1804 )
1805 .unwrap();
1806 assert_eq!(entries, 1);
1807 assert_eq!(tokens, 7);
1808 }
1809
1810 #[test]
1811 fn completed_agent_step_persists_compact_replay_result_without_raw() {
1812 let dir = tempfile::tempdir().unwrap();
1813 let db_path = dir.path().join("durable.db");
1814 let (_task_id, run_id) = seed_durable_run(&db_path);
1815 let input_json = json!({ "prompt": "hello" });
1816 let step_id = {
1817 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1818 match store
1819 .claim_or_replay_agent_step(claim_input(
1820 &run_id,
1821 "agent:compact",
1822 "worker-0",
1823 10_000,
1824 100,
1825 &input_json,
1826 ))
1827 .expect("claim should succeed")
1828 {
1829 AgentStepClaim::Run { step_id } => step_id,
1830 other => panic!("expected run claim, got {other:?}"),
1831 }
1832 };
1833 let result = AgentProviderResult {
1834 output: json!("done"),
1835 session_id: Some("provider-session-1".into()),
1836 model: Some("provider/model-from-result".into()),
1837 usage: Some(crate::agent_providers::AgentUsage {
1838 output_tokens: Some(7),
1839 ..Default::default()
1840 }),
1841 isolation: None,
1842 raw: Some(json!({ "events": ["large provider transcript"] })),
1843 };
1844
1845 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1846 store
1847 .complete_agent_step(AgentStepCompleteInput {
1848 step_id: &step_id,
1849 run_id: &run_id,
1850 root_run_id: &run_id,
1851 result: &result,
1852 now: 200,
1853 })
1854 .expect("completion should succeed");
1855 let stored: String = store
1856 .connection()
1857 .query_row(
1858 "SELECT result_json FROM sw_workflow_steps WHERE step_id = ?1",
1859 [&step_id],
1860 |row| row.get(0),
1861 )
1862 .unwrap();
1863 let stored: Value = serde_json::from_str(&stored).unwrap();
1864
1865 assert_eq!(stored["output"], json!("done"));
1866 assert_eq!(stored["sessionId"], json!("provider-session-1"));
1867 assert_eq!(stored["model"], json!("provider/model-from-result"));
1868 assert_eq!(stored["usage"]["outputTokens"], json!(7));
1869 assert!(stored.get("raw").is_none());
1870 }
1871
1872 #[test]
1873 fn expired_agent_step_lease_can_be_reclaimed_from_another_connection() {
1874 let dir = tempfile::tempdir().unwrap();
1875 let db_path = dir.path().join("durable.db");
1876 let (_task_id, run_id) = seed_durable_run(&db_path);
1877 let input_json = json!({ "prompt": "hello" });
1878 let first_step_id = {
1879 let mut store = SqliteDurableStore::open(&db_path).unwrap();
1880 match store
1881 .claim_or_replay_agent_step(claim_input(
1882 &run_id,
1883 "agent:lease",
1884 "worker-1",
1885 100,
1886 0,
1887 &input_json,
1888 ))
1889 .unwrap()
1890 {
1891 AgentStepClaim::Run { step_id } => step_id,
1892 other => panic!("expected run claim, got {other:?}"),
1893 }
1894 };
1895
1896 let mut waiting_store = SqliteDurableStore::open(&db_path).unwrap();
1897 assert!(matches!(
1898 waiting_store
1899 .claim_or_replay_agent_step(claim_input(
1900 &run_id,
1901 "agent:lease",
1902 "worker-2",
1903 200,
1904 50,
1905 &input_json,
1906 ))
1907 .unwrap(),
1908 AgentStepClaim::Wait
1909 ));
1910
1911 let mut reclaiming_store = SqliteDurableStore::open(&db_path).unwrap();
1912 let reclaimed_step_id = match reclaiming_store
1913 .claim_or_replay_agent_step(claim_input(
1914 &run_id,
1915 "agent:lease",
1916 "worker-3",
1917 300,
1918 101,
1919 &input_json,
1920 ))
1921 .unwrap()
1922 {
1923 AgentStepClaim::Run { step_id } => step_id,
1924 other => panic!("expected run claim, got {other:?}"),
1925 };
1926 assert_eq!(reclaimed_step_id, first_step_id);
1927
1928 let attempts: i64 = reclaiming_store
1929 .connection()
1930 .query_row(
1931 "SELECT attempts FROM sw_workflow_steps WHERE step_id = ?1",
1932 rusqlite::params![reclaimed_step_id],
1933 |row| row.get(0),
1934 )
1935 .unwrap();
1936 assert_eq!(attempts, 2);
1937 }
1938
1939 #[tokio::test]
1940 async fn more_than_five_durable_workflows_share_db_while_steps_have_varied_durations() {
1941 async fn run_one(
1942 db_path: std::path::PathBuf,
1943 script_path: std::path::PathBuf,
1944 provider: Arc<TimedProvider>,
1945 id: usize,
1946 base_delay_ms: u64,
1947 ) -> anyhow::Result<LocalDurableRunResult> {
1948 let mut store = SqliteDurableStore::open(db_path)?;
1949 run_local_durable_workflow(
1950 &mut store,
1951 LocalDurableRunOptions::new(
1952 script_path,
1953 json!({ "id": id, "baseDelay": base_delay_ms }),
1954 provider,
1955 ),
1956 )
1957 .await
1958 }
1959
1960 let dir = tempfile::tempdir().unwrap();
1961 let db_path = dir.path().join("durable.db");
1962 let script_path = dir.path().join("workflow.mjs");
1963 fs::write(
1964 &script_path,
1965 r#"
1966export const meta = { name: "durable-concurrent", description: "durable concurrent" };
1967const id = args.id;
1968const baseDelay = args.baseDelay;
1969const first = await agent(`wf:${id}:delay:${baseDelay}:first`);
1970const second = await agent(`wf:${id}:delay:${baseDelay + 30}:second`);
1971const third = await agent(`wf:${id}:delay:${baseDelay + 10}:third`);
1972export default { id, first, second, third };
1973"#,
1974 )
1975 .unwrap();
1976 let provider = Arc::new(TimedProvider {
1977 events: Mutex::new(Vec::new()),
1978 });
1979 let started = Instant::now();
1980 let (r0, r1, r2, r3, r4, r5) = tokio::join!(
1981 run_one(
1982 db_path.clone(),
1983 script_path.clone(),
1984 Arc::clone(&provider),
1985 0,
1986 40
1987 ),
1988 run_one(
1989 db_path.clone(),
1990 script_path.clone(),
1991 Arc::clone(&provider),
1992 1,
1993 55
1994 ),
1995 run_one(
1996 db_path.clone(),
1997 script_path.clone(),
1998 Arc::clone(&provider),
1999 2,
2000 70
2001 ),
2002 run_one(
2003 db_path.clone(),
2004 script_path.clone(),
2005 Arc::clone(&provider),
2006 3,
2007 85
2008 ),
2009 run_one(
2010 db_path.clone(),
2011 script_path.clone(),
2012 Arc::clone(&provider),
2013 4,
2014 100
2015 ),
2016 run_one(
2017 db_path.clone(),
2018 script_path.clone(),
2019 Arc::clone(&provider),
2020 5,
2021 115
2022 ),
2023 );
2024 let elapsed = started.elapsed();
2025 let results = [r0, r1, r2, r3, r4, r5]
2026 .into_iter()
2027 .collect::<Result<Vec<_>, _>>()
2028 .unwrap();
2029
2030 assert_eq!(results.len(), 6);
2031 for (id, result) in results.iter().enumerate() {
2032 assert_eq!(result.workflow.output.result["id"], json!(id));
2033 assert_eq!(result.attempts, 1);
2034 }
2035
2036 let events = provider.events.lock().unwrap().clone();
2037 assert_eq!(
2038 events.iter().filter(|event| event.kind == "start").count(),
2039 18
2040 );
2041 assert_eq!(
2042 events.iter().filter(|event| event.kind == "end").count(),
2043 18
2044 );
2045 assert!(events
2046 .iter()
2047 .any(|event| event.prompt == "wf:5:delay:145:second"));
2048 assert!(
2049 max_in_flight(&events) > 1,
2050 "expected overlapping provider steps, got events: {events:?}"
2051 );
2052 assert!(
2053 elapsed < Duration::from_millis(900),
2054 "runs should overlap on shared DB instead of serializing all varied sleeps; elapsed={elapsed:?}"
2055 );
2056
2057 let store = SqliteDurableStore::open(&db_path).unwrap();
2058 let (completed_runs, completed_tasks, completed_steps, budget_entries, output_tokens): (
2059 i64,
2060 i64,
2061 i64,
2062 i64,
2063 i64,
2064 ) = store
2065 .connection()
2066 .query_row(
2067 r#"
2068 SELECT
2069 (SELECT COUNT(*) FROM sw_workflow_runs WHERE state = 'completed'),
2070 (SELECT COUNT(*) FROM sw_workflow_tasks WHERE state = 'completed'),
2071 (SELECT COUNT(*) FROM sw_workflow_steps WHERE state = 'completed'),
2072 (SELECT COUNT(*) FROM sw_budget_ledger),
2073 (SELECT COALESCE(SUM(output_tokens), 0) FROM sw_budget_ledger)
2074 "#,
2075 [],
2076 |row| {
2077 Ok((
2078 row.get(0)?,
2079 row.get(1)?,
2080 row.get(2)?,
2081 row.get(3)?,
2082 row.get(4)?,
2083 ))
2084 },
2085 )
2086 .unwrap();
2087 assert_eq!(completed_runs, 6);
2088 assert_eq!(completed_tasks, 6);
2089 assert_eq!(completed_steps, 18);
2090 assert_eq!(budget_entries, 18);
2091 assert_eq!(output_tokens, 18);
2092 }
2093
2094 fn max_in_flight(events: &[TimedProviderEvent]) -> usize {
2095 let mut events = events.to_vec();
2096 events.sort_by_key(|event| (event.at, if event.kind == "start" { 0 } else { 1 }));
2097 let mut current = 0usize;
2098 let mut max = 0usize;
2099 for event in events {
2100 if event.kind == "start" {
2101 current += 1;
2102 max = max.max(current);
2103 } else {
2104 current = current.saturating_sub(1);
2105 }
2106 }
2107 max
2108 }
2109
2110 #[tokio::test]
2111 async fn local_durable_run_persists_sleep_step_without_budget_entry() {
2112 let dir = tempfile::tempdir().unwrap();
2113 let db_path = dir.path().join("durable.db");
2114 let script_path = dir.path().join("sleep.workflow.js");
2115 fs::write(
2116 &script_path,
2117 r#"
2118import { sleep } from "workflow:extra";
2119export const meta = { name: "durable-sleep", description: "durable sleep" };
2120await sleep(5);
2121export default { slept: true };
2122"#,
2123 )
2124 .unwrap();
2125
2126 let mut store = SqliteDurableStore::open(&db_path).unwrap();
2127 let result = run_local_durable_workflow(
2128 &mut store,
2129 LocalDurableRunOptions::new(
2130 script_path,
2131 json!({}),
2132 Arc::new(CountingProvider {
2133 calls: AtomicUsize::new(0),
2134 }),
2135 ),
2136 )
2137 .await
2138 .expect("durable workflow should run");
2139
2140 assert_eq!(result.workflow.output.result, json!({ "slept": true }));
2141 let (sleep_steps, budget_entries): (i64, i64) = store
2142 .connection()
2143 .query_row(
2144 r#"
2145 SELECT
2146 (SELECT COUNT(*) FROM sw_workflow_steps WHERE step_kind = 'sleep' AND state = 'completed'),
2147 (SELECT COUNT(*) FROM sw_budget_ledger)
2148 "#,
2149 [],
2150 |row| Ok((row.get(0)?, row.get(1)?)),
2151 )
2152 .unwrap();
2153 assert_eq!(sleep_steps, 1);
2154 assert_eq!(budget_entries, 0);
2155 }
2156
2157 #[tokio::test]
2158 async fn local_durable_run_marks_cancelled_when_cancel_requested() {
2159 let dir = tempfile::tempdir().unwrap();
2160 let db_path = dir.path().join("durable.db");
2161 let script_path = dir.path().join("workflow.mjs");
2162 fs::write(
2163 &script_path,
2164 r#"
2165export const meta = { name: "durable-cancel", description: "durable cancel" };
2166export default { result: await agent("hello") };
2167"#,
2168 )
2169 .unwrap();
2170 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2171 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(true);
2172 let mut options = LocalDurableRunOptions::new(
2173 script_path,
2174 json!({}),
2175 Arc::new(CountingProvider {
2176 calls: AtomicUsize::new(0),
2177 }),
2178 );
2179 options.cancel_rx = Some(cancel_rx);
2180
2181 let error = run_local_durable_workflow(&mut store, options)
2182 .await
2183 .expect_err("cancelled durable workflow should return an error");
2184 assert_eq!(error.to_string(), "workflow cancelled");
2185
2186 let (run_state, task_state, attempt_state): (String, String, String) = store
2187 .connection()
2188 .query_row(
2189 r#"
2190 SELECT r.state, t.state, a.state
2191 FROM sw_workflow_runs r
2192 JOIN sw_workflow_tasks t ON t.task_id = r.task_id
2193 JOIN sw_workflow_attempts a ON a.run_id = r.run_id
2194 "#,
2195 [],
2196 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2197 )
2198 .unwrap();
2199 assert_eq!(run_state, "cancelled");
2200 assert_eq!(task_state, "cancelled");
2201 assert_eq!(attempt_state, "cancelled");
2202 }
2203
2204 #[tokio::test]
2205 async fn local_durable_run_exports_raw_session_before_workflow_completes() {
2206 let dir = tempfile::tempdir().unwrap();
2207 let db_path = dir.path().join("durable.db");
2208 let raw_dir = dir.path().join("raw");
2209 let script_path = dir.path().join("workflow.mjs");
2210 fs::write(
2211 &script_path,
2212 r#"
2213import { sleep } from "workflow:extra";
2214export const meta = { name: "durable-early-session", description: "durable early session" };
2215await agent("before long sleep");
2216await sleep(100);
2217export default { ok: true };
2218"#,
2219 )
2220 .unwrap();
2221 let provider = Arc::new(SessionProvider {
2222 session_id: "early-session-1",
2223 cancel_on_run: None,
2224 delay_ms: 0,
2225 });
2226 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2227 let mut options = LocalDurableRunOptions::new(script_path, json!({}), provider);
2228 let (saved_tx, mut saved_rx) = tokio::sync::watch::channel(false);
2229 options.session_log_sink = Some(Arc::new(TestSessionLogSink {
2230 root: raw_dir.clone(),
2231 saved_tx: Some(saved_tx),
2232 }));
2233
2234 let run = run_local_durable_workflow(&mut store, options);
2235 tokio::pin!(run);
2236 tokio::select! {
2237 changed = saved_rx.changed() => {
2238 changed.expect("raw session notification should be sent");
2239 assert!(*saved_rx.borrow());
2240 assert!(
2241 raw_dir.join("session/early-session-1.jsonl").exists(),
2242 "raw session file should be written before workflow completion"
2243 );
2244 }
2245 result = &mut run => {
2246 panic!("workflow completed before raw session export notification: {result:?}");
2247 }
2248 }
2249
2250 let result = run.await.expect("workflow should complete");
2251 assert_eq!(result.workflow.output.result, json!({ "ok": true }));
2252 }
2253
2254 #[tokio::test]
2255 async fn local_durable_run_exports_raw_session_before_failed_terminal_state() {
2256 let dir = tempfile::tempdir().unwrap();
2257 let db_path = dir.path().join("durable.db");
2258 let raw_dir = dir.path().join("raw");
2259 let script_path = dir.path().join("workflow.mjs");
2260 fs::write(
2261 &script_path,
2262 r#"
2263export const meta = { name: "durable-fail-session", description: "durable fail session" };
2264await agent("before failure");
2265throw new Error("middle failure");
2266"#,
2267 )
2268 .unwrap();
2269 let provider = Arc::new(SessionProvider {
2270 session_id: "failed-session-1",
2271 cancel_on_run: None,
2272 delay_ms: 0,
2273 });
2274 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2275 let mut options = LocalDurableRunOptions::new(script_path, json!({}), provider);
2276 options.session_log_sink = Some(Arc::new(TestSessionLogSink {
2277 root: raw_dir.clone(),
2278 saved_tx: None,
2279 }));
2280
2281 let error = run_local_durable_workflow(&mut store, options)
2282 .await
2283 .expect_err("workflow should fail after exporting agent session");
2284 assert!(
2285 error.to_string().contains("middle failure"),
2286 "unexpected error: {error:#}"
2287 );
2288
2289 let raw_file = raw_dir.join("session/failed-session-1.jsonl");
2290 assert!(raw_file.exists(), "raw session file should be written");
2291 let raw_line = fs::read_to_string(raw_file).unwrap();
2292 assert!(raw_line.contains("before failure"));
2293 let run_state: String = store
2294 .connection()
2295 .query_row("SELECT state FROM sw_workflow_runs", [], |row| row.get(0))
2296 .unwrap();
2297 assert_eq!(run_state, "failed");
2298 }
2299
2300 #[tokio::test]
2301 async fn local_durable_run_exports_raw_session_before_cancelled_state() {
2302 let dir = tempfile::tempdir().unwrap();
2303 let db_path = dir.path().join("durable.db");
2304 let raw_dir = dir.path().join("raw");
2305 let script_path = dir.path().join("workflow.mjs");
2306 fs::write(
2307 &script_path,
2308 r#"
2309export const meta = { name: "durable-cancel-session", description: "durable cancel session" };
2310export default { result: await agent("before cancel") };
2311"#,
2312 )
2313 .unwrap();
2314 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2315 let provider = Arc::new(SessionProvider {
2316 session_id: "cancelled-session-1",
2317 cancel_on_run: Some(cancel_tx),
2318 delay_ms: 10,
2319 });
2320 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2321 let mut options = LocalDurableRunOptions::new(script_path, json!({}), provider);
2322 options.cancel_rx = Some(cancel_rx);
2323 options.session_log_sink = Some(Arc::new(TestSessionLogSink {
2324 root: raw_dir.clone(),
2325 saved_tx: None,
2326 }));
2327
2328 let error = run_local_durable_workflow(&mut store, options)
2329 .await
2330 .expect_err("workflow should be cancelled after exporting agent session");
2331 assert_eq!(error.to_string(), "workflow cancelled");
2332
2333 let raw_file = raw_dir.join("session/cancelled-session-1.jsonl");
2334 assert!(raw_file.exists(), "raw session file should be written");
2335 let raw_line = fs::read_to_string(raw_file).unwrap();
2336 assert!(raw_line.contains("before cancel"));
2337 let run_state: String = store
2338 .connection()
2339 .query_row("SELECT state FROM sw_workflow_runs", [], |row| row.get(0))
2340 .unwrap();
2341 assert_eq!(run_state, "cancelled");
2342 }
2343
2344 #[tokio::test]
2345 async fn local_durable_run_persists_successful_task_and_run() {
2346 let dir = tempfile::tempdir().unwrap();
2347 let db_path = dir.path().join("durable.db");
2348 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2349 let script_path = dir.path().join("workflow.mjs");
2350 fs::write(
2351 &script_path,
2352 r#"
2353export const meta = { name: "durable-local", description: "durable local" };
2354export default { result: await agent("hello") };
2355"#,
2356 )
2357 .unwrap();
2358 let provider = Arc::new(CountingProvider {
2359 calls: AtomicUsize::new(0),
2360 });
2361 let result = run_local_durable_workflow(
2362 &mut store,
2363 LocalDurableRunOptions::new(script_path, json!({}), provider.clone()),
2364 )
2365 .await
2366 .unwrap();
2367 assert_eq!(
2368 result.workflow.output.result,
2369 json!({ "result": "hello:1" })
2370 );
2371 assert_eq!(result.attempts, 1);
2372 assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2373
2374 let state: String = store
2375 .connection()
2376 .query_row(
2377 "SELECT state FROM sw_workflow_tasks WHERE task_id = ?1",
2378 rusqlite::params![result.task_id],
2379 |row| row.get(0),
2380 )
2381 .unwrap();
2382 assert_eq!(state, "completed");
2383 let run_state: String = store
2384 .connection()
2385 .query_row(
2386 "SELECT state FROM sw_workflow_runs WHERE run_id = ?1",
2387 rusqlite::params![result.run_id],
2388 |row| row.get(0),
2389 )
2390 .unwrap();
2391 assert_eq!(run_state, "completed");
2392 let completed_steps: i64 = store
2393 .connection()
2394 .query_row(
2395 "SELECT COUNT(*) FROM sw_workflow_steps WHERE state = 'completed'",
2396 [],
2397 |row| row.get(0),
2398 )
2399 .unwrap();
2400 assert_eq!(completed_steps, 1);
2401 }
2402
2403 #[tokio::test]
2404 async fn local_durable_run_uses_runtime_agent_retry_without_global_retry() {
2405 let dir = tempfile::tempdir().unwrap();
2406 let db_path = dir.path().join("durable.db");
2407 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2408 let script_path = dir.path().join("workflow.mjs");
2409 fs::write(
2410 &script_path,
2411 r#"
2412export const meta = { name: "durable-runtime-retry", description: "durable runtime retry" };
2413export default { result: await agent("hello", { retry: { maxAttempts: 2, backoffMs: 0 } }) };
2414"#,
2415 )
2416 .unwrap();
2417 let provider = Arc::new(FlakyOnceProvider {
2418 calls: AtomicUsize::new(0),
2419 });
2420
2421 let result = run_local_durable_workflow(
2422 &mut store,
2423 LocalDurableRunOptions::new(script_path, json!({}), provider.clone()),
2424 )
2425 .await
2426 .unwrap();
2427 assert_eq!(
2428 result.workflow.output.result,
2429 json!({ "result": "recovered: hello" })
2430 );
2431 assert_eq!(provider.calls.load(Ordering::SeqCst), 2);
2432
2433 let (workflow_attempts, completed_steps, failed_steps): (i64, i64, i64) = store
2434 .connection()
2435 .query_row(
2436 r#"
2437 SELECT
2438 (SELECT COUNT(*) FROM sw_workflow_attempts),
2439 (SELECT COUNT(*) FROM sw_workflow_steps WHERE step_kind = 'agent' AND state = 'completed'),
2440 (SELECT COUNT(*) FROM sw_workflow_steps WHERE step_kind = 'agent' AND state = 'failed')
2441 "#,
2442 [],
2443 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2444 )
2445 .unwrap();
2446 assert_eq!(workflow_attempts, 1);
2447 assert_eq!(completed_steps, 1);
2448 assert_eq!(failed_steps, 0);
2449 }
2450
2451 #[tokio::test]
2452 async fn local_durable_run_records_one_attempt_without_global_retry() {
2453 let dir = tempfile::tempdir().unwrap();
2454 let db_path = dir.path().join("durable.db");
2455 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2456 let script_path = dir.path().join("workflow.mjs");
2457 fs::write(
2458 &script_path,
2459 r#"
2460export const meta = { name: "durable-no-global-retry", description: "durable no global retry" };
2461await agent("hello");
2462throw new Error("boom");
2463export default { unreachable: true };
2464"#,
2465 )
2466 .unwrap();
2467 let provider = Arc::new(CountingProvider {
2468 calls: AtomicUsize::new(0),
2469 });
2470 let options = LocalDurableRunOptions::new(script_path, json!({}), provider.clone());
2471
2472 let error = run_local_durable_workflow(&mut store, options)
2473 .await
2474 .unwrap_err();
2475 assert!(error.to_string().contains("boom"));
2476 assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2477
2478 let attempts: i64 = store
2479 .connection()
2480 .query_row("SELECT COUNT(*) FROM sw_workflow_attempts", [], |row| {
2481 row.get(0)
2482 })
2483 .unwrap();
2484 assert_eq!(attempts, 1);
2485 let completed_steps: i64 = store
2486 .connection()
2487 .query_row(
2488 "SELECT COUNT(*) FROM sw_workflow_steps WHERE state = 'completed'",
2489 [],
2490 |row| row.get(0),
2491 )
2492 .unwrap();
2493 assert_eq!(completed_steps, 1);
2494 }
2495
2496 #[test]
2497 fn prepare_resume_run_reports_missing_run_id_and_database() {
2498 let dir = tempfile::tempdir().unwrap();
2499 let db_path = dir.path().join("durable.db");
2500 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2501 store.init().expect("schema should initialize");
2502
2503 let error = store
2504 .prepare_resume_run("run_missing", "owner", 1)
2505 .unwrap_err()
2506 .to_string();
2507
2508 assert!(error.contains("workflow run run_missing was not found"));
2509 assert!(error.contains(&db_path.display().to_string()));
2510 assert!(error.contains("check --db"));
2511 }
2512
2513 #[tokio::test]
2514 async fn local_durable_run_resumes_existing_run_and_replays_steps() {
2515 let dir = tempfile::tempdir().unwrap();
2516 let db_path = dir.path().join("durable.db");
2517 let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2518 let script_path = dir.path().join("workflow.mjs");
2519 fs::write(
2520 &script_path,
2521 r#"
2522export const meta = { name: "durable-resume", description: "durable resume" };
2523const value = await agent("hello");
2524throw new Error("boom");
2525export default { value };
2526"#,
2527 )
2528 .unwrap();
2529 let provider = Arc::new(CountingProvider {
2530 calls: AtomicUsize::new(0),
2531 });
2532 let first_options =
2533 LocalDurableRunOptions::new(script_path.clone(), json!({}), provider.clone());
2534 let first_error = run_local_durable_workflow(&mut store, first_options)
2535 .await
2536 .unwrap_err();
2537 assert!(first_error.to_string().contains("boom"));
2538 assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2539
2540 let run_id: String = store
2541 .connection()
2542 .query_row("SELECT run_id FROM sw_workflow_runs", [], |row| row.get(0))
2543 .unwrap();
2544
2545 fs::write(
2546 &script_path,
2547 r#"
2548export const meta = { name: "durable-resume", description: "durable resume" };
2549const value = await agent("hello");
2550export default { value };
2551"#,
2552 )
2553 .unwrap();
2554 let mut resume_options =
2555 LocalDurableRunOptions::new(script_path.clone(), json!({}), provider.clone());
2556 resume_options.resume_run_id = Some(run_id);
2557 let resumed = run_local_durable_workflow(&mut store, resume_options)
2558 .await
2559 .unwrap();
2560 assert_eq!(
2561 resumed.workflow.output.result,
2562 json!({ "value": "hello:1" })
2563 );
2564 assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2565 }
2566}