Skip to main content

smol_workflow_engine/durable/
runner.rs

1//! Minimal local durable workflow runner.
2//!
3//! This module implements the first local-only durable flow: create a local task
4//! and root run, execute the existing workflow engine in-process, and persist the
5//! terminal task/run state. Durable retryable steps are introduced separately.
6
7use crate::agent_providers::{AgentProvider, AgentProviderResult, AgentProviderRunInput};
8use crate::durable::json::{
9    DurableRunMode, FailureReasonJSON, LocalTaskParamsJSON, WorkflowRunJSON,
10};
11use crate::events::{WorkflowEvent, WorkflowEventMetadata, WorkflowEventSink};
12use crate::metadata::read_workflow_metadata;
13use crate::workflow::{
14    run_agent_provider_with_retry, run_workflow, RunWorkflowOptions, RunWorkflowResult,
15    WorkflowAgentRunner,
16};
17use anyhow::{bail, Context};
18use rusqlite::OptionalExtension;
19use serde_json::Value;
20use std::collections::{BTreeMap, HashMap};
21use std::path::PathBuf;
22use std::sync::{Arc, Mutex};
23use std::time::{Duration, Instant};
24use tokio::sync::watch;
25
26use super::sqlite::{new_id, now_ms, SqliteDurableStore};
27
28const WORKFLOW_TASK_NAME: &str = "workflow.run";
29const LOCAL_CLAIM_SCOPE: &str = "local";
30const DEFAULT_STEP_LEASE_MS: u64 = 60_000;
31
32/// Options for a local durable workflow run.
33pub struct LocalDurableRunOptions {
34    pub script_path: PathBuf,
35    pub workflow_cwd: Option<PathBuf>,
36    pub args: Value,
37    pub agent_provider: Arc<dyn AgentProvider>,
38    pub model_map: BTreeMap<String, String>,
39    pub budget_total: Option<u64>,
40    pub max_parallel_agent_requests: Option<usize>,
41    pub resume_run_id: Option<String>,
42    pub cancel_rx: Option<watch::Receiver<bool>>,
43    pub event_sink: Option<Arc<dyn crate::workflow::WorkflowEventSink>>,
44    pub session_log_sink: Option<Arc<dyn crate::workflow::AgentSessionLogSink>>,
45}
46
47impl LocalDurableRunOptions {
48    pub fn new(script_path: PathBuf, args: Value, agent_provider: Arc<dyn AgentProvider>) -> Self {
49        Self {
50            script_path,
51            workflow_cwd: None,
52            args,
53            agent_provider,
54            model_map: BTreeMap::new(),
55            budget_total: None,
56            max_parallel_agent_requests: None,
57            resume_run_id: None,
58            cancel_rx: None,
59            event_sink: None,
60            session_log_sink: None,
61        }
62    }
63}
64
65struct RunScopedWorkflowEventSink {
66    inner: Arc<dyn WorkflowEventSink>,
67    run_id: String,
68    start: Instant,
69}
70
71impl RunScopedWorkflowEventSink {
72    fn new(inner: Arc<dyn WorkflowEventSink>, run_id: String) -> Self {
73        Self {
74            inner,
75            run_id,
76            start: Instant::now(),
77        }
78    }
79}
80
81#[async_trait::async_trait]
82impl WorkflowEventSink for RunScopedWorkflowEventSink {
83    async fn emit(&self, event: WorkflowEvent) -> anyhow::Result<()> {
84        let workflow_depth = event
85            .metadata
86            .as_ref()
87            .and_then(|metadata| metadata.workflow_depth)
88            .unwrap_or(0);
89        if workflow_depth == 0
90            && matches!(
91                event.event_type.as_str(),
92                "workflow.started" | "workflow.result" | "workflow.error"
93            )
94        {
95            return Ok(());
96        }
97        self.emit_scoped(event).await
98    }
99}
100
101impl RunScopedWorkflowEventSink {
102    async fn emit_scoped(&self, mut event: WorkflowEvent) -> anyhow::Result<()> {
103        let metadata = event
104            .metadata
105            .get_or_insert_with(WorkflowEventMetadata::default);
106        if metadata.run_id.is_none() {
107            metadata.run_id = Some(self.run_id.clone());
108        }
109        if metadata.workflow_depth.is_none() {
110            metadata.workflow_depth = Some(0);
111        }
112        if event.event_type.as_str() != "workflow.started" && event.elapsed_nanos.is_none() {
113            event.elapsed_nanos = Some(elapsed_nanos(self.start));
114        }
115        self.inner.emit(event).await
116    }
117}
118
119fn elapsed_nanos(start: Instant) -> u64 {
120    u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX)
121}
122
123fn durable_agent_step_lease_expires_at(
124    now: i64,
125    retry_policy: crate::workflow::AgentRetryPolicy,
126) -> i64 {
127    let retry_backoff_budget = retry_policy
128        .backoff_ms
129        .saturating_mul(u64::from(retry_policy.max_attempts.saturating_sub(1)));
130    let lease_ms = DEFAULT_STEP_LEASE_MS.saturating_add(retry_backoff_budget);
131    let lease_ms = i64::try_from(lease_ms).unwrap_or(i64::MAX);
132    now.saturating_add(lease_ms)
133}
134
135fn rfc3339_now() -> anyhow::Result<String> {
136    Ok(time::OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339)?)
137}
138
139/// Result of a local durable workflow run.
140#[derive(Debug)]
141pub struct LocalDurableRunResult {
142    pub task_id: String,
143    pub run_id: String,
144    pub attempts: u32,
145    pub workflow: RunWorkflowResult,
146}
147
148#[derive(Debug)]
149pub struct SqliteDurableAgentRunner {
150    db_path: PathBuf,
151    run_id: String,
152    root_run_id: String,
153    worker_id: String,
154    cancel_rx: Option<watch::Receiver<bool>>,
155    occurrences: Mutex<HashMap<String, u64>>,
156}
157
158impl SqliteDurableAgentRunner {
159    pub fn new(
160        db_path: PathBuf,
161        run_id: String,
162        root_run_id: String,
163        worker_id: String,
164        cancel_rx: Option<watch::Receiver<bool>>,
165    ) -> Self {
166        Self {
167            db_path,
168            run_id,
169            root_run_id,
170            worker_id,
171            cancel_rx,
172            occurrences: Mutex::new(HashMap::new()),
173        }
174    }
175
176    fn next_checkpoint_name(&self, base_checkpoint_name: String) -> anyhow::Result<String> {
177        let mut occurrences = self
178            .occurrences
179            .lock()
180            .map_err(|_| anyhow::anyhow!("durable occurrence counter lock was poisoned"))?;
181        let count = occurrences.entry(base_checkpoint_name.clone()).or_insert(0);
182        *count += 1;
183        if *count == 1 {
184            Ok(base_checkpoint_name)
185        } else {
186            Ok(format!("{base_checkpoint_name}#{count}"))
187        }
188    }
189}
190
191#[async_trait::async_trait]
192impl WorkflowAgentRunner for SqliteDurableAgentRunner {
193    fn retry_in_runtime(&self) -> bool {
194        false
195    }
196
197    async fn run_agent(
198        &self,
199        default_provider: Arc<dyn AgentProvider>,
200        provider_override: Option<String>,
201        input: AgentProviderRunInput,
202    ) -> anyhow::Result<AgentProviderResult> {
203        let provider_name = provider_override
204            .as_deref()
205            .unwrap_or_else(|| default_provider.name())
206            .to_string();
207        let input_signature = agent_input_signature(&provider_name, &input);
208        let input_signature_json = canonical_json_string(&input_signature)?;
209        let input_signature_hash = short_blake3_hex(&input_signature_json);
210        let base_checkpoint_name = format!("step:sig_{input_signature_hash}");
211        let checkpoint_name = self.next_checkpoint_name(base_checkpoint_name)?;
212        let input_json = serde_json::to_value(&input_signature)
213            .context("failed to serialize durable agent input")?;
214        let retry_policy = crate::workflow::agent_retry_policy(&input.options)?;
215
216        loop {
217            let claim = {
218                let mut store = SqliteDurableStore::open(&self.db_path)?;
219                store.claim_or_replay_agent_step(AgentStepClaimInput {
220                    run_id: &self.run_id,
221                    root_run_id: &self.root_run_id,
222                    checkpoint_name: &checkpoint_name,
223                    input_signature_hash: &input_signature_hash,
224                    input_signature_json: &input_signature_json,
225                    input_json: &input_json,
226                    worker_id: &self.worker_id,
227                    lease_expires_at: durable_agent_step_lease_expires_at(now_ms(), retry_policy),
228                    now: now_ms(),
229                })?
230            };
231
232            match claim {
233                AgentStepClaim::Replay(result) => return Ok(*result),
234                AgentStepClaim::Run { step_id } => {
235                    let provider_result = run_durable_agent_provider(
236                        default_provider,
237                        provider_override,
238                        input,
239                        self.cancel_rx.clone(),
240                    )
241                    .await;
242                    let mut store = SqliteDurableStore::open(&self.db_path)?;
243                    match provider_result {
244                        Ok(result) => {
245                            store.complete_agent_step(AgentStepCompleteInput {
246                                step_id: &step_id,
247                                run_id: &self.run_id,
248                                root_run_id: &self.root_run_id,
249                                result: &result,
250                                now: now_ms(),
251                            })?;
252                            return Ok(result);
253                        }
254                        Err(error) => {
255                            let failure_reason = serde_json::to_value(FailureReasonJSON {
256                                message: error.to_string(),
257                            })?;
258                            store.fail_agent_step(AgentStepFailInput {
259                                step_id: &step_id,
260                                failure_reason: &failure_reason,
261                                now: now_ms(),
262                            })?;
263                            return Err(error);
264                        }
265                    }
266                }
267                AgentStepClaim::Wait => {
268                    tokio::time::sleep(Duration::from_millis(50)).await;
269                }
270            }
271        }
272    }
273
274    async fn sleep(&self, duration_ms: u64) -> anyhow::Result<()> {
275        let input_signature = sleep_input_signature(duration_ms);
276        let input_signature_json = canonical_json_string(&input_signature)?;
277        let input_signature_hash = short_blake3_hex(&input_signature_json);
278        let base_checkpoint_name = format!("step:sig_{input_signature_hash}");
279        let checkpoint_name = self.next_checkpoint_name(base_checkpoint_name)?;
280
281        let now = now_ms();
282        let claim = {
283            let mut store = SqliteDurableStore::open(&self.db_path)?;
284            store.claim_or_replay_sleep_step(SleepStepClaimInput {
285                run_id: &self.run_id,
286                root_run_id: &self.root_run_id,
287                checkpoint_name: &checkpoint_name,
288                input_signature_hash: &input_signature_hash,
289                input_signature_json: &input_signature_json,
290                duration_ms,
291                worker_id: &self.worker_id,
292                lease_expires_at: now + 60_000,
293                now,
294            })?
295        };
296
297        match claim {
298            SleepStepClaim::Replay => Ok(()),
299            SleepStepClaim::WaitUntil { step_id, wake_at } => {
300                let now = now_ms();
301                if wake_at > now {
302                    tokio::time::sleep(Duration::from_millis((wake_at - now) as u64)).await;
303                }
304                let mut store = SqliteDurableStore::open(&self.db_path)?;
305                store.complete_sleep_step(SleepStepCompleteInput {
306                    step_id: &step_id,
307                    duration_ms,
308                    wake_at,
309                    now: now_ms(),
310                })?;
311                Ok(())
312            }
313        }
314    }
315}
316
317/// Execute a workflow locally while persisting durable task/run/attempt state.
318pub async fn run_local_durable_workflow(
319    store: &mut SqliteDurableStore,
320    options: LocalDurableRunOptions,
321) -> anyhow::Result<LocalDurableRunResult> {
322    store.init()?;
323
324    let owner_id = new_id("owner");
325    let now = now_ms();
326    let params_json = serde_json::to_value(LocalTaskParamsJSON {
327        mode: DurableRunMode::Local,
328        script_path: options.script_path.clone(),
329        args: options.args.clone(),
330        budget_total: options.budget_total,
331    })?;
332    let workflow_metadata = read_workflow_metadata(&options.script_path).ok().flatten();
333    let workflow_run_json = serde_json::to_value(WorkflowRunJSON {
334        mode: DurableRunMode::Local,
335        script_path: options.script_path.clone(),
336        metadata: workflow_metadata,
337    })?;
338
339    let (task_id, run_id, first_attempt) = if let Some(run_id) = options.resume_run_id.clone() {
340        let (task_id, current_attempts) = store.prepare_resume_run(&run_id, &owner_id, now)?;
341        (task_id, run_id, current_attempts + 1)
342    } else {
343        let task_id = new_id("task");
344        let run_id = new_id("run");
345        store.insert_local_task_and_run(LocalTaskAndRunInsert {
346            task_id: &task_id,
347            run_id: &run_id,
348            owner_id: &owner_id,
349            params_json: &params_json,
350            workflow_run_json: &workflow_run_json,
351            args_json: &options.args,
352            budget_total: options.budget_total,
353            max_attempts: 1,
354            now,
355        })?;
356        (task_id, run_id, 1)
357    };
358
359    let workflow_event_sink = options.event_sink.as_ref().map(|sink| {
360        Arc::new(RunScopedWorkflowEventSink::new(
361            Arc::clone(sink),
362            run_id.clone(),
363        ))
364    });
365    if let Some(event_sink) = workflow_event_sink.as_ref() {
366        event_sink
367            .emit_scoped(WorkflowEvent::started(rfc3339_now()?))
368            .await
369            .context("failed to emit workflow started event")?;
370    }
371
372    let attempt = first_attempt;
373    let attempt_id = new_id("attempt");
374    store.start_attempt(LocalAttemptStart {
375        task_id: &task_id,
376        run_id: &run_id,
377        attempt_id: &attempt_id,
378        owner_id: &owner_id,
379        attempt,
380        lease_expires_at: now_ms() + 60_000,
381        now: now_ms(),
382    })?;
383
384    let agent_runner = store.path().map(|db_path| {
385        Arc::new(SqliteDurableAgentRunner::new(
386            db_path.to_path_buf(),
387            run_id.clone(),
388            run_id.clone(),
389            owner_id.clone(),
390            options.cancel_rx.clone(),
391        )) as Arc<dyn WorkflowAgentRunner>
392    });
393
394    let result = run_workflow(RunWorkflowOptions {
395        script_path: options.script_path.clone(),
396        workflow_cwd: options.workflow_cwd.clone(),
397        args: options.args.clone(),
398        agent_provider: Arc::clone(&options.agent_provider),
399        model_map: options.model_map.clone(),
400        budget_total: options.budget_total,
401        budget_spent: 0,
402        nesting_depth: 0,
403        max_parallel_agent_requests: options.max_parallel_agent_requests,
404        agent_runner,
405        cancel_rx: options.cancel_rx.clone(),
406        event_sink: workflow_event_sink
407            .as_ref()
408            .map(|sink| Arc::clone(sink) as Arc<dyn WorkflowEventSink>),
409        event_parent_step_id: None,
410        event_stream_start: workflow_event_sink.as_ref().map(|sink| sink.start),
411        session_log_sink: options.session_log_sink.clone(),
412    })
413    .await;
414
415    match result {
416        Ok(workflow) => {
417            let completed_payload = serde_json::to_value(&workflow.output)
418                .context("failed to serialize durable workflow output")?;
419            store.complete_attempt_and_task(LocalAttemptComplete {
420                task_id: &task_id,
421                run_id: &run_id,
422                attempt_id: &attempt_id,
423                completed_payload: &completed_payload,
424                budget_spent: workflow.budget.spent,
425                now: now_ms(),
426            })?;
427            if let Some(event_sink) = workflow_event_sink.as_ref() {
428                event_sink
429                    .emit_scoped(WorkflowEvent::result(
430                        workflow.token_usage.input_tokens,
431                        workflow.token_usage.output_tokens,
432                        workflow.token_usage.total_tokens,
433                        workflow.output.result.clone(),
434                    ))
435                    .await
436                    .context("failed to emit workflow result event")?;
437            }
438            Ok(LocalDurableRunResult {
439                task_id,
440                run_id,
441                attempts: attempt,
442                workflow,
443            })
444        }
445        Err(error) => {
446            let failure_reason = serde_json::to_value(FailureReasonJSON {
447                message: error.to_string(),
448            })?;
449            if cancellation_requested(&options.cancel_rx) {
450                store.cancel_attempt_and_task(LocalAttemptCancel {
451                    task_id: &task_id,
452                    run_id: &run_id,
453                    attempt_id: &attempt_id,
454                    failure_reason: &failure_reason,
455                    now: now_ms(),
456                })?;
457            } else {
458                store.fail_attempt(LocalAttemptFail {
459                    task_id: &task_id,
460                    run_id: &run_id,
461                    attempt_id: &attempt_id,
462                    failure_reason: &failure_reason,
463                    terminal: true,
464                    now: now_ms(),
465                })?;
466            }
467            if let Some(event_sink) = workflow_event_sink.as_ref() {
468                event_sink
469                    .emit_scoped(WorkflowEvent::error(error.to_string(), None))
470                    .await
471                    .context("failed to emit workflow error event")?;
472            }
473            Err(error)
474        }
475    }
476}
477
478pub struct LocalTaskAndRunInsert<'a> {
479    pub task_id: &'a str,
480    pub run_id: &'a str,
481    pub owner_id: &'a str,
482    pub params_json: &'a Value,
483    pub workflow_run_json: &'a Value,
484    pub args_json: &'a Value,
485    pub budget_total: Option<u64>,
486    pub max_attempts: u32,
487    pub now: i64,
488}
489
490pub struct LocalAttemptStart<'a> {
491    pub task_id: &'a str,
492    pub run_id: &'a str,
493    pub attempt_id: &'a str,
494    pub owner_id: &'a str,
495    pub attempt: u32,
496    pub lease_expires_at: i64,
497    pub now: i64,
498}
499
500pub struct LocalAttemptComplete<'a> {
501    pub task_id: &'a str,
502    pub run_id: &'a str,
503    pub attempt_id: &'a str,
504    pub completed_payload: &'a Value,
505    pub budget_spent: u64,
506    pub now: i64,
507}
508
509pub struct LocalAttemptFail<'a> {
510    pub task_id: &'a str,
511    pub run_id: &'a str,
512    pub attempt_id: &'a str,
513    pub failure_reason: &'a Value,
514    pub terminal: bool,
515    pub now: i64,
516}
517
518pub struct LocalAttemptCancel<'a> {
519    pub task_id: &'a str,
520    pub run_id: &'a str,
521    pub attempt_id: &'a str,
522    pub failure_reason: &'a Value,
523    pub now: i64,
524}
525
526fn cancellation_requested(cancel_rx: &Option<watch::Receiver<bool>>) -> bool {
527    cancel_rx
528        .as_ref()
529        .is_some_and(|cancel_rx| *cancel_rx.borrow())
530}
531
532impl SqliteDurableStore {
533    pub fn insert_local_task_and_run(
534        &mut self,
535        input: LocalTaskAndRunInsert<'_>,
536    ) -> anyhow::Result<()> {
537        let tx = self
538            .connection_mut()
539            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
540            .context("failed to begin local durable task transaction")?;
541        tx.execute(
542            r#"
543            INSERT INTO sw_workflow_tasks (
544                task_id,
545                task_name,
546                state,
547                params_json,
548                submitted_by_owner_id,
549                claimed_by_owner_id,
550                claim_scope,
551                created_at,
552                updated_at,
553                max_attempts
554            )
555            VALUES (?1, ?2, 'pending', ?3, ?4, NULL, ?5, ?6, ?6, ?7)
556            "#,
557            rusqlite::params![
558                input.task_id,
559                WORKFLOW_TASK_NAME,
560                serde_json::to_string(input.params_json)?,
561                input.owner_id,
562                LOCAL_CLAIM_SCOPE,
563                input.now,
564                input.max_attempts,
565            ],
566        )
567        .context("failed to insert durable workflow task")?;
568        tx.execute(
569            r#"
570            INSERT INTO sw_workflow_runs (
571                run_id,
572                task_id,
573                root_run_id,
574                state,
575                workflow_run_json,
576                args_json,
577                budget_total,
578                budget_spent,
579                created_at,
580                updated_at
581            )
582            VALUES (?1, ?2, ?1, 'pending', ?3, ?4, ?5, 0, ?6, ?6)
583            "#,
584            rusqlite::params![
585                input.run_id,
586                input.task_id,
587                serde_json::to_string(input.workflow_run_json)?,
588                serde_json::to_string(input.args_json)?,
589                input.budget_total.map(|value| value as i64),
590                input.now,
591            ],
592        )
593        .context("failed to insert durable workflow run")?;
594        tx.commit()
595            .context("failed to commit local durable task transaction")
596    }
597
598    pub fn prepare_resume_run(
599        &mut self,
600        run_id: &str,
601        owner_id: &str,
602        now: i64,
603    ) -> anyhow::Result<(String, u32)> {
604        let db_label = self
605            .path()
606            .map(|path| path.display().to_string())
607            .unwrap_or_else(|| "<in-memory>".to_string());
608        let tx = self
609            .connection_mut()
610            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
611            .context("failed to begin durable resume transaction")?;
612        let task_id: String = tx
613            .query_row(
614                r#"
615                SELECT task_id
616                FROM sw_workflow_runs
617                WHERE run_id = ?1
618                "#,
619                rusqlite::params![run_id],
620                |row| row.get(0),
621            )
622            .optional()
623            .context("failed to query durable run to resume")?
624            .ok_or_else(|| {
625                anyhow::anyhow!("workflow run {run_id} was not found in {db_label}; check --db")
626            })?;
627        let current_attempts: u32 =
628            tx.query_row(
629                r#"
630                SELECT COUNT(*)
631                FROM sw_workflow_attempts
632                WHERE run_id = ?1
633                "#,
634                rusqlite::params![run_id],
635                |row| row.get::<_, i64>(0),
636            )
637            .context("failed to count durable run attempts")? as u32;
638        tx.execute(
639            r#"
640            UPDATE sw_workflow_tasks
641            SET state = 'pending',
642                claimed_by_owner_id = NULL,
643                lease_expires_at = NULL,
644                updated_at = ?1
645            WHERE task_id = ?2
646              AND state IN ('pending', 'running', 'failed')
647            "#,
648            rusqlite::params![now, task_id],
649        )
650        .context("failed to prepare durable task for resume")?;
651        tx.execute(
652            r#"
653            UPDATE sw_workflow_runs
654            SET state = 'pending',
655                updated_at = ?1
656            WHERE run_id = ?2
657              AND state IN ('pending', 'running', 'failed')
658            "#,
659            rusqlite::params![now, run_id],
660        )
661        .context("failed to prepare durable run for resume")?;
662        tx.execute(
663            r#"
664            UPDATE sw_workflow_tasks
665            SET submitted_by_owner_id = ?1
666            WHERE task_id = ?2
667              AND claim_scope = 'local'
668            "#,
669            rusqlite::params![owner_id, task_id],
670        )
671        .context("failed to adopt durable task owner")?;
672        tx.commit().context("failed to commit durable resume")?;
673        Ok((task_id, current_attempts))
674    }
675
676    pub fn start_attempt(&mut self, input: LocalAttemptStart<'_>) -> anyhow::Result<()> {
677        let tx = self
678            .connection_mut()
679            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
680            .context("failed to begin durable attempt start transaction")?;
681        tx.execute(
682            r#"
683            UPDATE sw_workflow_tasks
684            SET state = 'running',
685                claimed_by_owner_id = ?1,
686                lease_expires_at = ?2,
687                updated_at = ?3
688            WHERE task_id = ?4
689              AND claim_scope = 'local'
690              AND submitted_by_owner_id = ?1
691              AND state IN ('pending', 'running')
692            "#,
693            rusqlite::params![
694                input.owner_id,
695                input.lease_expires_at,
696                input.now,
697                input.task_id
698            ],
699        )
700        .context("failed to claim local durable task")?;
701        tx.execute(
702            r#"
703            UPDATE sw_workflow_runs
704            SET state = 'running',
705                updated_at = ?1
706            WHERE run_id = ?2
707            "#,
708            rusqlite::params![input.now, input.run_id],
709        )
710        .context("failed to mark durable workflow run running")?;
711        tx.execute(
712            r#"
713            INSERT INTO sw_workflow_attempts (
714                attempt_id,
715                run_id,
716                task_id,
717                attempt,
718                worker_id,
719                state,
720                lease_expires_at,
721                started_at
722            )
723            VALUES (?1, ?2, ?3, ?4, ?5, 'running', ?6, ?7)
724            "#,
725            rusqlite::params![
726                input.attempt_id,
727                input.run_id,
728                input.task_id,
729                input.attempt,
730                input.owner_id,
731                input.lease_expires_at,
732                input.now,
733            ],
734        )
735        .context("failed to insert durable workflow attempt")?;
736        tx.commit()
737            .context("failed to commit durable attempt start transaction")
738    }
739
740    pub fn complete_attempt_and_task(
741        &mut self,
742        input: LocalAttemptComplete<'_>,
743    ) -> anyhow::Result<()> {
744        let payload = serde_json::to_string(input.completed_payload)?;
745        let tx = self
746            .connection_mut()
747            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
748            .context("failed to begin durable completion transaction")?;
749        tx.execute(
750            r#"
751            UPDATE sw_workflow_attempts
752            SET state = 'completed',
753                completed_at = ?1
754            WHERE attempt_id = ?2
755            "#,
756            rusqlite::params![input.now, input.attempt_id],
757        )
758        .context("failed to mark durable attempt completed")?;
759        tx.execute(
760            r#"
761            UPDATE sw_workflow_runs
762            SET state = 'completed',
763                budget_spent = ?1,
764                completed_payload_json = ?2,
765                updated_at = ?3
766            WHERE run_id = ?4
767            "#,
768            rusqlite::params![input.budget_spent as i64, payload, input.now, input.run_id],
769        )
770        .context("failed to mark durable run completed")?;
771        tx.execute(
772            r#"
773            UPDATE sw_workflow_tasks
774            SET state = 'completed',
775                completed_payload_json = ?1,
776                lease_expires_at = NULL,
777                updated_at = ?2
778            WHERE task_id = ?3
779            "#,
780            rusqlite::params![payload, input.now, input.task_id],
781        )
782        .context("failed to mark durable task completed")?;
783        tx.commit()
784            .context("failed to commit durable completion transaction")
785    }
786
787    pub fn fail_attempt(&mut self, input: LocalAttemptFail<'_>) -> anyhow::Result<()> {
788        let failure = serde_json::to_string(input.failure_reason)?;
789        let tx = self
790            .connection_mut()
791            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
792            .context("failed to begin durable failure transaction")?;
793        tx.execute(
794            r#"
795            UPDATE sw_workflow_attempts
796            SET state = 'failed',
797                completed_at = ?1,
798                failure_reason_json = ?2
799            WHERE attempt_id = ?3
800            "#,
801            rusqlite::params![input.now, failure, input.attempt_id],
802        )
803        .context("failed to mark durable attempt failed")?;
804        let next_state = if input.terminal { "failed" } else { "pending" };
805        tx.execute(
806            r#"
807            UPDATE sw_workflow_runs
808            SET state = ?1,
809                failure_reason_json = ?2,
810                updated_at = ?3
811            WHERE run_id = ?4
812            "#,
813            rusqlite::params![next_state, failure, input.now, input.run_id],
814        )
815        .context("failed to update durable run failure state")?;
816        tx.execute(
817            r#"
818            UPDATE sw_workflow_tasks
819            SET state = ?1,
820                failure_reason_json = ?2,
821                lease_expires_at = NULL,
822                updated_at = ?3
823            WHERE task_id = ?4
824            "#,
825            rusqlite::params![next_state, failure, input.now, input.task_id],
826        )
827        .context("failed to update durable task failure state")?;
828        tx.commit()
829            .context("failed to commit durable failure transaction")
830    }
831
832    pub fn cancel_attempt_and_task(&mut self, input: LocalAttemptCancel<'_>) -> anyhow::Result<()> {
833        let failure = serde_json::to_string(input.failure_reason)?;
834        let tx = self
835            .connection_mut()
836            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
837            .context("failed to begin durable cancellation transaction")?;
838        tx.execute(
839            r#"
840            UPDATE sw_workflow_attempts
841            SET state = 'cancelled',
842                completed_at = ?1,
843                failure_reason_json = ?2
844            WHERE attempt_id = ?3
845            "#,
846            rusqlite::params![input.now, failure, input.attempt_id],
847        )
848        .context("failed to mark durable attempt cancelled")?;
849        tx.execute(
850            r#"
851            UPDATE sw_workflow_runs
852            SET state = 'cancelled',
853                failure_reason_json = ?1,
854                updated_at = ?2
855            WHERE run_id = ?3
856            "#,
857            rusqlite::params![failure, input.now, input.run_id],
858        )
859        .context("failed to mark durable run cancelled")?;
860        tx.execute(
861            r#"
862            UPDATE sw_workflow_tasks
863            SET state = 'cancelled',
864                failure_reason_json = ?1,
865                lease_expires_at = NULL,
866                updated_at = ?2
867            WHERE task_id = ?3
868            "#,
869            rusqlite::params![failure, input.now, input.task_id],
870        )
871        .context("failed to mark durable task cancelled")?;
872        tx.commit()
873            .context("failed to commit durable cancellation transaction")
874    }
875
876    pub fn claim_or_replay_agent_step(
877        &mut self,
878        input: AgentStepClaimInput<'_>,
879    ) -> anyhow::Result<AgentStepClaim> {
880        let tx = self
881            .connection_mut()
882            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
883            .context("failed to begin durable agent step claim transaction")?;
884
885        let existing = tx
886            .query_row(
887                r#"
888                SELECT step_id, state, input_signature_json, result_json, lease_expires_at
889                FROM sw_workflow_steps
890                WHERE run_id = ?1 AND checkpoint_name = ?2
891                "#,
892                rusqlite::params![input.run_id, input.checkpoint_name],
893                |row| {
894                    Ok((
895                        row.get::<_, String>(0)?,
896                        row.get::<_, String>(1)?,
897                        row.get::<_, String>(2)?,
898                        row.get::<_, Option<String>>(3)?,
899                        row.get::<_, Option<i64>>(4)?,
900                    ))
901                },
902            )
903            .optional()
904            .context("failed to query durable agent step")?;
905
906        if let Some((step_id, state, stored_signature, result_json, lease_expires_at)) = existing {
907            if stored_signature != input.input_signature_json {
908                bail!(
909                    "durable step input signature mismatch for {}",
910                    input.checkpoint_name
911                );
912            }
913            match state.as_str() {
914                "completed" => {
915                    let result_json = result_json.ok_or_else(|| {
916                        anyhow::anyhow!("completed durable agent step missing result_json")
917                    })?;
918                    let result = serde_json::from_str::<AgentProviderResult>(&result_json)
919                        .context("failed to deserialize durable agent result")?;
920                    tx.commit().context("failed to commit replay transaction")?;
921                    return Ok(AgentStepClaim::Replay(Box::new(result)));
922                }
923                "running" if lease_expires_at.is_some_and(|lease| lease > input.now) => {
924                    tx.commit().context("failed to commit wait transaction")?;
925                    return Ok(AgentStepClaim::Wait);
926                }
927                "running" | "failed" | "cancelled" | "pending" => {
928                    tx.execute(
929                        r#"
930                        UPDATE sw_workflow_steps
931                        SET state = 'running',
932                            worker_id = ?1,
933                            lease_expires_at = ?2,
934                            attempts = attempts + 1,
935                            last_attempt_at = ?3,
936                            updated_at = ?3,
937                            failure_reason_json = NULL
938                        WHERE step_id = ?4
939                        "#,
940                        rusqlite::params![
941                            input.worker_id,
942                            input.lease_expires_at,
943                            input.now,
944                            step_id
945                        ],
946                    )
947                    .context("failed to reclaim durable agent step")?;
948                    tx.commit()
949                        .context("failed to commit durable agent reclaim transaction")?;
950                    return Ok(AgentStepClaim::Run { step_id });
951                }
952                other => bail!("unknown durable step state: {other}"),
953            }
954        }
955
956        let step_id = new_id("step");
957        tx.execute(
958            r#"
959            INSERT INTO sw_workflow_steps (
960                step_id,
961                run_id,
962                root_run_id,
963                step_kind,
964                checkpoint_name,
965                input_signature_hash,
966                input_signature_json,
967                state,
968                input_json,
969                worker_id,
970                lease_expires_at,
971                attempts,
972                last_attempt_at,
973                created_at,
974                updated_at
975            )
976            VALUES (?1, ?2, ?3, 'agent', ?4, ?5, ?6, 'running', ?7, ?8, ?9, 1, ?10, ?10, ?10)
977            "#,
978            rusqlite::params![
979                step_id,
980                input.run_id,
981                input.root_run_id,
982                input.checkpoint_name,
983                input.input_signature_hash,
984                input.input_signature_json,
985                serde_json::to_string(input.input_json)?,
986                input.worker_id,
987                input.lease_expires_at,
988                input.now,
989            ],
990        )
991        .context("failed to insert durable agent step")?;
992        tx.commit()
993            .context("failed to commit durable agent step insert")?;
994        Ok(AgentStepClaim::Run { step_id })
995    }
996
997    pub fn complete_agent_step(&mut self, input: AgentStepCompleteInput<'_>) -> anyhow::Result<()> {
998        let result_json = serde_json::to_string(&compact_agent_result_for_replay(input.result))?;
999        let output_tokens = input
1000            .result
1001            .usage
1002            .as_ref()
1003            .and_then(|usage| usage.output_tokens)
1004            .unwrap_or(0);
1005        let usage_json = input
1006            .result
1007            .usage
1008            .as_ref()
1009            .map(serde_json::to_string)
1010            .transpose()?;
1011        let budget_entry_id = new_id("budget");
1012        let tx = self
1013            .connection_mut()
1014            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1015            .context("failed to begin durable agent step completion transaction")?;
1016        tx.execute(
1017            r#"
1018            UPDATE sw_workflow_steps
1019            SET state = 'completed',
1020                result_json = ?1,
1021                lease_expires_at = NULL,
1022                updated_at = ?2
1023            WHERE step_id = ?3
1024            "#,
1025            rusqlite::params![result_json, input.now, input.step_id],
1026        )
1027        .context("failed to mark durable agent step completed")?;
1028        tx.execute(
1029            r#"
1030            INSERT OR IGNORE INTO sw_budget_ledger (
1031                budget_entry_id,
1032                run_id,
1033                root_run_id,
1034                step_id,
1035                source_type,
1036                source_id,
1037                output_tokens,
1038                usage_json,
1039                created_at
1040            )
1041            VALUES (?1, ?2, ?3, ?4, 'agent_step', ?4, ?5, ?6, ?7)
1042            "#,
1043            rusqlite::params![
1044                budget_entry_id,
1045                input.run_id,
1046                input.root_run_id,
1047                input.step_id,
1048                output_tokens as i64,
1049                usage_json,
1050                input.now,
1051            ],
1052        )
1053        .context("failed to insert durable budget ledger entry")?;
1054        tx.commit()
1055            .context("failed to commit durable agent step completion")
1056    }
1057
1058    pub fn claim_or_replay_sleep_step(
1059        &mut self,
1060        input: SleepStepClaimInput<'_>,
1061    ) -> anyhow::Result<SleepStepClaim> {
1062        let tx = self
1063            .connection_mut()
1064            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
1065            .context("failed to begin durable sleep step claim transaction")?;
1066
1067        let existing = tx
1068            .query_row(
1069                r#"
1070                SELECT step_id, state, input_signature_json, input_json, lease_expires_at
1071                FROM sw_workflow_steps
1072                WHERE run_id = ?1 AND checkpoint_name = ?2
1073                "#,
1074                rusqlite::params![input.run_id, input.checkpoint_name],
1075                |row| {
1076                    Ok((
1077                        row.get::<_, String>(0)?,
1078                        row.get::<_, String>(1)?,
1079                        row.get::<_, String>(2)?,
1080                        row.get::<_, String>(3)?,
1081                        row.get::<_, Option<i64>>(4)?,
1082                    ))
1083                },
1084            )
1085            .optional()
1086            .context("failed to query durable sleep step")?;
1087
1088        if let Some((step_id, state, stored_signature, input_json, lease_expires_at)) = existing {
1089            if stored_signature != input.input_signature_json {
1090                bail!(
1091                    "durable sleep step input signature mismatch for {}",
1092                    input.checkpoint_name
1093                );
1094            }
1095            match state.as_str() {
1096                "completed" => {
1097                    tx.commit()
1098                        .context("failed to commit sleep replay transaction")?;
1099                    return Ok(SleepStepClaim::Replay);
1100                }
1101                "running" | "pending" | "failed" | "cancelled" => {
1102                    let input_value = serde_json::from_str::<Value>(&input_json)
1103                        .context("failed to deserialize durable sleep input")?;
1104                    let wake_at = input_value
1105                        .get("wakeAt")
1106                        .and_then(Value::as_i64)
1107                        .ok_or_else(|| anyhow::anyhow!("durable sleep step missing wakeAt"))?;
1108                    if wake_at < 0 {
1109                        log::warn!(
1110                            "durable sleep step {} has negative wakeAt {}; resolving immediately",
1111                            step_id,
1112                            wake_at
1113                        );
1114                    }
1115                    if state == "running"
1116                        && lease_expires_at.is_some_and(|lease| lease > input.now)
1117                        && wake_at > input.now
1118                    {
1119                        tx.commit()
1120                            .context("failed to commit sleep wait transaction")?;
1121                        return Ok(SleepStepClaim::WaitUntil { step_id, wake_at });
1122                    }
1123                    tx.execute(
1124                        r#"
1125                        UPDATE sw_workflow_steps
1126                        SET state = 'running',
1127                            worker_id = ?1,
1128                            lease_expires_at = ?2,
1129                            attempts = attempts + 1,
1130                            last_attempt_at = ?3,
1131                            updated_at = ?3,
1132                            failure_reason_json = NULL
1133                        WHERE step_id = ?4
1134                        "#,
1135                        rusqlite::params![
1136                            input.worker_id,
1137                            input.lease_expires_at,
1138                            input.now,
1139                            step_id
1140                        ],
1141                    )
1142                    .context("failed to reclaim durable sleep step")?;
1143                    tx.commit()
1144                        .context("failed to commit durable sleep reclaim transaction")?;
1145                    return Ok(SleepStepClaim::WaitUntil { step_id, wake_at });
1146                }
1147                other => bail!("unknown durable step state: {other}"),
1148            }
1149        }
1150
1151        let step_id = new_id("step");
1152        let duration_ms_i64 =
1153            i64::try_from(input.duration_ms).context("sleep duration is too large to persist")?;
1154        let wake_at = input.now.saturating_add(duration_ms_i64);
1155        let input_json = serde_json::json!({
1156            "durationMs": input.duration_ms,
1157            "wakeAt": wake_at,
1158        });
1159        tx.execute(
1160            r#"
1161            INSERT INTO sw_workflow_steps (
1162                step_id,
1163                run_id,
1164                root_run_id,
1165                step_kind,
1166                checkpoint_name,
1167                input_signature_hash,
1168                input_signature_json,
1169                state,
1170                input_json,
1171                worker_id,
1172                lease_expires_at,
1173                attempts,
1174                last_attempt_at,
1175                created_at,
1176                updated_at
1177            )
1178            VALUES (?1, ?2, ?3, 'sleep', ?4, ?5, ?6, 'running', ?7, ?8, ?9, 1, ?10, ?10, ?10)
1179            "#,
1180            rusqlite::params![
1181                step_id,
1182                input.run_id,
1183                input.root_run_id,
1184                input.checkpoint_name,
1185                input.input_signature_hash,
1186                input.input_signature_json,
1187                serde_json::to_string(&input_json)?,
1188                input.worker_id,
1189                input.lease_expires_at,
1190                input.now,
1191            ],
1192        )
1193        .context("failed to insert durable sleep step")?;
1194        tx.commit()
1195            .context("failed to commit durable sleep step insert")?;
1196        Ok(SleepStepClaim::WaitUntil { step_id, wake_at })
1197    }
1198
1199    pub fn complete_sleep_step(&mut self, input: SleepStepCompleteInput<'_>) -> anyhow::Result<()> {
1200        let result_json = serde_json::to_string(&serde_json::json!({
1201            "ok": true,
1202            "durationMs": input.duration_ms,
1203            "wakeAt": input.wake_at,
1204            "completedAt": input.now,
1205        }))?;
1206        let updated = self
1207            .connection_mut()
1208            .execute(
1209                r#"
1210                UPDATE sw_workflow_steps
1211                SET state = 'completed',
1212                    result_json = ?1,
1213                    lease_expires_at = NULL,
1214                    updated_at = ?2
1215                WHERE step_id = ?3
1216                  AND state = 'running'
1217                "#,
1218                rusqlite::params![result_json, input.now, input.step_id],
1219            )
1220            .context("failed to mark durable sleep step completed")?;
1221        if updated == 1 {
1222            return Ok(());
1223        }
1224
1225        let state = self
1226            .connection()
1227            .query_row(
1228                "SELECT state FROM sw_workflow_steps WHERE step_id = ?1",
1229                rusqlite::params![input.step_id],
1230                |row| row.get::<_, String>(0),
1231            )
1232            .optional()
1233            .context("failed to inspect durable sleep step after completion race")?;
1234        match state.as_deref() {
1235            Some("completed") => Ok(()),
1236            Some(state) => bail!("cannot complete durable sleep step in state {state}"),
1237            None => bail!(
1238                "cannot complete missing durable sleep step {}",
1239                input.step_id
1240            ),
1241        }
1242    }
1243
1244    pub fn fail_agent_step(&mut self, input: AgentStepFailInput<'_>) -> anyhow::Result<()> {
1245        let failure = serde_json::to_string(input.failure_reason)?;
1246        self.connection_mut()
1247            .execute(
1248                r#"
1249                UPDATE sw_workflow_steps
1250                SET state = 'failed',
1251                    failure_reason_json = ?1,
1252                    lease_expires_at = NULL,
1253                    updated_at = ?2
1254                WHERE step_id = ?3
1255                "#,
1256                rusqlite::params![failure, input.now, input.step_id],
1257            )
1258            .context("failed to mark durable agent step failed")?;
1259        Ok(())
1260    }
1261}
1262
1263#[derive(Debug)]
1264pub enum AgentStepClaim {
1265    Replay(Box<AgentProviderResult>),
1266    Run { step_id: String },
1267    Wait,
1268}
1269
1270#[derive(Debug)]
1271pub enum SleepStepClaim {
1272    Replay,
1273    WaitUntil { step_id: String, wake_at: i64 },
1274}
1275
1276fn compact_agent_result_for_replay(result: &AgentProviderResult) -> AgentProviderResult {
1277    AgentProviderResult {
1278        output: result.output.clone(),
1279        session_id: result.session_id.clone(),
1280        model: result.model.clone(),
1281        usage: result.usage.clone(),
1282        isolation: result.isolation.clone(),
1283        raw: None,
1284    }
1285}
1286
1287pub struct AgentStepClaimInput<'a> {
1288    pub run_id: &'a str,
1289    pub root_run_id: &'a str,
1290    pub checkpoint_name: &'a str,
1291    pub input_signature_hash: &'a str,
1292    pub input_signature_json: &'a str,
1293    pub input_json: &'a Value,
1294    pub worker_id: &'a str,
1295    pub lease_expires_at: i64,
1296    pub now: i64,
1297}
1298
1299pub struct AgentStepCompleteInput<'a> {
1300    pub step_id: &'a str,
1301    pub run_id: &'a str,
1302    pub root_run_id: &'a str,
1303    pub result: &'a AgentProviderResult,
1304    pub now: i64,
1305}
1306
1307pub struct AgentStepFailInput<'a> {
1308    pub step_id: &'a str,
1309    pub failure_reason: &'a Value,
1310    pub now: i64,
1311}
1312
1313pub struct SleepStepClaimInput<'a> {
1314    pub run_id: &'a str,
1315    pub root_run_id: &'a str,
1316    pub checkpoint_name: &'a str,
1317    pub input_signature_hash: &'a str,
1318    pub input_signature_json: &'a str,
1319    pub duration_ms: u64,
1320    pub worker_id: &'a str,
1321    pub lease_expires_at: i64,
1322    pub now: i64,
1323}
1324
1325pub struct SleepStepCompleteInput<'a> {
1326    pub step_id: &'a str,
1327    pub duration_ms: u64,
1328    pub wake_at: i64,
1329    pub now: i64,
1330}
1331
1332async fn run_durable_agent_provider(
1333    default_provider: Arc<dyn AgentProvider>,
1334    provider_override: Option<String>,
1335    input: AgentProviderRunInput,
1336    cancel_rx: Option<watch::Receiver<bool>>,
1337) -> anyhow::Result<AgentProviderResult> {
1338    run_agent_provider_with_retry(default_provider, provider_override, input, cancel_rx).await
1339}
1340
1341fn agent_input_signature(provider_name: &str, input: &AgentProviderRunInput) -> Value {
1342    serde_json::json!({
1343        "signatureVersion": 2,
1344        "kind": "agent",
1345        "workflowScope": "root",
1346        "provider": provider_name,
1347        "prompt": input.prompt,
1348        "options": input.options,
1349        "context": {
1350            "phase": input.context.phase,
1351            "cwd": input.context.cwd.as_ref().map(|path| path.to_string_lossy().into_owned()),
1352        }
1353    })
1354}
1355
1356fn sleep_input_signature(duration_ms: u64) -> Value {
1357    serde_json::json!({
1358        "signatureVersion": 1,
1359        "kind": "sleep",
1360        "workflowScope": "root",
1361        "durationMs": duration_ms,
1362    })
1363}
1364
1365fn short_blake3_hex(input: &str) -> String {
1366    let hash = blake3::hash(input.as_bytes());
1367    hash.as_bytes()[..12]
1368        .iter()
1369        .map(|byte| format!("{byte:02x}"))
1370        .collect()
1371}
1372
1373fn canonical_json_string(value: &Value) -> anyhow::Result<String> {
1374    let canonical = canonical_json_value(value);
1375    serde_json::to_string(&canonical).context("failed to serialize canonical JSON")
1376}
1377
1378fn canonical_json_value(value: &Value) -> Value {
1379    match value {
1380        Value::Object(object) => {
1381            let mut sorted = BTreeMap::new();
1382            for (key, value) in object {
1383                sorted.insert(key.clone(), canonical_json_value(value));
1384            }
1385            Value::Object(sorted.into_iter().collect())
1386        }
1387        Value::Array(array) => Value::Array(array.iter().map(canonical_json_value).collect()),
1388        value => value.clone(),
1389    }
1390}
1391
1392#[cfg(test)]
1393mod tests {
1394    use super::*;
1395    use crate::agent_providers::{
1396        AgentProvider, AgentProviderResult, AgentProviderRunInput, AgentProviderSchemaMode,
1397        AgentProviderUsageMode,
1398    };
1399    use serde_json::json;
1400    use std::fs;
1401    use std::sync::{
1402        atomic::{AtomicUsize, Ordering},
1403        Arc, Barrier, Mutex,
1404    };
1405    use std::thread;
1406    use std::time::{Duration, Instant};
1407
1408    struct CountingProvider {
1409        calls: AtomicUsize,
1410    }
1411
1412    struct FlakyOnceProvider {
1413        calls: AtomicUsize,
1414    }
1415
1416    struct TimedProvider {
1417        events: Mutex<Vec<TimedProviderEvent>>,
1418    }
1419
1420    struct SessionProvider {
1421        session_id: &'static str,
1422        cancel_on_run: Option<watch::Sender<bool>>,
1423        delay_ms: u64,
1424    }
1425
1426    #[derive(Debug, Clone)]
1427    struct TimedProviderEvent {
1428        prompt: String,
1429        kind: &'static str,
1430        at: Instant,
1431    }
1432
1433    #[async_trait::async_trait]
1434    impl AgentProvider for CountingProvider {
1435        fn name(&self) -> &str {
1436            "counting"
1437        }
1438        fn schema_mode(&self) -> AgentProviderSchemaMode {
1439            AgentProviderSchemaMode::Builtin
1440        }
1441        fn usage_mode(&self) -> AgentProviderUsageMode {
1442            AgentProviderUsageMode::Builtin
1443        }
1444        async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1445            let count = self.calls.fetch_add(1, Ordering::SeqCst) + 1;
1446            Ok(AgentProviderResult {
1447                output: json!(format!("{}:{count}", input.prompt)),
1448                session_id: None,
1449                model: None,
1450                usage: None,
1451                isolation: None,
1452                raw: None,
1453            })
1454        }
1455    }
1456
1457    #[async_trait::async_trait]
1458    impl AgentProvider for FlakyOnceProvider {
1459        fn name(&self) -> &str {
1460            "flaky-once"
1461        }
1462        fn schema_mode(&self) -> AgentProviderSchemaMode {
1463            AgentProviderSchemaMode::Builtin
1464        }
1465        fn usage_mode(&self) -> AgentProviderUsageMode {
1466            AgentProviderUsageMode::Builtin
1467        }
1468        async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1469            let call = self.calls.fetch_add(1, Ordering::SeqCst) + 1;
1470            if call == 1 {
1471                bail!("temporary provider failure")
1472            }
1473            Ok(AgentProviderResult {
1474                output: json!(format!("recovered: {}", input.prompt)),
1475                session_id: None,
1476                model: None,
1477                usage: None,
1478                isolation: None,
1479                raw: None,
1480            })
1481        }
1482    }
1483
1484    #[async_trait::async_trait]
1485    impl AgentProvider for SessionProvider {
1486        fn name(&self) -> &str {
1487            "session"
1488        }
1489        fn schema_mode(&self) -> AgentProviderSchemaMode {
1490            AgentProviderSchemaMode::Builtin
1491        }
1492        fn usage_mode(&self) -> AgentProviderUsageMode {
1493            AgentProviderUsageMode::Builtin
1494        }
1495        async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1496            if let Some(cancel_tx) = &self.cancel_on_run {
1497                let _ = cancel_tx.send(true);
1498            }
1499            if self.delay_ms > 0 {
1500                tokio::time::sleep(Duration::from_millis(self.delay_ms)).await;
1501            }
1502            Ok(AgentProviderResult {
1503                output: json!(format!("session: {}", input.prompt)),
1504                session_id: Some(self.session_id.to_string()),
1505                model: Some("session-model".to_string()),
1506                usage: Some(crate::agent_providers::AgentUsage {
1507                    output_tokens: Some(1),
1508                    ..Default::default()
1509                }),
1510                isolation: None,
1511                raw: Some(json!({
1512                    "events": [
1513                        { "sessionId": self.session_id, "prompt": input.prompt }
1514                    ]
1515                })),
1516            })
1517        }
1518    }
1519
1520    #[async_trait::async_trait]
1521    impl AgentProvider for TimedProvider {
1522        fn name(&self) -> &str {
1523            "timed"
1524        }
1525        fn schema_mode(&self) -> AgentProviderSchemaMode {
1526            AgentProviderSchemaMode::Builtin
1527        }
1528        fn usage_mode(&self) -> AgentProviderUsageMode {
1529            AgentProviderUsageMode::Builtin
1530        }
1531        async fn run(&self, input: AgentProviderRunInput) -> anyhow::Result<AgentProviderResult> {
1532            let delay_ms = input
1533                .prompt
1534                .split(":delay:")
1535                .nth(1)
1536                .and_then(|suffix| suffix.split(':').next())
1537                .and_then(|value| value.parse::<u64>().ok())
1538                .unwrap_or(0);
1539            self.events.lock().unwrap().push(TimedProviderEvent {
1540                prompt: input.prompt.clone(),
1541                kind: "start",
1542                at: Instant::now(),
1543            });
1544            tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1545            self.events.lock().unwrap().push(TimedProviderEvent {
1546                prompt: input.prompt.clone(),
1547                kind: "end",
1548                at: Instant::now(),
1549            });
1550            Ok(AgentProviderResult {
1551                output: json!(format!("{}:done", input.prompt)),
1552                session_id: None,
1553                model: None,
1554                usage: Some(crate::agent_providers::AgentUsage {
1555                    output_tokens: Some(1),
1556                    ..Default::default()
1557                }),
1558                isolation: None,
1559                raw: None,
1560            })
1561        }
1562    }
1563
1564    struct TestSessionLogSink {
1565        root: std::path::PathBuf,
1566        saved_tx: Option<watch::Sender<bool>>,
1567    }
1568
1569    #[async_trait::async_trait]
1570    impl crate::workflow::AgentSessionLogSink for TestSessionLogSink {
1571        async fn write_agent_result(
1572            &self,
1573            provider: &str,
1574            result: &AgentProviderResult,
1575        ) -> anyhow::Result<()> {
1576            write_test_raw_session(&self.root, provider, result)?;
1577            if let Some(saved_tx) = &self.saved_tx {
1578                let _ = saved_tx.send(true);
1579            }
1580            Ok(())
1581        }
1582    }
1583
1584    fn write_test_raw_session(
1585        root: &std::path::Path,
1586        provider: &str,
1587        result: &AgentProviderResult,
1588    ) -> anyhow::Result<()> {
1589        let session_id = result
1590            .session_id
1591            .as_deref()
1592            .ok_or_else(|| anyhow::anyhow!("session id missing"))?;
1593        let events = result
1594            .raw
1595            .as_ref()
1596            .and_then(|raw| raw.get("events"))
1597            .and_then(Value::as_array)
1598            .ok_or_else(|| anyhow::anyhow!("raw events missing"))?;
1599        let dir = root.join(provider);
1600        fs::create_dir_all(&dir)?;
1601        let mut lines = String::new();
1602        for event in events {
1603            lines.push_str(&serde_json::to_string(event)?);
1604            lines.push('\n');
1605        }
1606        fs::write(dir.join(format!("{session_id}.jsonl")), lines)?;
1607        Ok(())
1608    }
1609
1610    fn seed_durable_run(db_path: &std::path::Path) -> (String, String) {
1611        let mut store = SqliteDurableStore::open(db_path).expect("store should open");
1612        store.init().expect("schema should initialize");
1613        let task_id = new_id("task");
1614        let run_id = new_id("run");
1615        store
1616            .insert_local_task_and_run(LocalTaskAndRunInsert {
1617                task_id: &task_id,
1618                run_id: &run_id,
1619                owner_id: "owner",
1620                params_json: &json!({ "scriptPath": "workflow.mjs" }),
1621                workflow_run_json: &json!({ "scriptPath": "workflow.mjs" }),
1622                args_json: &json!({}),
1623                budget_total: Some(100),
1624                max_attempts: 3,
1625                now: 1,
1626            })
1627            .expect("run should be inserted");
1628        (task_id, run_id)
1629    }
1630
1631    fn claim_input<'a>(
1632        run_id: &'a str,
1633        checkpoint_name: &'a str,
1634        worker_id: &'a str,
1635        lease_expires_at: i64,
1636        now: i64,
1637        input_json: &'a Value,
1638    ) -> AgentStepClaimInput<'a> {
1639        AgentStepClaimInput {
1640            run_id,
1641            root_run_id: run_id,
1642            checkpoint_name,
1643            input_signature_hash: "sig",
1644            input_signature_json: r#"{"prompt":"hello"}"#,
1645            input_json,
1646            worker_id,
1647            lease_expires_at,
1648            now,
1649        }
1650    }
1651
1652    #[test]
1653    fn durable_agent_retry_backoff_extends_step_lease_deadline() {
1654        let no_retry = crate::workflow::AgentRetryPolicy {
1655            max_attempts: 1,
1656            backoff_ms: 0,
1657        };
1658        let long_backoff = crate::workflow::AgentRetryPolicy {
1659            max_attempts: 3,
1660            backoff_ms: 120_000,
1661        };
1662        assert_eq!(durable_agent_step_lease_expires_at(1_000, no_retry), 61_000);
1663        assert_eq!(
1664            durable_agent_step_lease_expires_at(1_000, long_backoff),
1665            301_000
1666        );
1667    }
1668
1669    #[test]
1670    fn concurrent_agent_step_claims_allow_only_one_runner() {
1671        let dir = tempfile::tempdir().unwrap();
1672        let db_path = dir.path().join("durable.db");
1673        let (_task_id, run_id) = seed_durable_run(&db_path);
1674        let workers = 8;
1675        let barrier = Arc::new(Barrier::new(workers));
1676        let outcomes = Arc::new(Mutex::new(Vec::new()));
1677
1678        thread::scope(|scope| {
1679            for worker in 0..workers {
1680                let barrier = Arc::clone(&barrier);
1681                let outcomes = Arc::clone(&outcomes);
1682                let db_path = db_path.clone();
1683                let run_id = run_id.clone();
1684                scope.spawn(move || {
1685                    let input_json = json!({ "prompt": "hello" });
1686                    barrier.wait();
1687                    let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1688                    let claim = store
1689                        .claim_or_replay_agent_step(claim_input(
1690                            &run_id,
1691                            "agent:shared",
1692                            &format!("worker-{worker}"),
1693                            10_000,
1694                            100,
1695                            &input_json,
1696                        ))
1697                        .expect("claim should succeed");
1698                    let label = match claim {
1699                        AgentStepClaim::Run { .. } => "run",
1700                        AgentStepClaim::Wait => "wait",
1701                        AgentStepClaim::Replay(_) => "replay",
1702                    };
1703                    outcomes.lock().unwrap().push(label);
1704                });
1705            }
1706        });
1707
1708        let outcomes = outcomes.lock().unwrap();
1709        assert_eq!(
1710            outcomes.iter().filter(|&&outcome| outcome == "run").count(),
1711            1
1712        );
1713        assert_eq!(
1714            outcomes
1715                .iter()
1716                .filter(|&&outcome| outcome == "wait")
1717                .count(),
1718            workers - 1
1719        );
1720        assert_eq!(
1721            outcomes
1722                .iter()
1723                .filter(|&&outcome| outcome == "replay")
1724                .count(),
1725            0
1726        );
1727
1728        let store = SqliteDurableStore::open(&db_path).unwrap();
1729        let (steps, attempts): (i64, i64) = store
1730            .connection()
1731            .query_row(
1732                "SELECT COUNT(*), COALESCE(SUM(attempts), 0) FROM sw_workflow_steps",
1733                [],
1734                |row| Ok((row.get(0)?, row.get(1)?)),
1735            )
1736            .unwrap();
1737        assert_eq!(steps, 1);
1738        assert_eq!(attempts, 1);
1739    }
1740
1741    #[test]
1742    fn concurrent_agent_step_completion_writes_one_budget_ledger_entry() {
1743        let dir = tempfile::tempdir().unwrap();
1744        let db_path = dir.path().join("durable.db");
1745        let (_task_id, run_id) = seed_durable_run(&db_path);
1746        let input_json = json!({ "prompt": "hello" });
1747        let step_id = {
1748            let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1749            match store
1750                .claim_or_replay_agent_step(claim_input(
1751                    &run_id,
1752                    "agent:complete",
1753                    "worker-0",
1754                    10_000,
1755                    100,
1756                    &input_json,
1757                ))
1758                .expect("claim should succeed")
1759            {
1760                AgentStepClaim::Run { step_id } => step_id,
1761                other => panic!("expected run claim, got {other:?}"),
1762            }
1763        };
1764        let workers = 4;
1765        let barrier = Arc::new(Barrier::new(workers));
1766
1767        thread::scope(|scope| {
1768            for _ in 0..workers {
1769                let barrier = Arc::clone(&barrier);
1770                let db_path = db_path.clone();
1771                let run_id = run_id.clone();
1772                let step_id = step_id.clone();
1773                scope.spawn(move || {
1774                    let result = AgentProviderResult {
1775                        output: json!("done"),
1776                        session_id: None,
1777                        model: None,
1778                        usage: Some(crate::agent_providers::AgentUsage {
1779                            output_tokens: Some(7),
1780                            ..Default::default()
1781                        }),
1782                        isolation: None,
1783                        raw: None,
1784                    };
1785                    barrier.wait();
1786                    let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1787                    store
1788                        .complete_agent_step(AgentStepCompleteInput {
1789                            step_id: &step_id,
1790                            run_id: &run_id,
1791                            root_run_id: &run_id,
1792                            result: &result,
1793                            now: 200,
1794                        })
1795                        .expect("completion should be idempotent");
1796                });
1797            }
1798        });
1799
1800        let store = SqliteDurableStore::open(&db_path).unwrap();
1801        let (entries, tokens): (i64, i64) = store
1802            .connection()
1803            .query_row(
1804                "SELECT COUNT(*), COALESCE(SUM(output_tokens), 0) FROM sw_budget_ledger",
1805                [],
1806                |row| Ok((row.get(0)?, row.get(1)?)),
1807            )
1808            .unwrap();
1809        assert_eq!(entries, 1);
1810        assert_eq!(tokens, 7);
1811    }
1812
1813    #[test]
1814    fn completed_agent_step_persists_compact_replay_result_without_raw() {
1815        let dir = tempfile::tempdir().unwrap();
1816        let db_path = dir.path().join("durable.db");
1817        let (_task_id, run_id) = seed_durable_run(&db_path);
1818        let input_json = json!({ "prompt": "hello" });
1819        let step_id = {
1820            let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1821            match store
1822                .claim_or_replay_agent_step(claim_input(
1823                    &run_id,
1824                    "agent:compact",
1825                    "worker-0",
1826                    10_000,
1827                    100,
1828                    &input_json,
1829                ))
1830                .expect("claim should succeed")
1831            {
1832                AgentStepClaim::Run { step_id } => step_id,
1833                other => panic!("expected run claim, got {other:?}"),
1834            }
1835        };
1836        let result = AgentProviderResult {
1837            output: json!("done"),
1838            session_id: Some("provider-session-1".into()),
1839            model: Some("provider/model-from-result".into()),
1840            usage: Some(crate::agent_providers::AgentUsage {
1841                output_tokens: Some(7),
1842                ..Default::default()
1843            }),
1844            isolation: None,
1845            raw: Some(json!({ "events": ["large provider transcript"] })),
1846        };
1847
1848        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
1849        store
1850            .complete_agent_step(AgentStepCompleteInput {
1851                step_id: &step_id,
1852                run_id: &run_id,
1853                root_run_id: &run_id,
1854                result: &result,
1855                now: 200,
1856            })
1857            .expect("completion should succeed");
1858        let stored: String = store
1859            .connection()
1860            .query_row(
1861                "SELECT result_json FROM sw_workflow_steps WHERE step_id = ?1",
1862                [&step_id],
1863                |row| row.get(0),
1864            )
1865            .unwrap();
1866        let stored: Value = serde_json::from_str(&stored).unwrap();
1867
1868        assert_eq!(stored["output"], json!("done"));
1869        assert_eq!(stored["sessionId"], json!("provider-session-1"));
1870        assert_eq!(stored["model"], json!("provider/model-from-result"));
1871        assert_eq!(stored["usage"]["outputTokens"], json!(7));
1872        assert!(stored.get("raw").is_none());
1873    }
1874
1875    #[test]
1876    fn expired_agent_step_lease_can_be_reclaimed_from_another_connection() {
1877        let dir = tempfile::tempdir().unwrap();
1878        let db_path = dir.path().join("durable.db");
1879        let (_task_id, run_id) = seed_durable_run(&db_path);
1880        let input_json = json!({ "prompt": "hello" });
1881        let first_step_id = {
1882            let mut store = SqliteDurableStore::open(&db_path).unwrap();
1883            match store
1884                .claim_or_replay_agent_step(claim_input(
1885                    &run_id,
1886                    "agent:lease",
1887                    "worker-1",
1888                    100,
1889                    0,
1890                    &input_json,
1891                ))
1892                .unwrap()
1893            {
1894                AgentStepClaim::Run { step_id } => step_id,
1895                other => panic!("expected run claim, got {other:?}"),
1896            }
1897        };
1898
1899        let mut waiting_store = SqliteDurableStore::open(&db_path).unwrap();
1900        assert!(matches!(
1901            waiting_store
1902                .claim_or_replay_agent_step(claim_input(
1903                    &run_id,
1904                    "agent:lease",
1905                    "worker-2",
1906                    200,
1907                    50,
1908                    &input_json,
1909                ))
1910                .unwrap(),
1911            AgentStepClaim::Wait
1912        ));
1913
1914        let mut reclaiming_store = SqliteDurableStore::open(&db_path).unwrap();
1915        let reclaimed_step_id = match reclaiming_store
1916            .claim_or_replay_agent_step(claim_input(
1917                &run_id,
1918                "agent:lease",
1919                "worker-3",
1920                300,
1921                101,
1922                &input_json,
1923            ))
1924            .unwrap()
1925        {
1926            AgentStepClaim::Run { step_id } => step_id,
1927            other => panic!("expected run claim, got {other:?}"),
1928        };
1929        assert_eq!(reclaimed_step_id, first_step_id);
1930
1931        let attempts: i64 = reclaiming_store
1932            .connection()
1933            .query_row(
1934                "SELECT attempts FROM sw_workflow_steps WHERE step_id = ?1",
1935                rusqlite::params![reclaimed_step_id],
1936                |row| row.get(0),
1937            )
1938            .unwrap();
1939        assert_eq!(attempts, 2);
1940    }
1941
1942    #[tokio::test]
1943    async fn more_than_five_durable_workflows_share_db_while_steps_have_varied_durations() {
1944        async fn run_one(
1945            db_path: std::path::PathBuf,
1946            script_path: std::path::PathBuf,
1947            provider: Arc<TimedProvider>,
1948            id: usize,
1949            base_delay_ms: u64,
1950        ) -> anyhow::Result<LocalDurableRunResult> {
1951            let mut store = SqliteDurableStore::open(db_path)?;
1952            run_local_durable_workflow(
1953                &mut store,
1954                LocalDurableRunOptions::new(
1955                    script_path,
1956                    json!({ "id": id, "baseDelay": base_delay_ms }),
1957                    provider,
1958                ),
1959            )
1960            .await
1961        }
1962
1963        let dir = tempfile::tempdir().unwrap();
1964        let db_path = dir.path().join("durable.db");
1965        let script_path = dir.path().join("workflow.mjs");
1966        fs::write(
1967            &script_path,
1968            r#"
1969export const meta = { name: "durable-concurrent", description: "durable concurrent" };
1970const id = args.id;
1971const baseDelay = args.baseDelay;
1972const first = await agent(`wf:${id}:delay:${baseDelay}:first`);
1973const second = await agent(`wf:${id}:delay:${baseDelay + 30}:second`);
1974const third = await agent(`wf:${id}:delay:${baseDelay + 10}:third`);
1975export default { id, first, second, third };
1976"#,
1977        )
1978        .unwrap();
1979        let provider = Arc::new(TimedProvider {
1980            events: Mutex::new(Vec::new()),
1981        });
1982        let started = Instant::now();
1983        let (r0, r1, r2, r3, r4, r5) = tokio::join!(
1984            run_one(
1985                db_path.clone(),
1986                script_path.clone(),
1987                Arc::clone(&provider),
1988                0,
1989                40
1990            ),
1991            run_one(
1992                db_path.clone(),
1993                script_path.clone(),
1994                Arc::clone(&provider),
1995                1,
1996                55
1997            ),
1998            run_one(
1999                db_path.clone(),
2000                script_path.clone(),
2001                Arc::clone(&provider),
2002                2,
2003                70
2004            ),
2005            run_one(
2006                db_path.clone(),
2007                script_path.clone(),
2008                Arc::clone(&provider),
2009                3,
2010                85
2011            ),
2012            run_one(
2013                db_path.clone(),
2014                script_path.clone(),
2015                Arc::clone(&provider),
2016                4,
2017                100
2018            ),
2019            run_one(
2020                db_path.clone(),
2021                script_path.clone(),
2022                Arc::clone(&provider),
2023                5,
2024                115
2025            ),
2026        );
2027        let elapsed = started.elapsed();
2028        let results = [r0, r1, r2, r3, r4, r5]
2029            .into_iter()
2030            .collect::<Result<Vec<_>, _>>()
2031            .unwrap();
2032
2033        assert_eq!(results.len(), 6);
2034        for (id, result) in results.iter().enumerate() {
2035            assert_eq!(result.workflow.output.result["id"], json!(id));
2036            assert_eq!(result.attempts, 1);
2037        }
2038
2039        let events = provider.events.lock().unwrap().clone();
2040        assert_eq!(
2041            events.iter().filter(|event| event.kind == "start").count(),
2042            18
2043        );
2044        assert_eq!(
2045            events.iter().filter(|event| event.kind == "end").count(),
2046            18
2047        );
2048        assert!(events
2049            .iter()
2050            .any(|event| event.prompt == "wf:5:delay:145:second"));
2051        assert!(
2052            max_in_flight(&events) > 1,
2053            "expected overlapping provider steps, got events: {events:?}"
2054        );
2055        assert!(
2056            elapsed < Duration::from_millis(900),
2057            "runs should overlap on shared DB instead of serializing all varied sleeps; elapsed={elapsed:?}"
2058        );
2059
2060        let store = SqliteDurableStore::open(&db_path).unwrap();
2061        let (completed_runs, completed_tasks, completed_steps, budget_entries, output_tokens): (
2062            i64,
2063            i64,
2064            i64,
2065            i64,
2066            i64,
2067        ) = store
2068            .connection()
2069            .query_row(
2070                r#"
2071                SELECT
2072                  (SELECT COUNT(*) FROM sw_workflow_runs WHERE state = 'completed'),
2073                  (SELECT COUNT(*) FROM sw_workflow_tasks WHERE state = 'completed'),
2074                  (SELECT COUNT(*) FROM sw_workflow_steps WHERE state = 'completed'),
2075                  (SELECT COUNT(*) FROM sw_budget_ledger),
2076                  (SELECT COALESCE(SUM(output_tokens), 0) FROM sw_budget_ledger)
2077                "#,
2078                [],
2079                |row| {
2080                    Ok((
2081                        row.get(0)?,
2082                        row.get(1)?,
2083                        row.get(2)?,
2084                        row.get(3)?,
2085                        row.get(4)?,
2086                    ))
2087                },
2088            )
2089            .unwrap();
2090        assert_eq!(completed_runs, 6);
2091        assert_eq!(completed_tasks, 6);
2092        assert_eq!(completed_steps, 18);
2093        assert_eq!(budget_entries, 18);
2094        assert_eq!(output_tokens, 18);
2095    }
2096
2097    fn max_in_flight(events: &[TimedProviderEvent]) -> usize {
2098        let mut events = events.to_vec();
2099        events.sort_by_key(|event| (event.at, if event.kind == "start" { 0 } else { 1 }));
2100        let mut current = 0usize;
2101        let mut max = 0usize;
2102        for event in events {
2103            if event.kind == "start" {
2104                current += 1;
2105                max = max.max(current);
2106            } else {
2107                current = current.saturating_sub(1);
2108            }
2109        }
2110        max
2111    }
2112
2113    #[tokio::test]
2114    async fn local_durable_run_persists_sleep_step_without_budget_entry() {
2115        let dir = tempfile::tempdir().unwrap();
2116        let db_path = dir.path().join("durable.db");
2117        let script_path = dir.path().join("sleep.workflow.js");
2118        fs::write(
2119            &script_path,
2120            r#"
2121import { sleep } from "workflow:extra";
2122export const meta = { name: "durable-sleep", description: "durable sleep" };
2123await sleep(5);
2124export default { slept: true };
2125"#,
2126        )
2127        .unwrap();
2128
2129        let mut store = SqliteDurableStore::open(&db_path).unwrap();
2130        let result = run_local_durable_workflow(
2131            &mut store,
2132            LocalDurableRunOptions::new(
2133                script_path,
2134                json!({}),
2135                Arc::new(CountingProvider {
2136                    calls: AtomicUsize::new(0),
2137                }),
2138            ),
2139        )
2140        .await
2141        .expect("durable workflow should run");
2142
2143        assert_eq!(result.workflow.output.result, json!({ "slept": true }));
2144        let (sleep_steps, budget_entries): (i64, i64) = store
2145            .connection()
2146            .query_row(
2147                r#"
2148                SELECT
2149                  (SELECT COUNT(*) FROM sw_workflow_steps WHERE step_kind = 'sleep' AND state = 'completed'),
2150                  (SELECT COUNT(*) FROM sw_budget_ledger)
2151                "#,
2152                [],
2153                |row| Ok((row.get(0)?, row.get(1)?)),
2154            )
2155            .unwrap();
2156        assert_eq!(sleep_steps, 1);
2157        assert_eq!(budget_entries, 0);
2158    }
2159
2160    #[tokio::test]
2161    async fn local_durable_run_marks_cancelled_when_cancel_requested() {
2162        let dir = tempfile::tempdir().unwrap();
2163        let db_path = dir.path().join("durable.db");
2164        let script_path = dir.path().join("workflow.mjs");
2165        fs::write(
2166            &script_path,
2167            r#"
2168export const meta = { name: "durable-cancel", description: "durable cancel" };
2169export default { result: await agent("hello") };
2170"#,
2171        )
2172        .unwrap();
2173        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2174        let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(true);
2175        let mut options = LocalDurableRunOptions::new(
2176            script_path,
2177            json!({}),
2178            Arc::new(CountingProvider {
2179                calls: AtomicUsize::new(0),
2180            }),
2181        );
2182        options.cancel_rx = Some(cancel_rx);
2183
2184        let error = run_local_durable_workflow(&mut store, options)
2185            .await
2186            .expect_err("cancelled durable workflow should return an error");
2187        assert_eq!(error.to_string(), "workflow cancelled");
2188
2189        let (run_state, task_state, attempt_state): (String, String, String) = store
2190            .connection()
2191            .query_row(
2192                r#"
2193                SELECT r.state, t.state, a.state
2194                FROM sw_workflow_runs r
2195                JOIN sw_workflow_tasks t ON t.task_id = r.task_id
2196                JOIN sw_workflow_attempts a ON a.run_id = r.run_id
2197                "#,
2198                [],
2199                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2200            )
2201            .unwrap();
2202        assert_eq!(run_state, "cancelled");
2203        assert_eq!(task_state, "cancelled");
2204        assert_eq!(attempt_state, "cancelled");
2205    }
2206
2207    #[tokio::test]
2208    async fn local_durable_run_exports_raw_session_before_workflow_completes() {
2209        let dir = tempfile::tempdir().unwrap();
2210        let db_path = dir.path().join("durable.db");
2211        let raw_dir = dir.path().join("raw");
2212        let script_path = dir.path().join("workflow.mjs");
2213        fs::write(
2214            &script_path,
2215            r#"
2216import { sleep } from "workflow:extra";
2217export const meta = { name: "durable-early-session", description: "durable early session" };
2218await agent("before long sleep");
2219await sleep(100);
2220export default { ok: true };
2221"#,
2222        )
2223        .unwrap();
2224        let provider = Arc::new(SessionProvider {
2225            session_id: "early-session-1",
2226            cancel_on_run: None,
2227            delay_ms: 0,
2228        });
2229        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2230        let mut options = LocalDurableRunOptions::new(script_path, json!({}), provider);
2231        let (saved_tx, mut saved_rx) = tokio::sync::watch::channel(false);
2232        options.session_log_sink = Some(Arc::new(TestSessionLogSink {
2233            root: raw_dir.clone(),
2234            saved_tx: Some(saved_tx),
2235        }));
2236
2237        let run = run_local_durable_workflow(&mut store, options);
2238        tokio::pin!(run);
2239        tokio::select! {
2240            changed = saved_rx.changed() => {
2241                changed.expect("raw session notification should be sent");
2242                assert!(*saved_rx.borrow());
2243                assert!(
2244                    raw_dir.join("session/early-session-1.jsonl").exists(),
2245                    "raw session file should be written before workflow completion"
2246                );
2247            }
2248            result = &mut run => {
2249                panic!("workflow completed before raw session export notification: {result:?}");
2250            }
2251        }
2252
2253        let result = run.await.expect("workflow should complete");
2254        assert_eq!(result.workflow.output.result, json!({ "ok": true }));
2255    }
2256
2257    #[tokio::test]
2258    async fn local_durable_run_exports_raw_session_before_failed_terminal_state() {
2259        let dir = tempfile::tempdir().unwrap();
2260        let db_path = dir.path().join("durable.db");
2261        let raw_dir = dir.path().join("raw");
2262        let script_path = dir.path().join("workflow.mjs");
2263        fs::write(
2264            &script_path,
2265            r#"
2266export const meta = { name: "durable-fail-session", description: "durable fail session" };
2267await agent("before failure");
2268throw new Error("middle failure");
2269"#,
2270        )
2271        .unwrap();
2272        let provider = Arc::new(SessionProvider {
2273            session_id: "failed-session-1",
2274            cancel_on_run: None,
2275            delay_ms: 0,
2276        });
2277        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2278        let mut options = LocalDurableRunOptions::new(script_path, json!({}), provider);
2279        options.session_log_sink = Some(Arc::new(TestSessionLogSink {
2280            root: raw_dir.clone(),
2281            saved_tx: None,
2282        }));
2283
2284        let error = run_local_durable_workflow(&mut store, options)
2285            .await
2286            .expect_err("workflow should fail after exporting agent session");
2287        assert!(
2288            error.to_string().contains("middle failure"),
2289            "unexpected error: {error:#}"
2290        );
2291
2292        let raw_file = raw_dir.join("session/failed-session-1.jsonl");
2293        assert!(raw_file.exists(), "raw session file should be written");
2294        let raw_line = fs::read_to_string(raw_file).unwrap();
2295        assert!(raw_line.contains("before failure"));
2296        let run_state: String = store
2297            .connection()
2298            .query_row("SELECT state FROM sw_workflow_runs", [], |row| row.get(0))
2299            .unwrap();
2300        assert_eq!(run_state, "failed");
2301    }
2302
2303    #[tokio::test]
2304    async fn local_durable_run_exports_raw_session_before_cancelled_state() {
2305        let dir = tempfile::tempdir().unwrap();
2306        let db_path = dir.path().join("durable.db");
2307        let raw_dir = dir.path().join("raw");
2308        let script_path = dir.path().join("workflow.mjs");
2309        fs::write(
2310            &script_path,
2311            r#"
2312export const meta = { name: "durable-cancel-session", description: "durable cancel session" };
2313export default { result: await agent("before cancel") };
2314"#,
2315        )
2316        .unwrap();
2317        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2318        let provider = Arc::new(SessionProvider {
2319            session_id: "cancelled-session-1",
2320            cancel_on_run: Some(cancel_tx),
2321            delay_ms: 10,
2322        });
2323        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2324        let mut options = LocalDurableRunOptions::new(script_path, json!({}), provider);
2325        options.cancel_rx = Some(cancel_rx);
2326        options.session_log_sink = Some(Arc::new(TestSessionLogSink {
2327            root: raw_dir.clone(),
2328            saved_tx: None,
2329        }));
2330
2331        let error = run_local_durable_workflow(&mut store, options)
2332            .await
2333            .expect_err("workflow should be cancelled after exporting agent session");
2334        assert_eq!(error.to_string(), "workflow cancelled");
2335
2336        let raw_file = raw_dir.join("session/cancelled-session-1.jsonl");
2337        assert!(raw_file.exists(), "raw session file should be written");
2338        let raw_line = fs::read_to_string(raw_file).unwrap();
2339        assert!(raw_line.contains("before cancel"));
2340        let run_state: String = store
2341            .connection()
2342            .query_row("SELECT state FROM sw_workflow_runs", [], |row| row.get(0))
2343            .unwrap();
2344        assert_eq!(run_state, "cancelled");
2345    }
2346
2347    #[tokio::test]
2348    async fn local_durable_run_persists_successful_task_and_run() {
2349        let dir = tempfile::tempdir().unwrap();
2350        let db_path = dir.path().join("durable.db");
2351        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2352        let script_path = dir.path().join("workflow.mjs");
2353        fs::write(
2354            &script_path,
2355            r#"
2356export const meta = { name: "durable-local", description: "durable local" };
2357export default { result: await agent("hello") };
2358"#,
2359        )
2360        .unwrap();
2361        let provider = Arc::new(CountingProvider {
2362            calls: AtomicUsize::new(0),
2363        });
2364        let result = run_local_durable_workflow(
2365            &mut store,
2366            LocalDurableRunOptions::new(script_path, json!({}), provider.clone()),
2367        )
2368        .await
2369        .unwrap();
2370        assert_eq!(
2371            result.workflow.output.result,
2372            json!({ "result": "hello:1" })
2373        );
2374        assert_eq!(result.attempts, 1);
2375        assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2376
2377        let state: String = store
2378            .connection()
2379            .query_row(
2380                "SELECT state FROM sw_workflow_tasks WHERE task_id = ?1",
2381                rusqlite::params![result.task_id],
2382                |row| row.get(0),
2383            )
2384            .unwrap();
2385        assert_eq!(state, "completed");
2386        let run_state: String = store
2387            .connection()
2388            .query_row(
2389                "SELECT state FROM sw_workflow_runs WHERE run_id = ?1",
2390                rusqlite::params![result.run_id],
2391                |row| row.get(0),
2392            )
2393            .unwrap();
2394        assert_eq!(run_state, "completed");
2395        let completed_steps: i64 = store
2396            .connection()
2397            .query_row(
2398                "SELECT COUNT(*) FROM sw_workflow_steps WHERE state = 'completed'",
2399                [],
2400                |row| row.get(0),
2401            )
2402            .unwrap();
2403        assert_eq!(completed_steps, 1);
2404    }
2405
2406    #[tokio::test]
2407    async fn local_durable_run_uses_runtime_agent_retry_without_global_retry() {
2408        let dir = tempfile::tempdir().unwrap();
2409        let db_path = dir.path().join("durable.db");
2410        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2411        let script_path = dir.path().join("workflow.mjs");
2412        fs::write(
2413            &script_path,
2414            r#"
2415export const meta = { name: "durable-runtime-retry", description: "durable runtime retry" };
2416export default { result: await agent("hello", { retry: { maxAttempts: 2, backoffMs: 0 } }) };
2417"#,
2418        )
2419        .unwrap();
2420        let provider = Arc::new(FlakyOnceProvider {
2421            calls: AtomicUsize::new(0),
2422        });
2423
2424        let result = run_local_durable_workflow(
2425            &mut store,
2426            LocalDurableRunOptions::new(script_path, json!({}), provider.clone()),
2427        )
2428        .await
2429        .unwrap();
2430        assert_eq!(
2431            result.workflow.output.result,
2432            json!({ "result": "recovered: hello" })
2433        );
2434        assert_eq!(provider.calls.load(Ordering::SeqCst), 2);
2435
2436        let (workflow_attempts, completed_steps, failed_steps): (i64, i64, i64) = store
2437            .connection()
2438            .query_row(
2439                r#"
2440                SELECT
2441                  (SELECT COUNT(*) FROM sw_workflow_attempts),
2442                  (SELECT COUNT(*) FROM sw_workflow_steps WHERE step_kind = 'agent' AND state = 'completed'),
2443                  (SELECT COUNT(*) FROM sw_workflow_steps WHERE step_kind = 'agent' AND state = 'failed')
2444                "#,
2445                [],
2446                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2447            )
2448            .unwrap();
2449        assert_eq!(workflow_attempts, 1);
2450        assert_eq!(completed_steps, 1);
2451        assert_eq!(failed_steps, 0);
2452    }
2453
2454    #[tokio::test]
2455    async fn local_durable_run_records_one_attempt_without_global_retry() {
2456        let dir = tempfile::tempdir().unwrap();
2457        let db_path = dir.path().join("durable.db");
2458        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2459        let script_path = dir.path().join("workflow.mjs");
2460        fs::write(
2461            &script_path,
2462            r#"
2463export const meta = { name: "durable-no-global-retry", description: "durable no global retry" };
2464await agent("hello");
2465throw new Error("boom");
2466export default { unreachable: true };
2467"#,
2468        )
2469        .unwrap();
2470        let provider = Arc::new(CountingProvider {
2471            calls: AtomicUsize::new(0),
2472        });
2473        let options = LocalDurableRunOptions::new(script_path, json!({}), provider.clone());
2474
2475        let error = run_local_durable_workflow(&mut store, options)
2476            .await
2477            .unwrap_err();
2478        assert!(error.to_string().contains("boom"));
2479        assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2480
2481        let attempts: i64 = store
2482            .connection()
2483            .query_row("SELECT COUNT(*) FROM sw_workflow_attempts", [], |row| {
2484                row.get(0)
2485            })
2486            .unwrap();
2487        assert_eq!(attempts, 1);
2488        let completed_steps: i64 = store
2489            .connection()
2490            .query_row(
2491                "SELECT COUNT(*) FROM sw_workflow_steps WHERE state = 'completed'",
2492                [],
2493                |row| row.get(0),
2494            )
2495            .unwrap();
2496        assert_eq!(completed_steps, 1);
2497    }
2498
2499    #[test]
2500    fn prepare_resume_run_reports_missing_run_id_and_database() {
2501        let dir = tempfile::tempdir().unwrap();
2502        let db_path = dir.path().join("durable.db");
2503        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2504        store.init().expect("schema should initialize");
2505
2506        let error = store
2507            .prepare_resume_run("run_missing", "owner", 1)
2508            .unwrap_err()
2509            .to_string();
2510
2511        assert!(error.contains("workflow run run_missing was not found"));
2512        assert!(error.contains(&db_path.display().to_string()));
2513        assert!(error.contains("check --db"));
2514    }
2515
2516    #[tokio::test]
2517    async fn local_durable_run_resumes_existing_run_and_replays_steps() {
2518        let dir = tempfile::tempdir().unwrap();
2519        let db_path = dir.path().join("durable.db");
2520        let mut store = SqliteDurableStore::open(&db_path).expect("store should open");
2521        let script_path = dir.path().join("workflow.mjs");
2522        fs::write(
2523            &script_path,
2524            r#"
2525export const meta = { name: "durable-resume", description: "durable resume" };
2526const value = await agent("hello");
2527throw new Error("boom");
2528export default { value };
2529"#,
2530        )
2531        .unwrap();
2532        let provider = Arc::new(CountingProvider {
2533            calls: AtomicUsize::new(0),
2534        });
2535        let first_options =
2536            LocalDurableRunOptions::new(script_path.clone(), json!({}), provider.clone());
2537        let first_error = run_local_durable_workflow(&mut store, first_options)
2538            .await
2539            .unwrap_err();
2540        assert!(first_error.to_string().contains("boom"));
2541        assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2542
2543        let run_id: String = store
2544            .connection()
2545            .query_row("SELECT run_id FROM sw_workflow_runs", [], |row| row.get(0))
2546            .unwrap();
2547
2548        fs::write(
2549            &script_path,
2550            r#"
2551export const meta = { name: "durable-resume", description: "durable resume" };
2552const value = await agent("hello");
2553export default { value };
2554"#,
2555        )
2556        .unwrap();
2557        let mut resume_options =
2558            LocalDurableRunOptions::new(script_path.clone(), json!({}), provider.clone());
2559        resume_options.resume_run_id = Some(run_id);
2560        let resumed = run_local_durable_workflow(&mut store, resume_options)
2561            .await
2562            .unwrap();
2563        assert_eq!(
2564            resumed.workflow.output.result,
2565            json!({ "value": "hello:1" })
2566        );
2567        assert_eq!(provider.calls.load(Ordering::SeqCst), 1);
2568    }
2569}