Skip to main content

assay_workflow/
engine.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use tokio::task::JoinHandle;
5use tracing::info;
6
7use crate::dispatch_recovery;
8use crate::health;
9use crate::scheduler;
10use crate::store::WorkflowStore;
11use crate::timers;
12use crate::types::*;
13
14/// The workflow engine. Owns the store and manages background tasks
15/// (scheduler, timer poller, health monitor).
16///
17/// The API layer holds an `Arc<Engine<S>>` and delegates all operations here.
18pub struct Engine<S: WorkflowStore> {
19    store: Arc<S>,
20    _scheduler: JoinHandle<()>,
21    _timer_poller: JoinHandle<()>,
22    _health_monitor: JoinHandle<()>,
23    _dispatch_recovery: JoinHandle<()>,
24    #[cfg(feature = "s3-archival")]
25    _archival: Option<JoinHandle<()>>,
26}
27
28impl<S: WorkflowStore> Engine<S> {
29    /// Start the engine with all background tasks.
30    pub fn start(store: S) -> Self {
31        let store = Arc::new(store);
32
33        let _scheduler = tokio::spawn(scheduler::run_scheduler(Arc::clone(&store)));
34        let _timer_poller = tokio::spawn(timers::run_timer_poller(Arc::clone(&store)));
35        let _health_monitor = tokio::spawn(health::run_health_monitor(Arc::clone(&store)));
36        let _dispatch_recovery = tokio::spawn(dispatch_recovery::run_dispatch_recovery(
37            Arc::clone(&store),
38        ));
39
40        #[cfg(feature = "s3-archival")]
41        let _archival = crate::archival::ArchivalConfig::from_env().map(|cfg| {
42            tokio::spawn(crate::archival::run_archival(Arc::clone(&store), cfg))
43        });
44
45        info!("Workflow engine started");
46
47        Self {
48            store,
49            _scheduler,
50            _timer_poller,
51            _health_monitor,
52            _dispatch_recovery,
53            #[cfg(feature = "s3-archival")]
54            _archival,
55        }
56    }
57
58    /// Access the underlying store (for the API layer).
59    pub fn store(&self) -> &S {
60        &self.store
61    }
62
63    // ── Workflow Operations ─────────────────────────────────
64
65    pub async fn start_workflow(
66        &self,
67        namespace: &str,
68        workflow_type: &str,
69        workflow_id: &str,
70        input: Option<&str>,
71        task_queue: &str,
72        search_attributes: Option<&str>,
73    ) -> Result<WorkflowRecord> {
74        let now = timestamp_now();
75        let run_id = format!("run-{workflow_id}-{}", now as u64);
76
77        let wf = WorkflowRecord {
78            id: workflow_id.to_string(),
79            namespace: namespace.to_string(),
80            run_id,
81            workflow_type: workflow_type.to_string(),
82            task_queue: task_queue.to_string(),
83            status: "PENDING".to_string(),
84            input: input.map(String::from),
85            result: None,
86            error: None,
87            parent_id: None,
88            claimed_by: None,
89            search_attributes: search_attributes.map(String::from),
90            archived_at: None,
91            archive_uri: None,
92            created_at: now,
93            updated_at: now,
94            completed_at: None,
95        };
96
97        self.store.create_workflow(&wf).await?;
98
99        self.store
100            .append_event(&WorkflowEvent {
101                id: None,
102                workflow_id: workflow_id.to_string(),
103                seq: 1,
104                event_type: "WorkflowStarted".to_string(),
105                payload: input.map(String::from),
106                timestamp: now,
107            })
108            .await?;
109
110        // Phase 9: a freshly-started workflow has new events (WorkflowStarted)
111        // that need a worker to replay against — make it dispatchable.
112        self.store.mark_workflow_dispatchable(workflow_id).await?;
113
114        Ok(wf)
115    }
116
117    pub async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
118        self.store.get_workflow(id).await
119    }
120
121    pub async fn list_workflows(
122        &self,
123        namespace: &str,
124        status: Option<WorkflowStatus>,
125        workflow_type: Option<&str>,
126        search_attrs_filter: Option<&str>,
127        limit: i64,
128        offset: i64,
129    ) -> Result<Vec<WorkflowRecord>> {
130        self.store
131            .list_workflows(
132                namespace,
133                status,
134                workflow_type,
135                search_attrs_filter,
136                limit,
137                offset,
138            )
139            .await
140    }
141
142    pub async fn upsert_search_attributes(
143        &self,
144        workflow_id: &str,
145        patch_json: &str,
146    ) -> Result<()> {
147        self.store
148            .upsert_search_attributes(workflow_id, patch_json)
149            .await
150    }
151
152    pub async fn cancel_workflow(&self, id: &str) -> Result<bool> {
153        let wf = self.store.get_workflow(id).await?;
154        match wf {
155            None => Ok(false),
156            Some(wf) => {
157                let status = WorkflowStatus::from_str(&wf.status)
158                    .map_err(|e| anyhow::anyhow!(e))?;
159                if status.is_terminal() {
160                    return Ok(false);
161                }
162
163                // Two-phase cancel:
164                //   1. Append WorkflowCancelRequested + mark dispatchable.
165                //      The next worker replay sees the request, raises a
166                //      cancellation error inside the handler, and submits
167                //      a CancelWorkflow command.
168                //   2. CancelWorkflow command flips status to CANCELLED,
169                //      cancels pending activities/timers, appends
170                //      WorkflowCancelled.
171                //
172                // We cancel pending activities + timers up-front too so a
173                // worker that's about to claim them sees CANCELLED instead.
174                self.store.cancel_pending_activities(id).await?;
175                self.store.cancel_pending_timers(id).await?;
176
177                let seq = self.store.get_event_count(id).await? as i32 + 1;
178                self.store
179                    .append_event(&WorkflowEvent {
180                        id: None,
181                        workflow_id: id.to_string(),
182                        seq,
183                        event_type: "WorkflowCancelRequested".to_string(),
184                        payload: None,
185                        timestamp: timestamp_now(),
186                    })
187                    .await?;
188
189                self.store.mark_workflow_dispatchable(id).await?;
190
191                // Propagate cancellation to all child workflows
192                let children = self.store.list_child_workflows(id).await?;
193                for child in children {
194                    Box::pin(self.cancel_workflow(&child.id)).await?;
195                }
196
197                // For workflows that have NO worker registered (or no
198                // handler running), cancellation would never complete.
199                // Fall back: if the workflow has no events past
200                // WorkflowStarted (handler hasn't actually run yet, e.g.
201                // PENDING with no claim), finalise immediately.
202                if matches!(status, WorkflowStatus::Pending) {
203                    self.finalise_cancellation(id).await?;
204                }
205
206                Ok(true)
207            }
208        }
209    }
210
211    pub async fn terminate_workflow(&self, id: &str, reason: Option<&str>) -> Result<bool> {
212        let wf = self.store.get_workflow(id).await?;
213        match wf {
214            None => Ok(false),
215            Some(wf) => {
216                let status = WorkflowStatus::from_str(&wf.status)
217                    .map_err(|e| anyhow::anyhow!(e))?;
218                if status.is_terminal() {
219                    return Ok(false);
220                }
221
222                self.store
223                    .update_workflow_status(
224                        id,
225                        WorkflowStatus::Failed,
226                        None,
227                        Some(reason.unwrap_or("terminated")),
228                    )
229                    .await?;
230
231                Ok(true)
232            }
233        }
234    }
235
236    // ── Signal Operations ───────────────────────────────────
237
238    pub async fn send_signal(
239        &self,
240        workflow_id: &str,
241        name: &str,
242        payload: Option<&str>,
243    ) -> Result<()> {
244        let now = timestamp_now();
245
246        self.store
247            .send_signal(&WorkflowSignal {
248                id: None,
249                workflow_id: workflow_id.to_string(),
250                name: name.to_string(),
251                payload: payload.map(String::from),
252                consumed: false,
253                received_at: now,
254            })
255            .await?;
256
257        let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
258        // Parse the incoming payload string back to a JSON value so the
259        // event payload nests cleanly (otherwise the recorded payload is
260        // a stringified JSON-inside-JSON and Lua workers would have to
261        // double-decode).
262        let payload_value: serde_json::Value = payload
263            .and_then(|s| serde_json::from_str(s).ok())
264            .unwrap_or(serde_json::Value::Null);
265        self.store
266            .append_event(&WorkflowEvent {
267                id: None,
268                workflow_id: workflow_id.to_string(),
269                seq,
270                event_type: "SignalReceived".to_string(),
271                payload: Some(
272                    serde_json::json!({ "signal": name, "payload": payload_value }).to_string(),
273                ),
274                timestamp: now,
275            })
276            .await?;
277
278        // Phase 9: a workflow waiting on this signal needs to be re-dispatched
279        // so the worker can replay and notice the signal in history.
280        self.store.mark_workflow_dispatchable(workflow_id).await?;
281
282        Ok(())
283    }
284
285    // ── Event History ───────────────────────────────────────
286
287    pub async fn get_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
288        self.store.list_events(workflow_id).await
289    }
290
291    // ── Worker Operations ───────────────────────────────────
292
293    pub async fn register_worker(&self, worker: &WorkflowWorker) -> Result<()> {
294        self.store.register_worker(worker).await
295    }
296
297    pub async fn heartbeat_worker(&self, id: &str) -> Result<()> {
298        self.store.heartbeat_worker(id, timestamp_now()).await
299    }
300
301    pub async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
302        self.store.list_workers(namespace).await
303    }
304
305    // ── Task Operations (for worker polling) ────────────────
306
307    /// Schedule an activity within a workflow.
308    ///
309    /// Idempotent on `(workflow_id, seq)` — if an activity with this sequence
310    /// number already exists for the workflow, returns its id without
311    /// creating a duplicate row or duplicate `ActivityScheduled` event. This
312    /// is essential for deterministic replay: a worker can re-run the
313    /// workflow function and call `schedule_activity(seq=1, ...)` repeatedly
314    /// without producing side effects.
315    ///
316    /// On the first call for a `seq`:
317    /// - inserts a row in `workflow_activities` with status `PENDING`
318    /// - appends an `ActivityScheduled` event to the workflow event log
319    /// - if the workflow is still `PENDING`, transitions it to `RUNNING`
320    pub async fn schedule_activity(
321        &self,
322        workflow_id: &str,
323        seq: i32,
324        name: &str,
325        input: Option<&str>,
326        task_queue: &str,
327        opts: ScheduleActivityOpts,
328    ) -> Result<WorkflowActivity> {
329        // Idempotency: short-circuit if (workflow_id, seq) already exists.
330        if let Some(existing) = self
331            .store
332            .get_activity_by_workflow_seq(workflow_id, seq)
333            .await?
334        {
335            return Ok(existing);
336        }
337
338        let now = timestamp_now();
339        let mut act = WorkflowActivity {
340            id: None,
341            workflow_id: workflow_id.to_string(),
342            seq,
343            name: name.to_string(),
344            task_queue: task_queue.to_string(),
345            input: input.map(String::from),
346            status: "PENDING".to_string(),
347            result: None,
348            error: None,
349            attempt: 1,
350            max_attempts: opts.max_attempts.unwrap_or(3),
351            initial_interval_secs: opts.initial_interval_secs.unwrap_or(1.0),
352            backoff_coefficient: opts.backoff_coefficient.unwrap_or(2.0),
353            start_to_close_secs: opts.start_to_close_secs.unwrap_or(300.0),
354            heartbeat_timeout_secs: opts.heartbeat_timeout_secs,
355            claimed_by: None,
356            scheduled_at: now,
357            started_at: None,
358            completed_at: None,
359            last_heartbeat: None,
360        };
361
362        let id = self.store.create_activity(&act).await?;
363        act.id = Some(id);
364
365        // Append ActivityScheduled event with the activity's seq
366        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
367        self.store
368            .append_event(&WorkflowEvent {
369                id: None,
370                workflow_id: workflow_id.to_string(),
371                seq: event_seq,
372                event_type: "ActivityScheduled".to_string(),
373                payload: Some(
374                    serde_json::json!({
375                        "activity_id": id,
376                        "activity_seq": seq,
377                        "name": name,
378                        "task_queue": task_queue,
379                        "input": input,
380                    })
381                    .to_string(),
382                ),
383                timestamp: now,
384            })
385            .await?;
386
387        // Transition workflow from PENDING to RUNNING on first scheduled activity
388        if let Some(wf) = self.store.get_workflow(workflow_id).await?
389            && wf.status == "PENDING"
390        {
391            self.store
392                .update_workflow_status(workflow_id, WorkflowStatus::Running, None, None)
393                .await?;
394        }
395
396        Ok(act)
397    }
398
399    pub async fn claim_activity(
400        &self,
401        task_queue: &str,
402        worker_id: &str,
403    ) -> Result<Option<WorkflowActivity>> {
404        self.store.claim_activity(task_queue, worker_id).await
405    }
406
407    pub async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
408        self.store.get_activity(id).await
409    }
410
411    /// Mark a successfully-executed activity complete and append an
412    /// `ActivityCompleted` event to the workflow event log so a replaying
413    /// workflow can pick up the cached result.
414    ///
415    /// `failed=true` is preserved for legacy callers that go straight
416    /// through complete with a non-retry path; new code should call
417    /// [`Engine::fail_activity`] instead so retry policy is honored.
418    pub async fn complete_activity(
419        &self,
420        id: i64,
421        result: Option<&str>,
422        error: Option<&str>,
423        failed: bool,
424    ) -> Result<()> {
425        self.store.complete_activity(id, result, error, failed).await?;
426
427        // Look up the activity so we can attribute the event correctly
428        let act = match self.store.get_activity(id).await? {
429            Some(a) => a,
430            None => return Ok(()),
431        };
432
433        let event_type = if failed {
434            "ActivityFailed"
435        } else {
436            "ActivityCompleted"
437        };
438        let payload = serde_json::json!({
439            "activity_id": id,
440            "activity_seq": act.seq,
441            "name": act.name,
442            "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
443            "error": error,
444        });
445        let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
446        let workflow_id = act.workflow_id.clone();
447        self.store
448            .append_event(&WorkflowEvent {
449                id: None,
450                workflow_id: act.workflow_id,
451                seq: event_seq,
452                event_type: event_type.to_string(),
453                payload: Some(payload.to_string()),
454                timestamp: timestamp_now(),
455            })
456            .await?;
457        // Phase 9: the workflow has a new event the handler needs to see —
458        // wake the workflow task back up.
459        self.store.mark_workflow_dispatchable(&workflow_id).await?;
460        Ok(())
461    }
462
463    /// Fail an activity, honoring its retry policy.
464    ///
465    /// If `attempt < max_attempts`, the activity is re-queued with
466    /// exponential backoff (`initial_interval_secs * backoff_coefficient^(attempt-1)`)
467    /// and `attempt` is incremented. **No event is appended** — retries
468    /// are an internal-engine concern, not workflow-visible.
469    ///
470    /// If `attempt >= max_attempts`, the activity is permanently FAILED
471    /// and an `ActivityFailed` event is appended so the workflow can react.
472    pub async fn fail_activity(&self, id: i64, error: &str) -> Result<()> {
473        let act = match self.store.get_activity(id).await? {
474            Some(a) => a,
475            None => return Ok(()),
476        };
477
478        if act.attempt < act.max_attempts {
479            // Compute exponential backoff: interval * coefficient^(attempt-1)
480            let backoff = act.initial_interval_secs
481                * act.backoff_coefficient.powi(act.attempt - 1);
482            let next_scheduled_at = timestamp_now() + backoff;
483            self.store
484                .requeue_activity_for_retry(id, act.attempt + 1, next_scheduled_at)
485                .await?;
486            return Ok(());
487        }
488
489        // Out of retries — mark FAILED and surface to the workflow
490        self.store
491            .complete_activity(id, None, Some(error), true)
492            .await?;
493
494        let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
495        let workflow_id = act.workflow_id.clone();
496        self.store
497            .append_event(&WorkflowEvent {
498                id: None,
499                workflow_id: act.workflow_id,
500                seq: event_seq,
501                event_type: "ActivityFailed".to_string(),
502                payload: Some(
503                    serde_json::json!({
504                        "activity_id": id,
505                        "activity_seq": act.seq,
506                        "name": act.name,
507                        "error": error,
508                        "final_attempt": act.attempt,
509                    })
510                    .to_string(),
511                ),
512                timestamp: timestamp_now(),
513            })
514            .await?;
515        // Wake the workflow task — handler needs to see the failure.
516        self.store.mark_workflow_dispatchable(&workflow_id).await?;
517        Ok(())
518    }
519
520    // ── Workflow-task dispatch (Phase 9) ────────────────────
521
522    /// Claim a dispatchable workflow task on a queue. Returns the workflow
523    /// record + full event history so the worker can replay the handler
524    /// deterministically. Atomic — multiple workers polling the same queue
525    /// will each get a different task or None.
526    pub async fn claim_workflow_task(
527        &self,
528        task_queue: &str,
529        worker_id: &str,
530    ) -> Result<Option<(WorkflowRecord, Vec<WorkflowEvent>)>> {
531        let Some(mut wf) = self
532            .store
533            .claim_workflow_task(task_queue, worker_id)
534            .await?
535        else {
536            return Ok(None);
537        };
538        // Once a worker is processing the workflow it's RUNNING — even if
539        // it ultimately just yields and pauses on a signal/timer. PENDING
540        // means "no worker has touched this yet."
541        if wf.status == "PENDING" {
542            self.store
543                .update_workflow_status(&wf.id, WorkflowStatus::Running, None, None)
544                .await?;
545            wf.status = "RUNNING".to_string();
546        }
547        let history = self.store.list_events(&wf.id).await?;
548        Ok(Some((wf, history)))
549    }
550
551    /// Submit a worker's batch of commands for a workflow it claimed.
552    /// Each command produces durable events / rows transactionally and
553    /// the dispatch lease is released on return.
554    ///
555    /// Supported command types:
556    /// - `ScheduleActivity` { seq, name, task_queue, input?, max_attempts?, ... }
557    /// - `CompleteWorkflow` { result }
558    /// - `FailWorkflow`     { error }
559    pub async fn submit_workflow_commands(
560        &self,
561        workflow_id: &str,
562        worker_id: &str,
563        commands: &[serde_json::Value],
564    ) -> Result<()> {
565        for cmd in commands {
566            let cmd_type = cmd.get("type").and_then(|v| v.as_str()).unwrap_or("");
567            match cmd_type {
568                "ScheduleActivity" => {
569                    let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
570                    let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
571                    let queue = cmd
572                        .get("task_queue")
573                        .and_then(|v| v.as_str())
574                        .unwrap_or("default");
575                    let input = cmd.get("input").map(|v| v.to_string());
576                    let opts = ScheduleActivityOpts {
577                        max_attempts: cmd
578                            .get("max_attempts")
579                            .and_then(|v| v.as_i64())
580                            .map(|n| n as i32),
581                        initial_interval_secs: cmd
582                            .get("initial_interval_secs")
583                            .and_then(|v| v.as_f64()),
584                        backoff_coefficient: cmd
585                            .get("backoff_coefficient")
586                            .and_then(|v| v.as_f64()),
587                        start_to_close_secs: cmd
588                            .get("start_to_close_secs")
589                            .and_then(|v| v.as_f64()),
590                        heartbeat_timeout_secs: cmd
591                            .get("heartbeat_timeout_secs")
592                            .and_then(|v| v.as_f64()),
593                    };
594                    self.schedule_activity(
595                        workflow_id,
596                        seq,
597                        name,
598                        input.as_deref(),
599                        queue,
600                        opts,
601                    )
602                    .await?;
603                }
604                "CancelWorkflow" => {
605                    // Worker acknowledged a cancellation — finalise.
606                    self.finalise_cancellation(workflow_id).await?;
607                }
608                "WaitForSignal" => {
609                    // No engine-side state to write — the workflow has paused
610                    // and will be re-dispatched when a matching signal arrives.
611                    // Releasing the lease (below) is enough; record the wait
612                    // intent for the dashboard / debugging.
613                    let signal_name =
614                        cmd.get("name").and_then(|v| v.as_str()).unwrap_or("?");
615                    let event_seq =
616                        self.store.get_event_count(workflow_id).await? as i32 + 1;
617                    self.store
618                        .append_event(&WorkflowEvent {
619                            id: None,
620                            workflow_id: workflow_id.to_string(),
621                            seq: event_seq,
622                            event_type: "WorkflowAwaitingSignal".to_string(),
623                            payload: Some(
624                                serde_json::json!({ "signal": signal_name }).to_string(),
625                            ),
626                            timestamp: timestamp_now(),
627                        })
628                        .await?;
629                }
630                "StartChildWorkflow" => {
631                    let workflow_type = cmd
632                        .get("workflow_type")
633                        .and_then(|v| v.as_str())
634                        .unwrap_or("");
635                    let child_id =
636                        cmd.get("workflow_id").and_then(|v| v.as_str()).unwrap_or("");
637                    let task_queue = cmd
638                        .get("task_queue")
639                        .and_then(|v| v.as_str())
640                        .unwrap_or("default");
641                    let input = cmd.get("input").map(|v| v.to_string());
642                    // Determine the namespace from the parent workflow
643                    let namespace = self
644                        .store
645                        .get_workflow(workflow_id)
646                        .await?
647                        .map(|wf| wf.namespace)
648                        .unwrap_or_else(|| "main".to_string());
649
650                    // Idempotent: if a workflow with this id already exists,
651                    // skip creation (deterministic replay calls this command
652                    // for the same child id on every re-run until the parent
653                    // has the ChildWorkflowCompleted event).
654                    if self.store.get_workflow(child_id).await?.is_none() {
655                        self.start_child_workflow(
656                            &namespace,
657                            workflow_id,
658                            workflow_type,
659                            child_id,
660                            input.as_deref(),
661                            task_queue,
662                        )
663                        .await?;
664                        // Make the child immediately dispatchable so a worker
665                        // picks it up.
666                        self.store.mark_workflow_dispatchable(child_id).await?;
667                    }
668                }
669                "RecordSideEffect" => {
670                    let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
671                    let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
672                    let value =
673                        cmd.get("value").cloned().unwrap_or(serde_json::Value::Null);
674                    let event_seq =
675                        self.store.get_event_count(workflow_id).await? as i32 + 1;
676                    self.store
677                        .append_event(&WorkflowEvent {
678                            id: None,
679                            workflow_id: workflow_id.to_string(),
680                            seq: event_seq,
681                            event_type: "SideEffectRecorded".to_string(),
682                            payload: Some(
683                                serde_json::json!({
684                                    "side_effect_seq": seq,
685                                    "name": name,
686                                    "value": value,
687                                })
688                                .to_string(),
689                            ),
690                            timestamp: timestamp_now(),
691                        })
692                        .await?;
693                    // Side effects don't trigger anything external — the
694                    // workflow needs to immediately continue so it picks
695                    // up the cached value on next replay.
696                    self.store.mark_workflow_dispatchable(workflow_id).await?;
697                }
698                "ScheduleTimer" => {
699                    let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
700                    let duration = cmd
701                        .get("duration_secs")
702                        .and_then(|v| v.as_f64())
703                        .unwrap_or(0.0);
704                    self.schedule_timer(workflow_id, seq, duration).await?;
705                }
706                "UpsertSearchAttributes" => {
707                    // Merge the patch object into the workflow's stored
708                    // search_attributes. Workflow code can call this from
709                    // `ctx:upsert_search_attributes(...)` to surface live
710                    // progress / tenant / env tags that downstream callers
711                    // can filter on via the list endpoint.
712                    let patch = cmd
713                        .get("patch")
714                        .cloned()
715                        .unwrap_or(serde_json::Value::Object(Default::default()));
716                    self.store
717                        .upsert_search_attributes(workflow_id, &patch.to_string())
718                        .await?;
719                }
720                "ContinueAsNew" => {
721                    // Close out the current run and start a new one with the
722                    // same type / namespace / queue under a fresh id. Input
723                    // may be any JSON value; it's serialised and becomes the
724                    // new run's `input`. Called from workflow code via
725                    // `ctx:continue_as_new(input)` to reset event history
726                    // when a handler would otherwise loop forever.
727                    let input = cmd.get("input").map(|v| v.to_string());
728                    self.continue_as_new(workflow_id, input.as_deref())
729                        .await?;
730                }
731                "RecordSnapshot" => {
732                    // Persist the workflow's current query-handler state. Each
733                    // snapshot is keyed by the current event seq so the latest
734                    // is easy to retrieve via `get_latest_snapshot`. Runs on
735                    // every worker replay, which is fine — `create_snapshot`
736                    // is an insert, so each replay adds a new row reflecting
737                    // the state at that point in history.
738                    let state = cmd
739                        .get("state")
740                        .cloned()
741                        .unwrap_or(serde_json::Value::Null);
742                    let event_seq = self.store.get_event_count(workflow_id).await? as i32;
743                    self.store
744                        .create_snapshot(workflow_id, event_seq, &state.to_string())
745                        .await?;
746                }
747                "CompleteWorkflow" => {
748                    let result = cmd.get("result").map(|v| v.to_string());
749                    self.complete_workflow(workflow_id, result.as_deref()).await?;
750                }
751                "FailWorkflow" => {
752                    let error = cmd
753                        .get("error")
754                        .and_then(|v| v.as_str())
755                        .unwrap_or("workflow handler raised an error");
756                    self.fail_workflow(workflow_id, error).await?;
757                }
758                other => {
759                    tracing::warn!("submit_workflow_commands: unknown command type {other:?}");
760                }
761            }
762        }
763
764        self.store
765            .release_workflow_task(workflow_id, worker_id)
766            .await?;
767        Ok(())
768    }
769
770    /// Schedule a durable timer for a workflow.
771    ///
772    /// Idempotent on `(workflow_id, seq)` — a workflow that yields the same
773    /// `ScheduleTimer{seq=N}` on retry will reuse the existing timer, not
774    /// schedule a second one. This is the timer counterpart to
775    /// `schedule_activity`'s replay-safe behaviour.
776    ///
777    /// On the first call:
778    /// - inserts a row in `workflow_timers` with `fire_at = now + duration`
779    /// - appends a `TimerScheduled` event so the worker can replay and
780    ///   know it's been scheduled (otherwise replays would yield it again)
781    pub async fn schedule_timer(
782        &self,
783        workflow_id: &str,
784        seq: i32,
785        duration_secs: f64,
786    ) -> Result<WorkflowTimer> {
787        if let Some(existing) = self
788            .store
789            .get_timer_by_workflow_seq(workflow_id, seq)
790            .await?
791        {
792            return Ok(existing);
793        }
794
795        let now = timestamp_now();
796        let mut timer = WorkflowTimer {
797            id: None,
798            workflow_id: workflow_id.to_string(),
799            seq,
800            fire_at: now + duration_secs,
801            fired: false,
802        };
803        let id = self.store.create_timer(&timer).await?;
804        timer.id = Some(id);
805
806        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
807        self.store
808            .append_event(&WorkflowEvent {
809                id: None,
810                workflow_id: workflow_id.to_string(),
811                seq: event_seq,
812                event_type: "TimerScheduled".to_string(),
813                payload: Some(
814                    serde_json::json!({
815                        "timer_id": id,
816                        "timer_seq": seq,
817                        "fire_at": timer.fire_at,
818                        "duration_secs": duration_secs,
819                    })
820                    .to_string(),
821                ),
822                timestamp: now,
823            })
824            .await?;
825
826        Ok(timer)
827    }
828
829    /// Finalise a cancellation: flips status to CANCELLED and appends the
830    /// terminal WorkflowCancelled event. Called by the CancelWorkflow
831    /// command handler (worker acknowledged cancel) and by cancel_workflow
832    /// directly when the workflow has no worker yet.
833    pub async fn finalise_cancellation(&self, workflow_id: &str) -> Result<()> {
834        // Avoid double-finalising
835        if let Some(wf) = self.store.get_workflow(workflow_id).await?
836            && wf.status == "CANCELLED"
837        {
838            return Ok(());
839        }
840        self.store
841            .update_workflow_status(workflow_id, WorkflowStatus::Cancelled, None, None)
842            .await?;
843        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
844        self.store
845            .append_event(&WorkflowEvent {
846                id: None,
847                workflow_id: workflow_id.to_string(),
848                seq: event_seq,
849                event_type: "WorkflowCancelled".to_string(),
850                payload: None,
851                timestamp: timestamp_now(),
852            })
853            .await?;
854        Ok(())
855    }
856
857    /// Mark a workflow COMPLETED with a result + append WorkflowCompleted event.
858    /// If the workflow has a parent, also notifies the parent with a
859    /// ChildWorkflowCompleted event and marks it dispatchable so it can
860    /// replay past `ctx:start_child_workflow` and pick up the child's result.
861    pub async fn complete_workflow(&self, workflow_id: &str, result: Option<&str>) -> Result<()> {
862        self.store
863            .update_workflow_status(workflow_id, WorkflowStatus::Completed, result, None)
864            .await?;
865        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
866        self.store
867            .append_event(&WorkflowEvent {
868                id: None,
869                workflow_id: workflow_id.to_string(),
870                seq: event_seq,
871                event_type: "WorkflowCompleted".to_string(),
872                payload: result.map(String::from),
873                timestamp: timestamp_now(),
874            })
875            .await?;
876        self.notify_parent_of_child_outcome(
877            workflow_id,
878            "ChildWorkflowCompleted",
879            serde_json::json!({
880                "child_workflow_id": workflow_id,
881                "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
882            }),
883        )
884        .await?;
885        Ok(())
886    }
887
888    /// Mark a workflow FAILED with an error + append WorkflowFailed event.
889    /// Notifies the parent if any (ChildWorkflowFailed).
890    pub async fn fail_workflow(&self, workflow_id: &str, error: &str) -> Result<()> {
891        self.store
892            .update_workflow_status(workflow_id, WorkflowStatus::Failed, None, Some(error))
893            .await?;
894        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
895        self.store
896            .append_event(&WorkflowEvent {
897                id: None,
898                workflow_id: workflow_id.to_string(),
899                seq: event_seq,
900                event_type: "WorkflowFailed".to_string(),
901                payload: Some(serde_json::json!({"error": error}).to_string()),
902                timestamp: timestamp_now(),
903            })
904            .await?;
905        self.notify_parent_of_child_outcome(
906            workflow_id,
907            "ChildWorkflowFailed",
908            serde_json::json!({
909                "child_workflow_id": workflow_id,
910                "error": error,
911            }),
912        )
913        .await?;
914        Ok(())
915    }
916
917    /// Append a parent-side event when a child reaches a terminal state and
918    /// re-dispatch the parent so it can replay past its `start_child_workflow`
919    /// call. No-op for top-level workflows (no parent_id).
920    async fn notify_parent_of_child_outcome(
921        &self,
922        child_workflow_id: &str,
923        event_type: &str,
924        payload: serde_json::Value,
925    ) -> Result<()> {
926        let Some(child) = self.store.get_workflow(child_workflow_id).await? else {
927            return Ok(());
928        };
929        let Some(parent_id) = child.parent_id else {
930            return Ok(());
931        };
932        let event_seq = self.store.get_event_count(&parent_id).await? as i32 + 1;
933        self.store
934            .append_event(&WorkflowEvent {
935                id: None,
936                workflow_id: parent_id.clone(),
937                seq: event_seq,
938                event_type: event_type.to_string(),
939                payload: Some(payload.to_string()),
940                timestamp: timestamp_now(),
941            })
942            .await?;
943        self.store.mark_workflow_dispatchable(&parent_id).await?;
944        Ok(())
945    }
946
947    pub async fn heartbeat_activity(&self, id: i64, details: Option<&str>) -> Result<()> {
948        self.store.heartbeat_activity(id, details).await
949    }
950
951    // ── Schedule Operations ─────────────────────────────────
952
953    pub async fn create_schedule(&self, schedule: &WorkflowSchedule) -> Result<()> {
954        self.store.create_schedule(schedule).await
955    }
956
957    pub async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
958        self.store.list_schedules(namespace).await
959    }
960
961    pub async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
962        self.store.get_schedule(namespace, name).await
963    }
964
965    pub async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
966        self.store.delete_schedule(namespace, name).await
967    }
968
969    pub async fn update_schedule(
970        &self,
971        namespace: &str,
972        name: &str,
973        patch: &SchedulePatch,
974    ) -> Result<Option<WorkflowSchedule>> {
975        self.store.update_schedule(namespace, name, patch).await
976    }
977
978    pub async fn set_schedule_paused(
979        &self,
980        namespace: &str,
981        name: &str,
982        paused: bool,
983    ) -> Result<Option<WorkflowSchedule>> {
984        self.store.set_schedule_paused(namespace, name, paused).await
985    }
986
987    // ── Namespace Operations ────────────────────────────────
988
989    pub async fn create_namespace(&self, name: &str) -> Result<()> {
990        self.store.create_namespace(name).await
991    }
992
993    pub async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
994        self.store.list_namespaces().await
995    }
996
997    pub async fn delete_namespace(&self, name: &str) -> Result<bool> {
998        self.store.delete_namespace(name).await
999    }
1000
1001    pub async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
1002        self.store.get_namespace_stats(namespace).await
1003    }
1004
1005    pub async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
1006        self.store.get_queue_stats(namespace).await
1007    }
1008
1009    // ── Child Workflow Operations ───────────────────────────
1010
1011    pub async fn start_child_workflow(
1012        &self,
1013        namespace: &str,
1014        parent_id: &str,
1015        workflow_type: &str,
1016        workflow_id: &str,
1017        input: Option<&str>,
1018        task_queue: &str,
1019    ) -> Result<WorkflowRecord> {
1020        let now = timestamp_now();
1021        let run_id = format!("run-{workflow_id}-{}", now as u64);
1022
1023        let wf = WorkflowRecord {
1024            id: workflow_id.to_string(),
1025            namespace: namespace.to_string(),
1026            run_id,
1027            workflow_type: workflow_type.to_string(),
1028            task_queue: task_queue.to_string(),
1029            status: "PENDING".to_string(),
1030            input: input.map(String::from),
1031            result: None,
1032            error: None,
1033            parent_id: Some(parent_id.to_string()),
1034            claimed_by: None,
1035            search_attributes: None,
1036            archived_at: None,
1037            archive_uri: None,
1038            created_at: now,
1039            updated_at: now,
1040            completed_at: None,
1041        };
1042
1043        self.store.create_workflow(&wf).await?;
1044
1045        // Record events on both parent and child
1046        self.store
1047            .append_event(&WorkflowEvent {
1048                id: None,
1049                workflow_id: workflow_id.to_string(),
1050                seq: 1,
1051                event_type: "WorkflowStarted".to_string(),
1052                payload: input.map(String::from),
1053                timestamp: now,
1054            })
1055            .await?;
1056
1057        let parent_seq = self.store.get_event_count(parent_id).await? as i32 + 1;
1058        self.store
1059            .append_event(&WorkflowEvent {
1060                id: None,
1061                workflow_id: parent_id.to_string(),
1062                seq: parent_seq,
1063                event_type: "ChildWorkflowStarted".to_string(),
1064                payload: Some(
1065                    serde_json::json!({
1066                        "child_workflow_id": workflow_id,
1067                        "workflow_type": workflow_type,
1068                    })
1069                    .to_string(),
1070                ),
1071                timestamp: now,
1072            })
1073            .await?;
1074
1075        Ok(wf)
1076    }
1077
1078    pub async fn list_child_workflows(
1079        &self,
1080        parent_id: &str,
1081    ) -> Result<Vec<WorkflowRecord>> {
1082        self.store.list_child_workflows(parent_id).await
1083    }
1084
1085    // ── Continue-as-New ─────────────────────────────────────
1086
1087    pub async fn continue_as_new(
1088        &self,
1089        workflow_id: &str,
1090        input: Option<&str>,
1091    ) -> Result<WorkflowRecord> {
1092        let old_wf = self
1093            .store
1094            .get_workflow(workflow_id)
1095            .await?
1096            .ok_or_else(|| anyhow::anyhow!("workflow not found: {workflow_id}"))?;
1097
1098        // Complete the old workflow
1099        self.store
1100            .update_workflow_status(workflow_id, WorkflowStatus::Completed, None, None)
1101            .await?;
1102
1103        // Start a new run with the same type, namespace, and queue
1104        let new_id = format!("{workflow_id}-continued-{}", timestamp_now() as u64);
1105        self.start_workflow(
1106            &old_wf.namespace,
1107            &old_wf.workflow_type,
1108            &new_id,
1109            input,
1110            &old_wf.task_queue,
1111            old_wf.search_attributes.as_deref(),
1112        )
1113        .await
1114    }
1115
1116    // ── Snapshots ───────────────────────────────────────────
1117
1118    pub async fn create_snapshot(
1119        &self,
1120        workflow_id: &str,
1121        event_seq: i32,
1122        state_json: &str,
1123    ) -> Result<()> {
1124        self.store
1125            .create_snapshot(workflow_id, event_seq, state_json)
1126            .await
1127    }
1128
1129    pub async fn get_latest_snapshot(
1130        &self,
1131        workflow_id: &str,
1132    ) -> Result<Option<WorkflowSnapshot>> {
1133        self.store.get_latest_snapshot(workflow_id).await
1134    }
1135
1136    // ── Side Effects ────────────────────────────────────────
1137
1138    pub async fn record_side_effect(
1139        &self,
1140        workflow_id: &str,
1141        value: &str,
1142    ) -> Result<()> {
1143        let now = timestamp_now();
1144        let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
1145        self.store
1146            .append_event(&WorkflowEvent {
1147                id: None,
1148                workflow_id: workflow_id.to_string(),
1149                seq,
1150                event_type: "SideEffectRecorded".to_string(),
1151                payload: Some(value.to_string()),
1152                timestamp: now,
1153            })
1154            .await?;
1155        Ok(())
1156    }
1157}
1158
1159fn timestamp_now() -> f64 {
1160    std::time::SystemTime::now()
1161        .duration_since(std::time::UNIX_EPOCH)
1162        .unwrap()
1163        .as_secs_f64()
1164}
1165
1166// WorkflowStatus::from_str returns Result, re-export for convenience
1167use std::str::FromStr;