Skip to main content

assay_workflow/
engine.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use tokio::task::JoinHandle;
5use tracing::info;
6
7use crate::dispatch_recovery;
8use crate::health;
9use crate::scheduler;
10use crate::store::WorkflowStore;
11use crate::timers;
12use crate::types::*;
13
14/// The workflow engine. Owns the store and manages background tasks
15/// (scheduler, timer poller, health monitor).
16///
17/// The API layer holds an `Arc<Engine<S>>` and delegates all operations here.
18pub struct Engine<S: WorkflowStore> {
19    store: Arc<S>,
20    _scheduler: JoinHandle<()>,
21    _timer_poller: JoinHandle<()>,
22    _health_monitor: JoinHandle<()>,
23    _dispatch_recovery: JoinHandle<()>,
24}
25
26impl<S: WorkflowStore> Engine<S> {
27    /// Start the engine with all background tasks.
28    pub fn start(store: S) -> Self {
29        let store = Arc::new(store);
30
31        let _scheduler = tokio::spawn(scheduler::run_scheduler(Arc::clone(&store)));
32        let _timer_poller = tokio::spawn(timers::run_timer_poller(Arc::clone(&store)));
33        let _health_monitor = tokio::spawn(health::run_health_monitor(Arc::clone(&store)));
34        let _dispatch_recovery = tokio::spawn(dispatch_recovery::run_dispatch_recovery(
35            Arc::clone(&store),
36        ));
37
38        info!("Workflow engine started");
39
40        Self {
41            store,
42            _scheduler,
43            _timer_poller,
44            _health_monitor,
45            _dispatch_recovery,
46        }
47    }
48
49    /// Access the underlying store (for the API layer).
50    pub fn store(&self) -> &S {
51        &self.store
52    }
53
54    // ── Workflow Operations ─────────────────────────────────
55
56    pub async fn start_workflow(
57        &self,
58        namespace: &str,
59        workflow_type: &str,
60        workflow_id: &str,
61        input: Option<&str>,
62        task_queue: &str,
63    ) -> Result<WorkflowRecord> {
64        let now = timestamp_now();
65        let run_id = format!("run-{workflow_id}-{}", now as u64);
66
67        let wf = WorkflowRecord {
68            id: workflow_id.to_string(),
69            namespace: namespace.to_string(),
70            run_id,
71            workflow_type: workflow_type.to_string(),
72            task_queue: task_queue.to_string(),
73            status: "PENDING".to_string(),
74            input: input.map(String::from),
75            result: None,
76            error: None,
77            parent_id: None,
78            claimed_by: None,
79            created_at: now,
80            updated_at: now,
81            completed_at: None,
82        };
83
84        self.store.create_workflow(&wf).await?;
85
86        self.store
87            .append_event(&WorkflowEvent {
88                id: None,
89                workflow_id: workflow_id.to_string(),
90                seq: 1,
91                event_type: "WorkflowStarted".to_string(),
92                payload: input.map(String::from),
93                timestamp: now,
94            })
95            .await?;
96
97        // Phase 9: a freshly-started workflow has new events (WorkflowStarted)
98        // that need a worker to replay against — make it dispatchable.
99        self.store.mark_workflow_dispatchable(workflow_id).await?;
100
101        Ok(wf)
102    }
103
104    pub async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
105        self.store.get_workflow(id).await
106    }
107
108    pub async fn list_workflows(
109        &self,
110        namespace: &str,
111        status: Option<WorkflowStatus>,
112        workflow_type: Option<&str>,
113        limit: i64,
114        offset: i64,
115    ) -> Result<Vec<WorkflowRecord>> {
116        self.store
117            .list_workflows(namespace, status, workflow_type, limit, offset)
118            .await
119    }
120
121    pub async fn cancel_workflow(&self, id: &str) -> Result<bool> {
122        let wf = self.store.get_workflow(id).await?;
123        match wf {
124            None => Ok(false),
125            Some(wf) => {
126                let status = WorkflowStatus::from_str(&wf.status)
127                    .map_err(|e| anyhow::anyhow!(e))?;
128                if status.is_terminal() {
129                    return Ok(false);
130                }
131
132                // Two-phase cancel:
133                //   1. Append WorkflowCancelRequested + mark dispatchable.
134                //      The next worker replay sees the request, raises a
135                //      cancellation error inside the handler, and submits
136                //      a CancelWorkflow command.
137                //   2. CancelWorkflow command flips status to CANCELLED,
138                //      cancels pending activities/timers, appends
139                //      WorkflowCancelled.
140                //
141                // We cancel pending activities + timers up-front too so a
142                // worker that's about to claim them sees CANCELLED instead.
143                self.store.cancel_pending_activities(id).await?;
144                self.store.cancel_pending_timers(id).await?;
145
146                let seq = self.store.get_event_count(id).await? as i32 + 1;
147                self.store
148                    .append_event(&WorkflowEvent {
149                        id: None,
150                        workflow_id: id.to_string(),
151                        seq,
152                        event_type: "WorkflowCancelRequested".to_string(),
153                        payload: None,
154                        timestamp: timestamp_now(),
155                    })
156                    .await?;
157
158                self.store.mark_workflow_dispatchable(id).await?;
159
160                // Propagate cancellation to all child workflows
161                let children = self.store.list_child_workflows(id).await?;
162                for child in children {
163                    Box::pin(self.cancel_workflow(&child.id)).await?;
164                }
165
166                // For workflows that have NO worker registered (or no
167                // handler running), cancellation would never complete.
168                // Fall back: if the workflow has no events past
169                // WorkflowStarted (handler hasn't actually run yet, e.g.
170                // PENDING with no claim), finalise immediately.
171                if matches!(status, WorkflowStatus::Pending) {
172                    self.finalise_cancellation(id).await?;
173                }
174
175                Ok(true)
176            }
177        }
178    }
179
180    pub async fn terminate_workflow(&self, id: &str, reason: Option<&str>) -> Result<bool> {
181        let wf = self.store.get_workflow(id).await?;
182        match wf {
183            None => Ok(false),
184            Some(wf) => {
185                let status = WorkflowStatus::from_str(&wf.status)
186                    .map_err(|e| anyhow::anyhow!(e))?;
187                if status.is_terminal() {
188                    return Ok(false);
189                }
190
191                self.store
192                    .update_workflow_status(
193                        id,
194                        WorkflowStatus::Failed,
195                        None,
196                        Some(reason.unwrap_or("terminated")),
197                    )
198                    .await?;
199
200                Ok(true)
201            }
202        }
203    }
204
205    // ── Signal Operations ───────────────────────────────────
206
207    pub async fn send_signal(
208        &self,
209        workflow_id: &str,
210        name: &str,
211        payload: Option<&str>,
212    ) -> Result<()> {
213        let now = timestamp_now();
214
215        self.store
216            .send_signal(&WorkflowSignal {
217                id: None,
218                workflow_id: workflow_id.to_string(),
219                name: name.to_string(),
220                payload: payload.map(String::from),
221                consumed: false,
222                received_at: now,
223            })
224            .await?;
225
226        let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
227        // Parse the incoming payload string back to a JSON value so the
228        // event payload nests cleanly (otherwise the recorded payload is
229        // a stringified JSON-inside-JSON and Lua workers would have to
230        // double-decode).
231        let payload_value: serde_json::Value = payload
232            .and_then(|s| serde_json::from_str(s).ok())
233            .unwrap_or(serde_json::Value::Null);
234        self.store
235            .append_event(&WorkflowEvent {
236                id: None,
237                workflow_id: workflow_id.to_string(),
238                seq,
239                event_type: "SignalReceived".to_string(),
240                payload: Some(
241                    serde_json::json!({ "signal": name, "payload": payload_value }).to_string(),
242                ),
243                timestamp: now,
244            })
245            .await?;
246
247        // Phase 9: a workflow waiting on this signal needs to be re-dispatched
248        // so the worker can replay and notice the signal in history.
249        self.store.mark_workflow_dispatchable(workflow_id).await?;
250
251        Ok(())
252    }
253
254    // ── Event History ───────────────────────────────────────
255
256    pub async fn get_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
257        self.store.list_events(workflow_id).await
258    }
259
260    // ── Worker Operations ───────────────────────────────────
261
262    pub async fn register_worker(&self, worker: &WorkflowWorker) -> Result<()> {
263        self.store.register_worker(worker).await
264    }
265
266    pub async fn heartbeat_worker(&self, id: &str) -> Result<()> {
267        self.store.heartbeat_worker(id, timestamp_now()).await
268    }
269
270    pub async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
271        self.store.list_workers(namespace).await
272    }
273
274    // ── Task Operations (for worker polling) ────────────────
275
276    /// Schedule an activity within a workflow.
277    ///
278    /// Idempotent on `(workflow_id, seq)` — if an activity with this sequence
279    /// number already exists for the workflow, returns its id without
280    /// creating a duplicate row or duplicate `ActivityScheduled` event. This
281    /// is essential for deterministic replay: a worker can re-run the
282    /// workflow function and call `schedule_activity(seq=1, ...)` repeatedly
283    /// without producing side effects.
284    ///
285    /// On the first call for a `seq`:
286    /// - inserts a row in `workflow_activities` with status `PENDING`
287    /// - appends an `ActivityScheduled` event to the workflow event log
288    /// - if the workflow is still `PENDING`, transitions it to `RUNNING`
289    pub async fn schedule_activity(
290        &self,
291        workflow_id: &str,
292        seq: i32,
293        name: &str,
294        input: Option<&str>,
295        task_queue: &str,
296        opts: ScheduleActivityOpts,
297    ) -> Result<WorkflowActivity> {
298        // Idempotency: short-circuit if (workflow_id, seq) already exists.
299        if let Some(existing) = self
300            .store
301            .get_activity_by_workflow_seq(workflow_id, seq)
302            .await?
303        {
304            return Ok(existing);
305        }
306
307        let now = timestamp_now();
308        let mut act = WorkflowActivity {
309            id: None,
310            workflow_id: workflow_id.to_string(),
311            seq,
312            name: name.to_string(),
313            task_queue: task_queue.to_string(),
314            input: input.map(String::from),
315            status: "PENDING".to_string(),
316            result: None,
317            error: None,
318            attempt: 1,
319            max_attempts: opts.max_attempts.unwrap_or(3),
320            initial_interval_secs: opts.initial_interval_secs.unwrap_or(1.0),
321            backoff_coefficient: opts.backoff_coefficient.unwrap_or(2.0),
322            start_to_close_secs: opts.start_to_close_secs.unwrap_or(300.0),
323            heartbeat_timeout_secs: opts.heartbeat_timeout_secs,
324            claimed_by: None,
325            scheduled_at: now,
326            started_at: None,
327            completed_at: None,
328            last_heartbeat: None,
329        };
330
331        let id = self.store.create_activity(&act).await?;
332        act.id = Some(id);
333
334        // Append ActivityScheduled event with the activity's seq
335        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
336        self.store
337            .append_event(&WorkflowEvent {
338                id: None,
339                workflow_id: workflow_id.to_string(),
340                seq: event_seq,
341                event_type: "ActivityScheduled".to_string(),
342                payload: Some(
343                    serde_json::json!({
344                        "activity_id": id,
345                        "activity_seq": seq,
346                        "name": name,
347                        "task_queue": task_queue,
348                        "input": input,
349                    })
350                    .to_string(),
351                ),
352                timestamp: now,
353            })
354            .await?;
355
356        // Transition workflow from PENDING to RUNNING on first scheduled activity
357        if let Some(wf) = self.store.get_workflow(workflow_id).await?
358            && wf.status == "PENDING"
359        {
360            self.store
361                .update_workflow_status(workflow_id, WorkflowStatus::Running, None, None)
362                .await?;
363        }
364
365        Ok(act)
366    }
367
368    pub async fn claim_activity(
369        &self,
370        task_queue: &str,
371        worker_id: &str,
372    ) -> Result<Option<WorkflowActivity>> {
373        self.store.claim_activity(task_queue, worker_id).await
374    }
375
376    pub async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
377        self.store.get_activity(id).await
378    }
379
380    /// Mark a successfully-executed activity complete and append an
381    /// `ActivityCompleted` event to the workflow event log so a replaying
382    /// workflow can pick up the cached result.
383    ///
384    /// `failed=true` is preserved for legacy callers that go straight
385    /// through complete with a non-retry path; new code should call
386    /// [`Engine::fail_activity`] instead so retry policy is honored.
387    pub async fn complete_activity(
388        &self,
389        id: i64,
390        result: Option<&str>,
391        error: Option<&str>,
392        failed: bool,
393    ) -> Result<()> {
394        self.store.complete_activity(id, result, error, failed).await?;
395
396        // Look up the activity so we can attribute the event correctly
397        let act = match self.store.get_activity(id).await? {
398            Some(a) => a,
399            None => return Ok(()),
400        };
401
402        let event_type = if failed {
403            "ActivityFailed"
404        } else {
405            "ActivityCompleted"
406        };
407        let payload = serde_json::json!({
408            "activity_id": id,
409            "activity_seq": act.seq,
410            "name": act.name,
411            "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
412            "error": error,
413        });
414        let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
415        let workflow_id = act.workflow_id.clone();
416        self.store
417            .append_event(&WorkflowEvent {
418                id: None,
419                workflow_id: act.workflow_id,
420                seq: event_seq,
421                event_type: event_type.to_string(),
422                payload: Some(payload.to_string()),
423                timestamp: timestamp_now(),
424            })
425            .await?;
426        // Phase 9: the workflow has a new event the handler needs to see —
427        // wake the workflow task back up.
428        self.store.mark_workflow_dispatchable(&workflow_id).await?;
429        Ok(())
430    }
431
432    /// Fail an activity, honoring its retry policy.
433    ///
434    /// If `attempt < max_attempts`, the activity is re-queued with
435    /// exponential backoff (`initial_interval_secs * backoff_coefficient^(attempt-1)`)
436    /// and `attempt` is incremented. **No event is appended** — retries
437    /// are an internal-engine concern, not workflow-visible.
438    ///
439    /// If `attempt >= max_attempts`, the activity is permanently FAILED
440    /// and an `ActivityFailed` event is appended so the workflow can react.
441    pub async fn fail_activity(&self, id: i64, error: &str) -> Result<()> {
442        let act = match self.store.get_activity(id).await? {
443            Some(a) => a,
444            None => return Ok(()),
445        };
446
447        if act.attempt < act.max_attempts {
448            // Compute exponential backoff: interval * coefficient^(attempt-1)
449            let backoff = act.initial_interval_secs
450                * act.backoff_coefficient.powi(act.attempt - 1);
451            let next_scheduled_at = timestamp_now() + backoff;
452            self.store
453                .requeue_activity_for_retry(id, act.attempt + 1, next_scheduled_at)
454                .await?;
455            return Ok(());
456        }
457
458        // Out of retries — mark FAILED and surface to the workflow
459        self.store
460            .complete_activity(id, None, Some(error), true)
461            .await?;
462
463        let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
464        let workflow_id = act.workflow_id.clone();
465        self.store
466            .append_event(&WorkflowEvent {
467                id: None,
468                workflow_id: act.workflow_id,
469                seq: event_seq,
470                event_type: "ActivityFailed".to_string(),
471                payload: Some(
472                    serde_json::json!({
473                        "activity_id": id,
474                        "activity_seq": act.seq,
475                        "name": act.name,
476                        "error": error,
477                        "final_attempt": act.attempt,
478                    })
479                    .to_string(),
480                ),
481                timestamp: timestamp_now(),
482            })
483            .await?;
484        // Wake the workflow task — handler needs to see the failure.
485        self.store.mark_workflow_dispatchable(&workflow_id).await?;
486        Ok(())
487    }
488
489    // ── Workflow-task dispatch (Phase 9) ────────────────────
490
491    /// Claim a dispatchable workflow task on a queue. Returns the workflow
492    /// record + full event history so the worker can replay the handler
493    /// deterministically. Atomic — multiple workers polling the same queue
494    /// will each get a different task or None.
495    pub async fn claim_workflow_task(
496        &self,
497        task_queue: &str,
498        worker_id: &str,
499    ) -> Result<Option<(WorkflowRecord, Vec<WorkflowEvent>)>> {
500        let Some(mut wf) = self
501            .store
502            .claim_workflow_task(task_queue, worker_id)
503            .await?
504        else {
505            return Ok(None);
506        };
507        // Once a worker is processing the workflow it's RUNNING — even if
508        // it ultimately just yields and pauses on a signal/timer. PENDING
509        // means "no worker has touched this yet."
510        if wf.status == "PENDING" {
511            self.store
512                .update_workflow_status(&wf.id, WorkflowStatus::Running, None, None)
513                .await?;
514            wf.status = "RUNNING".to_string();
515        }
516        let history = self.store.list_events(&wf.id).await?;
517        Ok(Some((wf, history)))
518    }
519
520    /// Submit a worker's batch of commands for a workflow it claimed.
521    /// Each command produces durable events / rows transactionally and
522    /// the dispatch lease is released on return.
523    ///
524    /// Supported command types:
525    /// - `ScheduleActivity` { seq, name, task_queue, input?, max_attempts?, ... }
526    /// - `CompleteWorkflow` { result }
527    /// - `FailWorkflow`     { error }
528    pub async fn submit_workflow_commands(
529        &self,
530        workflow_id: &str,
531        worker_id: &str,
532        commands: &[serde_json::Value],
533    ) -> Result<()> {
534        for cmd in commands {
535            let cmd_type = cmd.get("type").and_then(|v| v.as_str()).unwrap_or("");
536            match cmd_type {
537                "ScheduleActivity" => {
538                    let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
539                    let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
540                    let queue = cmd
541                        .get("task_queue")
542                        .and_then(|v| v.as_str())
543                        .unwrap_or("default");
544                    let input = cmd.get("input").map(|v| v.to_string());
545                    let opts = ScheduleActivityOpts {
546                        max_attempts: cmd
547                            .get("max_attempts")
548                            .and_then(|v| v.as_i64())
549                            .map(|n| n as i32),
550                        initial_interval_secs: cmd
551                            .get("initial_interval_secs")
552                            .and_then(|v| v.as_f64()),
553                        backoff_coefficient: cmd
554                            .get("backoff_coefficient")
555                            .and_then(|v| v.as_f64()),
556                        start_to_close_secs: cmd
557                            .get("start_to_close_secs")
558                            .and_then(|v| v.as_f64()),
559                        heartbeat_timeout_secs: cmd
560                            .get("heartbeat_timeout_secs")
561                            .and_then(|v| v.as_f64()),
562                    };
563                    self.schedule_activity(
564                        workflow_id,
565                        seq,
566                        name,
567                        input.as_deref(),
568                        queue,
569                        opts,
570                    )
571                    .await?;
572                }
573                "CancelWorkflow" => {
574                    // Worker acknowledged a cancellation — finalise.
575                    self.finalise_cancellation(workflow_id).await?;
576                }
577                "WaitForSignal" => {
578                    // No engine-side state to write — the workflow has paused
579                    // and will be re-dispatched when a matching signal arrives.
580                    // Releasing the lease (below) is enough; record the wait
581                    // intent for the dashboard / debugging.
582                    let signal_name =
583                        cmd.get("name").and_then(|v| v.as_str()).unwrap_or("?");
584                    let event_seq =
585                        self.store.get_event_count(workflow_id).await? as i32 + 1;
586                    self.store
587                        .append_event(&WorkflowEvent {
588                            id: None,
589                            workflow_id: workflow_id.to_string(),
590                            seq: event_seq,
591                            event_type: "WorkflowAwaitingSignal".to_string(),
592                            payload: Some(
593                                serde_json::json!({ "signal": signal_name }).to_string(),
594                            ),
595                            timestamp: timestamp_now(),
596                        })
597                        .await?;
598                }
599                "StartChildWorkflow" => {
600                    let workflow_type = cmd
601                        .get("workflow_type")
602                        .and_then(|v| v.as_str())
603                        .unwrap_or("");
604                    let child_id =
605                        cmd.get("workflow_id").and_then(|v| v.as_str()).unwrap_or("");
606                    let task_queue = cmd
607                        .get("task_queue")
608                        .and_then(|v| v.as_str())
609                        .unwrap_or("default");
610                    let input = cmd.get("input").map(|v| v.to_string());
611                    // Determine the namespace from the parent workflow
612                    let namespace = self
613                        .store
614                        .get_workflow(workflow_id)
615                        .await?
616                        .map(|wf| wf.namespace)
617                        .unwrap_or_else(|| "main".to_string());
618
619                    // Idempotent: if a workflow with this id already exists,
620                    // skip creation (deterministic replay calls this command
621                    // for the same child id on every re-run until the parent
622                    // has the ChildWorkflowCompleted event).
623                    if self.store.get_workflow(child_id).await?.is_none() {
624                        self.start_child_workflow(
625                            &namespace,
626                            workflow_id,
627                            workflow_type,
628                            child_id,
629                            input.as_deref(),
630                            task_queue,
631                        )
632                        .await?;
633                        // Make the child immediately dispatchable so a worker
634                        // picks it up.
635                        self.store.mark_workflow_dispatchable(child_id).await?;
636                    }
637                }
638                "RecordSideEffect" => {
639                    let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
640                    let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
641                    let value =
642                        cmd.get("value").cloned().unwrap_or(serde_json::Value::Null);
643                    let event_seq =
644                        self.store.get_event_count(workflow_id).await? as i32 + 1;
645                    self.store
646                        .append_event(&WorkflowEvent {
647                            id: None,
648                            workflow_id: workflow_id.to_string(),
649                            seq: event_seq,
650                            event_type: "SideEffectRecorded".to_string(),
651                            payload: Some(
652                                serde_json::json!({
653                                    "side_effect_seq": seq,
654                                    "name": name,
655                                    "value": value,
656                                })
657                                .to_string(),
658                            ),
659                            timestamp: timestamp_now(),
660                        })
661                        .await?;
662                    // Side effects don't trigger anything external — the
663                    // workflow needs to immediately continue so it picks
664                    // up the cached value on next replay.
665                    self.store.mark_workflow_dispatchable(workflow_id).await?;
666                }
667                "ScheduleTimer" => {
668                    let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
669                    let duration = cmd
670                        .get("duration_secs")
671                        .and_then(|v| v.as_f64())
672                        .unwrap_or(0.0);
673                    self.schedule_timer(workflow_id, seq, duration).await?;
674                }
675                "CompleteWorkflow" => {
676                    let result = cmd.get("result").map(|v| v.to_string());
677                    self.complete_workflow(workflow_id, result.as_deref()).await?;
678                }
679                "FailWorkflow" => {
680                    let error = cmd
681                        .get("error")
682                        .and_then(|v| v.as_str())
683                        .unwrap_or("workflow handler raised an error");
684                    self.fail_workflow(workflow_id, error).await?;
685                }
686                other => {
687                    tracing::warn!("submit_workflow_commands: unknown command type {other:?}");
688                }
689            }
690        }
691
692        self.store
693            .release_workflow_task(workflow_id, worker_id)
694            .await?;
695        Ok(())
696    }
697
698    /// Schedule a durable timer for a workflow.
699    ///
700    /// Idempotent on `(workflow_id, seq)` — a workflow that yields the same
701    /// `ScheduleTimer{seq=N}` on retry will reuse the existing timer, not
702    /// schedule a second one. This is the timer counterpart to
703    /// `schedule_activity`'s replay-safe behaviour.
704    ///
705    /// On the first call:
706    /// - inserts a row in `workflow_timers` with `fire_at = now + duration`
707    /// - appends a `TimerScheduled` event so the worker can replay and
708    ///   know it's been scheduled (otherwise replays would yield it again)
709    pub async fn schedule_timer(
710        &self,
711        workflow_id: &str,
712        seq: i32,
713        duration_secs: f64,
714    ) -> Result<WorkflowTimer> {
715        if let Some(existing) = self
716            .store
717            .get_timer_by_workflow_seq(workflow_id, seq)
718            .await?
719        {
720            return Ok(existing);
721        }
722
723        let now = timestamp_now();
724        let mut timer = WorkflowTimer {
725            id: None,
726            workflow_id: workflow_id.to_string(),
727            seq,
728            fire_at: now + duration_secs,
729            fired: false,
730        };
731        let id = self.store.create_timer(&timer).await?;
732        timer.id = Some(id);
733
734        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
735        self.store
736            .append_event(&WorkflowEvent {
737                id: None,
738                workflow_id: workflow_id.to_string(),
739                seq: event_seq,
740                event_type: "TimerScheduled".to_string(),
741                payload: Some(
742                    serde_json::json!({
743                        "timer_id": id,
744                        "timer_seq": seq,
745                        "fire_at": timer.fire_at,
746                        "duration_secs": duration_secs,
747                    })
748                    .to_string(),
749                ),
750                timestamp: now,
751            })
752            .await?;
753
754        Ok(timer)
755    }
756
757    /// Finalise a cancellation: flips status to CANCELLED and appends the
758    /// terminal WorkflowCancelled event. Called by the CancelWorkflow
759    /// command handler (worker acknowledged cancel) and by cancel_workflow
760    /// directly when the workflow has no worker yet.
761    pub async fn finalise_cancellation(&self, workflow_id: &str) -> Result<()> {
762        // Avoid double-finalising
763        if let Some(wf) = self.store.get_workflow(workflow_id).await?
764            && wf.status == "CANCELLED"
765        {
766            return Ok(());
767        }
768        self.store
769            .update_workflow_status(workflow_id, WorkflowStatus::Cancelled, None, None)
770            .await?;
771        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
772        self.store
773            .append_event(&WorkflowEvent {
774                id: None,
775                workflow_id: workflow_id.to_string(),
776                seq: event_seq,
777                event_type: "WorkflowCancelled".to_string(),
778                payload: None,
779                timestamp: timestamp_now(),
780            })
781            .await?;
782        Ok(())
783    }
784
785    /// Mark a workflow COMPLETED with a result + append WorkflowCompleted event.
786    /// If the workflow has a parent, also notifies the parent with a
787    /// ChildWorkflowCompleted event and marks it dispatchable so it can
788    /// replay past `ctx:start_child_workflow` and pick up the child's result.
789    pub async fn complete_workflow(&self, workflow_id: &str, result: Option<&str>) -> Result<()> {
790        self.store
791            .update_workflow_status(workflow_id, WorkflowStatus::Completed, result, None)
792            .await?;
793        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
794        self.store
795            .append_event(&WorkflowEvent {
796                id: None,
797                workflow_id: workflow_id.to_string(),
798                seq: event_seq,
799                event_type: "WorkflowCompleted".to_string(),
800                payload: result.map(String::from),
801                timestamp: timestamp_now(),
802            })
803            .await?;
804        self.notify_parent_of_child_outcome(
805            workflow_id,
806            "ChildWorkflowCompleted",
807            serde_json::json!({
808                "child_workflow_id": workflow_id,
809                "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
810            }),
811        )
812        .await?;
813        Ok(())
814    }
815
816    /// Mark a workflow FAILED with an error + append WorkflowFailed event.
817    /// Notifies the parent if any (ChildWorkflowFailed).
818    pub async fn fail_workflow(&self, workflow_id: &str, error: &str) -> Result<()> {
819        self.store
820            .update_workflow_status(workflow_id, WorkflowStatus::Failed, None, Some(error))
821            .await?;
822        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
823        self.store
824            .append_event(&WorkflowEvent {
825                id: None,
826                workflow_id: workflow_id.to_string(),
827                seq: event_seq,
828                event_type: "WorkflowFailed".to_string(),
829                payload: Some(serde_json::json!({"error": error}).to_string()),
830                timestamp: timestamp_now(),
831            })
832            .await?;
833        self.notify_parent_of_child_outcome(
834            workflow_id,
835            "ChildWorkflowFailed",
836            serde_json::json!({
837                "child_workflow_id": workflow_id,
838                "error": error,
839            }),
840        )
841        .await?;
842        Ok(())
843    }
844
845    /// Append a parent-side event when a child reaches a terminal state and
846    /// re-dispatch the parent so it can replay past its `start_child_workflow`
847    /// call. No-op for top-level workflows (no parent_id).
848    async fn notify_parent_of_child_outcome(
849        &self,
850        child_workflow_id: &str,
851        event_type: &str,
852        payload: serde_json::Value,
853    ) -> Result<()> {
854        let Some(child) = self.store.get_workflow(child_workflow_id).await? else {
855            return Ok(());
856        };
857        let Some(parent_id) = child.parent_id else {
858            return Ok(());
859        };
860        let event_seq = self.store.get_event_count(&parent_id).await? as i32 + 1;
861        self.store
862            .append_event(&WorkflowEvent {
863                id: None,
864                workflow_id: parent_id.clone(),
865                seq: event_seq,
866                event_type: event_type.to_string(),
867                payload: Some(payload.to_string()),
868                timestamp: timestamp_now(),
869            })
870            .await?;
871        self.store.mark_workflow_dispatchable(&parent_id).await?;
872        Ok(())
873    }
874
875    pub async fn heartbeat_activity(&self, id: i64, details: Option<&str>) -> Result<()> {
876        self.store.heartbeat_activity(id, details).await
877    }
878
879    // ── Schedule Operations ─────────────────────────────────
880
881    pub async fn create_schedule(&self, schedule: &WorkflowSchedule) -> Result<()> {
882        self.store.create_schedule(schedule).await
883    }
884
885    pub async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
886        self.store.list_schedules(namespace).await
887    }
888
889    pub async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
890        self.store.get_schedule(namespace, name).await
891    }
892
893    pub async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
894        self.store.delete_schedule(namespace, name).await
895    }
896
897    // ── Namespace Operations ────────────────────────────────
898
899    pub async fn create_namespace(&self, name: &str) -> Result<()> {
900        self.store.create_namespace(name).await
901    }
902
903    pub async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
904        self.store.list_namespaces().await
905    }
906
907    pub async fn delete_namespace(&self, name: &str) -> Result<bool> {
908        self.store.delete_namespace(name).await
909    }
910
911    pub async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
912        self.store.get_namespace_stats(namespace).await
913    }
914
915    pub async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
916        self.store.get_queue_stats(namespace).await
917    }
918
919    // ── Child Workflow Operations ───────────────────────────
920
921    pub async fn start_child_workflow(
922        &self,
923        namespace: &str,
924        parent_id: &str,
925        workflow_type: &str,
926        workflow_id: &str,
927        input: Option<&str>,
928        task_queue: &str,
929    ) -> Result<WorkflowRecord> {
930        let now = timestamp_now();
931        let run_id = format!("run-{workflow_id}-{}", now as u64);
932
933        let wf = WorkflowRecord {
934            id: workflow_id.to_string(),
935            namespace: namespace.to_string(),
936            run_id,
937            workflow_type: workflow_type.to_string(),
938            task_queue: task_queue.to_string(),
939            status: "PENDING".to_string(),
940            input: input.map(String::from),
941            result: None,
942            error: None,
943            parent_id: Some(parent_id.to_string()),
944            claimed_by: None,
945            created_at: now,
946            updated_at: now,
947            completed_at: None,
948        };
949
950        self.store.create_workflow(&wf).await?;
951
952        // Record events on both parent and child
953        self.store
954            .append_event(&WorkflowEvent {
955                id: None,
956                workflow_id: workflow_id.to_string(),
957                seq: 1,
958                event_type: "WorkflowStarted".to_string(),
959                payload: input.map(String::from),
960                timestamp: now,
961            })
962            .await?;
963
964        let parent_seq = self.store.get_event_count(parent_id).await? as i32 + 1;
965        self.store
966            .append_event(&WorkflowEvent {
967                id: None,
968                workflow_id: parent_id.to_string(),
969                seq: parent_seq,
970                event_type: "ChildWorkflowStarted".to_string(),
971                payload: Some(
972                    serde_json::json!({
973                        "child_workflow_id": workflow_id,
974                        "workflow_type": workflow_type,
975                    })
976                    .to_string(),
977                ),
978                timestamp: now,
979            })
980            .await?;
981
982        Ok(wf)
983    }
984
985    pub async fn list_child_workflows(
986        &self,
987        parent_id: &str,
988    ) -> Result<Vec<WorkflowRecord>> {
989        self.store.list_child_workflows(parent_id).await
990    }
991
992    // ── Continue-as-New ─────────────────────────────────────
993
994    pub async fn continue_as_new(
995        &self,
996        workflow_id: &str,
997        input: Option<&str>,
998    ) -> Result<WorkflowRecord> {
999        let old_wf = self
1000            .store
1001            .get_workflow(workflow_id)
1002            .await?
1003            .ok_or_else(|| anyhow::anyhow!("workflow not found: {workflow_id}"))?;
1004
1005        // Complete the old workflow
1006        self.store
1007            .update_workflow_status(workflow_id, WorkflowStatus::Completed, None, None)
1008            .await?;
1009
1010        // Start a new run with the same type, namespace, and queue
1011        let new_id = format!("{workflow_id}-continued-{}", timestamp_now() as u64);
1012        self.start_workflow(
1013            &old_wf.namespace,
1014            &old_wf.workflow_type,
1015            &new_id,
1016            input,
1017            &old_wf.task_queue,
1018        )
1019        .await
1020    }
1021
1022    // ── Snapshots ───────────────────────────────────────────
1023
1024    pub async fn create_snapshot(
1025        &self,
1026        workflow_id: &str,
1027        event_seq: i32,
1028        state_json: &str,
1029    ) -> Result<()> {
1030        self.store
1031            .create_snapshot(workflow_id, event_seq, state_json)
1032            .await
1033    }
1034
1035    pub async fn get_latest_snapshot(
1036        &self,
1037        workflow_id: &str,
1038    ) -> Result<Option<WorkflowSnapshot>> {
1039        self.store.get_latest_snapshot(workflow_id).await
1040    }
1041
1042    // ── Side Effects ────────────────────────────────────────
1043
1044    pub async fn record_side_effect(
1045        &self,
1046        workflow_id: &str,
1047        value: &str,
1048    ) -> Result<()> {
1049        let now = timestamp_now();
1050        let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
1051        self.store
1052            .append_event(&WorkflowEvent {
1053                id: None,
1054                workflow_id: workflow_id.to_string(),
1055                seq,
1056                event_type: "SideEffectRecorded".to_string(),
1057                payload: Some(value.to_string()),
1058                timestamp: now,
1059            })
1060            .await?;
1061        Ok(())
1062    }
1063}
1064
1065fn timestamp_now() -> f64 {
1066    std::time::SystemTime::now()
1067        .duration_since(std::time::UNIX_EPOCH)
1068        .unwrap()
1069        .as_secs_f64()
1070}
1071
1072// WorkflowStatus::from_str returns Result, re-export for convenience
1073use std::str::FromStr;