Skip to main content

assay_workflow/
engine.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use tokio::task::JoinHandle;
5use tracing::info;
6
7use crate::dispatch_recovery;
8use crate::health;
9use crate::scheduler;
10use crate::store::WorkflowStore;
11use crate::timers;
12use crate::types::*;
13
14/// The workflow engine. Owns the store and manages background tasks
15/// (scheduler, timer poller, health monitor).
16///
17/// The API layer holds an `Arc<Engine<S>>` and delegates all operations here.
18pub struct Engine<S: WorkflowStore> {
19    store: Arc<S>,
20    _scheduler: JoinHandle<()>,
21    _timer_poller: JoinHandle<()>,
22    _health_monitor: JoinHandle<()>,
23    _dispatch_recovery: JoinHandle<()>,
24    #[cfg(feature = "s3-archival")]
25    _archival: Option<JoinHandle<()>>,
26}
27
28impl<S: WorkflowStore> Engine<S> {
29    /// Start the engine with all background tasks.
30    pub fn start(store: S) -> Self {
31        let store = Arc::new(store);
32
33        let _scheduler = tokio::spawn(scheduler::run_scheduler(Arc::clone(&store)));
34        let _timer_poller = tokio::spawn(timers::run_timer_poller(Arc::clone(&store)));
35        let _health_monitor = tokio::spawn(health::run_health_monitor(Arc::clone(&store)));
36        let _dispatch_recovery = tokio::spawn(dispatch_recovery::run_dispatch_recovery(
37            Arc::clone(&store),
38        ));
39
40        #[cfg(feature = "s3-archival")]
41        let _archival = crate::archival::ArchivalConfig::from_env().map(|cfg| {
42            tokio::spawn(crate::archival::run_archival(Arc::clone(&store), cfg))
43        });
44
45        info!("Workflow engine started");
46
47        Self {
48            store,
49            _scheduler,
50            _timer_poller,
51            _health_monitor,
52            _dispatch_recovery,
53            #[cfg(feature = "s3-archival")]
54            _archival,
55        }
56    }
57
58    /// Access the underlying store (for the API layer).
59    pub fn store(&self) -> &S {
60        &self.store
61    }
62
63    // ── Workflow Operations ─────────────────────────────────
64
65    pub async fn start_workflow(
66        &self,
67        namespace: &str,
68        workflow_type: &str,
69        workflow_id: &str,
70        input: Option<&str>,
71        task_queue: &str,
72        search_attributes: Option<&str>,
73    ) -> Result<WorkflowRecord> {
74        let now = timestamp_now();
75        let run_id = format!("run-{workflow_id}-{}", now as u64);
76
77        // Auto-stamp the engine version that started this run into its
78        // search attributes. Makes post-mortem triage concrete: "this
79        // run was v0.11.9, that's why it was stuck in main instead of
80        // deployments" is the kind of question that's otherwise guesswork
81        // once multiple engine versions have coexisted in a deployment.
82        // The operator's own attributes take precedence if they also
83        // supplied `assay_engine_version` — we don't overwrite, just
84        // backfill on the "not set" case.
85        let stamped_attrs = inject_engine_version(search_attributes);
86
87        let wf = WorkflowRecord {
88            id: workflow_id.to_string(),
89            namespace: namespace.to_string(),
90            run_id,
91            workflow_type: workflow_type.to_string(),
92            task_queue: task_queue.to_string(),
93            status: "PENDING".to_string(),
94            input: input.map(String::from),
95            result: None,
96            error: None,
97            parent_id: None,
98            claimed_by: None,
99            search_attributes: stamped_attrs,
100            archived_at: None,
101            archive_uri: None,
102            created_at: now,
103            updated_at: now,
104            completed_at: None,
105        };
106
107        self.store.create_workflow(&wf).await?;
108
109        self.store
110            .append_event(&WorkflowEvent {
111                id: None,
112                workflow_id: workflow_id.to_string(),
113                seq: 1,
114                event_type: "WorkflowStarted".to_string(),
115                payload: input.map(String::from),
116                timestamp: now,
117            })
118            .await?;
119
120        // Phase 9: a freshly-started workflow has new events (WorkflowStarted)
121        // that need a worker to replay against — make it dispatchable.
122        self.store.mark_workflow_dispatchable(workflow_id).await?;
123
124        Ok(wf)
125    }
126
127    pub async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
128        self.store.get_workflow(id).await
129    }
130
131    pub async fn list_workflows(
132        &self,
133        namespace: &str,
134        status: Option<WorkflowStatus>,
135        workflow_type: Option<&str>,
136        search_attrs_filter: Option<&str>,
137        limit: i64,
138        offset: i64,
139    ) -> Result<Vec<WorkflowRecord>> {
140        self.store
141            .list_workflows(
142                namespace,
143                status,
144                workflow_type,
145                search_attrs_filter,
146                limit,
147                offset,
148            )
149            .await
150    }
151
152    pub async fn upsert_search_attributes(
153        &self,
154        workflow_id: &str,
155        patch_json: &str,
156    ) -> Result<()> {
157        self.store
158            .upsert_search_attributes(workflow_id, patch_json)
159            .await
160    }
161
162    pub async fn cancel_workflow(&self, id: &str) -> Result<bool> {
163        let wf = self.store.get_workflow(id).await?;
164        match wf {
165            None => Ok(false),
166            Some(wf) => {
167                let status = WorkflowStatus::from_str(&wf.status)
168                    .map_err(|e| anyhow::anyhow!(e))?;
169                if status.is_terminal() {
170                    return Ok(false);
171                }
172
173                // Two-phase cancel:
174                //   1. Append WorkflowCancelRequested + mark dispatchable.
175                //      The next worker replay sees the request, raises a
176                //      cancellation error inside the handler, and submits
177                //      a CancelWorkflow command.
178                //   2. CancelWorkflow command flips status to CANCELLED,
179                //      cancels pending activities/timers, appends
180                //      WorkflowCancelled.
181                //
182                // We cancel pending activities + timers up-front too so a
183                // worker that's about to claim them sees CANCELLED instead.
184                self.store.cancel_pending_activities(id).await?;
185                self.store.cancel_pending_timers(id).await?;
186
187                let seq = self.store.get_event_count(id).await? as i32 + 1;
188                self.store
189                    .append_event(&WorkflowEvent {
190                        id: None,
191                        workflow_id: id.to_string(),
192                        seq,
193                        event_type: "WorkflowCancelRequested".to_string(),
194                        payload: None,
195                        timestamp: timestamp_now(),
196                    })
197                    .await?;
198
199                self.store.mark_workflow_dispatchable(id).await?;
200
201                // Propagate cancellation to all child workflows
202                let children = self.store.list_child_workflows(id).await?;
203                for child in children {
204                    Box::pin(self.cancel_workflow(&child.id)).await?;
205                }
206
207                // For workflows that have NO worker registered (or no
208                // handler running), cancellation would never complete.
209                // Fall back: if the workflow has no events past
210                // WorkflowStarted (handler hasn't actually run yet, e.g.
211                // PENDING with no claim), finalise immediately.
212                if matches!(status, WorkflowStatus::Pending) {
213                    self.finalise_cancellation(id).await?;
214                }
215
216                Ok(true)
217            }
218        }
219    }
220
221    pub async fn terminate_workflow(&self, id: &str, reason: Option<&str>) -> Result<bool> {
222        let wf = self.store.get_workflow(id).await?;
223        match wf {
224            None => Ok(false),
225            Some(wf) => {
226                let status = WorkflowStatus::from_str(&wf.status)
227                    .map_err(|e| anyhow::anyhow!(e))?;
228                if status.is_terminal() {
229                    return Ok(false);
230                }
231
232                self.store
233                    .update_workflow_status(
234                        id,
235                        WorkflowStatus::Failed,
236                        None,
237                        Some(reason.unwrap_or("terminated")),
238                    )
239                    .await?;
240
241                Ok(true)
242            }
243        }
244    }
245
246    // ── Signal Operations ───────────────────────────────────
247
248    pub async fn send_signal(
249        &self,
250        workflow_id: &str,
251        name: &str,
252        payload: Option<&str>,
253    ) -> Result<()> {
254        let now = timestamp_now();
255
256        self.store
257            .send_signal(&WorkflowSignal {
258                id: None,
259                workflow_id: workflow_id.to_string(),
260                name: name.to_string(),
261                payload: payload.map(String::from),
262                consumed: false,
263                received_at: now,
264            })
265            .await?;
266
267        let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
268        // Parse the incoming payload string back to a JSON value so the
269        // event payload nests cleanly (otherwise the recorded payload is
270        // a stringified JSON-inside-JSON and Lua workers would have to
271        // double-decode).
272        let payload_value: serde_json::Value = payload
273            .and_then(|s| serde_json::from_str(s).ok())
274            .unwrap_or(serde_json::Value::Null);
275        self.store
276            .append_event(&WorkflowEvent {
277                id: None,
278                workflow_id: workflow_id.to_string(),
279                seq,
280                event_type: "SignalReceived".to_string(),
281                payload: Some(
282                    serde_json::json!({ "signal": name, "payload": payload_value }).to_string(),
283                ),
284                timestamp: now,
285            })
286            .await?;
287
288        // Phase 9: a workflow waiting on this signal needs to be re-dispatched
289        // so the worker can replay and notice the signal in history.
290        self.store.mark_workflow_dispatchable(workflow_id).await?;
291
292        Ok(())
293    }
294
295    // ── Event History ───────────────────────────────────────
296
297    pub async fn get_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
298        self.store.list_events(workflow_id).await
299    }
300
301    // ── Worker Operations ───────────────────────────────────
302
303    pub async fn register_worker(&self, worker: &WorkflowWorker) -> Result<()> {
304        self.store.register_worker(worker).await
305    }
306
307    pub async fn heartbeat_worker(&self, id: &str) -> Result<()> {
308        self.store.heartbeat_worker(id, timestamp_now()).await
309    }
310
311    pub async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
312        self.store.list_workers(namespace).await
313    }
314
315    // ── Task Operations (for worker polling) ────────────────
316
317    /// Schedule an activity within a workflow.
318    ///
319    /// Idempotent on `(workflow_id, seq)` — if an activity with this sequence
320    /// number already exists for the workflow, returns its id without
321    /// creating a duplicate row or duplicate `ActivityScheduled` event. This
322    /// is essential for deterministic replay: a worker can re-run the
323    /// workflow function and call `schedule_activity(seq=1, ...)` repeatedly
324    /// without producing side effects.
325    ///
326    /// On the first call for a `seq`:
327    /// - inserts a row in `workflow_activities` with status `PENDING`
328    /// - appends an `ActivityScheduled` event to the workflow event log
329    /// - if the workflow is still `PENDING`, transitions it to `RUNNING`
330    pub async fn schedule_activity(
331        &self,
332        workflow_id: &str,
333        seq: i32,
334        name: &str,
335        input: Option<&str>,
336        task_queue: &str,
337        opts: ScheduleActivityOpts,
338    ) -> Result<WorkflowActivity> {
339        // Idempotency: short-circuit if (workflow_id, seq) already exists.
340        if let Some(existing) = self
341            .store
342            .get_activity_by_workflow_seq(workflow_id, seq)
343            .await?
344        {
345            return Ok(existing);
346        }
347
348        let now = timestamp_now();
349        let mut act = WorkflowActivity {
350            id: None,
351            workflow_id: workflow_id.to_string(),
352            seq,
353            name: name.to_string(),
354            task_queue: task_queue.to_string(),
355            input: input.map(String::from),
356            status: "PENDING".to_string(),
357            result: None,
358            error: None,
359            attempt: 1,
360            max_attempts: opts.max_attempts.unwrap_or(3),
361            initial_interval_secs: opts.initial_interval_secs.unwrap_or(1.0),
362            backoff_coefficient: opts.backoff_coefficient.unwrap_or(2.0),
363            start_to_close_secs: opts.start_to_close_secs.unwrap_or(300.0),
364            heartbeat_timeout_secs: opts.heartbeat_timeout_secs,
365            claimed_by: None,
366            scheduled_at: now,
367            started_at: None,
368            completed_at: None,
369            last_heartbeat: None,
370        };
371
372        let id = self.store.create_activity(&act).await?;
373        act.id = Some(id);
374
375        // Append ActivityScheduled event with the activity's seq
376        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
377        self.store
378            .append_event(&WorkflowEvent {
379                id: None,
380                workflow_id: workflow_id.to_string(),
381                seq: event_seq,
382                event_type: "ActivityScheduled".to_string(),
383                payload: Some(
384                    serde_json::json!({
385                        "activity_id": id,
386                        "activity_seq": seq,
387                        "name": name,
388                        "task_queue": task_queue,
389                        "input": input,
390                    })
391                    .to_string(),
392                ),
393                timestamp: now,
394            })
395            .await?;
396
397        // Transition workflow from PENDING to RUNNING on first scheduled activity
398        if let Some(wf) = self.store.get_workflow(workflow_id).await?
399            && wf.status == "PENDING"
400        {
401            self.store
402                .update_workflow_status(workflow_id, WorkflowStatus::Running, None, None)
403                .await?;
404        }
405
406        Ok(act)
407    }
408
409    pub async fn claim_activity(
410        &self,
411        task_queue: &str,
412        worker_id: &str,
413    ) -> Result<Option<WorkflowActivity>> {
414        self.store.claim_activity(task_queue, worker_id).await
415    }
416
417    pub async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
418        self.store.get_activity(id).await
419    }
420
421    /// Mark a successfully-executed activity complete and append an
422    /// `ActivityCompleted` event to the workflow event log so a replaying
423    /// workflow can pick up the cached result.
424    ///
425    /// `failed=true` is preserved for legacy callers that go straight
426    /// through complete with a non-retry path; new code should call
427    /// [`Engine::fail_activity`] instead so retry policy is honored.
428    pub async fn complete_activity(
429        &self,
430        id: i64,
431        result: Option<&str>,
432        error: Option<&str>,
433        failed: bool,
434    ) -> Result<()> {
435        self.store.complete_activity(id, result, error, failed).await?;
436
437        // Look up the activity so we can attribute the event correctly
438        let act = match self.store.get_activity(id).await? {
439            Some(a) => a,
440            None => return Ok(()),
441        };
442
443        let event_type = if failed {
444            "ActivityFailed"
445        } else {
446            "ActivityCompleted"
447        };
448        let payload = serde_json::json!({
449            "activity_id": id,
450            "activity_seq": act.seq,
451            "name": act.name,
452            "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
453            "error": error,
454        });
455        let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
456        let workflow_id = act.workflow_id.clone();
457        self.store
458            .append_event(&WorkflowEvent {
459                id: None,
460                workflow_id: act.workflow_id,
461                seq: event_seq,
462                event_type: event_type.to_string(),
463                payload: Some(payload.to_string()),
464                timestamp: timestamp_now(),
465            })
466            .await?;
467        // Phase 9: the workflow has a new event the handler needs to see —
468        // wake the workflow task back up.
469        self.store.mark_workflow_dispatchable(&workflow_id).await?;
470        Ok(())
471    }
472
473    /// Fail an activity, honoring its retry policy.
474    ///
475    /// If `attempt < max_attempts`, the activity is re-queued with
476    /// exponential backoff (`initial_interval_secs * backoff_coefficient^(attempt-1)`)
477    /// and `attempt` is incremented. **No event is appended** — retries
478    /// are an internal-engine concern, not workflow-visible.
479    ///
480    /// If `attempt >= max_attempts`, the activity is permanently FAILED
481    /// and an `ActivityFailed` event is appended so the workflow can react.
482    pub async fn fail_activity(&self, id: i64, error: &str) -> Result<()> {
483        let act = match self.store.get_activity(id).await? {
484            Some(a) => a,
485            None => return Ok(()),
486        };
487
488        if act.attempt < act.max_attempts {
489            // Compute exponential backoff: interval * coefficient^(attempt-1)
490            let backoff = act.initial_interval_secs
491                * act.backoff_coefficient.powi(act.attempt - 1);
492            let next_scheduled_at = timestamp_now() + backoff;
493            self.store
494                .requeue_activity_for_retry(id, act.attempt + 1, next_scheduled_at)
495                .await?;
496            return Ok(());
497        }
498
499        // Out of retries — mark FAILED and surface to the workflow
500        self.store
501            .complete_activity(id, None, Some(error), true)
502            .await?;
503
504        let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
505        let workflow_id = act.workflow_id.clone();
506        self.store
507            .append_event(&WorkflowEvent {
508                id: None,
509                workflow_id: act.workflow_id,
510                seq: event_seq,
511                event_type: "ActivityFailed".to_string(),
512                payload: Some(
513                    serde_json::json!({
514                        "activity_id": id,
515                        "activity_seq": act.seq,
516                        "name": act.name,
517                        "error": error,
518                        "final_attempt": act.attempt,
519                    })
520                    .to_string(),
521                ),
522                timestamp: timestamp_now(),
523            })
524            .await?;
525        // Wake the workflow task — handler needs to see the failure.
526        self.store.mark_workflow_dispatchable(&workflow_id).await?;
527        Ok(())
528    }
529
530    // ── Workflow-task dispatch (Phase 9) ────────────────────
531
532    /// Claim a dispatchable workflow task on a queue. Returns the workflow
533    /// record + full event history so the worker can replay the handler
534    /// deterministically. Atomic — multiple workers polling the same queue
535    /// will each get a different task or None.
536    pub async fn claim_workflow_task(
537        &self,
538        task_queue: &str,
539        worker_id: &str,
540    ) -> Result<Option<(WorkflowRecord, Vec<WorkflowEvent>)>> {
541        let Some(mut wf) = self
542            .store
543            .claim_workflow_task(task_queue, worker_id)
544            .await?
545        else {
546            return Ok(None);
547        };
548        // Once a worker is processing the workflow it's RUNNING — even if
549        // it ultimately just yields and pauses on a signal/timer. PENDING
550        // means "no worker has touched this yet."
551        if wf.status == "PENDING" {
552            self.store
553                .update_workflow_status(&wf.id, WorkflowStatus::Running, None, None)
554                .await?;
555            wf.status = "RUNNING".to_string();
556        }
557        let history = self.store.list_events(&wf.id).await?;
558        Ok(Some((wf, history)))
559    }
560
561    /// Submit a worker's batch of commands for a workflow it claimed.
562    /// Each command produces durable events / rows transactionally and
563    /// the dispatch lease is released on return.
564    ///
565    /// Supported command types:
566    /// - `ScheduleActivity` { seq, name, task_queue, input?, max_attempts?, ... }
567    /// - `CompleteWorkflow` { result }
568    /// - `FailWorkflow`     { error }
569    pub async fn submit_workflow_commands(
570        &self,
571        workflow_id: &str,
572        worker_id: &str,
573        commands: &[serde_json::Value],
574    ) -> Result<()> {
575        for cmd in commands {
576            let cmd_type = cmd.get("type").and_then(|v| v.as_str()).unwrap_or("");
577            match cmd_type {
578                "ScheduleActivity" => {
579                    let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
580                    let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
581                    let queue = cmd
582                        .get("task_queue")
583                        .and_then(|v| v.as_str())
584                        .unwrap_or("default");
585                    let input = cmd.get("input").map(|v| v.to_string());
586                    let opts = ScheduleActivityOpts {
587                        max_attempts: cmd
588                            .get("max_attempts")
589                            .and_then(|v| v.as_i64())
590                            .map(|n| n as i32),
591                        initial_interval_secs: cmd
592                            .get("initial_interval_secs")
593                            .and_then(|v| v.as_f64()),
594                        backoff_coefficient: cmd
595                            .get("backoff_coefficient")
596                            .and_then(|v| v.as_f64()),
597                        start_to_close_secs: cmd
598                            .get("start_to_close_secs")
599                            .and_then(|v| v.as_f64()),
600                        heartbeat_timeout_secs: cmd
601                            .get("heartbeat_timeout_secs")
602                            .and_then(|v| v.as_f64()),
603                    };
604                    self.schedule_activity(
605                        workflow_id,
606                        seq,
607                        name,
608                        input.as_deref(),
609                        queue,
610                        opts,
611                    )
612                    .await?;
613                }
614                "CancelWorkflow" => {
615                    // Worker acknowledged a cancellation — finalise.
616                    self.finalise_cancellation(workflow_id).await?;
617                }
618                "WaitForSignal" => {
619                    // No engine-side state to write — the workflow has paused
620                    // and will be re-dispatched when a matching signal arrives.
621                    // Releasing the lease (below) is enough; record the wait
622                    // intent for the dashboard / debugging.
623                    //
624                    // When the command carries `timer_seq`, the wait is paired
625                    // with a `ScheduleTimer` yielded in the same batch — the
626                    // worker uses the timer_seq to pick the winner on replay
627                    // (signal vs timeout). The engine stores the pairing on
628                    // the event for observability only.
629                    let signal_name =
630                        cmd.get("name").and_then(|v| v.as_str()).unwrap_or("?");
631                    let timer_seq = cmd.get("timer_seq").and_then(|v| v.as_i64());
632                    let payload = match timer_seq {
633                        Some(ts) => serde_json::json!({
634                            "signal": signal_name,
635                            "timer_seq": ts,
636                        }),
637                        None => serde_json::json!({ "signal": signal_name }),
638                    };
639                    let event_seq =
640                        self.store.get_event_count(workflow_id).await? as i32 + 1;
641                    self.store
642                        .append_event(&WorkflowEvent {
643                            id: None,
644                            workflow_id: workflow_id.to_string(),
645                            seq: event_seq,
646                            event_type: "WorkflowAwaitingSignal".to_string(),
647                            payload: Some(payload.to_string()),
648                            timestamp: timestamp_now(),
649                        })
650                        .await?;
651                }
652                "StartChildWorkflow" => {
653                    let workflow_type = cmd
654                        .get("workflow_type")
655                        .and_then(|v| v.as_str())
656                        .unwrap_or("");
657                    let child_id =
658                        cmd.get("workflow_id").and_then(|v| v.as_str()).unwrap_or("");
659                    let task_queue = cmd
660                        .get("task_queue")
661                        .and_then(|v| v.as_str())
662                        .unwrap_or("default");
663                    let input = cmd.get("input").map(|v| v.to_string());
664                    // Determine the namespace from the parent workflow
665                    let namespace = self
666                        .store
667                        .get_workflow(workflow_id)
668                        .await?
669                        .map(|wf| wf.namespace)
670                        .unwrap_or_else(|| "main".to_string());
671
672                    // Idempotent: if a workflow with this id already exists,
673                    // skip creation (deterministic replay calls this command
674                    // for the same child id on every re-run until the parent
675                    // has the ChildWorkflowCompleted event).
676                    if self.store.get_workflow(child_id).await?.is_none() {
677                        self.start_child_workflow(
678                            &namespace,
679                            workflow_id,
680                            workflow_type,
681                            child_id,
682                            input.as_deref(),
683                            task_queue,
684                        )
685                        .await?;
686                        // Make the child immediately dispatchable so a worker
687                        // picks it up.
688                        self.store.mark_workflow_dispatchable(child_id).await?;
689                    }
690                }
691                "RecordSideEffect" => {
692                    let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
693                    let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
694                    let value =
695                        cmd.get("value").cloned().unwrap_or(serde_json::Value::Null);
696                    let event_seq =
697                        self.store.get_event_count(workflow_id).await? as i32 + 1;
698                    self.store
699                        .append_event(&WorkflowEvent {
700                            id: None,
701                            workflow_id: workflow_id.to_string(),
702                            seq: event_seq,
703                            event_type: "SideEffectRecorded".to_string(),
704                            payload: Some(
705                                serde_json::json!({
706                                    "side_effect_seq": seq,
707                                    "name": name,
708                                    "value": value,
709                                })
710                                .to_string(),
711                            ),
712                            timestamp: timestamp_now(),
713                        })
714                        .await?;
715                    // Side effects don't trigger anything external — the
716                    // workflow needs to immediately continue so it picks
717                    // up the cached value on next replay.
718                    self.store.mark_workflow_dispatchable(workflow_id).await?;
719                }
720                "ScheduleTimer" => {
721                    let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
722                    let duration = cmd
723                        .get("duration_secs")
724                        .and_then(|v| v.as_f64())
725                        .unwrap_or(0.0);
726                    self.schedule_timer(workflow_id, seq, duration).await?;
727                }
728                "UpsertSearchAttributes" => {
729                    // Merge the patch object into the workflow's stored
730                    // search_attributes. Workflow code can call this from
731                    // `ctx:upsert_search_attributes(...)` to surface live
732                    // progress / tenant / env tags that downstream callers
733                    // can filter on via the list endpoint.
734                    let patch = cmd
735                        .get("patch")
736                        .cloned()
737                        .unwrap_or(serde_json::Value::Object(Default::default()));
738                    self.store
739                        .upsert_search_attributes(workflow_id, &patch.to_string())
740                        .await?;
741                }
742                "ContinueAsNew" => {
743                    // Close out the current run and start a new one with the
744                    // same type / namespace / queue under a fresh id. Input
745                    // may be any JSON value; it's serialised and becomes the
746                    // new run's `input`. Called from workflow code via
747                    // `ctx:continue_as_new(input)` to reset event history
748                    // when a handler would otherwise loop forever.
749                    let input = cmd.get("input").map(|v| v.to_string());
750                    self.continue_as_new(workflow_id, input.as_deref())
751                        .await?;
752                }
753                "RecordSnapshot" => {
754                    // Persist the workflow's current query-handler state. Each
755                    // snapshot is keyed by the current event seq so the latest
756                    // is easy to retrieve via `get_latest_snapshot`. Runs on
757                    // every worker replay, which is fine — `create_snapshot`
758                    // is an insert, so each replay adds a new row reflecting
759                    // the state at that point in history.
760                    let state = cmd
761                        .get("state")
762                        .cloned()
763                        .unwrap_or(serde_json::Value::Null);
764                    let event_seq = self.store.get_event_count(workflow_id).await? as i32;
765                    self.store
766                        .create_snapshot(workflow_id, event_seq, &state.to_string())
767                        .await?;
768                }
769                "CompleteWorkflow" => {
770                    let result = cmd.get("result").map(|v| v.to_string());
771                    self.complete_workflow(workflow_id, result.as_deref()).await?;
772                }
773                "FailWorkflow" => {
774                    let error = cmd
775                        .get("error")
776                        .and_then(|v| v.as_str())
777                        .unwrap_or("workflow handler raised an error");
778                    self.fail_workflow(workflow_id, error).await?;
779                }
780                other => {
781                    tracing::warn!("submit_workflow_commands: unknown command type {other:?}");
782                }
783            }
784        }
785
786        self.store
787            .release_workflow_task(workflow_id, worker_id)
788            .await?;
789        Ok(())
790    }
791
792    /// Schedule a durable timer for a workflow.
793    ///
794    /// Idempotent on `(workflow_id, seq)` — a workflow that yields the same
795    /// `ScheduleTimer{seq=N}` on retry will reuse the existing timer, not
796    /// schedule a second one. This is the timer counterpart to
797    /// `schedule_activity`'s replay-safe behaviour.
798    ///
799    /// On the first call:
800    /// - inserts a row in `workflow_timers` with `fire_at = now + duration`
801    /// - appends a `TimerScheduled` event so the worker can replay and
802    ///   know it's been scheduled (otherwise replays would yield it again)
803    pub async fn schedule_timer(
804        &self,
805        workflow_id: &str,
806        seq: i32,
807        duration_secs: f64,
808    ) -> Result<WorkflowTimer> {
809        if let Some(existing) = self
810            .store
811            .get_timer_by_workflow_seq(workflow_id, seq)
812            .await?
813        {
814            return Ok(existing);
815        }
816
817        let now = timestamp_now();
818        let mut timer = WorkflowTimer {
819            id: None,
820            workflow_id: workflow_id.to_string(),
821            seq,
822            fire_at: now + duration_secs,
823            fired: false,
824        };
825        let id = self.store.create_timer(&timer).await?;
826        timer.id = Some(id);
827
828        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
829        self.store
830            .append_event(&WorkflowEvent {
831                id: None,
832                workflow_id: workflow_id.to_string(),
833                seq: event_seq,
834                event_type: "TimerScheduled".to_string(),
835                payload: Some(
836                    serde_json::json!({
837                        "timer_id": id,
838                        "timer_seq": seq,
839                        "fire_at": timer.fire_at,
840                        "duration_secs": duration_secs,
841                    })
842                    .to_string(),
843                ),
844                timestamp: now,
845            })
846            .await?;
847
848        Ok(timer)
849    }
850
851    /// Finalise a cancellation: flips status to CANCELLED and appends the
852    /// terminal WorkflowCancelled event. Called by the CancelWorkflow
853    /// command handler (worker acknowledged cancel) and by cancel_workflow
854    /// directly when the workflow has no worker yet.
855    pub async fn finalise_cancellation(&self, workflow_id: &str) -> Result<()> {
856        // Avoid double-finalising
857        if let Some(wf) = self.store.get_workflow(workflow_id).await?
858            && wf.status == "CANCELLED"
859        {
860            return Ok(());
861        }
862        self.store
863            .update_workflow_status(workflow_id, WorkflowStatus::Cancelled, None, None)
864            .await?;
865        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
866        self.store
867            .append_event(&WorkflowEvent {
868                id: None,
869                workflow_id: workflow_id.to_string(),
870                seq: event_seq,
871                event_type: "WorkflowCancelled".to_string(),
872                payload: None,
873                timestamp: timestamp_now(),
874            })
875            .await?;
876        Ok(())
877    }
878
879    /// Mark a workflow COMPLETED with a result + append WorkflowCompleted event.
880    /// If the workflow has a parent, also notifies the parent with a
881    /// ChildWorkflowCompleted event and marks it dispatchable so it can
882    /// replay past `ctx:start_child_workflow` and pick up the child's result.
883    pub async fn complete_workflow(&self, workflow_id: &str, result: Option<&str>) -> Result<()> {
884        self.store
885            .update_workflow_status(workflow_id, WorkflowStatus::Completed, result, None)
886            .await?;
887        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
888        self.store
889            .append_event(&WorkflowEvent {
890                id: None,
891                workflow_id: workflow_id.to_string(),
892                seq: event_seq,
893                event_type: "WorkflowCompleted".to_string(),
894                payload: result.map(String::from),
895                timestamp: timestamp_now(),
896            })
897            .await?;
898        self.notify_parent_of_child_outcome(
899            workflow_id,
900            "ChildWorkflowCompleted",
901            serde_json::json!({
902                "child_workflow_id": workflow_id,
903                "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
904            }),
905        )
906        .await?;
907        Ok(())
908    }
909
910    /// Mark a workflow FAILED with an error + append WorkflowFailed event.
911    /// Notifies the parent if any (ChildWorkflowFailed).
912    pub async fn fail_workflow(&self, workflow_id: &str, error: &str) -> Result<()> {
913        self.store
914            .update_workflow_status(workflow_id, WorkflowStatus::Failed, None, Some(error))
915            .await?;
916        let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
917        self.store
918            .append_event(&WorkflowEvent {
919                id: None,
920                workflow_id: workflow_id.to_string(),
921                seq: event_seq,
922                event_type: "WorkflowFailed".to_string(),
923                payload: Some(serde_json::json!({"error": error}).to_string()),
924                timestamp: timestamp_now(),
925            })
926            .await?;
927        self.notify_parent_of_child_outcome(
928            workflow_id,
929            "ChildWorkflowFailed",
930            serde_json::json!({
931                "child_workflow_id": workflow_id,
932                "error": error,
933            }),
934        )
935        .await?;
936        Ok(())
937    }
938
939    /// Append a parent-side event when a child reaches a terminal state and
940    /// re-dispatch the parent so it can replay past its `start_child_workflow`
941    /// call. No-op for top-level workflows (no parent_id).
942    async fn notify_parent_of_child_outcome(
943        &self,
944        child_workflow_id: &str,
945        event_type: &str,
946        payload: serde_json::Value,
947    ) -> Result<()> {
948        let Some(child) = self.store.get_workflow(child_workflow_id).await? else {
949            return Ok(());
950        };
951        let Some(parent_id) = child.parent_id else {
952            return Ok(());
953        };
954        let event_seq = self.store.get_event_count(&parent_id).await? as i32 + 1;
955        self.store
956            .append_event(&WorkflowEvent {
957                id: None,
958                workflow_id: parent_id.clone(),
959                seq: event_seq,
960                event_type: event_type.to_string(),
961                payload: Some(payload.to_string()),
962                timestamp: timestamp_now(),
963            })
964            .await?;
965        self.store.mark_workflow_dispatchable(&parent_id).await?;
966        Ok(())
967    }
968
969    pub async fn heartbeat_activity(&self, id: i64, details: Option<&str>) -> Result<()> {
970        self.store.heartbeat_activity(id, details).await
971    }
972
973    // ── Schedule Operations ─────────────────────────────────
974
975    pub async fn create_schedule(&self, schedule: &WorkflowSchedule) -> Result<()> {
976        self.store.create_schedule(schedule).await
977    }
978
979    pub async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
980        self.store.list_schedules(namespace).await
981    }
982
983    pub async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
984        self.store.get_schedule(namespace, name).await
985    }
986
987    pub async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
988        self.store.delete_schedule(namespace, name).await
989    }
990
991    pub async fn update_schedule(
992        &self,
993        namespace: &str,
994        name: &str,
995        patch: &SchedulePatch,
996    ) -> Result<Option<WorkflowSchedule>> {
997        self.store.update_schedule(namespace, name, patch).await
998    }
999
1000    pub async fn set_schedule_paused(
1001        &self,
1002        namespace: &str,
1003        name: &str,
1004        paused: bool,
1005    ) -> Result<Option<WorkflowSchedule>> {
1006        self.store.set_schedule_paused(namespace, name, paused).await
1007    }
1008
1009    // ── Namespace Operations ────────────────────────────────
1010
1011    pub async fn create_namespace(&self, name: &str) -> Result<()> {
1012        self.store.create_namespace(name).await
1013    }
1014
1015    pub async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
1016        self.store.list_namespaces().await
1017    }
1018
1019    pub async fn delete_namespace(&self, name: &str) -> Result<bool> {
1020        self.store.delete_namespace(name).await
1021    }
1022
1023    pub async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
1024        self.store.get_namespace_stats(namespace).await
1025    }
1026
1027    pub async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
1028        self.store.get_queue_stats(namespace).await
1029    }
1030
1031    // ── Child Workflow Operations ───────────────────────────
1032
1033    pub async fn start_child_workflow(
1034        &self,
1035        namespace: &str,
1036        parent_id: &str,
1037        workflow_type: &str,
1038        workflow_id: &str,
1039        input: Option<&str>,
1040        task_queue: &str,
1041    ) -> Result<WorkflowRecord> {
1042        let now = timestamp_now();
1043        let run_id = format!("run-{workflow_id}-{}", now as u64);
1044
1045        let wf = WorkflowRecord {
1046            id: workflow_id.to_string(),
1047            namespace: namespace.to_string(),
1048            run_id,
1049            workflow_type: workflow_type.to_string(),
1050            task_queue: task_queue.to_string(),
1051            status: "PENDING".to_string(),
1052            input: input.map(String::from),
1053            result: None,
1054            error: None,
1055            parent_id: Some(parent_id.to_string()),
1056            claimed_by: None,
1057            search_attributes: None,
1058            archived_at: None,
1059            archive_uri: None,
1060            created_at: now,
1061            updated_at: now,
1062            completed_at: None,
1063        };
1064
1065        self.store.create_workflow(&wf).await?;
1066
1067        // Record events on both parent and child
1068        self.store
1069            .append_event(&WorkflowEvent {
1070                id: None,
1071                workflow_id: workflow_id.to_string(),
1072                seq: 1,
1073                event_type: "WorkflowStarted".to_string(),
1074                payload: input.map(String::from),
1075                timestamp: now,
1076            })
1077            .await?;
1078
1079        let parent_seq = self.store.get_event_count(parent_id).await? as i32 + 1;
1080        self.store
1081            .append_event(&WorkflowEvent {
1082                id: None,
1083                workflow_id: parent_id.to_string(),
1084                seq: parent_seq,
1085                event_type: "ChildWorkflowStarted".to_string(),
1086                payload: Some(
1087                    serde_json::json!({
1088                        "child_workflow_id": workflow_id,
1089                        "workflow_type": workflow_type,
1090                    })
1091                    .to_string(),
1092                ),
1093                timestamp: now,
1094            })
1095            .await?;
1096
1097        Ok(wf)
1098    }
1099
1100    pub async fn list_child_workflows(
1101        &self,
1102        parent_id: &str,
1103    ) -> Result<Vec<WorkflowRecord>> {
1104        self.store.list_child_workflows(parent_id).await
1105    }
1106
1107    // ── Continue-as-New ─────────────────────────────────────
1108
1109    pub async fn continue_as_new(
1110        &self,
1111        workflow_id: &str,
1112        input: Option<&str>,
1113    ) -> Result<WorkflowRecord> {
1114        let old_wf = self
1115            .store
1116            .get_workflow(workflow_id)
1117            .await?
1118            .ok_or_else(|| anyhow::anyhow!("workflow not found: {workflow_id}"))?;
1119
1120        // Complete the old workflow
1121        self.store
1122            .update_workflow_status(workflow_id, WorkflowStatus::Completed, None, None)
1123            .await?;
1124
1125        // Start a new run with the same type, namespace, and queue
1126        let new_id = format!("{workflow_id}-continued-{}", timestamp_now() as u64);
1127        self.start_workflow(
1128            &old_wf.namespace,
1129            &old_wf.workflow_type,
1130            &new_id,
1131            input,
1132            &old_wf.task_queue,
1133            old_wf.search_attributes.as_deref(),
1134        )
1135        .await
1136    }
1137
1138    // ── Snapshots ───────────────────────────────────────────
1139
1140    pub async fn create_snapshot(
1141        &self,
1142        workflow_id: &str,
1143        event_seq: i32,
1144        state_json: &str,
1145    ) -> Result<()> {
1146        self.store
1147            .create_snapshot(workflow_id, event_seq, state_json)
1148            .await
1149    }
1150
1151    pub async fn get_latest_snapshot(
1152        &self,
1153        workflow_id: &str,
1154    ) -> Result<Option<WorkflowSnapshot>> {
1155        self.store.get_latest_snapshot(workflow_id).await
1156    }
1157
1158    // ── Side Effects ────────────────────────────────────────
1159
1160    pub async fn record_side_effect(
1161        &self,
1162        workflow_id: &str,
1163        value: &str,
1164    ) -> Result<()> {
1165        let now = timestamp_now();
1166        let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
1167        self.store
1168            .append_event(&WorkflowEvent {
1169                id: None,
1170                workflow_id: workflow_id.to_string(),
1171                seq,
1172                event_type: "SideEffectRecorded".to_string(),
1173                payload: Some(value.to_string()),
1174                timestamp: now,
1175            })
1176            .await?;
1177        Ok(())
1178    }
1179}
1180
1181fn timestamp_now() -> f64 {
1182    std::time::SystemTime::now()
1183        .duration_since(std::time::UNIX_EPOCH)
1184        .unwrap()
1185        .as_secs_f64()
1186}
1187
1188/// Engine version (the binary version pulled from Cargo at build time).
1189/// Stamped into every workflow's search_attributes at start so operators
1190/// can correlate runs to the engine release that executed them.
1191const ENGINE_VERSION: &str = env!("CARGO_PKG_VERSION");
1192
1193/// Auto-stamp `assay_engine_version` into a workflow's search attributes.
1194/// Returns `Some` JSON string for the caller to store in the record.
1195///
1196/// If the caller already supplied `assay_engine_version` in their patch,
1197/// we leave their value alone (explicit override wins). Otherwise we
1198/// backfill the running engine's version. Callers who supply no
1199/// attributes at all get a single-key object with just the version.
1200fn inject_engine_version(caller_attrs: Option<&str>) -> Option<String> {
1201    let mut obj: serde_json::Map<String, serde_json::Value> = match caller_attrs {
1202        Some(raw) => match serde_json::from_str::<serde_json::Value>(raw) {
1203            Ok(serde_json::Value::Object(m)) => m,
1204            // Non-object JSON (or unparsable) — preserve as-is without
1205            // stamping; we can't safely merge a key into a non-object.
1206            Ok(other) => return Some(other.to_string()),
1207            Err(_) => return Some(raw.to_string()),
1208        },
1209        None => serde_json::Map::new(),
1210    };
1211    obj.entry("assay_engine_version".to_string())
1212        .or_insert_with(|| serde_json::Value::String(ENGINE_VERSION.to_string()));
1213    Some(serde_json::Value::Object(obj).to_string())
1214}
1215
1216#[cfg(test)]
1217mod engine_version_stamp_tests {
1218    use super::*;
1219
1220    #[test]
1221    fn no_attrs_produces_single_key_object() {
1222        let out = inject_engine_version(None).unwrap();
1223        let v: serde_json::Value = serde_json::from_str(&out).unwrap();
1224        assert_eq!(v["assay_engine_version"], ENGINE_VERSION);
1225        assert_eq!(v.as_object().unwrap().len(), 1);
1226    }
1227
1228    #[test]
1229    fn existing_attrs_gain_the_version_field() {
1230        let out = inject_engine_version(Some(r#"{"env":"prod","tenant":"acme"}"#)).unwrap();
1231        let v: serde_json::Value = serde_json::from_str(&out).unwrap();
1232        assert_eq!(v["env"], "prod");
1233        assert_eq!(v["tenant"], "acme");
1234        assert_eq!(v["assay_engine_version"], ENGINE_VERSION);
1235    }
1236
1237    #[test]
1238    fn caller_supplied_version_wins_on_conflict() {
1239        let out = inject_engine_version(Some(r#"{"assay_engine_version":"0.0.1-test"}"#)).unwrap();
1240        let v: serde_json::Value = serde_json::from_str(&out).unwrap();
1241        assert_eq!(v["assay_engine_version"], "0.0.1-test");
1242    }
1243
1244    #[test]
1245    fn non_object_json_is_preserved_unchanged() {
1246        let out = inject_engine_version(Some("[1, 2, 3]")).unwrap();
1247        assert_eq!(out, "[1,2,3]");
1248    }
1249
1250    #[test]
1251    fn unparsable_json_is_preserved_unchanged() {
1252        let out = inject_engine_version(Some("not json")).unwrap();
1253        assert_eq!(out, "not json");
1254    }
1255}
1256
1257// WorkflowStatus::from_str returns Result, re-export for convenience
1258use std::str::FromStr;