Skip to main content

assay_workflow/
engine.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use tokio::task::JoinHandle;
5use tracing::info;
6
7use crate::dispatch_recovery;
8use crate::health;
9use crate::scheduler;
10use crate::store::WorkflowStore;
11use crate::timers;
12use crate::types::*;
13
14/// The workflow engine. Owns the store and manages background tasks
15/// (scheduler, timer poller, health monitor).
16///
17/// The API layer holds an `Arc<Engine<S>>` and delegates all operations here.
18pub struct Engine<S: WorkflowStore> {
19    store: Arc<S>,
20    _scheduler: JoinHandle<()>,
21    _timer_poller: JoinHandle<()>,
22    _health_monitor: JoinHandle<()>,
23    _dispatch_recovery: JoinHandle<()>,
24    #[cfg(feature = "s3-archival")]
25    _archival: Option<JoinHandle<()>>,
26}
27
28impl<S: WorkflowStore> Engine<S> {
29    /// Start the engine with all background tasks.
30    pub fn start(store: S) -> Self {
31        let store = Arc::new(store);
32
33        let _scheduler = tokio::spawn(scheduler::run_scheduler(Arc::clone(&store)));
34        let _timer_poller = tokio::spawn(timers::run_timer_poller(Arc::clone(&store)));
35        let _health_monitor = tokio::spawn(health::run_health_monitor(Arc::clone(&store)));
36        let _dispatch_recovery = tokio::spawn(dispatch_recovery::run_dispatch_recovery(
37            Arc::clone(&store),
38        ));
39
40        #[cfg(feature = "s3-archival")]
41        let _archival = crate::archival::ArchivalConfig::from_env().map(|cfg| {
42            tokio::spawn(crate::archival::run_archival(Arc::clone(&store), cfg))
43        });
44
45        info!("Workflow engine started");
46
47        Self {
48            store,
49            _scheduler,
50            _timer_poller,
51            _health_monitor,
52            _dispatch_recovery,
53            #[cfg(feature = "s3-archival")]
54            _archival,
55        }
56    }
57
58    /// Access the underlying store (for the API layer).
59    pub fn store(&self) -> &S {
60        &self.store
61    }
62
63    // ── Workflow Operations ─────────────────────────────────
64
65    pub async fn start_workflow(
66        &self,
67        namespace: &str,
68        workflow_type: &str,
69        workflow_id: &str,
70        input: Option<&str>,
71        task_queue: &str,
72        search_attributes: Option<&str>,
73    ) -> Result<WorkflowRecord> {
74        let now = timestamp_now();
75        let run_id = format!("run-{workflow_id}-{}", now as u64);
76
77        let wf = WorkflowRecord {
78            id: workflow_id.to_string(),
79            namespace: namespace.to_string(),
80            run_id,
81            workflow_type: workflow_type.to_string(),
82            task_queue: task_queue.to_string(),
83            status: "PENDING".to_string(),
84            input: input.map(String::from),
85            result: None,
86            error: None,
87            parent_id: None,
88            claimed_by: None,
89            search_attributes: search_attributes.map(String::from),
90            archived_at: None,
91            archive_uri: None,
92            created_at: now,
93            updated_at: now,
94            completed_at: None,
95        };
96
97        self.store.create_workflow(&wf).await?;
98
99        self.store
100            .append_event(&WorkflowEvent {
101                id: None,
102                workflow_id: workflow_id.to_string(),
103                seq: 1,
104                event_type: "WorkflowStarted".to_string(),
105                payload: input.map(String::from),
106                timestamp: now,
107            })
108            .await?;
109
110        // Phase 9: a freshly-started workflow has new events (WorkflowStarted)
111        // that need a worker to replay against — make it dispatchable.
112        self.store.mark_workflow_dispatchable(workflow_id).await?;
113
114        Ok(wf)
115    }
116
117    pub async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
118        self.store.get_workflow(id).await
119    }
120
121    pub async fn list_workflows(
122        &self,
123        namespace: &str,
124        status: Option<WorkflowStatus>,
125        workflow_type: Option<&str>,
126        search_attrs_filter: Option<&str>,
127        limit: i64,
128        offset: i64,
129    ) -> Result<Vec<WorkflowRecord>> {
130        self.store
131            .list_workflows(
132                namespace,
133                status,
134                workflow_type,
135                search_attrs_filter,
136                limit,
137                offset,
138            )
139            .await
140    }
141
142    pub async fn upsert_search_attributes(
143        &self,
144        workflow_id: &str,
145        patch_json: &str,
146    ) -> Result<()> {
147        self.store
148            .upsert_search_attributes(workflow_id, patch_json)
149            .await
150    }
151
152    pub async fn cancel_workflow(&self, id: &str) -> Result<bool> {
153        let wf = self.store.get_workflow(id).await?;
154        match wf {
155            None => Ok(false),
156            Some(wf) => {
157                let status = WorkflowStatus::from_str(&wf.status)
158                    .map_err(|e| anyhow::anyhow!(e))?;
159                if status.is_terminal() {
160                    return Ok(false);
161                }
162
163                // Two-phase cancel:
164                //   1. Append WorkflowCancelRequested + mark dispatchable.
165                //      The next worker replay sees the request, raises a
166                //      cancellation error inside the handler, and submits
167                //      a CancelWorkflow command.
168                //   2. CancelWorkflow command flips status to CANCELLED,
169                //      cancels pending activities/timers, appends
170                //      WorkflowCancelled.
171                //
172                // We cancel pending activities + timers up-front too so a
173                // worker that's about to claim them sees CANCELLED instead.
174                self.store.cancel_pending_activities(id).await?;
175                self.store.cancel_pending_timers(id).await?;
176
177                let seq = self.store.get_event_count(id).await? as i32 + 1;
178                self.store
179                    .append_event(&WorkflowEvent {
180                        id: None,
181                        workflow_id: id.to_string(),
182                        seq,
183                        event_type: "WorkflowCancelRequested".to_string(),
184                        payload: None,
185                        timestamp: timestamp_now(),
186                    })
187                    .await?;
188
189                self.store.mark_workflow_dispatchable(id).await?;
190
191                // Propagate cancellation to all child workflows
192                let children = self.store.list_child_workflows(id).await?;
193                for child in children {
194                    Box::pin(self.cancel_workflow(&child.id)).await?;
195                }
196
197                // For workflows that have NO worker registered (or no
198                // handler running), cancellation would never complete.
199                // Fall back: if the workflow has no events past
200                // WorkflowStarted (handler hasn't actually run yet, e.g.
201                // PENDING with no claim), finalise immediately.
202                if matches!(status, WorkflowStatus::Pending) {
203                    self.finalise_cancellation(id).await?;
204                }
205
206                Ok(true)
207            }
208        }
209    }
210
211    pub async fn terminate_workflow(&self, id: &str, reason: Option<&str>) -> Result<bool> {
212        let wf = self.store.get_workflow(id).await?;
213        match wf {
214            None => Ok(false),
215            Some(wf) => {
216                let status = WorkflowStatus::from_str(&wf.status)
217                    .map_err(|e| anyhow::anyhow!(e))?;
218                if status.is_terminal() {
219                    return Ok(false);
220                }
221
222                self.store
223                    .update_workflow_status(
224                        id,
225                        WorkflowStatus::Failed,
226                        None,
227                        Some(reason.unwrap_or("terminated")),
228                    )
229                    .await?;
230
231                Ok(true)
232            }
233        }
234    }
235
236    // ── Signal Operations ───────────────────────────────────
237
238    pub async fn send_signal(
239        &self,
240        workflow_id: &str,
241        name: &str,
242        payload: Option<&str>,
243    ) -> Result<()> {
244        let now = timestamp_now();
245
246        self.store
247            .send_signal(&WorkflowSignal {
248                id: None,
249                workflow_id: workflow_id.to_string(),
250                name: name.to_string(),
251                payload: payload.map(String::from),
252                consumed: false,
253                received_at: now,
254            })
255            .await?;
256
257        let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
258        // Parse the incoming payload string back to a JSON value so the
259        // event payload nests cleanly (otherwise the recorded payload is
260        // a stringified JSON-inside-JSON and Lua workers would have to
261        // double-decode).
262        let payload_value: serde_json::Value = payload
263            .and_then(|s| serde_json::from_str(s).ok())
264            .unwrap_or(serde_json::Value::Null);
265        self.store
266            .append_event(&WorkflowEvent {
267                id: None,
268                workflow_id: workflow_id.to_string(),
269                seq,
270                event_type: "SignalReceived".to_string(),
271                payload: Some(
272                    serde_json::json!({ "signal": name, "payload": payload_value }).to_string(),
273                ),
274                timestamp: now,
275            })
276            .await?;
277
278        // Phase 9: a workflow waiting on this signal needs to be re-dispatched
279        // so the worker can replay and notice the signal in history.
280        self.store.mark_workflow_dispatchable(workflow_id).await?;
281
282        Ok(())
283    }
284
285    // ── Event History ───────────────────────────────────────
286
287    pub async fn get_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
288        self.store.list_events(workflow_id).await
289    }
290
291    // ── Worker Operations ───────────────────────────────────
292
293    pub async fn register_worker(&self, worker: &WorkflowWorker) -> Result<()> {
294        self.store.register_worker(worker).await
295    }
296
297    pub async fn heartbeat_worker(&self, id: &str) -> Result<()> {
298        self.store.heartbeat_worker(id, timestamp_now()).await
299    }
300
301    pub async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
302        self.store.list_workers(namespace).await
303    }
304
305    // ── Task Operations (for worker polling) ────────────────
306
307    /// Schedule an activity within a workflow.
308    ///
309    /// Idempotent on `(workflow_id, seq)` — if an activity with this sequence
310    /// number already exists for the workflow, returns its id without
311    /// creating a duplicate row or duplicate `ActivityScheduled` event. This
312    /// is essential for deterministic replay: a worker can re-run the
313    /// workflow function and call `schedule_activity(seq=1, ...)` repeatedly
314    /// without producing side effects.
315    ///
316    /// On the first call for a `seq`:
317    /// - inserts a row in `workflow_activities` with status `PENDING`
318    /// - appends an `ActivityScheduled` event to the workflow event log
319    /// - if the workflow is still `PENDING`, transitions it to `RUNNING`
320    pub async fn schedule_activity(
321        &self,
322        workflow_id: &str,
323        seq: i32,
324        name: &str,
325        input: Option<&str>,
326        task_queue: &str,
327        opts: ScheduleActivityOpts,
328    ) -> Result<WorkflowActivity> {
329        // Idempotency: short-circuit if (workflow_id, seq) already exists.
330        if let Some(existing) = self
331            .store
332            .get_activity_by_workflow_seq(workflow_id, seq)
333            .await?
334        {
335            return Ok(existing);
336        }
337
338        let now = timestamp_now();
339        let mut act = WorkflowActivity {
340            id: None,
341            workflow_id: workflow_id.to_string(),
342            seq,
343            name: name.to_string(),
344            task_queue: task_queue.to_string(),
345            input: input.map(String::from),
346            status: "PENDING".to_string(),
347            result: None,
348            error: None,
349            attempt: 1,
350            max_attempts: opts.max_attempts.unwrap_or(3),
351            initial_interval_secs: opts.initial_interval_secs.unwrap_or(1.0),
352            backoff_coefficient: opts.backoff_coefficient.unwrap_or(2.0),
353            start_to_close_secs: opts.start_to_close_secs.unwrap_or(300.0),
354            heartbeat_timeout_secs: opts.heartbeat_timeout_secs,
355            claimed_by: None,
356            scheduled_at: now,
357            started_at: None,
358            completed_at: None,
359            last_heartbeat: None,
360        };
361
362        let id = self.store.create_activity(&act).await?;
363        act.id = Some(id);
364
365        // Append ActivityScheduled event with the activity's seq
366        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
367        self.store
368            .append_event(&WorkflowEvent {
369                id: None,
370                workflow_id: workflow_id.to_string(),
371                seq: event_seq,
372                event_type: "ActivityScheduled".to_string(),
373                payload: Some(
374                    serde_json::json!({
375                        "activity_id": id,
376                        "activity_seq": seq,
377                        "name": name,
378                        "task_queue": task_queue,
379                        "input": input,
380                    })
381                    .to_string(),
382                ),
383                timestamp: now,
384            })
385            .await?;
386
387        // Transition workflow from PENDING to RUNNING on first scheduled activity
388        if let Some(wf) = self.store.get_workflow(workflow_id).await?
389            && wf.status == "PENDING"
390        {
391            self.store
392                .update_workflow_status(workflow_id, WorkflowStatus::Running, None, None)
393                .await?;
394        }
395
396        Ok(act)
397    }
398
399    pub async fn claim_activity(
400        &self,
401        task_queue: &str,
402        worker_id: &str,
403    ) -> Result<Option<WorkflowActivity>> {
404        self.store.claim_activity(task_queue, worker_id).await
405    }
406
407    pub async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
408        self.store.get_activity(id).await
409    }
410
411    /// Mark a successfully-executed activity complete and append an
412    /// `ActivityCompleted` event to the workflow event log so a replaying
413    /// workflow can pick up the cached result.
414    ///
415    /// `failed=true` is preserved for legacy callers that go straight
416    /// through complete with a non-retry path; new code should call
417    /// [`Engine::fail_activity`] instead so retry policy is honored.
418    pub async fn complete_activity(
419        &self,
420        id: i64,
421        result: Option<&str>,
422        error: Option<&str>,
423        failed: bool,
424    ) -> Result<()> {
425        self.store.complete_activity(id, result, error, failed).await?;
426
427        // Look up the activity so we can attribute the event correctly
428        let act = match self.store.get_activity(id).await? {
429            Some(a) => a,
430            None => return Ok(()),
431        };
432
433        let event_type = if failed {
434            "ActivityFailed"
435        } else {
436            "ActivityCompleted"
437        };
438        let payload = serde_json::json!({
439            "activity_id": id,
440            "activity_seq": act.seq,
441            "name": act.name,
442            "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
443            "error": error,
444        });
445        let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
446        let workflow_id = act.workflow_id.clone();
447        self.store
448            .append_event(&WorkflowEvent {
449                id: None,
450                workflow_id: act.workflow_id,
451                seq: event_seq,
452                event_type: event_type.to_string(),
453                payload: Some(payload.to_string()),
454                timestamp: timestamp_now(),
455            })
456            .await?;
457        // Phase 9: the workflow has a new event the handler needs to see —
458        // wake the workflow task back up.
459        self.store.mark_workflow_dispatchable(&workflow_id).await?;
460        Ok(())
461    }
462
463    /// Fail an activity, honoring its retry policy.
464    ///
465    /// If `attempt < max_attempts`, the activity is re-queued with
466    /// exponential backoff (`initial_interval_secs * backoff_coefficient^(attempt-1)`)
467    /// and `attempt` is incremented. **No event is appended** — retries
468    /// are an internal-engine concern, not workflow-visible.
469    ///
470    /// If `attempt >= max_attempts`, the activity is permanently FAILED
471    /// and an `ActivityFailed` event is appended so the workflow can react.
472    pub async fn fail_activity(&self, id: i64, error: &str) -> Result<()> {
473        let act = match self.store.get_activity(id).await? {
474            Some(a) => a,
475            None => return Ok(()),
476        };
477
478        if act.attempt < act.max_attempts {
479            // Compute exponential backoff: interval * coefficient^(attempt-1)
480            let backoff = act.initial_interval_secs
481                * act.backoff_coefficient.powi(act.attempt - 1);
482            let next_scheduled_at = timestamp_now() + backoff;
483            self.store
484                .requeue_activity_for_retry(id, act.attempt + 1, next_scheduled_at)
485                .await?;
486            return Ok(());
487        }
488
489        // Out of retries — mark FAILED and surface to the workflow
490        self.store
491            .complete_activity(id, None, Some(error), true)
492            .await?;
493
494        let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
495        let workflow_id = act.workflow_id.clone();
496        self.store
497            .append_event(&WorkflowEvent {
498                id: None,
499                workflow_id: act.workflow_id,
500                seq: event_seq,
501                event_type: "ActivityFailed".to_string(),
502                payload: Some(
503                    serde_json::json!({
504                        "activity_id": id,
505                        "activity_seq": act.seq,
506                        "name": act.name,
507                        "error": error,
508                        "final_attempt": act.attempt,
509                    })
510                    .to_string(),
511                ),
512                timestamp: timestamp_now(),
513            })
514            .await?;
515        // Wake the workflow task — handler needs to see the failure.
516        self.store.mark_workflow_dispatchable(&workflow_id).await?;
517        Ok(())
518    }
519
520    // ── Workflow-task dispatch (Phase 9) ────────────────────
521
522    /// Claim a dispatchable workflow task on a queue. Returns the workflow
523    /// record + full event history so the worker can replay the handler
524    /// deterministically. Atomic — multiple workers polling the same queue
525    /// will each get a different task or None.
526    pub async fn claim_workflow_task(
527        &self,
528        task_queue: &str,
529        worker_id: &str,
530    ) -> Result<Option<(WorkflowRecord, Vec<WorkflowEvent>)>> {
531        let Some(mut wf) = self
532            .store
533            .claim_workflow_task(task_queue, worker_id)
534            .await?
535        else {
536            return Ok(None);
537        };
538        // Once a worker is processing the workflow it's RUNNING — even if
539        // it ultimately just yields and pauses on a signal/timer. PENDING
540        // means "no worker has touched this yet."
541        if wf.status == "PENDING" {
542            self.store
543                .update_workflow_status(&wf.id, WorkflowStatus::Running, None, None)
544                .await?;
545            wf.status = "RUNNING".to_string();
546        }
547        let history = self.store.list_events(&wf.id).await?;
548        Ok(Some((wf, history)))
549    }
550
551    /// Submit a worker's batch of commands for a workflow it claimed.
552    /// Each command produces durable events / rows transactionally and
553    /// the dispatch lease is released on return.
554    ///
555    /// Supported command types:
556    /// - `ScheduleActivity` { seq, name, task_queue, input?, max_attempts?, ... }
557    /// - `CompleteWorkflow` { result }
558    /// - `FailWorkflow`     { error }
559    pub async fn submit_workflow_commands(
560        &self,
561        workflow_id: &str,
562        worker_id: &str,
563        commands: &[serde_json::Value],
564    ) -> Result<()> {
565        for cmd in commands {
566            let cmd_type = cmd.get("type").and_then(|v| v.as_str()).unwrap_or("");
567            match cmd_type {
568                "ScheduleActivity" => {
569                    let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
570                    let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
571                    let queue = cmd
572                        .get("task_queue")
573                        .and_then(|v| v.as_str())
574                        .unwrap_or("default");
575                    let input = cmd.get("input").map(|v| v.to_string());
576                    let opts = ScheduleActivityOpts {
577                        max_attempts: cmd
578                            .get("max_attempts")
579                            .and_then(|v| v.as_i64())
580                            .map(|n| n as i32),
581                        initial_interval_secs: cmd
582                            .get("initial_interval_secs")
583                            .and_then(|v| v.as_f64()),
584                        backoff_coefficient: cmd
585                            .get("backoff_coefficient")
586                            .and_then(|v| v.as_f64()),
587                        start_to_close_secs: cmd
588                            .get("start_to_close_secs")
589                            .and_then(|v| v.as_f64()),
590                        heartbeat_timeout_secs: cmd
591                            .get("heartbeat_timeout_secs")
592                            .and_then(|v| v.as_f64()),
593                    };
594                    self.schedule_activity(
595                        workflow_id,
596                        seq,
597                        name,
598                        input.as_deref(),
599                        queue,
600                        opts,
601                    )
602                    .await?;
603                }
604                "CancelWorkflow" => {
605                    // Worker acknowledged a cancellation — finalise.
606                    self.finalise_cancellation(workflow_id).await?;
607                }
608                "WaitForSignal" => {
609                    // No engine-side state to write — the workflow has paused
610                    // and will be re-dispatched when a matching signal arrives.
611                    // Releasing the lease (below) is enough; record the wait
612                    // intent for the dashboard / debugging.
613                    //
614                    // When the command carries `timer_seq`, the wait is paired
615                    // with a `ScheduleTimer` yielded in the same batch — the
616                    // worker uses the timer_seq to pick the winner on replay
617                    // (signal vs timeout). The engine stores the pairing on
618                    // the event for observability only.
619                    let signal_name =
620                        cmd.get("name").and_then(|v| v.as_str()).unwrap_or("?");
621                    let timer_seq = cmd.get("timer_seq").and_then(|v| v.as_i64());
622                    let payload = match timer_seq {
623                        Some(ts) => serde_json::json!({
624                            "signal": signal_name,
625                            "timer_seq": ts,
626                        }),
627                        None => serde_json::json!({ "signal": signal_name }),
628                    };
629                    let event_seq =
630                        self.store.get_event_count(workflow_id).await? as i32 + 1;
631                    self.store
632                        .append_event(&WorkflowEvent {
633                            id: None,
634                            workflow_id: workflow_id.to_string(),
635                            seq: event_seq,
636                            event_type: "WorkflowAwaitingSignal".to_string(),
637                            payload: Some(payload.to_string()),
638                            timestamp: timestamp_now(),
639                        })
640                        .await?;
641                }
642                "StartChildWorkflow" => {
643                    let workflow_type = cmd
644                        .get("workflow_type")
645                        .and_then(|v| v.as_str())
646                        .unwrap_or("");
647                    let child_id =
648                        cmd.get("workflow_id").and_then(|v| v.as_str()).unwrap_or("");
649                    let task_queue = cmd
650                        .get("task_queue")
651                        .and_then(|v| v.as_str())
652                        .unwrap_or("default");
653                    let input = cmd.get("input").map(|v| v.to_string());
654                    // Determine the namespace from the parent workflow
655                    let namespace = self
656                        .store
657                        .get_workflow(workflow_id)
658                        .await?
659                        .map(|wf| wf.namespace)
660                        .unwrap_or_else(|| "main".to_string());
661
662                    // Idempotent: if a workflow with this id already exists,
663                    // skip creation (deterministic replay calls this command
664                    // for the same child id on every re-run until the parent
665                    // has the ChildWorkflowCompleted event).
666                    if self.store.get_workflow(child_id).await?.is_none() {
667                        self.start_child_workflow(
668                            &namespace,
669                            workflow_id,
670                            workflow_type,
671                            child_id,
672                            input.as_deref(),
673                            task_queue,
674                        )
675                        .await?;
676                        // Make the child immediately dispatchable so a worker
677                        // picks it up.
678                        self.store.mark_workflow_dispatchable(child_id).await?;
679                    }
680                }
681                "RecordSideEffect" => {
682                    let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
683                    let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
684                    let value =
685                        cmd.get("value").cloned().unwrap_or(serde_json::Value::Null);
686                    let event_seq =
687                        self.store.get_event_count(workflow_id).await? as i32 + 1;
688                    self.store
689                        .append_event(&WorkflowEvent {
690                            id: None,
691                            workflow_id: workflow_id.to_string(),
692                            seq: event_seq,
693                            event_type: "SideEffectRecorded".to_string(),
694                            payload: Some(
695                                serde_json::json!({
696                                    "side_effect_seq": seq,
697                                    "name": name,
698                                    "value": value,
699                                })
700                                .to_string(),
701                            ),
702                            timestamp: timestamp_now(),
703                        })
704                        .await?;
705                    // Side effects don't trigger anything external — the
706                    // workflow needs to immediately continue so it picks
707                    // up the cached value on next replay.
708                    self.store.mark_workflow_dispatchable(workflow_id).await?;
709                }
710                "ScheduleTimer" => {
711                    let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
712                    let duration = cmd
713                        .get("duration_secs")
714                        .and_then(|v| v.as_f64())
715                        .unwrap_or(0.0);
716                    self.schedule_timer(workflow_id, seq, duration).await?;
717                }
718                "UpsertSearchAttributes" => {
719                    // Merge the patch object into the workflow's stored
720                    // search_attributes. Workflow code can call this from
721                    // `ctx:upsert_search_attributes(...)` to surface live
722                    // progress / tenant / env tags that downstream callers
723                    // can filter on via the list endpoint.
724                    let patch = cmd
725                        .get("patch")
726                        .cloned()
727                        .unwrap_or(serde_json::Value::Object(Default::default()));
728                    self.store
729                        .upsert_search_attributes(workflow_id, &patch.to_string())
730                        .await?;
731                }
732                "ContinueAsNew" => {
733                    // Close out the current run and start a new one with the
734                    // same type / namespace / queue under a fresh id. Input
735                    // may be any JSON value; it's serialised and becomes the
736                    // new run's `input`. Called from workflow code via
737                    // `ctx:continue_as_new(input)` to reset event history
738                    // when a handler would otherwise loop forever.
739                    let input = cmd.get("input").map(|v| v.to_string());
740                    self.continue_as_new(workflow_id, input.as_deref())
741                        .await?;
742                }
743                "RecordSnapshot" => {
744                    // Persist the workflow's current query-handler state. Each
745                    // snapshot is keyed by the current event seq so the latest
746                    // is easy to retrieve via `get_latest_snapshot`. Runs on
747                    // every worker replay, which is fine — `create_snapshot`
748                    // is an insert, so each replay adds a new row reflecting
749                    // the state at that point in history.
750                    let state = cmd
751                        .get("state")
752                        .cloned()
753                        .unwrap_or(serde_json::Value::Null);
754                    let event_seq = self.store.get_event_count(workflow_id).await? as i32;
755                    self.store
756                        .create_snapshot(workflow_id, event_seq, &state.to_string())
757                        .await?;
758                }
759                "CompleteWorkflow" => {
760                    let result = cmd.get("result").map(|v| v.to_string());
761                    self.complete_workflow(workflow_id, result.as_deref()).await?;
762                }
763                "FailWorkflow" => {
764                    let error = cmd
765                        .get("error")
766                        .and_then(|v| v.as_str())
767                        .unwrap_or("workflow handler raised an error");
768                    self.fail_workflow(workflow_id, error).await?;
769                }
770                other => {
771                    tracing::warn!("submit_workflow_commands: unknown command type {other:?}");
772                }
773            }
774        }
775
776        self.store
777            .release_workflow_task(workflow_id, worker_id)
778            .await?;
779        Ok(())
780    }
781
782    /// Schedule a durable timer for a workflow.
783    ///
784    /// Idempotent on `(workflow_id, seq)` — a workflow that yields the same
785    /// `ScheduleTimer{seq=N}` on retry will reuse the existing timer, not
786    /// schedule a second one. This is the timer counterpart to
787    /// `schedule_activity`'s replay-safe behaviour.
788    ///
789    /// On the first call:
790    /// - inserts a row in `workflow_timers` with `fire_at = now + duration`
791    /// - appends a `TimerScheduled` event so the worker can replay and
792    ///   know it's been scheduled (otherwise replays would yield it again)
793    pub async fn schedule_timer(
794        &self,
795        workflow_id: &str,
796        seq: i32,
797        duration_secs: f64,
798    ) -> Result<WorkflowTimer> {
799        if let Some(existing) = self
800            .store
801            .get_timer_by_workflow_seq(workflow_id, seq)
802            .await?
803        {
804            return Ok(existing);
805        }
806
807        let now = timestamp_now();
808        let mut timer = WorkflowTimer {
809            id: None,
810            workflow_id: workflow_id.to_string(),
811            seq,
812            fire_at: now + duration_secs,
813            fired: false,
814        };
815        let id = self.store.create_timer(&timer).await?;
816        timer.id = Some(id);
817
818        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
819        self.store
820            .append_event(&WorkflowEvent {
821                id: None,
822                workflow_id: workflow_id.to_string(),
823                seq: event_seq,
824                event_type: "TimerScheduled".to_string(),
825                payload: Some(
826                    serde_json::json!({
827                        "timer_id": id,
828                        "timer_seq": seq,
829                        "fire_at": timer.fire_at,
830                        "duration_secs": duration_secs,
831                    })
832                    .to_string(),
833                ),
834                timestamp: now,
835            })
836            .await?;
837
838        Ok(timer)
839    }
840
841    /// Finalise a cancellation: flips status to CANCELLED and appends the
842    /// terminal WorkflowCancelled event. Called by the CancelWorkflow
843    /// command handler (worker acknowledged cancel) and by cancel_workflow
844    /// directly when the workflow has no worker yet.
845    pub async fn finalise_cancellation(&self, workflow_id: &str) -> Result<()> {
846        // Avoid double-finalising
847        if let Some(wf) = self.store.get_workflow(workflow_id).await?
848            && wf.status == "CANCELLED"
849        {
850            return Ok(());
851        }
852        self.store
853            .update_workflow_status(workflow_id, WorkflowStatus::Cancelled, None, None)
854            .await?;
855        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
856        self.store
857            .append_event(&WorkflowEvent {
858                id: None,
859                workflow_id: workflow_id.to_string(),
860                seq: event_seq,
861                event_type: "WorkflowCancelled".to_string(),
862                payload: None,
863                timestamp: timestamp_now(),
864            })
865            .await?;
866        Ok(())
867    }
868
869    /// Mark a workflow COMPLETED with a result + append WorkflowCompleted event.
870    /// If the workflow has a parent, also notifies the parent with a
871    /// ChildWorkflowCompleted event and marks it dispatchable so it can
872    /// replay past `ctx:start_child_workflow` and pick up the child's result.
873    pub async fn complete_workflow(&self, workflow_id: &str, result: Option<&str>) -> Result<()> {
874        self.store
875            .update_workflow_status(workflow_id, WorkflowStatus::Completed, result, None)
876            .await?;
877        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
878        self.store
879            .append_event(&WorkflowEvent {
880                id: None,
881                workflow_id: workflow_id.to_string(),
882                seq: event_seq,
883                event_type: "WorkflowCompleted".to_string(),
884                payload: result.map(String::from),
885                timestamp: timestamp_now(),
886            })
887            .await?;
888        self.notify_parent_of_child_outcome(
889            workflow_id,
890            "ChildWorkflowCompleted",
891            serde_json::json!({
892                "child_workflow_id": workflow_id,
893                "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
894            }),
895        )
896        .await?;
897        Ok(())
898    }
899
900    /// Mark a workflow FAILED with an error + append WorkflowFailed event.
901    /// Notifies the parent if any (ChildWorkflowFailed).
902    pub async fn fail_workflow(&self, workflow_id: &str, error: &str) -> Result<()> {
903        self.store
904            .update_workflow_status(workflow_id, WorkflowStatus::Failed, None, Some(error))
905            .await?;
906        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
907        self.store
908            .append_event(&WorkflowEvent {
909                id: None,
910                workflow_id: workflow_id.to_string(),
911                seq: event_seq,
912                event_type: "WorkflowFailed".to_string(),
913                payload: Some(serde_json::json!({"error": error}).to_string()),
914                timestamp: timestamp_now(),
915            })
916            .await?;
917        self.notify_parent_of_child_outcome(
918            workflow_id,
919            "ChildWorkflowFailed",
920            serde_json::json!({
921                "child_workflow_id": workflow_id,
922                "error": error,
923            }),
924        )
925        .await?;
926        Ok(())
927    }
928
929    /// Append a parent-side event when a child reaches a terminal state and
930    /// re-dispatch the parent so it can replay past its `start_child_workflow`
931    /// call. No-op for top-level workflows (no parent_id).
932    async fn notify_parent_of_child_outcome(
933        &self,
934        child_workflow_id: &str,
935        event_type: &str,
936        payload: serde_json::Value,
937    ) -> Result<()> {
938        let Some(child) = self.store.get_workflow(child_workflow_id).await? else {
939            return Ok(());
940        };
941        let Some(parent_id) = child.parent_id else {
942            return Ok(());
943        };
944        let event_seq = self.store.get_event_count(&parent_id).await? as i32 + 1;
945        self.store
946            .append_event(&WorkflowEvent {
947                id: None,
948                workflow_id: parent_id.clone(),
949                seq: event_seq,
950                event_type: event_type.to_string(),
951                payload: Some(payload.to_string()),
952                timestamp: timestamp_now(),
953            })
954            .await?;
955        self.store.mark_workflow_dispatchable(&parent_id).await?;
956        Ok(())
957    }
958
959    pub async fn heartbeat_activity(&self, id: i64, details: Option<&str>) -> Result<()> {
960        self.store.heartbeat_activity(id, details).await
961    }
962
963    // ── Schedule Operations ─────────────────────────────────
964
965    pub async fn create_schedule(&self, schedule: &WorkflowSchedule) -> Result<()> {
966        self.store.create_schedule(schedule).await
967    }
968
969    pub async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
970        self.store.list_schedules(namespace).await
971    }
972
973    pub async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
974        self.store.get_schedule(namespace, name).await
975    }
976
977    pub async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
978        self.store.delete_schedule(namespace, name).await
979    }
980
981    pub async fn update_schedule(
982        &self,
983        namespace: &str,
984        name: &str,
985        patch: &SchedulePatch,
986    ) -> Result<Option<WorkflowSchedule>> {
987        self.store.update_schedule(namespace, name, patch).await
988    }
989
990    pub async fn set_schedule_paused(
991        &self,
992        namespace: &str,
993        name: &str,
994        paused: bool,
995    ) -> Result<Option<WorkflowSchedule>> {
996        self.store.set_schedule_paused(namespace, name, paused).await
997    }
998
999    // ── Namespace Operations ────────────────────────────────
1000
1001    pub async fn create_namespace(&self, name: &str) -> Result<()> {
1002        self.store.create_namespace(name).await
1003    }
1004
1005    pub async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
1006        self.store.list_namespaces().await
1007    }
1008
1009    pub async fn delete_namespace(&self, name: &str) -> Result<bool> {
1010        self.store.delete_namespace(name).await
1011    }
1012
1013    pub async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
1014        self.store.get_namespace_stats(namespace).await
1015    }
1016
1017    pub async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
1018        self.store.get_queue_stats(namespace).await
1019    }
1020
1021    // ── Child Workflow Operations ───────────────────────────
1022
1023    pub async fn start_child_workflow(
1024        &self,
1025        namespace: &str,
1026        parent_id: &str,
1027        workflow_type: &str,
1028        workflow_id: &str,
1029        input: Option<&str>,
1030        task_queue: &str,
1031    ) -> Result<WorkflowRecord> {
1032        let now = timestamp_now();
1033        let run_id = format!("run-{workflow_id}-{}", now as u64);
1034
1035        let wf = WorkflowRecord {
1036            id: workflow_id.to_string(),
1037            namespace: namespace.to_string(),
1038            run_id,
1039            workflow_type: workflow_type.to_string(),
1040            task_queue: task_queue.to_string(),
1041            status: "PENDING".to_string(),
1042            input: input.map(String::from),
1043            result: None,
1044            error: None,
1045            parent_id: Some(parent_id.to_string()),
1046            claimed_by: None,
1047            search_attributes: None,
1048            archived_at: None,
1049            archive_uri: None,
1050            created_at: now,
1051            updated_at: now,
1052            completed_at: None,
1053        };
1054
1055        self.store.create_workflow(&wf).await?;
1056
1057        // Record events on both parent and child
1058        self.store
1059            .append_event(&WorkflowEvent {
1060                id: None,
1061                workflow_id: workflow_id.to_string(),
1062                seq: 1,
1063                event_type: "WorkflowStarted".to_string(),
1064                payload: input.map(String::from),
1065                timestamp: now,
1066            })
1067            .await?;
1068
1069        let parent_seq = self.store.get_event_count(parent_id).await? as i32 + 1;
1070        self.store
1071            .append_event(&WorkflowEvent {
1072                id: None,
1073                workflow_id: parent_id.to_string(),
1074                seq: parent_seq,
1075                event_type: "ChildWorkflowStarted".to_string(),
1076                payload: Some(
1077                    serde_json::json!({
1078                        "child_workflow_id": workflow_id,
1079                        "workflow_type": workflow_type,
1080                    })
1081                    .to_string(),
1082                ),
1083                timestamp: now,
1084            })
1085            .await?;
1086
1087        Ok(wf)
1088    }
1089
1090    pub async fn list_child_workflows(
1091        &self,
1092        parent_id: &str,
1093    ) -> Result<Vec<WorkflowRecord>> {
1094        self.store.list_child_workflows(parent_id).await
1095    }
1096
1097    // ── Continue-as-New ─────────────────────────────────────
1098
1099    pub async fn continue_as_new(
1100        &self,
1101        workflow_id: &str,
1102        input: Option<&str>,
1103    ) -> Result<WorkflowRecord> {
1104        let old_wf = self
1105            .store
1106            .get_workflow(workflow_id)
1107            .await?
1108            .ok_or_else(|| anyhow::anyhow!("workflow not found: {workflow_id}"))?;
1109
1110        // Complete the old workflow
1111        self.store
1112            .update_workflow_status(workflow_id, WorkflowStatus::Completed, None, None)
1113            .await?;
1114
1115        // Start a new run with the same type, namespace, and queue
1116        let new_id = format!("{workflow_id}-continued-{}", timestamp_now() as u64);
1117        self.start_workflow(
1118            &old_wf.namespace,
1119            &old_wf.workflow_type,
1120            &new_id,
1121            input,
1122            &old_wf.task_queue,
1123            old_wf.search_attributes.as_deref(),
1124        )
1125        .await
1126    }
1127
1128    // ── Snapshots ───────────────────────────────────────────
1129
1130    pub async fn create_snapshot(
1131        &self,
1132        workflow_id: &str,
1133        event_seq: i32,
1134        state_json: &str,
1135    ) -> Result<()> {
1136        self.store
1137            .create_snapshot(workflow_id, event_seq, state_json)
1138            .await
1139    }
1140
1141    pub async fn get_latest_snapshot(
1142        &self,
1143        workflow_id: &str,
1144    ) -> Result<Option<WorkflowSnapshot>> {
1145        self.store.get_latest_snapshot(workflow_id).await
1146    }
1147
1148    // ── Side Effects ────────────────────────────────────────
1149
1150    pub async fn record_side_effect(
1151        &self,
1152        workflow_id: &str,
1153        value: &str,
1154    ) -> Result<()> {
1155        let now = timestamp_now();
1156        let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
1157        self.store
1158            .append_event(&WorkflowEvent {
1159                id: None,
1160                workflow_id: workflow_id.to_string(),
1161                seq,
1162                event_type: "SideEffectRecorded".to_string(),
1163                payload: Some(value.to_string()),
1164                timestamp: now,
1165            })
1166            .await?;
1167        Ok(())
1168    }
1169}
1170
1171fn timestamp_now() -> f64 {
1172    std::time::SystemTime::now()
1173        .duration_since(std::time::UNIX_EPOCH)
1174        .unwrap()
1175        .as_secs_f64()
1176}
1177
1178// WorkflowStatus::from_str returns Result, re-export for convenience
1179use std::str::FromStr;