1use std::sync::Arc;
2
3use anyhow::Result;
4use tokio::task::JoinHandle;
5use tracing::info;
6
7use crate::dispatch_recovery;
8use crate::health;
9use crate::scheduler;
10use crate::store::WorkflowStore;
11use crate::timers;
12use crate::types::*;
13
14pub struct Engine<S: WorkflowStore> {
19 store: Arc<S>,
20 _scheduler: JoinHandle<()>,
21 _timer_poller: JoinHandle<()>,
22 _health_monitor: JoinHandle<()>,
23 _dispatch_recovery: JoinHandle<()>,
24 #[cfg(feature = "s3-archival")]
25 _archival: Option<JoinHandle<()>>,
26}
27
28impl<S: WorkflowStore> Engine<S> {
29 pub fn start(store: S) -> Self {
31 let store = Arc::new(store);
32
33 let _scheduler = tokio::spawn(scheduler::run_scheduler(Arc::clone(&store)));
34 let _timer_poller = tokio::spawn(timers::run_timer_poller(Arc::clone(&store)));
35 let _health_monitor = tokio::spawn(health::run_health_monitor(Arc::clone(&store)));
36 let _dispatch_recovery = tokio::spawn(dispatch_recovery::run_dispatch_recovery(
37 Arc::clone(&store),
38 ));
39
40 #[cfg(feature = "s3-archival")]
41 let _archival = crate::archival::ArchivalConfig::from_env().map(|cfg| {
42 tokio::spawn(crate::archival::run_archival(Arc::clone(&store), cfg))
43 });
44
45 info!("Workflow engine started");
46
47 Self {
48 store,
49 _scheduler,
50 _timer_poller,
51 _health_monitor,
52 _dispatch_recovery,
53 #[cfg(feature = "s3-archival")]
54 _archival,
55 }
56 }
57
58 pub fn store(&self) -> &S {
60 &self.store
61 }
62
63 pub async fn start_workflow(
66 &self,
67 namespace: &str,
68 workflow_type: &str,
69 workflow_id: &str,
70 input: Option<&str>,
71 task_queue: &str,
72 search_attributes: Option<&str>,
73 ) -> Result<WorkflowRecord> {
74 let now = timestamp_now();
75 let run_id = format!("run-{workflow_id}-{}", now as u64);
76
77 let wf = WorkflowRecord {
78 id: workflow_id.to_string(),
79 namespace: namespace.to_string(),
80 run_id,
81 workflow_type: workflow_type.to_string(),
82 task_queue: task_queue.to_string(),
83 status: "PENDING".to_string(),
84 input: input.map(String::from),
85 result: None,
86 error: None,
87 parent_id: None,
88 claimed_by: None,
89 search_attributes: search_attributes.map(String::from),
90 archived_at: None,
91 archive_uri: None,
92 created_at: now,
93 updated_at: now,
94 completed_at: None,
95 };
96
97 self.store.create_workflow(&wf).await?;
98
99 self.store
100 .append_event(&WorkflowEvent {
101 id: None,
102 workflow_id: workflow_id.to_string(),
103 seq: 1,
104 event_type: "WorkflowStarted".to_string(),
105 payload: input.map(String::from),
106 timestamp: now,
107 })
108 .await?;
109
110 self.store.mark_workflow_dispatchable(workflow_id).await?;
113
114 Ok(wf)
115 }
116
117 pub async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
118 self.store.get_workflow(id).await
119 }
120
121 pub async fn list_workflows(
122 &self,
123 namespace: &str,
124 status: Option<WorkflowStatus>,
125 workflow_type: Option<&str>,
126 search_attrs_filter: Option<&str>,
127 limit: i64,
128 offset: i64,
129 ) -> Result<Vec<WorkflowRecord>> {
130 self.store
131 .list_workflows(
132 namespace,
133 status,
134 workflow_type,
135 search_attrs_filter,
136 limit,
137 offset,
138 )
139 .await
140 }
141
142 pub async fn upsert_search_attributes(
143 &self,
144 workflow_id: &str,
145 patch_json: &str,
146 ) -> Result<()> {
147 self.store
148 .upsert_search_attributes(workflow_id, patch_json)
149 .await
150 }
151
152 pub async fn cancel_workflow(&self, id: &str) -> Result<bool> {
153 let wf = self.store.get_workflow(id).await?;
154 match wf {
155 None => Ok(false),
156 Some(wf) => {
157 let status = WorkflowStatus::from_str(&wf.status)
158 .map_err(|e| anyhow::anyhow!(e))?;
159 if status.is_terminal() {
160 return Ok(false);
161 }
162
163 self.store.cancel_pending_activities(id).await?;
175 self.store.cancel_pending_timers(id).await?;
176
177 let seq = self.store.get_event_count(id).await? as i32 + 1;
178 self.store
179 .append_event(&WorkflowEvent {
180 id: None,
181 workflow_id: id.to_string(),
182 seq,
183 event_type: "WorkflowCancelRequested".to_string(),
184 payload: None,
185 timestamp: timestamp_now(),
186 })
187 .await?;
188
189 self.store.mark_workflow_dispatchable(id).await?;
190
191 let children = self.store.list_child_workflows(id).await?;
193 for child in children {
194 Box::pin(self.cancel_workflow(&child.id)).await?;
195 }
196
197 if matches!(status, WorkflowStatus::Pending) {
203 self.finalise_cancellation(id).await?;
204 }
205
206 Ok(true)
207 }
208 }
209 }
210
211 pub async fn terminate_workflow(&self, id: &str, reason: Option<&str>) -> Result<bool> {
212 let wf = self.store.get_workflow(id).await?;
213 match wf {
214 None => Ok(false),
215 Some(wf) => {
216 let status = WorkflowStatus::from_str(&wf.status)
217 .map_err(|e| anyhow::anyhow!(e))?;
218 if status.is_terminal() {
219 return Ok(false);
220 }
221
222 self.store
223 .update_workflow_status(
224 id,
225 WorkflowStatus::Failed,
226 None,
227 Some(reason.unwrap_or("terminated")),
228 )
229 .await?;
230
231 Ok(true)
232 }
233 }
234 }
235
236 pub async fn send_signal(
239 &self,
240 workflow_id: &str,
241 name: &str,
242 payload: Option<&str>,
243 ) -> Result<()> {
244 let now = timestamp_now();
245
246 self.store
247 .send_signal(&WorkflowSignal {
248 id: None,
249 workflow_id: workflow_id.to_string(),
250 name: name.to_string(),
251 payload: payload.map(String::from),
252 consumed: false,
253 received_at: now,
254 })
255 .await?;
256
257 let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
258 let payload_value: serde_json::Value = payload
263 .and_then(|s| serde_json::from_str(s).ok())
264 .unwrap_or(serde_json::Value::Null);
265 self.store
266 .append_event(&WorkflowEvent {
267 id: None,
268 workflow_id: workflow_id.to_string(),
269 seq,
270 event_type: "SignalReceived".to_string(),
271 payload: Some(
272 serde_json::json!({ "signal": name, "payload": payload_value }).to_string(),
273 ),
274 timestamp: now,
275 })
276 .await?;
277
278 self.store.mark_workflow_dispatchable(workflow_id).await?;
281
282 Ok(())
283 }
284
285 pub async fn get_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
288 self.store.list_events(workflow_id).await
289 }
290
291 pub async fn register_worker(&self, worker: &WorkflowWorker) -> Result<()> {
294 self.store.register_worker(worker).await
295 }
296
297 pub async fn heartbeat_worker(&self, id: &str) -> Result<()> {
298 self.store.heartbeat_worker(id, timestamp_now()).await
299 }
300
301 pub async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
302 self.store.list_workers(namespace).await
303 }
304
305 pub async fn schedule_activity(
321 &self,
322 workflow_id: &str,
323 seq: i32,
324 name: &str,
325 input: Option<&str>,
326 task_queue: &str,
327 opts: ScheduleActivityOpts,
328 ) -> Result<WorkflowActivity> {
329 if let Some(existing) = self
331 .store
332 .get_activity_by_workflow_seq(workflow_id, seq)
333 .await?
334 {
335 return Ok(existing);
336 }
337
338 let now = timestamp_now();
339 let mut act = WorkflowActivity {
340 id: None,
341 workflow_id: workflow_id.to_string(),
342 seq,
343 name: name.to_string(),
344 task_queue: task_queue.to_string(),
345 input: input.map(String::from),
346 status: "PENDING".to_string(),
347 result: None,
348 error: None,
349 attempt: 1,
350 max_attempts: opts.max_attempts.unwrap_or(3),
351 initial_interval_secs: opts.initial_interval_secs.unwrap_or(1.0),
352 backoff_coefficient: opts.backoff_coefficient.unwrap_or(2.0),
353 start_to_close_secs: opts.start_to_close_secs.unwrap_or(300.0),
354 heartbeat_timeout_secs: opts.heartbeat_timeout_secs,
355 claimed_by: None,
356 scheduled_at: now,
357 started_at: None,
358 completed_at: None,
359 last_heartbeat: None,
360 };
361
362 let id = self.store.create_activity(&act).await?;
363 act.id = Some(id);
364
365 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
367 self.store
368 .append_event(&WorkflowEvent {
369 id: None,
370 workflow_id: workflow_id.to_string(),
371 seq: event_seq,
372 event_type: "ActivityScheduled".to_string(),
373 payload: Some(
374 serde_json::json!({
375 "activity_id": id,
376 "activity_seq": seq,
377 "name": name,
378 "task_queue": task_queue,
379 "input": input,
380 })
381 .to_string(),
382 ),
383 timestamp: now,
384 })
385 .await?;
386
387 if let Some(wf) = self.store.get_workflow(workflow_id).await?
389 && wf.status == "PENDING"
390 {
391 self.store
392 .update_workflow_status(workflow_id, WorkflowStatus::Running, None, None)
393 .await?;
394 }
395
396 Ok(act)
397 }
398
399 pub async fn claim_activity(
400 &self,
401 task_queue: &str,
402 worker_id: &str,
403 ) -> Result<Option<WorkflowActivity>> {
404 self.store.claim_activity(task_queue, worker_id).await
405 }
406
407 pub async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
408 self.store.get_activity(id).await
409 }
410
411 pub async fn complete_activity(
419 &self,
420 id: i64,
421 result: Option<&str>,
422 error: Option<&str>,
423 failed: bool,
424 ) -> Result<()> {
425 self.store.complete_activity(id, result, error, failed).await?;
426
427 let act = match self.store.get_activity(id).await? {
429 Some(a) => a,
430 None => return Ok(()),
431 };
432
433 let event_type = if failed {
434 "ActivityFailed"
435 } else {
436 "ActivityCompleted"
437 };
438 let payload = serde_json::json!({
439 "activity_id": id,
440 "activity_seq": act.seq,
441 "name": act.name,
442 "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
443 "error": error,
444 });
445 let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
446 let workflow_id = act.workflow_id.clone();
447 self.store
448 .append_event(&WorkflowEvent {
449 id: None,
450 workflow_id: act.workflow_id,
451 seq: event_seq,
452 event_type: event_type.to_string(),
453 payload: Some(payload.to_string()),
454 timestamp: timestamp_now(),
455 })
456 .await?;
457 self.store.mark_workflow_dispatchable(&workflow_id).await?;
460 Ok(())
461 }
462
463 pub async fn fail_activity(&self, id: i64, error: &str) -> Result<()> {
473 let act = match self.store.get_activity(id).await? {
474 Some(a) => a,
475 None => return Ok(()),
476 };
477
478 if act.attempt < act.max_attempts {
479 let backoff = act.initial_interval_secs
481 * act.backoff_coefficient.powi(act.attempt - 1);
482 let next_scheduled_at = timestamp_now() + backoff;
483 self.store
484 .requeue_activity_for_retry(id, act.attempt + 1, next_scheduled_at)
485 .await?;
486 return Ok(());
487 }
488
489 self.store
491 .complete_activity(id, None, Some(error), true)
492 .await?;
493
494 let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
495 let workflow_id = act.workflow_id.clone();
496 self.store
497 .append_event(&WorkflowEvent {
498 id: None,
499 workflow_id: act.workflow_id,
500 seq: event_seq,
501 event_type: "ActivityFailed".to_string(),
502 payload: Some(
503 serde_json::json!({
504 "activity_id": id,
505 "activity_seq": act.seq,
506 "name": act.name,
507 "error": error,
508 "final_attempt": act.attempt,
509 })
510 .to_string(),
511 ),
512 timestamp: timestamp_now(),
513 })
514 .await?;
515 self.store.mark_workflow_dispatchable(&workflow_id).await?;
517 Ok(())
518 }
519
520 pub async fn claim_workflow_task(
527 &self,
528 task_queue: &str,
529 worker_id: &str,
530 ) -> Result<Option<(WorkflowRecord, Vec<WorkflowEvent>)>> {
531 let Some(mut wf) = self
532 .store
533 .claim_workflow_task(task_queue, worker_id)
534 .await?
535 else {
536 return Ok(None);
537 };
538 if wf.status == "PENDING" {
542 self.store
543 .update_workflow_status(&wf.id, WorkflowStatus::Running, None, None)
544 .await?;
545 wf.status = "RUNNING".to_string();
546 }
547 let history = self.store.list_events(&wf.id).await?;
548 Ok(Some((wf, history)))
549 }
550
551 pub async fn submit_workflow_commands(
560 &self,
561 workflow_id: &str,
562 worker_id: &str,
563 commands: &[serde_json::Value],
564 ) -> Result<()> {
565 for cmd in commands {
566 let cmd_type = cmd.get("type").and_then(|v| v.as_str()).unwrap_or("");
567 match cmd_type {
568 "ScheduleActivity" => {
569 let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
570 let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
571 let queue = cmd
572 .get("task_queue")
573 .and_then(|v| v.as_str())
574 .unwrap_or("default");
575 let input = cmd.get("input").map(|v| v.to_string());
576 let opts = ScheduleActivityOpts {
577 max_attempts: cmd
578 .get("max_attempts")
579 .and_then(|v| v.as_i64())
580 .map(|n| n as i32),
581 initial_interval_secs: cmd
582 .get("initial_interval_secs")
583 .and_then(|v| v.as_f64()),
584 backoff_coefficient: cmd
585 .get("backoff_coefficient")
586 .and_then(|v| v.as_f64()),
587 start_to_close_secs: cmd
588 .get("start_to_close_secs")
589 .and_then(|v| v.as_f64()),
590 heartbeat_timeout_secs: cmd
591 .get("heartbeat_timeout_secs")
592 .and_then(|v| v.as_f64()),
593 };
594 self.schedule_activity(
595 workflow_id,
596 seq,
597 name,
598 input.as_deref(),
599 queue,
600 opts,
601 )
602 .await?;
603 }
604 "CancelWorkflow" => {
605 self.finalise_cancellation(workflow_id).await?;
607 }
608 "WaitForSignal" => {
609 let signal_name =
614 cmd.get("name").and_then(|v| v.as_str()).unwrap_or("?");
615 let event_seq =
616 self.store.get_event_count(workflow_id).await? as i32 + 1;
617 self.store
618 .append_event(&WorkflowEvent {
619 id: None,
620 workflow_id: workflow_id.to_string(),
621 seq: event_seq,
622 event_type: "WorkflowAwaitingSignal".to_string(),
623 payload: Some(
624 serde_json::json!({ "signal": signal_name }).to_string(),
625 ),
626 timestamp: timestamp_now(),
627 })
628 .await?;
629 }
630 "StartChildWorkflow" => {
631 let workflow_type = cmd
632 .get("workflow_type")
633 .and_then(|v| v.as_str())
634 .unwrap_or("");
635 let child_id =
636 cmd.get("workflow_id").and_then(|v| v.as_str()).unwrap_or("");
637 let task_queue = cmd
638 .get("task_queue")
639 .and_then(|v| v.as_str())
640 .unwrap_or("default");
641 let input = cmd.get("input").map(|v| v.to_string());
642 let namespace = self
644 .store
645 .get_workflow(workflow_id)
646 .await?
647 .map(|wf| wf.namespace)
648 .unwrap_or_else(|| "main".to_string());
649
650 if self.store.get_workflow(child_id).await?.is_none() {
655 self.start_child_workflow(
656 &namespace,
657 workflow_id,
658 workflow_type,
659 child_id,
660 input.as_deref(),
661 task_queue,
662 )
663 .await?;
664 self.store.mark_workflow_dispatchable(child_id).await?;
667 }
668 }
669 "RecordSideEffect" => {
670 let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
671 let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
672 let value =
673 cmd.get("value").cloned().unwrap_or(serde_json::Value::Null);
674 let event_seq =
675 self.store.get_event_count(workflow_id).await? as i32 + 1;
676 self.store
677 .append_event(&WorkflowEvent {
678 id: None,
679 workflow_id: workflow_id.to_string(),
680 seq: event_seq,
681 event_type: "SideEffectRecorded".to_string(),
682 payload: Some(
683 serde_json::json!({
684 "side_effect_seq": seq,
685 "name": name,
686 "value": value,
687 })
688 .to_string(),
689 ),
690 timestamp: timestamp_now(),
691 })
692 .await?;
693 self.store.mark_workflow_dispatchable(workflow_id).await?;
697 }
698 "ScheduleTimer" => {
699 let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
700 let duration = cmd
701 .get("duration_secs")
702 .and_then(|v| v.as_f64())
703 .unwrap_or(0.0);
704 self.schedule_timer(workflow_id, seq, duration).await?;
705 }
706 "UpsertSearchAttributes" => {
707 let patch = cmd
713 .get("patch")
714 .cloned()
715 .unwrap_or(serde_json::Value::Object(Default::default()));
716 self.store
717 .upsert_search_attributes(workflow_id, &patch.to_string())
718 .await?;
719 }
720 "ContinueAsNew" => {
721 let input = cmd.get("input").map(|v| v.to_string());
728 self.continue_as_new(workflow_id, input.as_deref())
729 .await?;
730 }
731 "RecordSnapshot" => {
732 let state = cmd
739 .get("state")
740 .cloned()
741 .unwrap_or(serde_json::Value::Null);
742 let event_seq = self.store.get_event_count(workflow_id).await? as i32;
743 self.store
744 .create_snapshot(workflow_id, event_seq, &state.to_string())
745 .await?;
746 }
747 "CompleteWorkflow" => {
748 let result = cmd.get("result").map(|v| v.to_string());
749 self.complete_workflow(workflow_id, result.as_deref()).await?;
750 }
751 "FailWorkflow" => {
752 let error = cmd
753 .get("error")
754 .and_then(|v| v.as_str())
755 .unwrap_or("workflow handler raised an error");
756 self.fail_workflow(workflow_id, error).await?;
757 }
758 other => {
759 tracing::warn!("submit_workflow_commands: unknown command type {other:?}");
760 }
761 }
762 }
763
764 self.store
765 .release_workflow_task(workflow_id, worker_id)
766 .await?;
767 Ok(())
768 }
769
770 pub async fn schedule_timer(
782 &self,
783 workflow_id: &str,
784 seq: i32,
785 duration_secs: f64,
786 ) -> Result<WorkflowTimer> {
787 if let Some(existing) = self
788 .store
789 .get_timer_by_workflow_seq(workflow_id, seq)
790 .await?
791 {
792 return Ok(existing);
793 }
794
795 let now = timestamp_now();
796 let mut timer = WorkflowTimer {
797 id: None,
798 workflow_id: workflow_id.to_string(),
799 seq,
800 fire_at: now + duration_secs,
801 fired: false,
802 };
803 let id = self.store.create_timer(&timer).await?;
804 timer.id = Some(id);
805
806 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
807 self.store
808 .append_event(&WorkflowEvent {
809 id: None,
810 workflow_id: workflow_id.to_string(),
811 seq: event_seq,
812 event_type: "TimerScheduled".to_string(),
813 payload: Some(
814 serde_json::json!({
815 "timer_id": id,
816 "timer_seq": seq,
817 "fire_at": timer.fire_at,
818 "duration_secs": duration_secs,
819 })
820 .to_string(),
821 ),
822 timestamp: now,
823 })
824 .await?;
825
826 Ok(timer)
827 }
828
829 pub async fn finalise_cancellation(&self, workflow_id: &str) -> Result<()> {
834 if let Some(wf) = self.store.get_workflow(workflow_id).await?
836 && wf.status == "CANCELLED"
837 {
838 return Ok(());
839 }
840 self.store
841 .update_workflow_status(workflow_id, WorkflowStatus::Cancelled, None, None)
842 .await?;
843 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
844 self.store
845 .append_event(&WorkflowEvent {
846 id: None,
847 workflow_id: workflow_id.to_string(),
848 seq: event_seq,
849 event_type: "WorkflowCancelled".to_string(),
850 payload: None,
851 timestamp: timestamp_now(),
852 })
853 .await?;
854 Ok(())
855 }
856
857 pub async fn complete_workflow(&self, workflow_id: &str, result: Option<&str>) -> Result<()> {
862 self.store
863 .update_workflow_status(workflow_id, WorkflowStatus::Completed, result, None)
864 .await?;
865 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
866 self.store
867 .append_event(&WorkflowEvent {
868 id: None,
869 workflow_id: workflow_id.to_string(),
870 seq: event_seq,
871 event_type: "WorkflowCompleted".to_string(),
872 payload: result.map(String::from),
873 timestamp: timestamp_now(),
874 })
875 .await?;
876 self.notify_parent_of_child_outcome(
877 workflow_id,
878 "ChildWorkflowCompleted",
879 serde_json::json!({
880 "child_workflow_id": workflow_id,
881 "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
882 }),
883 )
884 .await?;
885 Ok(())
886 }
887
888 pub async fn fail_workflow(&self, workflow_id: &str, error: &str) -> Result<()> {
891 self.store
892 .update_workflow_status(workflow_id, WorkflowStatus::Failed, None, Some(error))
893 .await?;
894 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
895 self.store
896 .append_event(&WorkflowEvent {
897 id: None,
898 workflow_id: workflow_id.to_string(),
899 seq: event_seq,
900 event_type: "WorkflowFailed".to_string(),
901 payload: Some(serde_json::json!({"error": error}).to_string()),
902 timestamp: timestamp_now(),
903 })
904 .await?;
905 self.notify_parent_of_child_outcome(
906 workflow_id,
907 "ChildWorkflowFailed",
908 serde_json::json!({
909 "child_workflow_id": workflow_id,
910 "error": error,
911 }),
912 )
913 .await?;
914 Ok(())
915 }
916
917 async fn notify_parent_of_child_outcome(
921 &self,
922 child_workflow_id: &str,
923 event_type: &str,
924 payload: serde_json::Value,
925 ) -> Result<()> {
926 let Some(child) = self.store.get_workflow(child_workflow_id).await? else {
927 return Ok(());
928 };
929 let Some(parent_id) = child.parent_id else {
930 return Ok(());
931 };
932 let event_seq = self.store.get_event_count(&parent_id).await? as i32 + 1;
933 self.store
934 .append_event(&WorkflowEvent {
935 id: None,
936 workflow_id: parent_id.clone(),
937 seq: event_seq,
938 event_type: event_type.to_string(),
939 payload: Some(payload.to_string()),
940 timestamp: timestamp_now(),
941 })
942 .await?;
943 self.store.mark_workflow_dispatchable(&parent_id).await?;
944 Ok(())
945 }
946
947 pub async fn heartbeat_activity(&self, id: i64, details: Option<&str>) -> Result<()> {
948 self.store.heartbeat_activity(id, details).await
949 }
950
951 pub async fn create_schedule(&self, schedule: &WorkflowSchedule) -> Result<()> {
954 self.store.create_schedule(schedule).await
955 }
956
957 pub async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
958 self.store.list_schedules(namespace).await
959 }
960
961 pub async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
962 self.store.get_schedule(namespace, name).await
963 }
964
965 pub async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
966 self.store.delete_schedule(namespace, name).await
967 }
968
969 pub async fn update_schedule(
970 &self,
971 namespace: &str,
972 name: &str,
973 patch: &SchedulePatch,
974 ) -> Result<Option<WorkflowSchedule>> {
975 self.store.update_schedule(namespace, name, patch).await
976 }
977
978 pub async fn set_schedule_paused(
979 &self,
980 namespace: &str,
981 name: &str,
982 paused: bool,
983 ) -> Result<Option<WorkflowSchedule>> {
984 self.store.set_schedule_paused(namespace, name, paused).await
985 }
986
987 pub async fn create_namespace(&self, name: &str) -> Result<()> {
990 self.store.create_namespace(name).await
991 }
992
993 pub async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
994 self.store.list_namespaces().await
995 }
996
997 pub async fn delete_namespace(&self, name: &str) -> Result<bool> {
998 self.store.delete_namespace(name).await
999 }
1000
1001 pub async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
1002 self.store.get_namespace_stats(namespace).await
1003 }
1004
1005 pub async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
1006 self.store.get_queue_stats(namespace).await
1007 }
1008
1009 pub async fn start_child_workflow(
1012 &self,
1013 namespace: &str,
1014 parent_id: &str,
1015 workflow_type: &str,
1016 workflow_id: &str,
1017 input: Option<&str>,
1018 task_queue: &str,
1019 ) -> Result<WorkflowRecord> {
1020 let now = timestamp_now();
1021 let run_id = format!("run-{workflow_id}-{}", now as u64);
1022
1023 let wf = WorkflowRecord {
1024 id: workflow_id.to_string(),
1025 namespace: namespace.to_string(),
1026 run_id,
1027 workflow_type: workflow_type.to_string(),
1028 task_queue: task_queue.to_string(),
1029 status: "PENDING".to_string(),
1030 input: input.map(String::from),
1031 result: None,
1032 error: None,
1033 parent_id: Some(parent_id.to_string()),
1034 claimed_by: None,
1035 search_attributes: None,
1036 archived_at: None,
1037 archive_uri: None,
1038 created_at: now,
1039 updated_at: now,
1040 completed_at: None,
1041 };
1042
1043 self.store.create_workflow(&wf).await?;
1044
1045 self.store
1047 .append_event(&WorkflowEvent {
1048 id: None,
1049 workflow_id: workflow_id.to_string(),
1050 seq: 1,
1051 event_type: "WorkflowStarted".to_string(),
1052 payload: input.map(String::from),
1053 timestamp: now,
1054 })
1055 .await?;
1056
1057 let parent_seq = self.store.get_event_count(parent_id).await? as i32 + 1;
1058 self.store
1059 .append_event(&WorkflowEvent {
1060 id: None,
1061 workflow_id: parent_id.to_string(),
1062 seq: parent_seq,
1063 event_type: "ChildWorkflowStarted".to_string(),
1064 payload: Some(
1065 serde_json::json!({
1066 "child_workflow_id": workflow_id,
1067 "workflow_type": workflow_type,
1068 })
1069 .to_string(),
1070 ),
1071 timestamp: now,
1072 })
1073 .await?;
1074
1075 Ok(wf)
1076 }
1077
1078 pub async fn list_child_workflows(
1079 &self,
1080 parent_id: &str,
1081 ) -> Result<Vec<WorkflowRecord>> {
1082 self.store.list_child_workflows(parent_id).await
1083 }
1084
1085 pub async fn continue_as_new(
1088 &self,
1089 workflow_id: &str,
1090 input: Option<&str>,
1091 ) -> Result<WorkflowRecord> {
1092 let old_wf = self
1093 .store
1094 .get_workflow(workflow_id)
1095 .await?
1096 .ok_or_else(|| anyhow::anyhow!("workflow not found: {workflow_id}"))?;
1097
1098 self.store
1100 .update_workflow_status(workflow_id, WorkflowStatus::Completed, None, None)
1101 .await?;
1102
1103 let new_id = format!("{workflow_id}-continued-{}", timestamp_now() as u64);
1105 self.start_workflow(
1106 &old_wf.namespace,
1107 &old_wf.workflow_type,
1108 &new_id,
1109 input,
1110 &old_wf.task_queue,
1111 old_wf.search_attributes.as_deref(),
1112 )
1113 .await
1114 }
1115
1116 pub async fn create_snapshot(
1119 &self,
1120 workflow_id: &str,
1121 event_seq: i32,
1122 state_json: &str,
1123 ) -> Result<()> {
1124 self.store
1125 .create_snapshot(workflow_id, event_seq, state_json)
1126 .await
1127 }
1128
1129 pub async fn get_latest_snapshot(
1130 &self,
1131 workflow_id: &str,
1132 ) -> Result<Option<WorkflowSnapshot>> {
1133 self.store.get_latest_snapshot(workflow_id).await
1134 }
1135
1136 pub async fn record_side_effect(
1139 &self,
1140 workflow_id: &str,
1141 value: &str,
1142 ) -> Result<()> {
1143 let now = timestamp_now();
1144 let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
1145 self.store
1146 .append_event(&WorkflowEvent {
1147 id: None,
1148 workflow_id: workflow_id.to_string(),
1149 seq,
1150 event_type: "SideEffectRecorded".to_string(),
1151 payload: Some(value.to_string()),
1152 timestamp: now,
1153 })
1154 .await?;
1155 Ok(())
1156 }
1157}
1158
1159fn timestamp_now() -> f64 {
1160 std::time::SystemTime::now()
1161 .duration_since(std::time::UNIX_EPOCH)
1162 .unwrap()
1163 .as_secs_f64()
1164}
1165
1166use std::str::FromStr;