1use std::sync::Arc;
2
3use anyhow::Result;
4use tokio::task::JoinHandle;
5use tracing::info;
6
7use crate::dispatch_recovery;
8use crate::health;
9use crate::scheduler;
10use crate::store::WorkflowStore;
11use crate::timers;
12use crate::types::*;
13
14pub struct Engine<S: WorkflowStore> {
19 store: Arc<S>,
20 _scheduler: JoinHandle<()>,
21 _timer_poller: JoinHandle<()>,
22 _health_monitor: JoinHandle<()>,
23 _dispatch_recovery: JoinHandle<()>,
24 #[cfg(feature = "s3-archival")]
25 _archival: Option<JoinHandle<()>>,
26}
27
28impl<S: WorkflowStore> Engine<S> {
29 pub fn start(store: S) -> Self {
31 let store = Arc::new(store);
32
33 let _scheduler = tokio::spawn(scheduler::run_scheduler(Arc::clone(&store)));
34 let _timer_poller = tokio::spawn(timers::run_timer_poller(Arc::clone(&store)));
35 let _health_monitor = tokio::spawn(health::run_health_monitor(Arc::clone(&store)));
36 let _dispatch_recovery = tokio::spawn(dispatch_recovery::run_dispatch_recovery(
37 Arc::clone(&store),
38 ));
39
40 #[cfg(feature = "s3-archival")]
41 let _archival = crate::archival::ArchivalConfig::from_env().map(|cfg| {
42 tokio::spawn(crate::archival::run_archival(Arc::clone(&store), cfg))
43 });
44
45 info!("Workflow engine started");
46
47 Self {
48 store,
49 _scheduler,
50 _timer_poller,
51 _health_monitor,
52 _dispatch_recovery,
53 #[cfg(feature = "s3-archival")]
54 _archival,
55 }
56 }
57
58 pub fn store(&self) -> &S {
60 &self.store
61 }
62
63 pub async fn start_workflow(
66 &self,
67 namespace: &str,
68 workflow_type: &str,
69 workflow_id: &str,
70 input: Option<&str>,
71 task_queue: &str,
72 search_attributes: Option<&str>,
73 ) -> Result<WorkflowRecord> {
74 let now = timestamp_now();
75 let run_id = format!("run-{workflow_id}-{}", now as u64);
76
77 let stamped_attrs = inject_engine_version(search_attributes);
86
87 let wf = WorkflowRecord {
88 id: workflow_id.to_string(),
89 namespace: namespace.to_string(),
90 run_id,
91 workflow_type: workflow_type.to_string(),
92 task_queue: task_queue.to_string(),
93 status: "PENDING".to_string(),
94 input: input.map(String::from),
95 result: None,
96 error: None,
97 parent_id: None,
98 claimed_by: None,
99 search_attributes: stamped_attrs,
100 archived_at: None,
101 archive_uri: None,
102 created_at: now,
103 updated_at: now,
104 completed_at: None,
105 };
106
107 self.store.create_workflow(&wf).await?;
108
109 self.store
110 .append_event(&WorkflowEvent {
111 id: None,
112 workflow_id: workflow_id.to_string(),
113 seq: 1,
114 event_type: "WorkflowStarted".to_string(),
115 payload: input.map(String::from),
116 timestamp: now,
117 })
118 .await?;
119
120 self.store.mark_workflow_dispatchable(workflow_id).await?;
123
124 Ok(wf)
125 }
126
127 pub async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
128 self.store.get_workflow(id).await
129 }
130
131 pub async fn list_workflows(
132 &self,
133 namespace: &str,
134 status: Option<WorkflowStatus>,
135 workflow_type: Option<&str>,
136 search_attrs_filter: Option<&str>,
137 limit: i64,
138 offset: i64,
139 ) -> Result<Vec<WorkflowRecord>> {
140 self.store
141 .list_workflows(
142 namespace,
143 status,
144 workflow_type,
145 search_attrs_filter,
146 limit,
147 offset,
148 )
149 .await
150 }
151
152 pub async fn upsert_search_attributes(
153 &self,
154 workflow_id: &str,
155 patch_json: &str,
156 ) -> Result<()> {
157 self.store
158 .upsert_search_attributes(workflow_id, patch_json)
159 .await
160 }
161
162 pub async fn cancel_workflow(&self, id: &str) -> Result<bool> {
163 let wf = self.store.get_workflow(id).await?;
164 match wf {
165 None => Ok(false),
166 Some(wf) => {
167 let status = WorkflowStatus::from_str(&wf.status)
168 .map_err(|e| anyhow::anyhow!(e))?;
169 if status.is_terminal() {
170 return Ok(false);
171 }
172
173 self.store.cancel_pending_activities(id).await?;
185 self.store.cancel_pending_timers(id).await?;
186
187 let seq = self.store.get_event_count(id).await? as i32 + 1;
188 self.store
189 .append_event(&WorkflowEvent {
190 id: None,
191 workflow_id: id.to_string(),
192 seq,
193 event_type: "WorkflowCancelRequested".to_string(),
194 payload: None,
195 timestamp: timestamp_now(),
196 })
197 .await?;
198
199 self.store.mark_workflow_dispatchable(id).await?;
200
201 let children = self.store.list_child_workflows(id).await?;
203 for child in children {
204 Box::pin(self.cancel_workflow(&child.id)).await?;
205 }
206
207 if matches!(status, WorkflowStatus::Pending) {
213 self.finalise_cancellation(id).await?;
214 }
215
216 Ok(true)
217 }
218 }
219 }
220
221 pub async fn terminate_workflow(&self, id: &str, reason: Option<&str>) -> Result<bool> {
222 let wf = self.store.get_workflow(id).await?;
223 match wf {
224 None => Ok(false),
225 Some(wf) => {
226 let status = WorkflowStatus::from_str(&wf.status)
227 .map_err(|e| anyhow::anyhow!(e))?;
228 if status.is_terminal() {
229 return Ok(false);
230 }
231
232 self.store
233 .update_workflow_status(
234 id,
235 WorkflowStatus::Failed,
236 None,
237 Some(reason.unwrap_or("terminated")),
238 )
239 .await?;
240
241 Ok(true)
242 }
243 }
244 }
245
246 pub async fn send_signal(
249 &self,
250 workflow_id: &str,
251 name: &str,
252 payload: Option<&str>,
253 ) -> Result<()> {
254 let now = timestamp_now();
255
256 self.store
257 .send_signal(&WorkflowSignal {
258 id: None,
259 workflow_id: workflow_id.to_string(),
260 name: name.to_string(),
261 payload: payload.map(String::from),
262 consumed: false,
263 received_at: now,
264 })
265 .await?;
266
267 let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
268 let payload_value: serde_json::Value = payload
273 .and_then(|s| serde_json::from_str(s).ok())
274 .unwrap_or(serde_json::Value::Null);
275 self.store
276 .append_event(&WorkflowEvent {
277 id: None,
278 workflow_id: workflow_id.to_string(),
279 seq,
280 event_type: "SignalReceived".to_string(),
281 payload: Some(
282 serde_json::json!({ "signal": name, "payload": payload_value }).to_string(),
283 ),
284 timestamp: now,
285 })
286 .await?;
287
288 self.store.mark_workflow_dispatchable(workflow_id).await?;
291
292 Ok(())
293 }
294
295 pub async fn get_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
298 self.store.list_events(workflow_id).await
299 }
300
301 pub async fn register_worker(&self, worker: &WorkflowWorker) -> Result<()> {
304 self.store.register_worker(worker).await
305 }
306
307 pub async fn heartbeat_worker(&self, id: &str) -> Result<()> {
308 self.store.heartbeat_worker(id, timestamp_now()).await
309 }
310
311 pub async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
312 self.store.list_workers(namespace).await
313 }
314
315 pub async fn schedule_activity(
331 &self,
332 workflow_id: &str,
333 seq: i32,
334 name: &str,
335 input: Option<&str>,
336 task_queue: &str,
337 opts: ScheduleActivityOpts,
338 ) -> Result<WorkflowActivity> {
339 if let Some(existing) = self
341 .store
342 .get_activity_by_workflow_seq(workflow_id, seq)
343 .await?
344 {
345 return Ok(existing);
346 }
347
348 let now = timestamp_now();
349 let mut act = WorkflowActivity {
350 id: None,
351 workflow_id: workflow_id.to_string(),
352 seq,
353 name: name.to_string(),
354 task_queue: task_queue.to_string(),
355 input: input.map(String::from),
356 status: "PENDING".to_string(),
357 result: None,
358 error: None,
359 attempt: 1,
360 max_attempts: opts.max_attempts.unwrap_or(3),
361 initial_interval_secs: opts.initial_interval_secs.unwrap_or(1.0),
362 backoff_coefficient: opts.backoff_coefficient.unwrap_or(2.0),
363 start_to_close_secs: opts.start_to_close_secs.unwrap_or(300.0),
364 heartbeat_timeout_secs: opts.heartbeat_timeout_secs,
365 claimed_by: None,
366 scheduled_at: now,
367 started_at: None,
368 completed_at: None,
369 last_heartbeat: None,
370 };
371
372 let id = self.store.create_activity(&act).await?;
373 act.id = Some(id);
374
375 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
377 self.store
378 .append_event(&WorkflowEvent {
379 id: None,
380 workflow_id: workflow_id.to_string(),
381 seq: event_seq,
382 event_type: "ActivityScheduled".to_string(),
383 payload: Some(
384 serde_json::json!({
385 "activity_id": id,
386 "activity_seq": seq,
387 "name": name,
388 "task_queue": task_queue,
389 "input": input,
390 })
391 .to_string(),
392 ),
393 timestamp: now,
394 })
395 .await?;
396
397 if let Some(wf) = self.store.get_workflow(workflow_id).await?
399 && wf.status == "PENDING"
400 {
401 self.store
402 .update_workflow_status(workflow_id, WorkflowStatus::Running, None, None)
403 .await?;
404 }
405
406 Ok(act)
407 }
408
409 pub async fn claim_activity(
410 &self,
411 task_queue: &str,
412 worker_id: &str,
413 ) -> Result<Option<WorkflowActivity>> {
414 self.store.claim_activity(task_queue, worker_id).await
415 }
416
417 pub async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
418 self.store.get_activity(id).await
419 }
420
421 pub async fn complete_activity(
429 &self,
430 id: i64,
431 result: Option<&str>,
432 error: Option<&str>,
433 failed: bool,
434 ) -> Result<()> {
435 self.store.complete_activity(id, result, error, failed).await?;
436
437 let act = match self.store.get_activity(id).await? {
439 Some(a) => a,
440 None => return Ok(()),
441 };
442
443 let event_type = if failed {
444 "ActivityFailed"
445 } else {
446 "ActivityCompleted"
447 };
448 let payload = serde_json::json!({
449 "activity_id": id,
450 "activity_seq": act.seq,
451 "name": act.name,
452 "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
453 "error": error,
454 });
455 let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
456 let workflow_id = act.workflow_id.clone();
457 self.store
458 .append_event(&WorkflowEvent {
459 id: None,
460 workflow_id: act.workflow_id,
461 seq: event_seq,
462 event_type: event_type.to_string(),
463 payload: Some(payload.to_string()),
464 timestamp: timestamp_now(),
465 })
466 .await?;
467 self.store.mark_workflow_dispatchable(&workflow_id).await?;
470 Ok(())
471 }
472
473 pub async fn fail_activity(&self, id: i64, error: &str) -> Result<()> {
483 let act = match self.store.get_activity(id).await? {
484 Some(a) => a,
485 None => return Ok(()),
486 };
487
488 if act.attempt < act.max_attempts {
489 let backoff = act.initial_interval_secs
491 * act.backoff_coefficient.powi(act.attempt - 1);
492 let next_scheduled_at = timestamp_now() + backoff;
493 self.store
494 .requeue_activity_for_retry(id, act.attempt + 1, next_scheduled_at)
495 .await?;
496 return Ok(());
497 }
498
499 self.store
501 .complete_activity(id, None, Some(error), true)
502 .await?;
503
504 let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
505 let workflow_id = act.workflow_id.clone();
506 self.store
507 .append_event(&WorkflowEvent {
508 id: None,
509 workflow_id: act.workflow_id,
510 seq: event_seq,
511 event_type: "ActivityFailed".to_string(),
512 payload: Some(
513 serde_json::json!({
514 "activity_id": id,
515 "activity_seq": act.seq,
516 "name": act.name,
517 "error": error,
518 "final_attempt": act.attempt,
519 })
520 .to_string(),
521 ),
522 timestamp: timestamp_now(),
523 })
524 .await?;
525 self.store.mark_workflow_dispatchable(&workflow_id).await?;
527 Ok(())
528 }
529
530 pub async fn claim_workflow_task(
537 &self,
538 task_queue: &str,
539 worker_id: &str,
540 ) -> Result<Option<(WorkflowRecord, Vec<WorkflowEvent>)>> {
541 let Some(mut wf) = self
542 .store
543 .claim_workflow_task(task_queue, worker_id)
544 .await?
545 else {
546 return Ok(None);
547 };
548 if wf.status == "PENDING" {
552 self.store
553 .update_workflow_status(&wf.id, WorkflowStatus::Running, None, None)
554 .await?;
555 wf.status = "RUNNING".to_string();
556 }
557 let history = self.store.list_events(&wf.id).await?;
558 Ok(Some((wf, history)))
559 }
560
561 pub async fn submit_workflow_commands(
570 &self,
571 workflow_id: &str,
572 worker_id: &str,
573 commands: &[serde_json::Value],
574 ) -> Result<()> {
575 for cmd in commands {
576 let cmd_type = cmd.get("type").and_then(|v| v.as_str()).unwrap_or("");
577 match cmd_type {
578 "ScheduleActivity" => {
579 let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
580 let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
581 let queue = cmd
582 .get("task_queue")
583 .and_then(|v| v.as_str())
584 .unwrap_or("default");
585 let input = cmd.get("input").map(|v| v.to_string());
586 let opts = ScheduleActivityOpts {
587 max_attempts: cmd
588 .get("max_attempts")
589 .and_then(|v| v.as_i64())
590 .map(|n| n as i32),
591 initial_interval_secs: cmd
592 .get("initial_interval_secs")
593 .and_then(|v| v.as_f64()),
594 backoff_coefficient: cmd
595 .get("backoff_coefficient")
596 .and_then(|v| v.as_f64()),
597 start_to_close_secs: cmd
598 .get("start_to_close_secs")
599 .and_then(|v| v.as_f64()),
600 heartbeat_timeout_secs: cmd
601 .get("heartbeat_timeout_secs")
602 .and_then(|v| v.as_f64()),
603 };
604 self.schedule_activity(
605 workflow_id,
606 seq,
607 name,
608 input.as_deref(),
609 queue,
610 opts,
611 )
612 .await?;
613 }
614 "CancelWorkflow" => {
615 self.finalise_cancellation(workflow_id).await?;
617 }
618 "WaitForSignal" => {
619 let signal_name =
630 cmd.get("name").and_then(|v| v.as_str()).unwrap_or("?");
631 let timer_seq = cmd.get("timer_seq").and_then(|v| v.as_i64());
632 let payload = match timer_seq {
633 Some(ts) => serde_json::json!({
634 "signal": signal_name,
635 "timer_seq": ts,
636 }),
637 None => serde_json::json!({ "signal": signal_name }),
638 };
639 let event_seq =
640 self.store.get_event_count(workflow_id).await? as i32 + 1;
641 self.store
642 .append_event(&WorkflowEvent {
643 id: None,
644 workflow_id: workflow_id.to_string(),
645 seq: event_seq,
646 event_type: "WorkflowAwaitingSignal".to_string(),
647 payload: Some(payload.to_string()),
648 timestamp: timestamp_now(),
649 })
650 .await?;
651 }
652 "StartChildWorkflow" => {
653 let workflow_type = cmd
654 .get("workflow_type")
655 .and_then(|v| v.as_str())
656 .unwrap_or("");
657 let child_id =
658 cmd.get("workflow_id").and_then(|v| v.as_str()).unwrap_or("");
659 let task_queue = cmd
660 .get("task_queue")
661 .and_then(|v| v.as_str())
662 .unwrap_or("default");
663 let input = cmd.get("input").map(|v| v.to_string());
664 let namespace = self
666 .store
667 .get_workflow(workflow_id)
668 .await?
669 .map(|wf| wf.namespace)
670 .unwrap_or_else(|| "main".to_string());
671
672 if self.store.get_workflow(child_id).await?.is_none() {
677 self.start_child_workflow(
678 &namespace,
679 workflow_id,
680 workflow_type,
681 child_id,
682 input.as_deref(),
683 task_queue,
684 )
685 .await?;
686 self.store.mark_workflow_dispatchable(child_id).await?;
689 }
690 }
691 "RecordSideEffect" => {
692 let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
693 let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
694 let value =
695 cmd.get("value").cloned().unwrap_or(serde_json::Value::Null);
696 let event_seq =
697 self.store.get_event_count(workflow_id).await? as i32 + 1;
698 self.store
699 .append_event(&WorkflowEvent {
700 id: None,
701 workflow_id: workflow_id.to_string(),
702 seq: event_seq,
703 event_type: "SideEffectRecorded".to_string(),
704 payload: Some(
705 serde_json::json!({
706 "side_effect_seq": seq,
707 "name": name,
708 "value": value,
709 })
710 .to_string(),
711 ),
712 timestamp: timestamp_now(),
713 })
714 .await?;
715 self.store.mark_workflow_dispatchable(workflow_id).await?;
719 }
720 "ScheduleTimer" => {
721 let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
722 let duration = cmd
723 .get("duration_secs")
724 .and_then(|v| v.as_f64())
725 .unwrap_or(0.0);
726 self.schedule_timer(workflow_id, seq, duration).await?;
727 }
728 "UpsertSearchAttributes" => {
729 let patch = cmd
735 .get("patch")
736 .cloned()
737 .unwrap_or(serde_json::Value::Object(Default::default()));
738 self.store
739 .upsert_search_attributes(workflow_id, &patch.to_string())
740 .await?;
741 }
742 "ContinueAsNew" => {
743 let input = cmd.get("input").map(|v| v.to_string());
750 self.continue_as_new(workflow_id, input.as_deref())
751 .await?;
752 }
753 "RecordSnapshot" => {
754 let state = cmd
761 .get("state")
762 .cloned()
763 .unwrap_or(serde_json::Value::Null);
764 let event_seq = self.store.get_event_count(workflow_id).await? as i32;
765 self.store
766 .create_snapshot(workflow_id, event_seq, &state.to_string())
767 .await?;
768 }
769 "CompleteWorkflow" => {
770 let result = cmd.get("result").map(|v| v.to_string());
771 self.complete_workflow(workflow_id, result.as_deref()).await?;
772 }
773 "FailWorkflow" => {
774 let error = cmd
775 .get("error")
776 .and_then(|v| v.as_str())
777 .unwrap_or("workflow handler raised an error");
778 self.fail_workflow(workflow_id, error).await?;
779 }
780 other => {
781 tracing::warn!("submit_workflow_commands: unknown command type {other:?}");
782 }
783 }
784 }
785
786 self.store
787 .release_workflow_task(workflow_id, worker_id)
788 .await?;
789 Ok(())
790 }
791
792 pub async fn schedule_timer(
804 &self,
805 workflow_id: &str,
806 seq: i32,
807 duration_secs: f64,
808 ) -> Result<WorkflowTimer> {
809 if let Some(existing) = self
810 .store
811 .get_timer_by_workflow_seq(workflow_id, seq)
812 .await?
813 {
814 return Ok(existing);
815 }
816
817 let now = timestamp_now();
818 let mut timer = WorkflowTimer {
819 id: None,
820 workflow_id: workflow_id.to_string(),
821 seq,
822 fire_at: now + duration_secs,
823 fired: false,
824 };
825 let id = self.store.create_timer(&timer).await?;
826 timer.id = Some(id);
827
828 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
829 self.store
830 .append_event(&WorkflowEvent {
831 id: None,
832 workflow_id: workflow_id.to_string(),
833 seq: event_seq,
834 event_type: "TimerScheduled".to_string(),
835 payload: Some(
836 serde_json::json!({
837 "timer_id": id,
838 "timer_seq": seq,
839 "fire_at": timer.fire_at,
840 "duration_secs": duration_secs,
841 })
842 .to_string(),
843 ),
844 timestamp: now,
845 })
846 .await?;
847
848 Ok(timer)
849 }
850
851 pub async fn finalise_cancellation(&self, workflow_id: &str) -> Result<()> {
856 if let Some(wf) = self.store.get_workflow(workflow_id).await?
858 && wf.status == "CANCELLED"
859 {
860 return Ok(());
861 }
862 self.store
863 .update_workflow_status(workflow_id, WorkflowStatus::Cancelled, None, None)
864 .await?;
865 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
866 self.store
867 .append_event(&WorkflowEvent {
868 id: None,
869 workflow_id: workflow_id.to_string(),
870 seq: event_seq,
871 event_type: "WorkflowCancelled".to_string(),
872 payload: None,
873 timestamp: timestamp_now(),
874 })
875 .await?;
876 Ok(())
877 }
878
879 pub async fn complete_workflow(&self, workflow_id: &str, result: Option<&str>) -> Result<()> {
884 self.store
885 .update_workflow_status(workflow_id, WorkflowStatus::Completed, result, None)
886 .await?;
887 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
888 self.store
889 .append_event(&WorkflowEvent {
890 id: None,
891 workflow_id: workflow_id.to_string(),
892 seq: event_seq,
893 event_type: "WorkflowCompleted".to_string(),
894 payload: result.map(String::from),
895 timestamp: timestamp_now(),
896 })
897 .await?;
898 self.notify_parent_of_child_outcome(
899 workflow_id,
900 "ChildWorkflowCompleted",
901 serde_json::json!({
902 "child_workflow_id": workflow_id,
903 "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
904 }),
905 )
906 .await?;
907 Ok(())
908 }
909
910 pub async fn fail_workflow(&self, workflow_id: &str, error: &str) -> Result<()> {
913 self.store
914 .update_workflow_status(workflow_id, WorkflowStatus::Failed, None, Some(error))
915 .await?;
916 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
917 self.store
918 .append_event(&WorkflowEvent {
919 id: None,
920 workflow_id: workflow_id.to_string(),
921 seq: event_seq,
922 event_type: "WorkflowFailed".to_string(),
923 payload: Some(serde_json::json!({"error": error}).to_string()),
924 timestamp: timestamp_now(),
925 })
926 .await?;
927 self.notify_parent_of_child_outcome(
928 workflow_id,
929 "ChildWorkflowFailed",
930 serde_json::json!({
931 "child_workflow_id": workflow_id,
932 "error": error,
933 }),
934 )
935 .await?;
936 Ok(())
937 }
938
939 async fn notify_parent_of_child_outcome(
943 &self,
944 child_workflow_id: &str,
945 event_type: &str,
946 payload: serde_json::Value,
947 ) -> Result<()> {
948 let Some(child) = self.store.get_workflow(child_workflow_id).await? else {
949 return Ok(());
950 };
951 let Some(parent_id) = child.parent_id else {
952 return Ok(());
953 };
954 let event_seq = self.store.get_event_count(&parent_id).await? as i32 + 1;
955 self.store
956 .append_event(&WorkflowEvent {
957 id: None,
958 workflow_id: parent_id.clone(),
959 seq: event_seq,
960 event_type: event_type.to_string(),
961 payload: Some(payload.to_string()),
962 timestamp: timestamp_now(),
963 })
964 .await?;
965 self.store.mark_workflow_dispatchable(&parent_id).await?;
966 Ok(())
967 }
968
969 pub async fn heartbeat_activity(&self, id: i64, details: Option<&str>) -> Result<()> {
970 self.store.heartbeat_activity(id, details).await
971 }
972
973 pub async fn create_schedule(&self, schedule: &WorkflowSchedule) -> Result<()> {
976 self.store.create_schedule(schedule).await
977 }
978
979 pub async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
980 self.store.list_schedules(namespace).await
981 }
982
983 pub async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
984 self.store.get_schedule(namespace, name).await
985 }
986
987 pub async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
988 self.store.delete_schedule(namespace, name).await
989 }
990
991 pub async fn update_schedule(
992 &self,
993 namespace: &str,
994 name: &str,
995 patch: &SchedulePatch,
996 ) -> Result<Option<WorkflowSchedule>> {
997 self.store.update_schedule(namespace, name, patch).await
998 }
999
1000 pub async fn set_schedule_paused(
1001 &self,
1002 namespace: &str,
1003 name: &str,
1004 paused: bool,
1005 ) -> Result<Option<WorkflowSchedule>> {
1006 self.store.set_schedule_paused(namespace, name, paused).await
1007 }
1008
1009 pub async fn create_namespace(&self, name: &str) -> Result<()> {
1012 self.store.create_namespace(name).await
1013 }
1014
1015 pub async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
1016 self.store.list_namespaces().await
1017 }
1018
1019 pub async fn delete_namespace(&self, name: &str) -> Result<bool> {
1020 self.store.delete_namespace(name).await
1021 }
1022
1023 pub async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
1024 self.store.get_namespace_stats(namespace).await
1025 }
1026
1027 pub async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
1028 self.store.get_queue_stats(namespace).await
1029 }
1030
1031 pub async fn start_child_workflow(
1034 &self,
1035 namespace: &str,
1036 parent_id: &str,
1037 workflow_type: &str,
1038 workflow_id: &str,
1039 input: Option<&str>,
1040 task_queue: &str,
1041 ) -> Result<WorkflowRecord> {
1042 let now = timestamp_now();
1043 let run_id = format!("run-{workflow_id}-{}", now as u64);
1044
1045 let wf = WorkflowRecord {
1046 id: workflow_id.to_string(),
1047 namespace: namespace.to_string(),
1048 run_id,
1049 workflow_type: workflow_type.to_string(),
1050 task_queue: task_queue.to_string(),
1051 status: "PENDING".to_string(),
1052 input: input.map(String::from),
1053 result: None,
1054 error: None,
1055 parent_id: Some(parent_id.to_string()),
1056 claimed_by: None,
1057 search_attributes: None,
1058 archived_at: None,
1059 archive_uri: None,
1060 created_at: now,
1061 updated_at: now,
1062 completed_at: None,
1063 };
1064
1065 self.store.create_workflow(&wf).await?;
1066
1067 self.store
1069 .append_event(&WorkflowEvent {
1070 id: None,
1071 workflow_id: workflow_id.to_string(),
1072 seq: 1,
1073 event_type: "WorkflowStarted".to_string(),
1074 payload: input.map(String::from),
1075 timestamp: now,
1076 })
1077 .await?;
1078
1079 let parent_seq = self.store.get_event_count(parent_id).await? as i32 + 1;
1080 self.store
1081 .append_event(&WorkflowEvent {
1082 id: None,
1083 workflow_id: parent_id.to_string(),
1084 seq: parent_seq,
1085 event_type: "ChildWorkflowStarted".to_string(),
1086 payload: Some(
1087 serde_json::json!({
1088 "child_workflow_id": workflow_id,
1089 "workflow_type": workflow_type,
1090 })
1091 .to_string(),
1092 ),
1093 timestamp: now,
1094 })
1095 .await?;
1096
1097 Ok(wf)
1098 }
1099
1100 pub async fn list_child_workflows(
1101 &self,
1102 parent_id: &str,
1103 ) -> Result<Vec<WorkflowRecord>> {
1104 self.store.list_child_workflows(parent_id).await
1105 }
1106
1107 pub async fn continue_as_new(
1110 &self,
1111 workflow_id: &str,
1112 input: Option<&str>,
1113 ) -> Result<WorkflowRecord> {
1114 let old_wf = self
1115 .store
1116 .get_workflow(workflow_id)
1117 .await?
1118 .ok_or_else(|| anyhow::anyhow!("workflow not found: {workflow_id}"))?;
1119
1120 self.store
1122 .update_workflow_status(workflow_id, WorkflowStatus::Completed, None, None)
1123 .await?;
1124
1125 let new_id = format!("{workflow_id}-continued-{}", timestamp_now() as u64);
1127 self.start_workflow(
1128 &old_wf.namespace,
1129 &old_wf.workflow_type,
1130 &new_id,
1131 input,
1132 &old_wf.task_queue,
1133 old_wf.search_attributes.as_deref(),
1134 )
1135 .await
1136 }
1137
1138 pub async fn create_snapshot(
1141 &self,
1142 workflow_id: &str,
1143 event_seq: i32,
1144 state_json: &str,
1145 ) -> Result<()> {
1146 self.store
1147 .create_snapshot(workflow_id, event_seq, state_json)
1148 .await
1149 }
1150
1151 pub async fn get_latest_snapshot(
1152 &self,
1153 workflow_id: &str,
1154 ) -> Result<Option<WorkflowSnapshot>> {
1155 self.store.get_latest_snapshot(workflow_id).await
1156 }
1157
1158 pub async fn record_side_effect(
1161 &self,
1162 workflow_id: &str,
1163 value: &str,
1164 ) -> Result<()> {
1165 let now = timestamp_now();
1166 let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
1167 self.store
1168 .append_event(&WorkflowEvent {
1169 id: None,
1170 workflow_id: workflow_id.to_string(),
1171 seq,
1172 event_type: "SideEffectRecorded".to_string(),
1173 payload: Some(value.to_string()),
1174 timestamp: now,
1175 })
1176 .await?;
1177 Ok(())
1178 }
1179}
1180
1181fn timestamp_now() -> f64 {
1182 std::time::SystemTime::now()
1183 .duration_since(std::time::UNIX_EPOCH)
1184 .unwrap()
1185 .as_secs_f64()
1186}
1187
1188const ENGINE_VERSION: &str = env!("CARGO_PKG_VERSION");
1192
1193fn inject_engine_version(caller_attrs: Option<&str>) -> Option<String> {
1201 let mut obj: serde_json::Map<String, serde_json::Value> = match caller_attrs {
1202 Some(raw) => match serde_json::from_str::<serde_json::Value>(raw) {
1203 Ok(serde_json::Value::Object(m)) => m,
1204 Ok(other) => return Some(other.to_string()),
1207 Err(_) => return Some(raw.to_string()),
1208 },
1209 None => serde_json::Map::new(),
1210 };
1211 obj.entry("assay_engine_version".to_string())
1212 .or_insert_with(|| serde_json::Value::String(ENGINE_VERSION.to_string()));
1213 Some(serde_json::Value::Object(obj).to_string())
1214}
1215
1216#[cfg(test)]
1217mod engine_version_stamp_tests {
1218 use super::*;
1219
1220 #[test]
1221 fn no_attrs_produces_single_key_object() {
1222 let out = inject_engine_version(None).unwrap();
1223 let v: serde_json::Value = serde_json::from_str(&out).unwrap();
1224 assert_eq!(v["assay_engine_version"], ENGINE_VERSION);
1225 assert_eq!(v.as_object().unwrap().len(), 1);
1226 }
1227
1228 #[test]
1229 fn existing_attrs_gain_the_version_field() {
1230 let out = inject_engine_version(Some(r#"{"env":"prod","tenant":"acme"}"#)).unwrap();
1231 let v: serde_json::Value = serde_json::from_str(&out).unwrap();
1232 assert_eq!(v["env"], "prod");
1233 assert_eq!(v["tenant"], "acme");
1234 assert_eq!(v["assay_engine_version"], ENGINE_VERSION);
1235 }
1236
1237 #[test]
1238 fn caller_supplied_version_wins_on_conflict() {
1239 let out = inject_engine_version(Some(r#"{"assay_engine_version":"0.0.1-test"}"#)).unwrap();
1240 let v: serde_json::Value = serde_json::from_str(&out).unwrap();
1241 assert_eq!(v["assay_engine_version"], "0.0.1-test");
1242 }
1243
1244 #[test]
1245 fn non_object_json_is_preserved_unchanged() {
1246 let out = inject_engine_version(Some("[1, 2, 3]")).unwrap();
1247 assert_eq!(out, "[1,2,3]");
1248 }
1249
1250 #[test]
1251 fn unparsable_json_is_preserved_unchanged() {
1252 let out = inject_engine_version(Some("not json")).unwrap();
1253 assert_eq!(out, "not json");
1254 }
1255}
1256
1257use std::str::FromStr;