Skip to main content

smol_workflow_engine/durable/
runner.rs

1//! Minimal local durable workflow runner.
2//!
3//! This module implements the first local-only durable flow: create a local task
4//! and root run, execute the existing workflow engine in-process, and persist the
5//! terminal task/run state. Durable retryable steps are introduced separately.
6
7use crate::agent_providers::{AgentProvider, AgentProviderResult, AgentProviderRunInput};
8use crate::durable::json::{
9    DurableRunMode, FailureReasonJSON, LocalTaskParamsJSON, WorkflowRunJSON,
10};
11use crate::events::{WorkflowEvent, WorkflowEventMetadata, WorkflowEventSink};
12use crate::metadata::read_workflow_metadata;
13use crate::workflow::{
14    run_agent_provider_with_retry, run_workflow, RunWorkflowOptions, RunWorkflowResult,
15    WorkflowAgentRunner,
16};
17use anyhow::{bail, Context};
18use rusqlite::OptionalExtension;
19use serde_json::Value;
20use std::collections::{BTreeMap, HashMap};
21use std::path::PathBuf;
22use std::sync::{Arc, Mutex};
23use std::time::{Duration, Instant};
24use tokio::sync::watch;
25
26use super::sqlite::{new_id, now_ms, SqliteDurableStore};
27
28const WORKFLOW_TASK_NAME: &str = "workflow.run";
29const LOCAL_CLAIM_SCOPE: &str = "local";
30const DEFAULT_STEP_LEASE_MS: u64 = 60_000;
31
32/// Options for a local durable workflow run.
33pub struct LocalDurableRunOptions {
34    pub script_path: PathBuf,
35    pub args: Value,
36    pub agent_provider: Arc<dyn AgentProvider>,
37    pub model_map: BTreeMap<String, String>,
38    pub budget_total: Option<u64>,
39    pub max_parallel_agent_requests: Option<usize>,
40    pub resume_run_id: Option<String>,
41    pub cancel_rx: Option<watch::Receiver<bool>>,
42    pub event_sink: Option<Arc<dyn crate::workflow::WorkflowEventSink>>,
43    pub session_log_sink: Option<Arc<dyn crate::workflow::AgentSessionLogSink>>,
44}
45
46impl LocalDurableRunOptions {
47    pub fn new(script_path: PathBuf, args: Value, agent_provider: Arc<dyn AgentProvider>) -> Self {
48        Self {
49            script_path,
50            args,
51            agent_provider,
52            model_map: BTreeMap::new(),
53            budget_total: None,
54            max_parallel_agent_requests: None,
55            resume_run_id: None,
56            cancel_rx: None,
57            event_sink: None,
58            session_log_sink: None,
59        }
60    }
61}
62
63struct RunScopedWorkflowEventSink {
64    inner: Arc<dyn WorkflowEventSink>,
65    run_id: String,
66    start: Instant,
67}
68
69impl RunScopedWorkflowEventSink {
70    fn new(inner: Arc<dyn WorkflowEventSink>, run_id: String) -> Self {
71        Self {
72            inner,
73            run_id,
74            start: Instant::now(),
75        }
76    }
77}
78
79#[async_trait::async_trait]
80impl WorkflowEventSink for RunScopedWorkflowEventSink {
81    async fn emit(&self, event: WorkflowEvent) -> anyhow::Result<()> {
82        let workflow_depth = event
83            .metadata
84            .as_ref()
85            .and_then(|metadata| metadata.workflow_depth)
86            .unwrap_or(0);
87        if workflow_depth == 0
88            && matches!(
89                event.event_type.as_str(),
90                "workflow.started" | "workflow.result" | "workflow.error"
91            )
92        {
93            return Ok(());
94        }
95        self.emit_scoped(event).await
96    }
97}
98
99impl RunScopedWorkflowEventSink {
100    async fn emit_scoped(&self, mut event: WorkflowEvent) -> anyhow::Result<()> {
101        let metadata = event
102            .metadata
103            .get_or_insert_with(WorkflowEventMetadata::default);
104        if metadata.run_id.is_none() {
105            metadata.run_id = Some(self.run_id.clone());
106        }
107        if metadata.workflow_depth.is_none() {
108            metadata.workflow_depth = Some(0);
109        }
110        if event.event_type.as_str() != "workflow.started" && event.elapsed_nanos.is_none() {
111            event.elapsed_nanos = Some(elapsed_nanos(self.start));
112        }
113        self.inner.emit(event).await
114    }
115}
116
117fn elapsed_nanos(start: Instant) -> u64 {
118    u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX)
119}
120
121fn durable_agent_step_lease_expires_at(
122    now: i64,
123    retry_policy: crate::workflow::AgentRetryPolicy,
124) -> i64 {
125    let retry_backoff_budget = retry_policy
126        .backoff_ms
127        .saturating_mul(u64::from(retry_policy.max_attempts.saturating_sub(1)));
128    let lease_ms = DEFAULT_STEP_LEASE_MS.saturating_add(retry_backoff_budget);
129    let lease_ms = i64::try_from(lease_ms).unwrap_or(i64::MAX);
130    now.saturating_add(lease_ms)
131}
132
133fn rfc3339_now() -> anyhow::Result<String> {
134    Ok(time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339)?)
135}
136
137/// Result of a local durable workflow run.
138#[derive(Debug)]
139pub struct LocalDurableRunResult {
140    pub task_id: String,
141    pub run_id: String,
142    pub attempts: u32,
143    pub workflow: RunWorkflowResult,
144}
145
146#[derive(Debug)]
147pub struct SqliteDurableAgentRunner {
148    db_path: PathBuf,
149    run_id: String,
150    root_run_id: String,
151    worker_id: String,
152    cancel_rx: Option<watch::Receiver<bool>>,
153    occurrences: Mutex<HashMap<String, u64>>,
154}
155
156impl SqliteDurableAgentRunner {
157    pub fn new(
158        db_path: PathBuf,
159        run_id: String,
160        root_run_id: String,
161        worker_id: String,
162        cancel_rx: Option<watch::Receiver<bool>>,
163    ) -> Self {
164        Self {
165            db_path,
166            run_id,
167            root_run_id,
168            worker_id,
169            cancel_rx,
170            occurrences: Mutex::new(HashMap::new()),
171        }
172    }
173
174    fn next_checkpoint_name(&self, base_checkpoint_name: String) -> anyhow::Result<String> {
175        let mut occurrences = self
176            .occurrences
177            .lock()
178            .map_err(|_| anyhow::anyhow!("durable occurrence counter lock was poisoned"))?;
179        let count = occurrences.entry(base_checkpoint_name.clone()).or_insert(0);
180        *count += 1;
181        if *count == 1 {
182            Ok(base_checkpoint_name)
183        } else {
184            Ok(format!("{base_checkpoint_name}#{count}"))
185        }
186    }
187}
188
189#[async_trait::async_trait]
190impl WorkflowAgentRunner for SqliteDurableAgentRunner {
191    fn retry_in_runtime(&self) -> bool {
192        false
193    }
194
195    async fn run_agent(
196        &self,
197        default_provider: Arc<dyn AgentProvider>,
198        provider_override: Option<String>,
199        input: AgentProviderRunInput,
200    ) -> anyhow::Result<AgentProviderResult> {
201        let provider_name = provider_override
202            .as_deref()
203            .unwrap_or_else(|| default_provider.name())
204            .to_string();
205        let input_signature = agent_input_signature(&provider_name, &input);
206        let input_signature_json = canonical_json_string(&input_signature)?;
207        let input_signature_hash = short_blake3_hex(&input_signature_json);
208        let base_checkpoint_name = format!("step:sig_{input_signature_hash}");
209        let checkpoint_name = self.next_checkpoint_name(base_checkpoint_name)?;
210        let input_json = serde_json::to_value(&input_signature)
211            .context("failed to serialize durable agent input")?;
212        let retry_policy = crate::workflow::agent_retry_policy(&input.options)?;
213
214        loop {
215            let claim = {
216                let mut store = SqliteDurableStore::open(&self.db_path)?;
217                store.claim_or_replay_agent_step(AgentStepClaimInput {
218                    run_id: &self.run_id,
219                    root_run_id: &self.root_run_id,
220                    checkpoint_name: &checkpoint_name,
221                    input_signature_hash: &input_signature_hash,
222                    input_signature_json: &input_signature_json,
223                    input_json: &input_json,
224                    worker_id: &self.worker_id,
225                    lease_expires_at: durable_agent_step_lease_expires_at(now_ms(), retry_policy),
226                    now: now_ms(),
227                })?
228            };
229
230            match claim {
231                AgentStepClaim::Replay(result) => return Ok(*result),
232                AgentStepClaim::Run { step_id } => {
233                    let provider_result = run_durable_agent_provider(
234                        default_provider,
235                        provider_override,
236                        input,
237                        self.cancel_rx.clone(),
238                    )
239                    .await;
240                    let mut store = SqliteDurableStore::open(&self.db_path)?;
241                    match provider_result {
242                        Ok(result) => {
243                            store.complete_agent_step(AgentStepCompleteInput {
244                                step_id: &step_id,
245                                run_id: &self.run_id,
246                                root_run_id: &self.root_run_id,
247                                result: &result,
248                                now: now_ms(),
249                            })?;
250                            return Ok(result);
251                        }
252                        Err(error) => {
253                            let failure_reason = serde_json::to_value(FailureReasonJSON {
254                                message: error.to_string(),
255                            })?;
256                            store.fail_agent_step(AgentStepFailInput {
257                                step_id: &step_id,
258                                failure_reason: &failure_reason,
259                                now: now_ms(),
260                            })?;
261                            return Err(error);
262                        }
263                    }
264                }
265                AgentStepClaim::Wait => {
266                    tokio::time::sleep(Duration::from_millis(50)).await;
267                }
268            }
269        }
270    }
271
272    async fn sleep(&self, duration_ms: u64) -> anyhow::Result<()> {
273        let input_signature = sleep_input_signature(duration_ms);
274        let input_signature_json = canonical_json_string(&input_signature)?;
275        let input_signature_hash = short_blake3_hex(&input_signature_json);
276        let base_checkpoint_name = format!("step:sig_{input_signature_hash}");
277        let checkpoint_name = self.next_checkpoint_name(base_checkpoint_name)?;
278
279        let now = now_ms();
280        let claim = {
281            let mut store = SqliteDurableStore::open(&self.db_path)?;
282            store.claim_or_replay_sleep_step(SleepStepClaimInput {
283                run_id: &self.run_id,
284                root_run_id: &self.root_run_id,
285                checkpoint_name: &checkpoint_name,
286                input_signature_hash: &input_signature_hash,
287                input_signature_json: &input_signature_json,
288                duration_ms,
289                worker_id: &self.worker_id,
290                lease_expires_at: now + 60_000,
291                now,
292            })?
293        };
294
295        match claim {
296            SleepStepClaim::Replay => Ok(()),
297            SleepStepClaim::WaitUntil { step_id, wake_at } => {
298                let now = now_ms();
299                if wake_at > now {
300                    tokio::time::sleep(Duration::from_millis((wake_at - now) as u64)).await;
301                }
302                let mut store = SqliteDurableStore::open(&self.db_path)?;
303                store.complete_sleep_step(SleepStepCompleteInput {
304                    step_id: &step_id,
305                    duration_ms,
306                    wake_at,
307                    now: now_ms(),
308                })?;
309                Ok(())
310            }
311        }
312    }
313}
314
315/// Execute a workflow locally while persisting durable task/run/attempt state.
316pub async fn run_local_durable_workflow(
317    store: &mut SqliteDurableStore,
318    options: LocalDurableRunOptions,
319) -> anyhow::Result<LocalDurableRunResult> {
320    store.init()?;
321
322    let owner_id = new_id("owner");
323    let now = now_ms();
324    let params_json = serde_json::to_value(LocalTaskParamsJSON {
325        mode: DurableRunMode::Local,
326        script_path: options.script_path.clone(),
327        args: options.args.clone(),
328        budget_total: options.budget_total,
329    })?;
330    let workflow_metadata = read_workflow_metadata(&options.script_path).ok().flatten();
331    let workflow_run_json = serde_json::to_value(WorkflowRunJSON {
332        mode: DurableRunMode::Local,
333        script_path: options.script_path.clone(),
334        metadata: workflow_metadata,
335    })?;
336
337    let (task_id, run_id, first_attempt) = if let Some(run_id) = options.resume_run_id.clone() {
338        let (task_id, current_attempts) = store.prepare_resume_run(&run_id, &owner_id, now)?;
339        (task_id, run_id, current_attempts + 1)
340    } else {
341        let task_id = new_id("task");
342        let run_id = new_id("run");
343        store.insert_local_task_and_run(LocalTaskAndRunInsert {
344            task_id: &task_id,
345            run_id: &run_id,
346            owner_id: &owner_id,
347            params_json: &params_json,
348            workflow_run_json: &workflow_run_json,
349            args_json: &options.args,
350            budget_total: options.budget_total,
351            max_attempts: 1,
352            now,
353        })?;
354        (task_id, run_id, 1)
355    };
356
357    let workflow_event_sink = options.event_sink.as_ref().map(|sink| {
358        Arc::new(RunScopedWorkflowEventSink::new(
359            Arc::clone(sink),
360            run_id.clone(),
361        ))
362    });
363    if let Some(event_sink) = workflow_event_sink.as_ref() {
364        event_sink
365            .emit_scoped(WorkflowEvent::started(rfc3339_now()?))
366            .await
367            .context("failed to emit workflow started event")?;
368    }
369
370    let attempt = first_attempt;
371    let attempt_id = new_id("attempt");
372    store.start_attempt(LocalAttemptStart {
373        task_id: &task_id,
374        run_id: &run_id,
375        attempt_id: &attempt_id,
376        owner_id: &owner_id,
377        attempt,
378        lease_expires_at: now_ms() + 60_000,
379        now: now_ms(),
380    })?;
381
382    let agent_runner = store.path().map(|db_path| {
383        Arc::new(SqliteDurableAgentRunner::new(
384            db_path.to_path_buf(),
385            run_id.clone(),
386            run_id.clone(),
387            owner_id.clone(),
388            options.cancel_rx.clone(),
389        )) as Arc<dyn WorkflowAgentRunner>
390    });
391
392    let result = run_workflow(RunWorkflowOptions {
393        script_path: options.script_path.clone(),
394        args: options.args.clone(),
395        agent_provider: Arc::clone(&options.agent_provider),
396        model_map: options.model_map.clone(),
397        budget_total: options.budget_total,
398        budget_spent: 0,
399        nesting_depth: 0,
400        max_parallel_agent_requests: options.max_parallel_agent_requests,
401        agent_runner,
402        cancel_rx: options.cancel_rx.clone(),
403        event_sink: workflow_event_sink
404            .as_ref()
405            .map(|sink| Arc::clone(sink) as Arc<dyn WorkflowEventSink>),
406        event_parent_step_id: None,
407        event_stream_start: workflow_event_sink.as_ref().map(|sink| sink.start),
408        session_log_sink: options.session_log_sink.clone(),
409    })
410    .await;
411
412    match result {
413        Ok(workflow) => {
414            let completed_payload = serde_json::to_value(&workflow.output)
415                .context("failed to serialize durable workflow output")?;
416            store.complete_attempt_and_task(LocalAttemptComplete {
417                task_id: &task_id,
418                run_id: &run_id,
419                attempt_id: &attempt_id,
420                completed_payload: &completed_payload,
421                budget_spent: workflow.budget.spent,
422                now: now_ms(),
423            })?;
424            if let Some(event_sink) = workflow_event_sink.as_ref() {
425                event_sink
426                    .emit_scoped(WorkflowEvent::result(
427                        workflow.token_usage.input_tokens,
428                        workflow.token_usage.output_tokens,
429                        workflow.token_usage.total_tokens,
430                        workflow.output.result.clone(),
431                    ))
432                    .await
433                    .context("failed to emit workflow result event")?;
434            }
435            Ok(LocalDurableRunResult {
436                task_id,
437                run_id,
438                attempts: attempt,
439                workflow,
440            })
441        }
442        Err(error) => {
443            let failure_reason = serde_json::to_value(FailureReasonJSON {
444                message: error.to_string(),
445            })?;
446            if cancellation_requested(&options.cancel_rx) {
447                store.cancel_attempt_and_task(LocalAttemptCancel {
448                    task_id: &task_id,
449                    run_id: &run_id,
450                    attempt_id: &attempt_id,
451                    failure_reason: &failure_reason,
452                    now: now_ms(),
453                })?;
454            } else {
455                store.fail_attempt(LocalAttemptFail {
456                    task_id: &task_id,
457                    run_id: &run_id,
458                    attempt_id: &attempt_id,
459                    failure_reason: &failure_reason,
460                    terminal: true,
461                    now: now_ms(),
462                })?;
463            }
464            if let Some(event_sink) = workflow_event_sink.as_ref() {
465                event_sink
466                    .emit_scoped(WorkflowEvent::error(error.to_string(), None))
467                    .await
468                    .context("failed to emit workflow error event")?;
469            }
470            Err(error)
471        }
472    }
473}
474
475pub struct LocalTaskAndRunInsert<'a> {
476    pub task_id: &'a str,
477    pub run_id: &'a str,
478    pub owner_id: &'a str,
479    pub params_json: &'a Value,
480    pub workflow_run_json: &'a Value,
481    pub args_json: &'a Value,
482    pub budget_total: Option<u64>,
483    pub max_attempts: u32,
484    pub now: i64,
485}
486
487pub struct LocalAttemptStart<'a> {
488    pub task_id: &'a str,
489    pub run_id: &'a str,
490    pub attempt_id: &'a str,
491    pub owner_id: &'a str,
492    pub attempt: u32,
493    pub lease_expires_at: i64,
494    pub now: i64,
495}
496
497pub struct LocalAttemptComplete<'a> {
498    pub task_id: &'a str,
499    pub run_id: &'a str,
500    pub attempt_id: &'a str,
501    pub completed_payload: &'a Value,
502    pub budget_spent: u64,
503    pub now: i64,
504}
505
506pub struct LocalAttemptFail<'a> {
507    pub task_id: &'a str,
508    pub run_id: &'a str,
509    pub attempt_id: &'a str,
510    pub failure_reason: &'a Value,
511    pub terminal: bool,
512    pub now: i64,
513}
514
515pub struct LocalAttemptCancel<'a> {
516    pub task_id: &'a str,
517    pub run_id: &'a str,
518    pub attempt_id: &'a str,
519    pub failure_reason: &'a Value,
520    pub now: i64,
521}
522
523fn cancellation_requested(cancel_rx: &Option<watch::Receiver<bool>>) -> bool {
524    cancel_rx
525        .as_ref()
526        .is_some_and(|cancel_rx| *cancel_rx.borrow())
527}
528
529impl SqliteDurableStore {
530    pub fn insert_local_task_and_run(
531        &mut self,
532        input: LocalTaskAndRunInsert<'_>,
533    ) -> anyhow::Result<()> {
534        let tx = self
535            .connection_mut()
536            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
537            .context("failed to begin local durable task transaction")?;
538        tx.execute(
539            r#"
540            INSERT INTO sw_workflow_tasks (
541                task_id,
542                task_name,
543                state,
544                params_json,
545                submitted_by_owner_id,
546                claimed_by_owner_id,
547                claim_scope,
548                created_at,
549                updated_at,
550                max_attempts
551            )
552            VALUES (?1, ?2, 'pending', ?3, ?4, NULL, ?5, ?6, ?6, ?7)
553            "#,
554            rusqlite::params![
555                input.task_id,
556                WORKFLOW_TASK_NAME,
557                serde_json::to_string(input.params_json)?,
558                input.owner_id,
559                LOCAL_CLAIM_SCOPE,
560                input.now,
561                input.max_attempts,
562            ],
563        )
564        .context("failed to insert durable workflow task")?;
565        tx.execute(
566            r#"
567            INSERT INTO sw_workflow_runs (
568                run_id,
569                task_id,
570                root_run_id,
571                state,
572                workflow_run_json,
573                args_json,
574                budget_total,
575                budget_spent,
576                created_at,
577                updated_at
578            )
579            VALUES (?1, ?2, ?1, 'pending', ?3, ?4, ?5, 0, ?6, ?6)
580            "#,
581            rusqlite::params![
582                input.run_id,
583                input.task_id,
584                serde_json::to_string(input.workflow_run_json)?,
585                serde_json::to_string(input.args_json)?,
586                input.budget_total.map(|value| value as i64),
587                input.now,
588            ],
589        )
590        .context("failed to insert durable workflow run")?;
591        tx.commit()
592            .context("failed to commit local durable task transaction")
593    }
594
595    pub fn prepare_resume_run(
596        &mut self,
597        run_id: &str,
598        owner_id: &str,
599        now: i64,
600    ) -> anyhow::Result<(String, u32)> {
601        let db_label = self
602            .path()
603            .map(|path| path.display().to_string())
604            .unwrap_or_else(|| "<in-memory>".to_string());
605        let tx = self
606            .connection_mut()
607            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
608            .context("failed to begin durable resume transaction")?;
609        let task_id: String = tx
610            .query_row(
611                r#"
612                SELECT task_id
613                FROM sw_workflow_runs
614                WHERE run_id = ?1
615                "#,
616                rusqlite::params![run_id],
617                |row| row.get(0),
618            )
619            .optional()
620            .context("failed to query durable run to resume")?
621            .ok_or_else(|| {
622                anyhow::anyhow!("workflow run {run_id} was not found in {db_label}; check --db")
623            })?;
624        let current_attempts: u32 =
625            tx.query_row(
626                r#"
627                SELECT COUNT(*)
628                FROM sw_workflow_attempts
629                WHERE run_id = ?1
630                "#,
631                rusqlite::params![run_id],
632                |row| row.get::<_, i64>(0),
633            )
634            .context("failed to count durable run attempts")? as u32;
635        tx.execute(
636            r#"
637            UPDATE sw_workflow_tasks
638            SET state = 'pending',
639                claimed_by_owner_id = NULL,
640                lease_expires_at = NULL,
641                updated_at = ?1
642            WHERE task_id = ?2
643              AND state IN ('pending', 'running', 'failed')
644            "#,
645            rusqlite::params![now, task_id],
646        )
647        .context("failed to prepare durable task for resume")?;
648        tx.execute(
649            r#"
650            UPDATE sw_workflow_runs
651            SET state = 'pending',
652                updated_at = ?1
653            WHERE run_id = ?2
654              AND state IN ('pending', 'running', 'failed')
655            "#,
656            rusqlite::params![now, run_id],
657        )
658        .context("failed to prepare durable run for resume")?;
659        tx.execute(
660            r#"
661            UPDATE sw_workflow_tasks
662            SET submitted_by_owner_id = ?1
663            WHERE task_id = ?2
664              AND claim_scope = 'local'
665            "#,
666            rusqlite::params![owner_id, task_id],
667        )
668        .context("failed to adopt durable task owner")?;
669        tx.commit().context("failed to commit durable resume")?;
670        Ok((task_id, current_attempts))
671    }
672
673    pub fn start_attempt(&mut self, input: LocalAttemptStart<'_>) -> anyhow::Result<()> {
674        let tx = self
675            .connection_mut()
676            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
677            .context("failed to begin durable attempt start transaction")?;
678        tx.execute(
679            r#"
680            UPDATE sw_workflow_tasks
681            SET state = 'running',
682                claimed_by_owner_id = ?1,
683                lease_expires_at = ?2,
684                updated_at = ?3
685            WHERE task_id = ?4
686              AND claim_scope = 'local'
687              AND submitted_by_owner_id = ?1
688              AND state IN ('pending', 'running')
689            "#,
690            rusqlite::params![
691                input.owner_id,
692                input.lease_expires_at,
693                input.now,
694                input.task_id
695            ],
696        )
697        .context("failed to claim local durable task")?;
698        tx.execute(
699            r#"
700            UPDATE sw_workflow_runs
701            SET state = 'running',
702                updated_at = ?1
703            WHERE run_id = ?2
704            "#,
705            rusqlite::params![input.now, input.run_id],
706        )
707        .context("failed to mark durable workflow run running")?;
708        tx.execute(
709            r#"
710            INSERT INTO sw_workflow_attempts (
711                attempt_id,
712                run_id,
713                task_id,
714                attempt,
715                worker_id,
716                state,
717                lease_expires_at,
718                started_at
719            )
720            VALUES (?1, ?2, ?3, ?4, ?5, 'running', ?6, ?7)
721            "#,
722            rusqlite::params![
723                input.attempt_id,
724                input.run_id,
725                input.task_id,
726                input.attempt,
727                input.owner_id,
728                input.lease_expires_at,
729                input.now,
730            ],
731        )
732        .context("failed to insert durable workflow attempt")?;
733        tx.commit()
734            .context("failed to commit durable attempt start transaction")
735    }
736
737    pub fn complete_attempt_and_task(
738        &mut self,
739        input: LocalAttemptComplete<'_>,
740    ) -> anyhow::Result<()> {
741        let payload = serde_json::to_string(input.completed_payload)?;
742        let tx = self
743            .connection_mut()
744            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
745            .context("failed to begin durable completion transaction")?;
746        tx.execute(
747            r#"
748            UPDATE sw_workflow_attempts
749            SET state = 'completed',
750                completed_at = ?1
751            WHERE attempt_id = ?2
752            "#,
753            rusqlite::params![input.now, input.attempt_id],
754        )
755        .context("failed to mark durable attempt completed")?;
756        tx.execute(
757            r#"
758            UPDATE sw_workflow_runs
759            SET state = 'completed',
760                budget_spent = ?1,
761                completed_payload_json = ?2,
762                updated_at = ?3
763            WHERE run_id = ?4
764            "#,
765            rusqlite::params![input.budget_spent as i64, payload, input.now, input.run_id],
766        )
767        .context("failed to mark durable run completed")?;
768        tx.execute(
769            r#"
770            UPDATE sw_workflow_tasks
771            SET state = 'completed',
772                completed_payload_json = ?1,
773                lease_expires_at = NULL,
774                updated_at = ?2
775            WHERE task_id = ?3
776            "#,
777            rusqlite::params![payload, input.now, input.task_id],
778        )
779        .context("failed to mark durable task completed")?;
780        tx.commit()
781            .context("failed to commit durable completion transaction")
782    }
783
784    pub fn fail_attempt(&mut self, input: LocalAttemptFail<'_>) -> anyhow::Result<()> {
785        let failure = serde_json::to_string(input.failure_reason)?;
786        let tx = self
787            .connection_mut()
788            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
789            .context("failed to begin durable failure transaction")?;
790        tx.execute(
791            r#"
792            UPDATE sw_workflow_attempts
793            SET state = 'failed',
794                completed_at = ?1,
795                failure_reason_json = ?2
796            WHERE attempt_id = ?3
797            "#,
798            rusqlite::params![input.now, failure, input.attempt_id],
799        )
800        .context("failed to mark durable attempt failed")?;
801        let next_state = if input.terminal { "failed" } else { "pending" };
802        tx.execute(
803            r#"
804            UPDATE sw_workflow_runs
805            SET state = ?1,
806                failure_reason_json = ?2,
807                updated_at = ?3
808            WHERE run_id = ?4
809            "#,
810            rusqlite::params![next_state, failure, input.now, input.run_id],
811        )
812        .context("failed to update durable run failure state")?;
813        tx.execute(
814            r#"
815            UPDATE sw_workflow_tasks
816            SET state = ?1,
817                failure_reason_json = ?2,
818                lease_expires_at = NULL,
819                updated_at = ?3
820            WHERE task_id = ?4
821            "#,
822            rusqlite::params![next_state, failure, input.now, input.task_id],
823        )
824        .context("failed to update durable task failure state")?;
825        tx.commit()
826            .context("failed to commit durable failure transaction")
827    }
828
829    pub fn cancel_attempt_and_task(&mut self, input: LocalAttemptCancel<'_>) -> anyhow::Result<()> {
830        let failure = serde_json::to_string(input.failure_reason)?;
831        let tx = self
832            .connection_mut()
833            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
834            .context("failed to begin durable cancellation transaction")?;
835        tx.execute(
836            r#"
837            UPDATE sw_workflow_attempts
838            SET state = 'cancelled',
839                completed_at = ?1,
840                failure_reason_json = ?2
841            WHERE attempt_id = ?3
842            "#,
843            rusqlite::params![input.now, failure, input.attempt_id],
844        )
845        .context("failed to mark durable attempt cancelled")?;
846        tx.execute(
847            r#"
848            UPDATE sw_workflow_runs
849            SET state = 'cancelled',
850                failure_reason_json = ?1,
851                updated_at = ?2
852            WHERE run_id = ?3
853            "#,
854            rusqlite::params![failure, input.now, input.run_id],
855        )
856        .context("failed to mark durable run cancelled")?;
857        tx.execute(
858            r#"
859            UPDATE sw_workflow_tasks
860            SET state = 'cancelled',
861                failure_reason_json = ?1,
862                lease_expires_at = NULL,
863                updated_at = ?2
864            WHERE task_id = ?3
865            "#,
866            rusqlite::params![failure, input.now, input.task_id],
867        )
868        .context("failed to mark durable task cancelled")?;
869        tx.commit()
870            .context("failed to commit durable cancellation transaction")
871    }
872
873    pub fn claim_or_replay_agent_step(
874        &mut self,
875        input: AgentStepClaimInput<'_>,
876    ) -> anyhow::Result<AgentStepClaim> {
877        let tx = self
878            .connection_mut()
879            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
880            .context("failed to begin durable agent step claim transaction")?;
881
882        let existing = tx
883            .query_row(
884                r#"
885                SELECT step_id, state, input_signature_json, result_json, lease_expires_at
886                FROM sw_workflow_steps
887                WHERE run_id = ?1 AND checkpoint_name = ?2
888                "#,
889                rusqlite::params![input.run_id, input.checkpoint_name],
890                |row| {
891                    Ok((
892                        row.get::<_, String>(0)?,
893                        row.get::<_, String>(1)?,
894                        row.get::<_, String>(2)?,
895                        row.get::<_, Option<String>>(3)?,
896                        row.get::<_, Option<i64>>(4)?,
897                    ))
898                },
899            )
900            .optional()
901            .context("failed to query durable agent step")?;
902
903        if let Some((step_id, state, stored_signature, result_json, lease_expires_at)) = existing {
904            if stored_signature != input.input_signature_json {
905                bail!(
906                    "durable step input signature mismatch for {}",
907                    input.checkpoint_name
908                );
909            }
910            match state.as_str() {
911                "completed" => {
912                    let result_json = result_json.ok_or_else(|| {
913                        anyhow::anyhow!("completed durable agent step missing result_json")
914                    })?;
915                    let result = serde_json::from_str::<AgentProviderResult>(&result_json)
916                        .context("failed to deserialize durable agent result")?;
917                    tx.commit().context("failed to commit replay transaction")?;
918                    return Ok(AgentStepClaim::Replay(Box::new(result)));
919                }
920                "running" if lease_expires_at.is_some_and(|lease| lease > input.now) => {
921                    tx.commit().context("failed to commit wait transaction")?;
922                    return Ok(AgentStepClaim::Wait);
923                }
924                "running" | "failed" | "cancelled" | "pending" => {
925                    tx.execute(
926                        r#"
927                        UPDATE sw_workflow_steps
928                        SET state = 'running',
929                            worker_id = ?1,
930                            lease_expires_at = ?2,
931                            attempts = attempts + 1,
932                            last_attempt_at = ?3,
933                            updated_at = ?3,
934                            failure_reason_json = NULL
935                        WHERE step_id = ?4
936                        "#,
937                        rusqlite::params![
938                            input.worker_id,
939                            input.lease_expires_at,
940                            input.now,
941                            step_id
942                        ],
943                    )
944                    .context("failed to reclaim durable agent step")?;
945                    tx.commit()
946                        .context("failed to commit durable agent reclaim transaction")?;
947                    return Ok(AgentStepClaim::Run { step_id });
948                }
949                other => bail!("unknown durable step state: {other}"),
950            }
951        }
952
953        let step_id = new_id("step");
954        tx.execute(
955            r#"
956            INSERT INTO sw_workflow_steps (
957                step_id,
958                run_id,
959                root_run_id,
960                step_kind,
961                checkpoint_name,
962                input_signature_hash,
963                input_signature_json,
964                state,
965                input_json,
966                worker_id,
967                lease_expires_at,
968                attempts,
969                last_attempt_at,
970                created_at,
971                updated_at
972            )
973            VALUES (?1, ?2, ?3, 'agent', ?4, ?5, ?6, 'running', ?7, ?8, ?9, 1, ?10, ?10, ?10)
974            "#,
975            rusqlite::params![
976                step_id,
977                input.run_id,
978                input.root_run_id,
979                input.checkpoint_name,
980                input.input_signature_hash,
981                input.input_signature_json,
982                serde_json::to_string(input.input_json)?,
983                input.worker_id,
984                input.lease_expires_at,
985                input.now,
986            ],
987        )
988        .context("failed to insert durable agent step")?;
989        tx.commit()
990            .context("failed to commit durable agent step insert")?;
991        Ok(AgentStepClaim::Run { step_id })
992    }
993
994    pub fn complete_agent_step(&mut self, input: AgentStepCompleteInput<'_>) -> anyhow::Result<()> {
995        let result_json = serde_json::to_string(&compact_agent_result_for_replay(input.result))?;
996        let output_tokens = input
997            .result
998            .usage
999            .as_ref()
1000            .and_then(|usage| usage.output_tokens)
1001            .unwrap_or(0);
1002        let usage_json = input
1003            .result
1004            .usage
1005            .as_ref()
1006            .map(serde_json::to_string)
1007            .transpose()?;
1008        let budget_entry_id = new_id("budget");
1009        let tx = self
1010            .connection_mut()
1011            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1012            .context("failed to begin durable agent step completion transaction")?;
1013        tx.execute(
1014            r#"
1015            UPDATE sw_workflow_steps
1016            SET state = 'completed',
1017                result_json = ?1,
1018                lease_expires_at = NULL,
1019                updated_at = ?2
1020            WHERE step_id = ?3
1021            "#,
1022            rusqlite::params![result_json, input.now, input.step_id],
1023        )
1024        .context("failed to mark durable agent step completed")?;
1025        tx.execute(
1026            r#"
1027            INSERT OR IGNORE INTO sw_budget_ledger (
1028                budget_entry_id,
1029                run_id,
1030                root_run_id,
1031                step_id,
1032                source_type,
1033                source_id,
1034                output_tokens,
1035                usage_json,
1036                created_at
1037            )
1038            VALUES (?1, ?2, ?3, ?4, 'agent_step', ?4, ?5, ?6, ?7)
1039            "#,
1040            rusqlite::params![
1041                budget_entry_id,
1042                input.run_id,
1043                input.root_run_id,
1044                input.step_id,
1045                output_tokens as i64,
1046                usage_json,
1047                input.now,
1048            ],
1049        )
1050        .context("failed to insert durable budget ledger entry")?;
1051        tx.commit()
1052            .context("failed to commit durable agent step completion")
1053    }
1054
1055    pub fn claim_or_replay_sleep_step(
1056        &mut self,
1057        input: SleepStepClaimInput<'_>,
1058    ) -> anyhow::Result<SleepStepClaim> {
1059        let tx = self
1060            .connection_mut()
1061            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1062            .context("failed to begin durable sleep step claim transaction")?;
1063
1064        let existing = tx
1065            .query_row(
1066                r#"
1067                SELECT step_id, state, input_signature_json, input_json, lease_expires_at
1068                FROM sw_workflow_steps
1069                WHERE run_id = ?1 AND checkpoint_name = ?2
1070                "#,
1071                rusqlite::params![input.run_id, input.checkpoint_name],
1072                |row| {
1073                    Ok((
1074                        row.get::<_, String>(0)?,
1075                        row.get::<_, String>(1)?,
1076                        row.get::<_, String>(2)?,
1077                        row.get::<_, String>(3)?,
1078                        row.get::<_, Option<i64>>(4)?,
1079                    ))
1080                },
1081            )
1082            .optional()
1083            .context("failed to query durable sleep step")?;
1084
1085        if let Some((step_id, state, stored_signature, input_json, lease_expires_at)) = existing {
1086            if stored_signature != input.input_signature_json {
1087                bail!(
1088                    "durable sleep step input signature mismatch for {}",
1089                    input.checkpoint_name
1090                );
1091            }
1092            match state.as_str() {
1093                "completed" => {
1094                    tx.commit()
1095                        .context("failed to commit sleep replay transaction")?;
1096                    return Ok(SleepStepClaim::Replay);
1097                }
1098                "running" | "pending" | "failed" | "cancelled" => {
1099                    let input_value = serde_json::from_str::<Value>(&input_json)
1100                        .context("failed to deserialize durable sleep input")?;
1101                    let wake_at = input_value
1102                        .get("wakeAt")
1103                        .and_then(Value::as_i64)
1104                        .ok_or_else(|| anyhow::anyhow!("durable sleep step missing wakeAt"))?;
1105                    if wake_at < 0 {
1106                        log::warn!(
1107                            "durable sleep step {} has negative wakeAt {}; resolving immediately",
1108                            step_id,
1109                            wake_at
1110                        );
1111                    }
1112                    if state == "running"
1113                        && lease_expires_at.is_some_and(|lease| lease > input.now)
1114                        && wake_at > input.now
1115                    {
1116                        tx.commit()
1117                            .context("failed to commit sleep wait transaction")?;
1118                        return Ok(SleepStepClaim::WaitUntil { step_id, wake_at });
1119                    }
1120                    tx.execute(
1121                        r#"
1122                        UPDATE sw_workflow_steps
1123                        SET state = 'running',
1124                            worker_id = ?1,
1125                            lease_expires_at = ?2,
1126                            attempts = attempts + 1,
1127                            last_attempt_at = ?3,
1128                            updated_at = ?3,
1129                            failure_reason_json = NULL
1130                        WHERE step_id = ?4
1131                        "#,
1132                        rusqlite::params![
1133                            input.worker_id,
1134                            input.lease_expires_at,
1135                            input.now,
1136                            step_id
1137                        ],
1138                    )
1139                    .context("failed to reclaim durable sleep step")?;
1140                    tx.commit()
1141                        .context("failed to commit durable sleep reclaim transaction")?;
1142                    return Ok(SleepStepClaim::WaitUntil { step_id, wake_at });
1143                }
1144                other => bail!("unknown durable step state: {other}"),
1145            }
1146        }
1147
1148        let step_id = new_id("step");
1149        let duration_ms_i64 =
1150            i64::try_from(input.duration_ms).context("sleep duration is too large to persist")?;
1151        let wake_at = input.now.saturating_add(duration_ms_i64);
1152        let input_json = serde_json::json!({
1153            "durationMs": input.duration_ms,
1154            "wakeAt": wake_at,
1155        });
1156        tx.execute(
1157            r#"
1158            INSERT INTO sw_workflow_steps (
1159                step_id,
1160                run_id,
1161                root_run_id,
1162                step_kind,
1163                checkpoint_name,
1164                input_signature_hash,
1165                input_signature_json,
1166                state,
1167                input_json,
1168                worker_id,
1169                lease_expires_at,
1170                attempts,
1171                last_attempt_at,
1172                created_at,
1173                updated_at
1174            )
1175            VALUES (?1, ?2, ?3, 'sleep', ?4, ?5, ?6, 'running', ?7, ?8, ?9, 1, ?10, ?10, ?10)
1176            "#,
1177            rusqlite::params![
1178                step_id,
1179                input.run_id,
1180                input.root_run_id,
1181                input.checkpoint_name,
1182                input.input_signature_hash,
1183                input.input_signature_json,
1184                serde_json::to_string(&input_json)?,
1185                input.worker_id,
1186                input.lease_expires_at,
1187                input.now,
1188            ],
1189        )
1190        .context("failed to insert durable sleep step")?;
1191        tx.commit()
1192            .context("failed to commit durable sleep step insert")?;
1193        Ok(SleepStepClaim::WaitUntil { step_id, wake_at })
1194    }
1195
1196    pub fn complete_sleep_step(&mut self, input: SleepStepCompleteInput<'_>) -> anyhow::Result<()> {
1197        let result_json = serde_json::to_string(&serde_json::json!({
1198            "ok": true,
1199            "durationMs": input.duration_ms,
1200            "wakeAt": input.wake_at,
1201            "completedAt": input.now,
1202        }))?;
1203        let updated = self
1204            .connection_mut()
1205            .execute(
1206                r#"
1207                UPDATE sw_workflow_steps
1208                SET state = 'completed',
1209                    result_json = ?1,
1210                    lease_expires_at = NULL,
1211                    updated_at = ?2
1212                WHERE step_id = ?3
1213                  AND state = 'running'
1214                "#,
1215                rusqlite::params![result_json, input.now, input.step_id],
1216            )
1217            .context("failed to mark durable sleep step completed")?;
1218        if updated == 1 {
1219            return Ok(());
1220        }
1221
1222        let state = self
1223            .connection()
1224            .query_row(
1225                "SELECT state FROM sw_workflow_steps WHERE step_id = ?1",
1226                rusqlite::params![input.step_id],
1227                |row| row.get::<_, String>(0),
1228            )
1229            .optional()
1230            .context("failed to inspect durable sleep step after completion race")?;
1231        match state.as_deref() {
1232            Some("completed") => Ok(()),
1233            Some(state) => bail!("cannot complete durable sleep step in state {state}"),
1234            None => bail!(
1235                "cannot complete missing durable sleep step {}",
1236                input.step_id
1237            ),
1238        }
1239    }
1240
1241    pub fn fail_agent_step(&mut self, input: AgentStepFailInput<'_>) -> anyhow::Result<()> {
1242        let failure = serde_json::to_string(input.failure_reason)?;
1243        self.connection_mut()
1244            .execute(
1245                r#"
1246                UPDATE sw_workflow_steps
1247                SET state = 'failed',
1248                    failure_reason_json = ?1,
1249                    lease_expires_at = NULL,
1250                    updated_at = ?2
1251                WHERE step_id = ?3
1252                "#,
1253                rusqlite::params![failure, input.now, input.step_id],
1254            )
1255            .context("failed to mark durable agent step failed")?;
1256        Ok(())
1257    }
1258}
1259
1260#[derive(Debug)]
1261pub enum AgentStepClaim {
1262    Replay(Box<AgentProviderResult>),
1263    Run { step_id: String },
1264    Wait,
1265}
1266
1267#[derive(Debug)]
1268pub enum SleepStepClaim {
1269    Replay,
1270    WaitUntil { step_id: String, wake_at: i64 },
1271}
1272
1273fn compact_agent_result_for_replay(result: &AgentProviderResult) -> AgentProviderResult {
1274    AgentProviderResult {
1275        output: result.output.clone(),
1276        session_id: result.session_id.clone(),
1277        model: result.model.clone(),
1278        usage: result.usage.clone(),
1279        isolation: result.isolation.clone(),
1280        raw: None,
1281    }
1282}
1283
1284pub struct AgentStepClaimInput<'a> {
1285    pub run_id: &'a str,
1286    pub root_run_id: &'a str,
1287    pub checkpoint_name: &'a str,
1288    pub input_signature_hash: &'a str,
1289    pub input_signature_json: &'a str,
1290    pub input_json: &'a Value,
1291    pub worker_id: &'a str,
1292    pub lease_expires_at: i64,
1293    pub now: i64,
1294}
1295
1296pub struct AgentStepCompleteInput<'a> {
1297    pub step_id: &'a str,
1298    pub run_id: &'a str,
1299    pub root_run_id: &'a str,
1300    pub result: &'a AgentProviderResult,
1301    pub now: i64,
1302}
1303
1304pub struct AgentStepFailInput<'a> {
1305    pub step_id: &'a str,
1306    pub failure_reason: &'a Value,
1307    pub now: i64,
1308}
1309
1310pub struct SleepStepClaimInput<'a> {
1311    pub run_id: &'a str,
1312    pub root_run_id: &'a str,
1313    pub checkpoint_name: &'a str,
1314    pub input_signature_hash: &'a str,
1315    pub input_signature_json: &'a str,
1316    pub duration_ms: u64,
1317    pub worker_id: &'a str,
1318    pub lease_expires_at: i64,
1319    pub now: i64,
1320}
1321
1322pub struct SleepStepCompleteInput<'a> {
1323    pub step_id: &'a str,
1324    pub duration_ms: u64,
1325    pub wake_at: i64,
1326    pub now: i64,
1327}
1328
1329async fn run_durable_agent_provider(
1330    default_provider: Arc<dyn AgentProvider>,
1331    provider_override: Option<String>,
1332    input: AgentProviderRunInput,
1333    cancel_rx: Option<watch::Receiver<bool>>,
1334) -> anyhow::Result<AgentProviderResult> {
1335    run_agent_provider_with_retry(default_provider, provider_override, input, cancel_rx).await
1336}
1337
1338fn agent_input_signature(provider_name: &str, input: &AgentProviderRunInput) -> Value {
1339    serde_json::json!({
1340        "signatureVersion": 2,
1341        "kind": "agent",
1342        "workflowScope": "root",
1343        "provider": provider_name,
1344        "prompt": input.prompt,
1345        "options": input.options,
1346        "context": {
1347            "phase": input.context.phase,
1348            "cwd": input.context.cwd.as_ref().map(|path| path.to_string_lossy().into_owned()),
1349        }
1350    })
1351}
1352
1353fn sleep_input_signature(duration_ms: u64) -> Value {
1354    serde_json::json!({
1355        "signatureVersion": 1,
1356        "kind": "sleep",
1357        "workflowScope": "root",
1358        "durationMs": duration_ms,
1359    })
1360}
1361
1362fn short_blake3_hex(input: &str) -> String {
1363    let hash = blake3::hash(input.as_bytes());
1364    hash.as_bytes()[..12]
1365        .iter()
1366        .map(|byte| format!("{byte:02x}"))
1367        .collect()
1368}
1369
1370fn canonical_json_string(value: &Value) -> anyhow::Result<String> {
1371    let canonical = canonical_json_value(value);
1372    serde_json::to_string(&canonical).context("failed to serialize canonical JSON")
1373}
1374
1375fn canonical_json_value(value: &Value) -> Value {
1376    match value {
1377        Value::Object(object) => {
1378            let mut sorted = BTreeMap::new();
1379            for (key, value) in object {
1380                sorted.insert(key.clone(), canonical_json_value(value));
1381            }
1382            Value::Object(sorted.into_iter().collect())
1383        }
1384        Value::Array(array) => Value::Array(array.iter().map(canonical_json_value).collect()),
1385        value => value.clone(),
1386    }
1387}
1388
1389#[cfg(test)]
1390mod tests {
1391    use super::*;
1392    use crate::agent_providers::{
1393        AgentProvider, AgentProviderResult, AgentProviderRunInput, AgentProviderSchemaMode,
1394        AgentProviderUsageMode,
1395    };
1396    use serde_json::json;
1397    use std::fs;
1398    use std::sync::{
1399        atomic::{AtomicUsize, Ordering},
1400        Arc, Barrier, Mutex,
1401    };
1402    use std::thread;
1403    use std::time::{Duration, Instant};
1404
1405    struct CountingProvider {
1406        calls: AtomicUsize,
1407    }
1408
1409    struct FlakyOnceProvider {
1410        calls: AtomicUsize,
1411    }
1412
1413    struct TimedProvider {
1414        events: Mutex<Vec<TimedProviderEvent>>,
1415    }
1416
1417    struct SessionProvider {
1418        session_id: &'static str,
1419        cancel_on_run: Option<watch::Sender<bool>>,
1420        delay_ms: u64,
1421    }
1422
1423    #[derive(Debug, Clone)]
1424    struct TimedProviderEvent {
1425        prompt: String,
1426        kind: &'static str,
1427        at: Instant,
1428    }
1429
1430    #[async_trait::async_trait]
1431    impl AgentProvider for CountingProvider {
1432        fn name(&self) -> &str {
1433            "counting"
1434        }
1435        fn schema_mode(&self) -> AgentProviderSchemaMode {
1436            AgentProviderSchemaMode::Builtin
1437        }
1438        fn usage_mode(&self) -> AgentProviderUsageMode {
1439            AgentProviderUsageMode::Builtin
1440        }
1441        async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1442            let count = self.calls.fetch_add(1, Ordering::SeqCst) + 1;
1443            Ok(AgentProviderResult {
1444                output: json!(format!("{}:{count}", input.prompt)),
1445                session_id: None,
1446                model: None,
1447                usage: None,
1448                isolation: None,
1449                raw: None,
1450            })
1451        }
1452    }
1453
1454    #[async_trait::async_trait]
1455    impl AgentProvider for FlakyOnceProvider {
1456        fn name(&self) -> &str {
1457            "flaky-once"
1458        }
1459        fn schema_mode(&self) -> AgentProviderSchemaMode {
1460            AgentProviderSchemaMode::Builtin
1461        }
1462        fn usage_mode(&self) -> AgentProviderUsageMode {
1463            AgentProviderUsageMode::Builtin
1464        }
1465        async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1466            let call = self.calls.fetch_add(1, Ordering::SeqCst) + 1;
1467            if call == 1 {
1468                bail!("temporary provider failure")
1469            }
1470            Ok(AgentProviderResult {
1471                output: json!(format!("recovered: {}", input.prompt)),
1472                session_id: None,
1473                model: None,
1474                usage: None,
1475                isolation: None,
1476                raw: None,
1477            })
1478        }
1479    }
1480
1481    #[async_trait::async_trait]
1482    impl AgentProvider for SessionProvider {
1483        fn name(&self) -> &str {
1484            "session"
1485        }
1486        fn schema_mode(&self) -> AgentProviderSchemaMode {
1487            AgentProviderSchemaMode::Builtin
1488        }
1489        fn usage_mode(&self) -> AgentProviderUsageMode {
1490            AgentProviderUsageMode::Builtin
1491        }
1492        async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1493            if let Some(cancel_tx) = &self.cancel_on_run {
1494                let _ = cancel_tx.send(true);
1495            }
1496            if self.delay_ms > 0 {
1497                tokio::time::sleep(Duration::from_millis(self.delay_ms)).await;
1498            }
1499            Ok(AgentProviderResult {
1500                output: json!(format!("session: {}", input.prompt)),
1501                session_id: Some(self.session_id.to_string()),
1502                model: Some("session-model".to_string()),
1503                usage: Some(crate::agent_providers::AgentUsage {
1504                    output_tokens: Some(1),
1505                    ..Default::default()
1506                }),
1507                isolation: None,
1508                raw: Some(json!({
1509                    "events": [
1510                        { "sessionId": self.session_id, "prompt": input.prompt }
1511                    ]
1512                })),
1513            })
1514        }
1515    }
1516
1517    #[async_trait::async_trait]
1518    impl AgentProvider for TimedProvider {
1519        fn name(&self) -> &str {
1520            "timed"
1521        }
1522        fn schema_mode(&self) -> AgentProviderSchemaMode {
1523            AgentProviderSchemaMode::Builtin
1524        }
1525        fn usage_mode(&self) -> AgentProviderUsageMode {
1526            AgentProviderUsageMode::Builtin
1527        }
1528        async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1529            let delay_ms = input
1530                .prompt
1531                .split(":delay:")
1532                .nth(1)
1533                .and_then(|suffix| suffix.split(':').next())
1534                .and_then(|value| value.parse::<u64>().ok())
1535                .unwrap_or(0);
1536            self.events.lock().unwrap().push(TimedProviderEvent {
1537                prompt: input.prompt.clone(),
1538                kind: "start",
1539                at: Instant::now(),
1540            });
1541            tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1542            self.events.lock().unwrap().push(TimedProviderEvent {
1543                prompt: input.prompt.clone(),
1544                kind: "end",
1545                at: Instant::now(),
1546            });
1547            Ok(AgentProviderResult {
1548                output: json!(format!("{}:done", input.prompt)),
1549                session_id: None,
1550                model: None,
1551                usage: Some(crate::agent_providers::AgentUsage {
1552                    output_tokens: Some(1),
1553                    ..Default::default()
1554                }),
1555                isolation: None,
1556                raw: None,
1557            })
1558        }
1559    }
1560
1561    struct TestSessionLogSink {
1562        root: std::path::PathBuf,
1563        saved_tx: Option<watch::Sender<bool>>,
1564    }
1565
1566    #[async_trait::async_trait]
1567    impl crate::workflow::AgentSessionLogSink for TestSessionLogSink {
1568        async fn write_agent_result(
1569            &self,
1570            provider: &str,
1571            result: &AgentProviderResult,
1572        ) -> anyhow::Result<()> {
1573            write_test_raw_session(&self.root, provider, result)?;
1574            if let Some(saved_tx) = &self.saved_tx {
1575                let _ = saved_tx.send(true);
1576            }
1577            Ok(())
1578        }
1579    }
1580
1581    fn write_test_raw_session(
1582        root: &std::path::Path,
1583        provider: &str,
1584        result: &AgentProviderResult,
1585    ) -> anyhow::Result<()> {
1586        let session_id = result
1587            .session_id
1588            .as_deref()
1589            .ok_or_else(|| anyhow::anyhow!("session id missing"))?;
1590        let events = result
1591            .raw
1592            .as_ref()
1593            .and_then(|raw| raw.get("events"))
1594            .and_then(Value::as_array)
1595            .ok_or_else(|| anyhow::anyhow!("raw events missing"))?;
1596        let dir = root.join(provider);
1597        fs::create_dir_all(&dir)?;
1598        let mut lines = String::new();
1599        for event in events {
1600            lines.push_str(&serde_json::to_string(event)?);
1601            lines.push('\n');
1602        }
1603        fs::write(dir.join(format!("{session_id}.jsonl")), lines)?;
1604        Ok(())
1605    }
1606
1607    fn seed_durable_run(db_path: &std::path::Path) -> (String, String) {
1608        let mut store = SqliteDurableStore::open(db_path).expect("store should open");
1609        store.init().expect("schema should initialize");
1610        let task_id = new_id("task");
1611        let run_id = new_id("run");
1612        store
1613            .insert_local_task_and_run(LocalTaskAndRunInsert {
1614                task_id: &task_id,
1615                run_id: &run_id,
1616                owner_id: "owner",
1617                params_json: &json!({ "scriptPath": "workflow.mjs" }),
1618                workflow_run_json: &json!({ "scriptPath": "workflow.mjs" }),
1619                args_json: &json!({}),
1620                budget_total: Some(100),
1621                max_attempts: 3,
1622                now: 1,
1623            })
1624            .expect("run should be inserted");
1625        (task_id, run_id)
1626    }
1627
1628    fn claim_input<'a>(
1629        run_id: &'a str,
1630        checkpoint_name: &'a str,
1631        worker_id: &'a str,
1632        lease_expires_at: i64,
1633        now: i64,
1634        input_json: &'a Value,
1635    ) -> AgentStepClaimInput<'a> {
1636        AgentStepClaimInput {
1637            run_id,
1638            root_run_id: run_id,
1639            checkpoint_name,
1640            input_signature_hash: "sig",
1641            input_signature_json: r#"{"prompt":"hello"}"#,
1642            input_json,
1643            worker_id,
1644            lease_expires_at,
1645            now,
1646        }
1647    }
1648
1649    #[test]
1650    fn durable_agent_retry_backoff_extends_step_lease_deadline() {
1651        let no_retry = crate::workflow::AgentRetryPolicy {
1652            max_attempts: 1,
1653            backoff_ms: 0,
1654        };
1655        let long_backoff = crate::workflow::AgentRetryPolicy {
1656            max_attempts: 3,
1657            backoff_ms: 120_000,
1658        };
1659        assert_eq!(durable_agent_step_lease_expires_at(1_000, no_retry), 61_000);
1660        assert_eq!(
1661            durable_agent_step_lease_expires_at(1_000, long_backoff),
1662            301_000
1663        );
1664    }
1665
1666    #[test]
1667    fn concurrent_agent_step_claims_allow_only_one_runner() {
1668        let dir = tempfile::tempdir().unwrap();
1669        let db_path = dir.path().join("durable.db");
1670        let (_task_id, run_id) = seed_durable_run(&db_path);
1671        let workers = 8;
1672        let barrier = Arc::new(Barrier::new(workers));
1673        let outcomes = Arc::new(Mutex::new(Vec::new()));
1674
1675        thread::scope(|scope| {
1676            for worker in 0..workers {
1677                let barrier = Arc::clone(&barrier);
1678                let outcomes = Arc::clone(&outcomes);
1679                let db_path = db_path.clone();
1680                let run_id = run_id.clone();
1681                scope.spawn(move || {
1682                    let input_json = json!({ "prompt": "hello" });
1683                    barrier.wait();
1684                    let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1685                    let claim = store
1686                        .claim_or_replay_agent_step(claim_input(
1687                            &run_id,
1688                            "agent:shared",
1689                            &format!("worker-{worker}"),
1690                            10_000,
1691                            100,
1692                            &input_json,
1693                        ))
1694                        .expect("claim should succeed");
1695                    let label = match claim {
1696                        AgentStepClaim::Run { .. } => "run",
1697                        AgentStepClaim::Wait => "wait",
1698                        AgentStepClaim::Replay(_) => "replay",
1699                    };
1700                    outcomes.lock().unwrap().push(label);
1701                });
1702            }
1703        });
1704
1705        let outcomes = outcomes.lock().unwrap();
1706        assert_eq!(
1707            outcomes.iter().filter(|&&outcome| outcome == "run").count(),
1708            1
1709        );
1710        assert_eq!(
1711            outcomes
1712                .iter()
1713                .filter(|&&outcome| outcome == "wait")
1714                .count(),
1715            workers - 1
1716        );
1717        assert_eq!(
1718            outcomes
1719                .iter()
1720                .filter(|&&outcome| outcome == "replay")
1721                .count(),
1722            0
1723        );
1724
1725        let store = SqliteDurableStore::open(&db_path).unwrap();
1726        let (steps, attempts): (i64, i64) = store
1727            .connection()
1728            .query_row(
1729                "SELECT COUNT(*), COALESCE(SUM(attempts), 0) FROM sw_workflow_steps",
1730                [],
1731                |row| Ok((row.get(0)?, row.get(1)?)),
1732            )
1733            .unwrap();
1734        assert_eq!(steps, 1);
1735        assert_eq!(attempts, 1);
1736    }
1737
1738    #[test]
1739    fn concurrent_agent_step_completion_writes_one_budget_ledger_entry() {
1740        let dir = tempfile::tempdir().unwrap();
1741        let db_path = dir.path().join("durable.db");
1742        let (_task_id, run_id) = seed_durable_run(&db_path);
1743        let input_json = json!({ "prompt": "hello" });
1744        let step_id = {
1745            let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1746            match store
1747                .claim_or_replay_agent_step(claim_input(
1748                    &run_id,
1749                    "agent:complete",
1750                    "worker-0",
1751                    10_000,
1752                    100,
1753                    &input_json,
1754                ))
1755                .expect("claim should succeed")
1756            {
1757                AgentStepClaim::Run { step_id } => step_id,
1758                other => panic!("expected run claim, got {other:?}"),
1759            }
1760        };
1761        let workers = 4;
1762        let barrier = Arc::new(Barrier::new(workers));
1763
1764        thread::scope(|scope| {
1765            for _ in 0..workers {
1766                let barrier = Arc::clone(&barrier);
1767                let db_path = db_path.clone();
1768                let run_id = run_id.clone();
1769                let step_id = step_id.clone();
1770                scope.spawn(move || {
1771                    let result = AgentProviderResult {
1772                        output: json!("done"),
1773                        session_id: None,
1774                        model: None,
1775                        usage: Some(crate::agent_providers::AgentUsage {
1776                            output_tokens: Some(7),
1777                            ..Default::default()
1778                        }),
1779                        isolation: None,
1780                        raw: None,
1781                    };
1782                    barrier.wait();
1783                    let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1784                    store
1785                        .complete_agent_step(AgentStepCompleteInput {
1786                            step_id: &step_id,
1787                            run_id: &run_id,
1788                            root_run_id: &run_id,
1789                            result: &result,
1790                            now: 200,
1791                        })
1792                        .expect("completion should be idempotent");
1793                });
1794            }
1795        });
1796
1797        let store = SqliteDurableStore::open(&db_path).unwrap();
1798        let (entries, tokens): (i64, i64) = store
1799            .connection()
1800            .query_row(
1801                "SELECT COUNT(*), COALESCE(SUM(output_tokens), 0) FROM sw_budget_ledger",
1802                [],
1803                |row| Ok((row.get(0)?, row.get(1)?)),
1804            )
1805            .unwrap();
1806        assert_eq!(entries, 1);
1807        assert_eq!(tokens, 7);
1808    }
1809
1810    #[test]
1811    fn completed_agent_step_persists_compact_replay_result_without_raw() {
1812        let dir = tempfile::tempdir().unwrap();
1813        let db_path = dir.path().join("durable.db");
1814        let (_task_id, run_id) = seed_durable_run(&db_path);
1815        let input_json = json!({ "prompt": "hello" });
1816        let step_id = {
1817            let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1818            match store
1819                .claim_or_replay_agent_step(claim_input(
1820                    &run_id,
1821                    "agent:compact",
1822                    "worker-0",
1823                    10_000,
1824                    100,
1825                    &input_json,
1826                ))
1827                .expect("claim should succeed")
1828            {
1829                AgentStepClaim::Run { step_id } => step_id,
1830                other => panic!("expected run claim, got {other:?}"),
1831            }
1832        };
1833        let result = AgentProviderResult {
1834            output: json!("done"),
1835            session_id: Some("provider-session-1".into()),
1836            model: Some("provider/model-from-result".into()),
1837            usage: Some(crate::agent_providers::AgentUsage {
1838                output_tokens: Some(7),
1839                ..Default::default()
1840            }),
1841            isolation: None,
1842            raw: Some(json!({ "events": ["large provider transcript"] })),
1843        };
1844
1845        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1846        store
1847            .complete_agent_step(AgentStepCompleteInput {
1848                step_id: &step_id,
1849                run_id: &run_id,
1850                root_run_id: &run_id,
1851                result: &result,
1852                now: 200,
1853            })
1854            .expect("completion should succeed");
1855        let stored: String = store
1856            .connection()
1857            .query_row(
1858                "SELECT result_json FROM sw_workflow_steps WHERE step_id = ?1",
1859                [&step_id],
1860                |row| row.get(0),
1861            )
1862            .unwrap();
1863        let stored: Value = serde_json::from_str(&stored).unwrap();
1864
1865        assert_eq!(stored["output"], json!("done"));
1866        assert_eq!(stored["sessionId"], json!("provider-session-1"));
1867        assert_eq!(stored["model"], json!("provider/model-from-result"));
1868        assert_eq!(stored["usage"]["outputTokens"], json!(7));
1869        assert!(stored.get("raw").is_none());
1870    }
1871
1872    #[test]
1873    fn expired_agent_step_lease_can_be_reclaimed_from_another_connection() {
1874        let dir = tempfile::tempdir().unwrap();
1875        let db_path = dir.path().join("durable.db");
1876        let (_task_id, run_id) = seed_durable_run(&db_path);
1877        let input_json = json!({ "prompt": "hello" });
1878        let first_step_id = {
1879            let mut store = SqliteDurableStore::open(&db_path).unwrap();
1880            match store
1881                .claim_or_replay_agent_step(claim_input(
1882                    &run_id,
1883                    "agent:lease",
1884                    "worker-1",
1885                    100,
1886                    0,
1887                    &input_json,
1888                ))
1889                .unwrap()
1890            {
1891                AgentStepClaim::Run { step_id } => step_id,
1892                other => panic!("expected run claim, got {other:?}"),
1893            }
1894        };
1895
1896        let mut waiting_store = SqliteDurableStore::open(&db_path).unwrap();
1897        assert!(matches!(
1898            waiting_store
1899                .claim_or_replay_agent_step(claim_input(
1900                    &run_id,
1901                    "agent:lease",
1902                    "worker-2",
1903                    200,
1904                    50,
1905                    &input_json,
1906                ))
1907                .unwrap(),
1908            AgentStepClaim::Wait
1909        ));
1910
1911        let mut reclaiming_store = SqliteDurableStore::open(&db_path).unwrap();
1912        let reclaimed_step_id = match reclaiming_store
1913            .claim_or_replay_agent_step(claim_input(
1914                &run_id,
1915                "agent:lease",
1916                "worker-3",
1917                300,
1918                101,
1919                &input_json,
1920            ))
1921            .unwrap()
1922        {
1923            AgentStepClaim::Run { step_id } => step_id,
1924            other => panic!("expected run claim, got {other:?}"),
1925        };
1926        assert_eq!(reclaimed_step_id, first_step_id);
1927
1928        let attempts: i64 = reclaiming_store
1929            .connection()
1930            .query_row(
1931                "SELECT attempts FROM sw_workflow_steps WHERE step_id = ?1",
1932                rusqlite::params![reclaimed_step_id],
1933                |row| row.get(0),
1934            )
1935            .unwrap();
1936        assert_eq!(attempts, 2);
1937    }
1938
1939    #[tokio::test]
1940    async fn more_than_five_durable_workflows_share_db_while_steps_have_varied_durations() {
1941        async fn run_one(
1942            db_path: std::path::PathBuf,
1943            script_path: std::path::PathBuf,
1944            provider: Arc<TimedProvider>,
1945            id: usize,
1946            base_delay_ms: u64,
1947        ) -> anyhow::Result<LocalDurableRunResult> {
1948            let mut store = SqliteDurableStore::open(db_path)?;
1949            run_local_durable_workflow(
1950                &mut store,
1951                LocalDurableRunOptions::new(
1952                    script_path,
1953                    json!({ "id": id, "baseDelay": base_delay_ms }),
1954                    provider,
1955                ),
1956            )
1957            .await
1958        }
1959
1960        let dir = tempfile::tempdir().unwrap();
1961        let db_path = dir.path().join("durable.db");
1962        let script_path = dir.path().join("workflow.mjs");
1963        fs::write(
1964            &script_path,
1965            r#"
1966export const meta = { name: "durable-concurrent", description: "durable concurrent" };
1967const id = args.id;
1968const baseDelay = args.baseDelay;
1969const first = await agent(`wf:${id}:delay:${baseDelay}:first`);
1970const second = await agent(`wf:${id}:delay:${baseDelay + 30}:second`);
1971const third = await agent(`wf:${id}:delay:${baseDelay + 10}:third`);
1972export default { id, first, second, third };
1973"#,
1974        )
1975        .unwrap();
1976        let provider = Arc::new(TimedProvider {
1977            events: Mutex::new(Vec::new()),
1978        });
1979        let started = Instant::now();
1980        let (r0, r1, r2, r3, r4, r5) = tokio::join!(
1981            run_one(
1982                db_path.clone(),
1983                script_path.clone(),
1984                Arc::clone(&provider),
1985                0,
1986                40
1987            ),
1988            run_one(
1989                db_path.clone(),
1990                script_path.clone(),
1991                Arc::clone(&provider),
1992                1,
1993                55
1994            ),
1995            run_one(
1996                db_path.clone(),
1997                script_path.clone(),
1998                Arc::clone(&provider),
1999                2,
2000                70
2001            ),
2002            run_one(
2003                db_path.clone(),
2004                script_path.clone(),
2005                Arc::clone(&provider),
2006                3,
2007                85
2008            ),
2009            run_one(
2010                db_path.clone(),
2011                script_path.clone(),
2012                Arc::clone(&provider),
2013                4,
2014                100
2015            ),
2016            run_one(
2017                db_path.clone(),
2018                script_path.clone(),
2019                Arc::clone(&provider),
2020                5,
2021                115
2022            ),
2023        );
2024        let elapsed = started.elapsed();
2025        let results = [r0, r1, r2, r3, r4, r5]
2026            .into_iter()
2027            .collect::<Result<Vec<_>, _>>()
2028            .unwrap();
2029
2030        assert_eq!(results.len(), 6);
2031        for (id, result) in results.iter().enumerate() {
2032            assert_eq!(result.workflow.output.result["id"], json!(id));
2033            assert_eq!(result.attempts, 1);
2034        }
2035
2036        let events = provider.events.lock().unwrap().clone();
2037        assert_eq!(
2038            events.iter().filter(|event| event.kind == "start").count(),
2039            18
2040        );
2041        assert_eq!(
2042            events.iter().filter(|event| event.kind == "end").count(),
2043            18
2044        );
2045        assert!(events
2046            .iter()
2047            .any(|event| event.prompt == "wf:5:delay:145:second"));
2048        assert!(
2049            max_in_flight(&events) > 1,
2050            "expected overlapping provider steps, got events: {events:?}"
2051        );
2052        assert!(
2053            elapsed < Duration::from_millis(900),
2054            "runs should overlap on shared DB instead of serializing all varied sleeps; elapsed={elapsed:?}"
2055        );
2056
2057        let store = SqliteDurableStore::open(&db_path).unwrap();
2058        let (completed_runs, completed_tasks, completed_steps, budget_entries, output_tokens): (
2059            i64,
2060            i64,
2061            i64,
2062            i64,
2063            i64,
2064        ) = store
2065            .connection()
2066            .query_row(
2067                r#"
2068                SELECT
2069                  (SELECT COUNT(*) FROM sw_workflow_runs WHERE state = 'completed'),
2070                  (SELECT COUNT(*) FROM sw_workflow_tasks WHERE state = 'completed'),
2071                  (SELECT COUNT(*) FROM sw_workflow_steps WHERE state = 'completed'),
2072                  (SELECT COUNT(*) FROM sw_budget_ledger),
2073                  (SELECT COALESCE(SUM(output_tokens), 0) FROM sw_budget_ledger)
2074                "#,
2075                [],
2076                |row| {
2077                    Ok((
2078                        row.get(0)?,
2079                        row.get(1)?,
2080                        row.get(2)?,
2081                        row.get(3)?,
2082                        row.get(4)?,
2083                    ))
2084                },
2085            )
2086            .unwrap();
2087        assert_eq!(completed_runs, 6);
2088        assert_eq!(completed_tasks, 6);
2089        assert_eq!(completed_steps, 18);
2090        assert_eq!(budget_entries, 18);
2091        assert_eq!(output_tokens, 18);
2092    }
2093
2094    fn max_in_flight(events: &[TimedProviderEvent]) -> usize {
2095        let mut events = events.to_vec();
2096        events.sort_by_key(|event| (event.at, if event.kind == "start" { 0 } else { 1 }));
2097        let mut current = 0usize;
2098        let mut max = 0usize;
2099        for event in events {
2100            if event.kind == "start" {
2101                current += 1;
2102                max = max.max(current);
2103            } else {
2104                current = current.saturating_sub(1);
2105            }
2106        }
2107        max
2108    }
2109
2110    #[tokio::test]
2111    async fn local_durable_run_persists_sleep_step_without_budget_entry() {
2112        let dir = tempfile::tempdir().unwrap();
2113        let db_path = dir.path().join("durable.db");
2114        let script_path = dir.path().join("sleep.workflow.js");
2115        fs::write(
2116            &script_path,
2117            r#"
2118import { sleep } from "workflow:extra";
2119export const meta = { name: "durable-sleep", description: "durable sleep" };
2120await sleep(5);
2121export default { slept: true };
2122"#,
2123        )
2124        .unwrap();
2125
2126        let mut store = SqliteDurableStore::open(&db_path).unwrap();
2127        let result = run_local_durable_workflow(
2128            &mut store,
2129            LocalDurableRunOptions::new(
2130                script_path,
2131                json!({}),
2132                Arc::new(CountingProvider {
2133                    calls: AtomicUsize::new(0),
2134                }),
2135            ),
2136        )
2137        .await
2138        .expect("durable workflow should run");
2139
2140        assert_eq!(result.workflow.output.result, json!({ "slept": true }));
2141        let (sleep_steps, budget_entries): (i64, i64) = store
2142            .connection()
2143            .query_row(
2144                r#"
2145                SELECT
2146                  (SELECT COUNT(*) FROM sw_workflow_steps WHERE step_kind = 'sleep' AND state = 'completed'),
2147                  (SELECT COUNT(*) FROM sw_budget_ledger)
2148                "#,
2149                [],
2150                |row| Ok((row.get(0)?, row.get(1)?)),
2151            )
2152            .unwrap();
2153        assert_eq!(sleep_steps, 1);
2154        assert_eq!(budget_entries, 0);
2155    }
2156
2157    #[tokio::test]
2158    async fn local_durable_run_marks_cancelled_when_cancel_requested() {
2159        let dir = tempfile::tempdir().unwrap();
2160        let db_path = dir.path().join("durable.db");
2161        let script_path = dir.path().join("workflow.mjs");
2162        fs::write(
2163            &script_path,
2164            r#"
2165export const meta = { name: "durable-cancel", description: "durable cancel" };
2166export default { result: await agent("hello") };
2167"#,
2168        )
2169        .unwrap();
2170        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2171        let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(true);
2172        let mut options = LocalDurableRunOptions::new(
2173            script_path,
2174            json!({}),
2175            Arc::new(CountingProvider {
2176                calls: AtomicUsize::new(0),
2177            }),
2178        );
2179        options.cancel_rx = Some(cancel_rx);
2180
2181        let error = run_local_durable_workflow(&mut store, options)
2182            .await
2183            .expect_err("cancelled durable workflow should return an error");
2184        assert_eq!(error.to_string(), "workflow cancelled");
2185
2186        let (run_state, task_state, attempt_state): (String, String, String) = store
2187            .connection()
2188            .query_row(
2189                r#"
2190                SELECT r.state, t.state, a.state
2191                FROM sw_workflow_runs r
2192                JOIN sw_workflow_tasks t ON t.task_id = r.task_id
2193                JOIN sw_workflow_attempts a ON a.run_id = r.run_id
2194                "#,
2195                [],
2196                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2197            )
2198            .unwrap();
2199        assert_eq!(run_state, "cancelled");
2200        assert_eq!(task_state, "cancelled");
2201        assert_eq!(attempt_state, "cancelled");
2202    }
2203
2204    #[tokio::test]
2205    async fn local_durable_run_exports_raw_session_before_workflow_completes() {
2206        let dir = tempfile::tempdir().unwrap();
2207        let db_path = dir.path().join("durable.db");
2208        let raw_dir = dir.path().join("raw");
2209        let script_path = dir.path().join("workflow.mjs");
2210        fs::write(
2211            &script_path,
2212            r#"
2213import { sleep } from "workflow:extra";
2214export const meta = { name: "durable-early-session", description: "durable early session" };
2215await agent("before long sleep");
2216await sleep(100);
2217export default { ok: true };
2218"#,
2219        )
2220        .unwrap();
2221        let provider = Arc::new(SessionProvider {
2222            session_id: "early-session-1",
2223            cancel_on_run: None,
2224            delay_ms: 0,
2225        });
2226        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2227        let mut options = LocalDurableRunOptions::new(script_path, json!({}), provider);
2228        let (saved_tx, mut saved_rx) = tokio::sync::watch::channel(false);
2229        options.session_log_sink = Some(Arc::new(TestSessionLogSink {
2230            root: raw_dir.clone(),
2231            saved_tx: Some(saved_tx),
2232        }));
2233
2234        let run = run_local_durable_workflow(&mut store, options);
2235        tokio::pin!(run);
2236        tokio::select! {
2237            changed = saved_rx.changed() => {
2238                changed.expect("raw session notification should be sent");
2239                assert!(*saved_rx.borrow());
2240                assert!(
2241                    raw_dir.join("session/early-session-1.jsonl").exists(),
2242                    "raw session file should be written before workflow completion"
2243                );
2244            }
2245            result = &mut run => {
2246                panic!("workflow completed before raw session export notification: {result:?}");
2247            }
2248        }
2249
2250        let result = run.await.expect("workflow should complete");
2251        assert_eq!(result.workflow.output.result, json!({ "ok": true }));
2252    }
2253
2254    #[tokio::test]
2255    async fn local_durable_run_exports_raw_session_before_failed_terminal_state() {
2256        let dir = tempfile::tempdir().unwrap();
2257        let db_path = dir.path().join("durable.db");
2258        let raw_dir = dir.path().join("raw");
2259        let script_path = dir.path().join("workflow.mjs");
2260        fs::write(
2261            &script_path,
2262            r#"
2263export const meta = { name: "durable-fail-session", description: "durable fail session" };
2264await agent("before failure");
2265throw new Error("middle failure");
2266"#,
2267        )
2268        .unwrap();
2269        let provider = Arc::new(SessionProvider {
2270            session_id: "failed-session-1",
2271            cancel_on_run: None,
2272            delay_ms: 0,
2273        });
2274        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2275        let mut options = LocalDurableRunOptions::new(script_path, json!({}), provider);
2276        options.session_log_sink = Some(Arc::new(TestSessionLogSink {
2277            root: raw_dir.clone(),
2278            saved_tx: None,
2279        }));
2280
2281        let error = run_local_durable_workflow(&mut store, options)
2282            .await
2283            .expect_err("workflow should fail after exporting agent session");
2284        assert!(
2285            error.to_string().contains("middle failure"),
2286            "unexpected error: {error:#}"
2287        );
2288
2289        let raw_file = raw_dir.join("session/failed-session-1.jsonl");
2290        assert!(raw_file.exists(), "raw session file should be written");
2291        let raw_line = fs::read_to_string(raw_file).unwrap();
2292        assert!(raw_line.contains("before failure"));
2293        let run_state: String = store
2294            .connection()
2295            .query_row("SELECT state FROM sw_workflow_runs", [], |row| row.get(0))
2296            .unwrap();
2297        assert_eq!(run_state, "failed");
2298    }
2299
2300    #[tokio::test]
2301    async fn local_durable_run_exports_raw_session_before_cancelled_state() {
2302        let dir = tempfile::tempdir().unwrap();
2303        let db_path = dir.path().join("durable.db");
2304        let raw_dir = dir.path().join("raw");
2305        let script_path = dir.path().join("workflow.mjs");
2306        fs::write(
2307            &script_path,
2308            r#"
2309export const meta = { name: "durable-cancel-session", description: "durable cancel session" };
2310export default { result: await agent("before cancel") };
2311"#,
2312        )
2313        .unwrap();
2314        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2315        let provider = Arc::new(SessionProvider {
2316            session_id: "cancelled-session-1",
2317            cancel_on_run: Some(cancel_tx),
2318            delay_ms: 10,
2319        });
2320        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2321        let mut options = LocalDurableRunOptions::new(script_path, json!({}), provider);
2322        options.cancel_rx = Some(cancel_rx);
2323        options.session_log_sink = Some(Arc::new(TestSessionLogSink {
2324            root: raw_dir.clone(),
2325            saved_tx: None,
2326        }));
2327
2328        let error = run_local_durable_workflow(&mut store, options)
2329            .await
2330            .expect_err("workflow should be cancelled after exporting agent session");
2331        assert_eq!(error.to_string(), "workflow cancelled");
2332
2333        let raw_file = raw_dir.join("session/cancelled-session-1.jsonl");
2334        assert!(raw_file.exists(), "raw session file should be written");
2335        let raw_line = fs::read_to_string(raw_file).unwrap();
2336        assert!(raw_line.contains("before cancel"));
2337        let run_state: String = store
2338            .connection()
2339            .query_row("SELECT state FROM sw_workflow_runs", [], |row| row.get(0))
2340            .unwrap();
2341        assert_eq!(run_state, "cancelled");
2342    }
2343
2344    #[tokio::test]
2345    async fn local_durable_run_persists_successful_task_and_run() {
2346        let dir = tempfile::tempdir().unwrap();
2347        let db_path = dir.path().join("durable.db");
2348        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2349        let script_path = dir.path().join("workflow.mjs");
2350        fs::write(
2351            &script_path,
2352            r#"
2353export const meta = { name: "durable-local", description: "durable local" };
2354export default { result: await agent("hello") };
2355"#,
2356        )
2357        .unwrap();
2358        let provider = Arc::new(CountingProvider {
2359            calls: AtomicUsize::new(0),
2360        });
2361        let result = run_local_durable_workflow(
2362            &mut store,
2363            LocalDurableRunOptions::new(script_path, json!({}), provider.clone()),
2364        )
2365        .await
2366        .unwrap();
2367        assert_eq!(
2368            result.workflow.output.result,
2369            json!({ "result": "hello:1" })
2370        );
2371        assert_eq!(result.attempts, 1);
2372        assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2373
2374        let state: String = store
2375            .connection()
2376            .query_row(
2377                "SELECT state FROM sw_workflow_tasks WHERE task_id = ?1",
2378                rusqlite::params![result.task_id],
2379                |row| row.get(0),
2380            )
2381            .unwrap();
2382        assert_eq!(state, "completed");
2383        let run_state: String = store
2384            .connection()
2385            .query_row(
2386                "SELECT state FROM sw_workflow_runs WHERE run_id = ?1",
2387                rusqlite::params![result.run_id],
2388                |row| row.get(0),
2389            )
2390            .unwrap();
2391        assert_eq!(run_state, "completed");
2392        let completed_steps: i64 = store
2393            .connection()
2394            .query_row(
2395                "SELECT COUNT(*) FROM sw_workflow_steps WHERE state = 'completed'",
2396                [],
2397                |row| row.get(0),
2398            )
2399            .unwrap();
2400        assert_eq!(completed_steps, 1);
2401    }
2402
2403    #[tokio::test]
2404    async fn local_durable_run_uses_runtime_agent_retry_without_global_retry() {
2405        let dir = tempfile::tempdir().unwrap();
2406        let db_path = dir.path().join("durable.db");
2407        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2408        let script_path = dir.path().join("workflow.mjs");
2409        fs::write(
2410            &script_path,
2411            r#"
2412export const meta = { name: "durable-runtime-retry", description: "durable runtime retry" };
2413export default { result: await agent("hello", { retry: { maxAttempts: 2, backoffMs: 0 } }) };
2414"#,
2415        )
2416        .unwrap();
2417        let provider = Arc::new(FlakyOnceProvider {
2418            calls: AtomicUsize::new(0),
2419        });
2420
2421        let result = run_local_durable_workflow(
2422            &mut store,
2423            LocalDurableRunOptions::new(script_path, json!({}), provider.clone()),
2424        )
2425        .await
2426        .unwrap();
2427        assert_eq!(
2428            result.workflow.output.result,
2429            json!({ "result": "recovered: hello" })
2430        );
2431        assert_eq!(provider.calls.load(Ordering::SeqCst), 2);
2432
2433        let (workflow_attempts, completed_steps, failed_steps): (i64, i64, i64) = store
2434            .connection()
2435            .query_row(
2436                r#"
2437                SELECT
2438                  (SELECT COUNT(*) FROM sw_workflow_attempts),
2439                  (SELECT COUNT(*) FROM sw_workflow_steps WHERE step_kind = 'agent' AND state = 'completed'),
2440                  (SELECT COUNT(*) FROM sw_workflow_steps WHERE step_kind = 'agent' AND state = 'failed')
2441                "#,
2442                [],
2443                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2444            )
2445            .unwrap();
2446        assert_eq!(workflow_attempts, 1);
2447        assert_eq!(completed_steps, 1);
2448        assert_eq!(failed_steps, 0);
2449    }
2450
2451    #[tokio::test]
2452    async fn local_durable_run_records_one_attempt_without_global_retry() {
2453        let dir = tempfile::tempdir().unwrap();
2454        let db_path = dir.path().join("durable.db");
2455        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2456        let script_path = dir.path().join("workflow.mjs");
2457        fs::write(
2458            &script_path,
2459            r#"
2460export const meta = { name: "durable-no-global-retry", description: "durable no global retry" };
2461await agent("hello");
2462throw new Error("boom");
2463export default { unreachable: true };
2464"#,
2465        )
2466        .unwrap();
2467        let provider = Arc::new(CountingProvider {
2468            calls: AtomicUsize::new(0),
2469        });
2470        let options = LocalDurableRunOptions::new(script_path, json!({}), provider.clone());
2471
2472        let error = run_local_durable_workflow(&mut store, options)
2473            .await
2474            .unwrap_err();
2475        assert!(error.to_string().contains("boom"));
2476        assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2477
2478        let attempts: i64 = store
2479            .connection()
2480            .query_row("SELECT COUNT(*) FROM sw_workflow_attempts", [], |row| {
2481                row.get(0)
2482            })
2483            .unwrap();
2484        assert_eq!(attempts, 1);
2485        let completed_steps: i64 = store
2486            .connection()
2487            .query_row(
2488                "SELECT COUNT(*) FROM sw_workflow_steps WHERE state = 'completed'",
2489                [],
2490                |row| row.get(0),
2491            )
2492            .unwrap();
2493        assert_eq!(completed_steps, 1);
2494    }
2495
2496    #[test]
2497    fn prepare_resume_run_reports_missing_run_id_and_database() {
2498        let dir = tempfile::tempdir().unwrap();
2499        let db_path = dir.path().join("durable.db");
2500        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2501        store.init().expect("schema should initialize");
2502
2503        let error = store
2504            .prepare_resume_run("run_missing", "owner", 1)
2505            .unwrap_err()
2506            .to_string();
2507
2508        assert!(error.contains("workflow run run_missing was not found"));
2509        assert!(error.contains(&db_path.display().to_string()));
2510        assert!(error.contains("check --db"));
2511    }
2512
2513    #[tokio::test]
2514    async fn local_durable_run_resumes_existing_run_and_replays_steps() {
2515        let dir = tempfile::tempdir().unwrap();
2516        let db_path = dir.path().join("durable.db");
2517        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2518        let script_path = dir.path().join("workflow.mjs");
2519        fs::write(
2520            &script_path,
2521            r#"
2522export const meta = { name: "durable-resume", description: "durable resume" };
2523const value = await agent("hello");
2524throw new Error("boom");
2525export default { value };
2526"#,
2527        )
2528        .unwrap();
2529        let provider = Arc::new(CountingProvider {
2530            calls: AtomicUsize::new(0),
2531        });
2532        let first_options =
2533            LocalDurableRunOptions::new(script_path.clone(), json!({}), provider.clone());
2534        let first_error = run_local_durable_workflow(&mut store, first_options)
2535            .await
2536            .unwrap_err();
2537        assert!(first_error.to_string().contains("boom"));
2538        assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2539
2540        let run_id: String = store
2541            .connection()
2542            .query_row("SELECT run_id FROM sw_workflow_runs", [], |row| row.get(0))
2543            .unwrap();
2544
2545        fs::write(
2546            &script_path,
2547            r#"
2548export const meta = { name: "durable-resume", description: "durable resume" };
2549const value = await agent("hello");
2550export default { value };
2551"#,
2552        )
2553        .unwrap();
2554        let mut resume_options =
2555            LocalDurableRunOptions::new(script_path.clone(), json!({}), provider.clone());
2556        resume_options.resume_run_id = Some(run_id);
2557        let resumed = run_local_durable_workflow(&mut store, resume_options)
2558            .await
2559            .unwrap();
2560        assert_eq!(
2561            resumed.workflow.output.result,
2562            json!({ "value": "hello:1" })
2563        );
2564        assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2565    }
2566}