1use std::sync::Arc;
2
3use anyhow::Result;
4use tokio::task::JoinHandle;
5use tracing::info;
6
7use crate::dispatch_recovery;
8use crate::health;
9use crate::scheduler;
10use crate::store::WorkflowStore;
11use crate::timers;
12use crate::types::*;
13
14pub struct Engine<S: WorkflowStore> {
19 store: Arc<S>,
20 _scheduler: JoinHandle<()>,
21 _timer_poller: JoinHandle<()>,
22 _health_monitor: JoinHandle<()>,
23 _dispatch_recovery: JoinHandle<()>,
24 #[cfg(feature = "s3-archival")]
25 _archival: Option<JoinHandle<()>>,
26}
27
28impl<S: WorkflowStore> Engine<S> {
29 pub fn start(store: S) -> Self {
31 let store = Arc::new(store);
32
33 let _scheduler = tokio::spawn(scheduler::run_scheduler(Arc::clone(&store)));
34 let _timer_poller = tokio::spawn(timers::run_timer_poller(Arc::clone(&store)));
35 let _health_monitor = tokio::spawn(health::run_health_monitor(Arc::clone(&store)));
36 let _dispatch_recovery = tokio::spawn(dispatch_recovery::run_dispatch_recovery(
37 Arc::clone(&store),
38 ));
39
40 #[cfg(feature = "s3-archival")]
41 let _archival = crate::archival::ArchivalConfig::from_env().map(|cfg| {
42 tokio::spawn(crate::archival::run_archival(Arc::clone(&store), cfg))
43 });
44
45 info!("Workflow engine started");
46
47 Self {
48 store,
49 _scheduler,
50 _timer_poller,
51 _health_monitor,
52 _dispatch_recovery,
53 #[cfg(feature = "s3-archival")]
54 _archival,
55 }
56 }
57
58 pub fn store(&self) -> &S {
60 &self.store
61 }
62
63 pub async fn start_workflow(
66 &self,
67 namespace: &str,
68 workflow_type: &str,
69 workflow_id: &str,
70 input: Option<&str>,
71 task_queue: &str,
72 search_attributes: Option<&str>,
73 ) -> Result<WorkflowRecord> {
74 let now = timestamp_now();
75 let run_id = format!("run-{workflow_id}-{}", now as u64);
76
77 let wf = WorkflowRecord {
78 id: workflow_id.to_string(),
79 namespace: namespace.to_string(),
80 run_id,
81 workflow_type: workflow_type.to_string(),
82 task_queue: task_queue.to_string(),
83 status: "PENDING".to_string(),
84 input: input.map(String::from),
85 result: None,
86 error: None,
87 parent_id: None,
88 claimed_by: None,
89 search_attributes: search_attributes.map(String::from),
90 archived_at: None,
91 archive_uri: None,
92 created_at: now,
93 updated_at: now,
94 completed_at: None,
95 };
96
97 self.store.create_workflow(&wf).await?;
98
99 self.store
100 .append_event(&WorkflowEvent {
101 id: None,
102 workflow_id: workflow_id.to_string(),
103 seq: 1,
104 event_type: "WorkflowStarted".to_string(),
105 payload: input.map(String::from),
106 timestamp: now,
107 })
108 .await?;
109
110 self.store.mark_workflow_dispatchable(workflow_id).await?;
113
114 Ok(wf)
115 }
116
117 pub async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
118 self.store.get_workflow(id).await
119 }
120
121 pub async fn list_workflows(
122 &self,
123 namespace: &str,
124 status: Option<WorkflowStatus>,
125 workflow_type: Option<&str>,
126 search_attrs_filter: Option<&str>,
127 limit: i64,
128 offset: i64,
129 ) -> Result<Vec<WorkflowRecord>> {
130 self.store
131 .list_workflows(
132 namespace,
133 status,
134 workflow_type,
135 search_attrs_filter,
136 limit,
137 offset,
138 )
139 .await
140 }
141
142 pub async fn upsert_search_attributes(
143 &self,
144 workflow_id: &str,
145 patch_json: &str,
146 ) -> Result<()> {
147 self.store
148 .upsert_search_attributes(workflow_id, patch_json)
149 .await
150 }
151
152 pub async fn cancel_workflow(&self, id: &str) -> Result<bool> {
153 let wf = self.store.get_workflow(id).await?;
154 match wf {
155 None => Ok(false),
156 Some(wf) => {
157 let status = WorkflowStatus::from_str(&wf.status)
158 .map_err(|e| anyhow::anyhow!(e))?;
159 if status.is_terminal() {
160 return Ok(false);
161 }
162
163 self.store.cancel_pending_activities(id).await?;
175 self.store.cancel_pending_timers(id).await?;
176
177 let seq = self.store.get_event_count(id).await? as i32 + 1;
178 self.store
179 .append_event(&WorkflowEvent {
180 id: None,
181 workflow_id: id.to_string(),
182 seq,
183 event_type: "WorkflowCancelRequested".to_string(),
184 payload: None,
185 timestamp: timestamp_now(),
186 })
187 .await?;
188
189 self.store.mark_workflow_dispatchable(id).await?;
190
191 let children = self.store.list_child_workflows(id).await?;
193 for child in children {
194 Box::pin(self.cancel_workflow(&child.id)).await?;
195 }
196
197 if matches!(status, WorkflowStatus::Pending) {
203 self.finalise_cancellation(id).await?;
204 }
205
206 Ok(true)
207 }
208 }
209 }
210
211 pub async fn terminate_workflow(&self, id: &str, reason: Option<&str>) -> Result<bool> {
212 let wf = self.store.get_workflow(id).await?;
213 match wf {
214 None => Ok(false),
215 Some(wf) => {
216 let status = WorkflowStatus::from_str(&wf.status)
217 .map_err(|e| anyhow::anyhow!(e))?;
218 if status.is_terminal() {
219 return Ok(false);
220 }
221
222 self.store
223 .update_workflow_status(
224 id,
225 WorkflowStatus::Failed,
226 None,
227 Some(reason.unwrap_or("terminated")),
228 )
229 .await?;
230
231 Ok(true)
232 }
233 }
234 }
235
236 pub async fn send_signal(
239 &self,
240 workflow_id: &str,
241 name: &str,
242 payload: Option<&str>,
243 ) -> Result<()> {
244 let now = timestamp_now();
245
246 self.store
247 .send_signal(&WorkflowSignal {
248 id: None,
249 workflow_id: workflow_id.to_string(),
250 name: name.to_string(),
251 payload: payload.map(String::from),
252 consumed: false,
253 received_at: now,
254 })
255 .await?;
256
257 let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
258 let payload_value: serde_json::Value = payload
263 .and_then(|s| serde_json::from_str(s).ok())
264 .unwrap_or(serde_json::Value::Null);
265 self.store
266 .append_event(&WorkflowEvent {
267 id: None,
268 workflow_id: workflow_id.to_string(),
269 seq,
270 event_type: "SignalReceived".to_string(),
271 payload: Some(
272 serde_json::json!({ "signal": name, "payload": payload_value }).to_string(),
273 ),
274 timestamp: now,
275 })
276 .await?;
277
278 self.store.mark_workflow_dispatchable(workflow_id).await?;
281
282 Ok(())
283 }
284
285 pub async fn get_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
288 self.store.list_events(workflow_id).await
289 }
290
291 pub async fn register_worker(&self, worker: &WorkflowWorker) -> Result<()> {
294 self.store.register_worker(worker).await
295 }
296
297 pub async fn heartbeat_worker(&self, id: &str) -> Result<()> {
298 self.store.heartbeat_worker(id, timestamp_now()).await
299 }
300
301 pub async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
302 self.store.list_workers(namespace).await
303 }
304
305 pub async fn schedule_activity(
321 &self,
322 workflow_id: &str,
323 seq: i32,
324 name: &str,
325 input: Option<&str>,
326 task_queue: &str,
327 opts: ScheduleActivityOpts,
328 ) -> Result<WorkflowActivity> {
329 if let Some(existing) = self
331 .store
332 .get_activity_by_workflow_seq(workflow_id, seq)
333 .await?
334 {
335 return Ok(existing);
336 }
337
338 let now = timestamp_now();
339 let mut act = WorkflowActivity {
340 id: None,
341 workflow_id: workflow_id.to_string(),
342 seq,
343 name: name.to_string(),
344 task_queue: task_queue.to_string(),
345 input: input.map(String::from),
346 status: "PENDING".to_string(),
347 result: None,
348 error: None,
349 attempt: 1,
350 max_attempts: opts.max_attempts.unwrap_or(3),
351 initial_interval_secs: opts.initial_interval_secs.unwrap_or(1.0),
352 backoff_coefficient: opts.backoff_coefficient.unwrap_or(2.0),
353 start_to_close_secs: opts.start_to_close_secs.unwrap_or(300.0),
354 heartbeat_timeout_secs: opts.heartbeat_timeout_secs,
355 claimed_by: None,
356 scheduled_at: now,
357 started_at: None,
358 completed_at: None,
359 last_heartbeat: None,
360 };
361
362 let id = self.store.create_activity(&act).await?;
363 act.id = Some(id);
364
365 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
367 self.store
368 .append_event(&WorkflowEvent {
369 id: None,
370 workflow_id: workflow_id.to_string(),
371 seq: event_seq,
372 event_type: "ActivityScheduled".to_string(),
373 payload: Some(
374 serde_json::json!({
375 "activity_id": id,
376 "activity_seq": seq,
377 "name": name,
378 "task_queue": task_queue,
379 "input": input,
380 })
381 .to_string(),
382 ),
383 timestamp: now,
384 })
385 .await?;
386
387 if let Some(wf) = self.store.get_workflow(workflow_id).await?
389 && wf.status == "PENDING"
390 {
391 self.store
392 .update_workflow_status(workflow_id, WorkflowStatus::Running, None, None)
393 .await?;
394 }
395
396 Ok(act)
397 }
398
399 pub async fn claim_activity(
400 &self,
401 task_queue: &str,
402 worker_id: &str,
403 ) -> Result<Option<WorkflowActivity>> {
404 self.store.claim_activity(task_queue, worker_id).await
405 }
406
407 pub async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
408 self.store.get_activity(id).await
409 }
410
411 pub async fn complete_activity(
419 &self,
420 id: i64,
421 result: Option<&str>,
422 error: Option<&str>,
423 failed: bool,
424 ) -> Result<()> {
425 self.store.complete_activity(id, result, error, failed).await?;
426
427 let act = match self.store.get_activity(id).await? {
429 Some(a) => a,
430 None => return Ok(()),
431 };
432
433 let event_type = if failed {
434 "ActivityFailed"
435 } else {
436 "ActivityCompleted"
437 };
438 let payload = serde_json::json!({
439 "activity_id": id,
440 "activity_seq": act.seq,
441 "name": act.name,
442 "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
443 "error": error,
444 });
445 let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
446 let workflow_id = act.workflow_id.clone();
447 self.store
448 .append_event(&WorkflowEvent {
449 id: None,
450 workflow_id: act.workflow_id,
451 seq: event_seq,
452 event_type: event_type.to_string(),
453 payload: Some(payload.to_string()),
454 timestamp: timestamp_now(),
455 })
456 .await?;
457 self.store.mark_workflow_dispatchable(&workflow_id).await?;
460 Ok(())
461 }
462
463 pub async fn fail_activity(&self, id: i64, error: &str) -> Result<()> {
473 let act = match self.store.get_activity(id).await? {
474 Some(a) => a,
475 None => return Ok(()),
476 };
477
478 if act.attempt < act.max_attempts {
479 let backoff = act.initial_interval_secs
481 * act.backoff_coefficient.powi(act.attempt - 1);
482 let next_scheduled_at = timestamp_now() + backoff;
483 self.store
484 .requeue_activity_for_retry(id, act.attempt + 1, next_scheduled_at)
485 .await?;
486 return Ok(());
487 }
488
489 self.store
491 .complete_activity(id, None, Some(error), true)
492 .await?;
493
494 let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
495 let workflow_id = act.workflow_id.clone();
496 self.store
497 .append_event(&WorkflowEvent {
498 id: None,
499 workflow_id: act.workflow_id,
500 seq: event_seq,
501 event_type: "ActivityFailed".to_string(),
502 payload: Some(
503 serde_json::json!({
504 "activity_id": id,
505 "activity_seq": act.seq,
506 "name": act.name,
507 "error": error,
508 "final_attempt": act.attempt,
509 })
510 .to_string(),
511 ),
512 timestamp: timestamp_now(),
513 })
514 .await?;
515 self.store.mark_workflow_dispatchable(&workflow_id).await?;
517 Ok(())
518 }
519
520 pub async fn claim_workflow_task(
527 &self,
528 task_queue: &str,
529 worker_id: &str,
530 ) -> Result<Option<(WorkflowRecord, Vec<WorkflowEvent>)>> {
531 let Some(mut wf) = self
532 .store
533 .claim_workflow_task(task_queue, worker_id)
534 .await?
535 else {
536 return Ok(None);
537 };
538 if wf.status == "PENDING" {
542 self.store
543 .update_workflow_status(&wf.id, WorkflowStatus::Running, None, None)
544 .await?;
545 wf.status = "RUNNING".to_string();
546 }
547 let history = self.store.list_events(&wf.id).await?;
548 Ok(Some((wf, history)))
549 }
550
551 pub async fn submit_workflow_commands(
560 &self,
561 workflow_id: &str,
562 worker_id: &str,
563 commands: &[serde_json::Value],
564 ) -> Result<()> {
565 for cmd in commands {
566 let cmd_type = cmd.get("type").and_then(|v| v.as_str()).unwrap_or("");
567 match cmd_type {
568 "ScheduleActivity" => {
569 let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
570 let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
571 let queue = cmd
572 .get("task_queue")
573 .and_then(|v| v.as_str())
574 .unwrap_or("default");
575 let input = cmd.get("input").map(|v| v.to_string());
576 let opts = ScheduleActivityOpts {
577 max_attempts: cmd
578 .get("max_attempts")
579 .and_then(|v| v.as_i64())
580 .map(|n| n as i32),
581 initial_interval_secs: cmd
582 .get("initial_interval_secs")
583 .and_then(|v| v.as_f64()),
584 backoff_coefficient: cmd
585 .get("backoff_coefficient")
586 .and_then(|v| v.as_f64()),
587 start_to_close_secs: cmd
588 .get("start_to_close_secs")
589 .and_then(|v| v.as_f64()),
590 heartbeat_timeout_secs: cmd
591 .get("heartbeat_timeout_secs")
592 .and_then(|v| v.as_f64()),
593 };
594 self.schedule_activity(
595 workflow_id,
596 seq,
597 name,
598 input.as_deref(),
599 queue,
600 opts,
601 )
602 .await?;
603 }
604 "CancelWorkflow" => {
605 self.finalise_cancellation(workflow_id).await?;
607 }
608 "WaitForSignal" => {
609 let signal_name =
620 cmd.get("name").and_then(|v| v.as_str()).unwrap_or("?");
621 let timer_seq = cmd.get("timer_seq").and_then(|v| v.as_i64());
622 let payload = match timer_seq {
623 Some(ts) => serde_json::json!({
624 "signal": signal_name,
625 "timer_seq": ts,
626 }),
627 None => serde_json::json!({ "signal": signal_name }),
628 };
629 let event_seq =
630 self.store.get_event_count(workflow_id).await? as i32 + 1;
631 self.store
632 .append_event(&WorkflowEvent {
633 id: None,
634 workflow_id: workflow_id.to_string(),
635 seq: event_seq,
636 event_type: "WorkflowAwaitingSignal".to_string(),
637 payload: Some(payload.to_string()),
638 timestamp: timestamp_now(),
639 })
640 .await?;
641 }
642 "StartChildWorkflow" => {
643 let workflow_type = cmd
644 .get("workflow_type")
645 .and_then(|v| v.as_str())
646 .unwrap_or("");
647 let child_id =
648 cmd.get("workflow_id").and_then(|v| v.as_str()).unwrap_or("");
649 let task_queue = cmd
650 .get("task_queue")
651 .and_then(|v| v.as_str())
652 .unwrap_or("default");
653 let input = cmd.get("input").map(|v| v.to_string());
654 let namespace = self
656 .store
657 .get_workflow(workflow_id)
658 .await?
659 .map(|wf| wf.namespace)
660 .unwrap_or_else(|| "main".to_string());
661
662 if self.store.get_workflow(child_id).await?.is_none() {
667 self.start_child_workflow(
668 &namespace,
669 workflow_id,
670 workflow_type,
671 child_id,
672 input.as_deref(),
673 task_queue,
674 )
675 .await?;
676 self.store.mark_workflow_dispatchable(child_id).await?;
679 }
680 }
681 "RecordSideEffect" => {
682 let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
683 let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
684 let value =
685 cmd.get("value").cloned().unwrap_or(serde_json::Value::Null);
686 let event_seq =
687 self.store.get_event_count(workflow_id).await? as i32 + 1;
688 self.store
689 .append_event(&WorkflowEvent {
690 id: None,
691 workflow_id: workflow_id.to_string(),
692 seq: event_seq,
693 event_type: "SideEffectRecorded".to_string(),
694 payload: Some(
695 serde_json::json!({
696 "side_effect_seq": seq,
697 "name": name,
698 "value": value,
699 })
700 .to_string(),
701 ),
702 timestamp: timestamp_now(),
703 })
704 .await?;
705 self.store.mark_workflow_dispatchable(workflow_id).await?;
709 }
710 "ScheduleTimer" => {
711 let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
712 let duration = cmd
713 .get("duration_secs")
714 .and_then(|v| v.as_f64())
715 .unwrap_or(0.0);
716 self.schedule_timer(workflow_id, seq, duration).await?;
717 }
718 "UpsertSearchAttributes" => {
719 let patch = cmd
725 .get("patch")
726 .cloned()
727 .unwrap_or(serde_json::Value::Object(Default::default()));
728 self.store
729 .upsert_search_attributes(workflow_id, &patch.to_string())
730 .await?;
731 }
732 "ContinueAsNew" => {
733 let input = cmd.get("input").map(|v| v.to_string());
740 self.continue_as_new(workflow_id, input.as_deref())
741 .await?;
742 }
743 "RecordSnapshot" => {
744 let state = cmd
751 .get("state")
752 .cloned()
753 .unwrap_or(serde_json::Value::Null);
754 let event_seq = self.store.get_event_count(workflow_id).await? as i32;
755 self.store
756 .create_snapshot(workflow_id, event_seq, &state.to_string())
757 .await?;
758 }
759 "CompleteWorkflow" => {
760 let result = cmd.get("result").map(|v| v.to_string());
761 self.complete_workflow(workflow_id, result.as_deref()).await?;
762 }
763 "FailWorkflow" => {
764 let error = cmd
765 .get("error")
766 .and_then(|v| v.as_str())
767 .unwrap_or("workflow handler raised an error");
768 self.fail_workflow(workflow_id, error).await?;
769 }
770 other => {
771 tracing::warn!("submit_workflow_commands: unknown command type {other:?}");
772 }
773 }
774 }
775
776 self.store
777 .release_workflow_task(workflow_id, worker_id)
778 .await?;
779 Ok(())
780 }
781
782 pub async fn schedule_timer(
794 &self,
795 workflow_id: &str,
796 seq: i32,
797 duration_secs: f64,
798 ) -> Result<WorkflowTimer> {
799 if let Some(existing) = self
800 .store
801 .get_timer_by_workflow_seq(workflow_id, seq)
802 .await?
803 {
804 return Ok(existing);
805 }
806
807 let now = timestamp_now();
808 let mut timer = WorkflowTimer {
809 id: None,
810 workflow_id: workflow_id.to_string(),
811 seq,
812 fire_at: now + duration_secs,
813 fired: false,
814 };
815 let id = self.store.create_timer(&timer).await?;
816 timer.id = Some(id);
817
818 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
819 self.store
820 .append_event(&WorkflowEvent {
821 id: None,
822 workflow_id: workflow_id.to_string(),
823 seq: event_seq,
824 event_type: "TimerScheduled".to_string(),
825 payload: Some(
826 serde_json::json!({
827 "timer_id": id,
828 "timer_seq": seq,
829 "fire_at": timer.fire_at,
830 "duration_secs": duration_secs,
831 })
832 .to_string(),
833 ),
834 timestamp: now,
835 })
836 .await?;
837
838 Ok(timer)
839 }
840
841 pub async fn finalise_cancellation(&self, workflow_id: &str) -> Result<()> {
846 if let Some(wf) = self.store.get_workflow(workflow_id).await?
848 && wf.status == "CANCELLED"
849 {
850 return Ok(());
851 }
852 self.store
853 .update_workflow_status(workflow_id, WorkflowStatus::Cancelled, None, None)
854 .await?;
855 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
856 self.store
857 .append_event(&WorkflowEvent {
858 id: None,
859 workflow_id: workflow_id.to_string(),
860 seq: event_seq,
861 event_type: "WorkflowCancelled".to_string(),
862 payload: None,
863 timestamp: timestamp_now(),
864 })
865 .await?;
866 Ok(())
867 }
868
869 pub async fn complete_workflow(&self, workflow_id: &str, result: Option<&str>) -> Result<()> {
874 self.store
875 .update_workflow_status(workflow_id, WorkflowStatus::Completed, result, None)
876 .await?;
877 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
878 self.store
879 .append_event(&WorkflowEvent {
880 id: None,
881 workflow_id: workflow_id.to_string(),
882 seq: event_seq,
883 event_type: "WorkflowCompleted".to_string(),
884 payload: result.map(String::from),
885 timestamp: timestamp_now(),
886 })
887 .await?;
888 self.notify_parent_of_child_outcome(
889 workflow_id,
890 "ChildWorkflowCompleted",
891 serde_json::json!({
892 "child_workflow_id": workflow_id,
893 "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
894 }),
895 )
896 .await?;
897 Ok(())
898 }
899
900 pub async fn fail_workflow(&self, workflow_id: &str, error: &str) -> Result<()> {
903 self.store
904 .update_workflow_status(workflow_id, WorkflowStatus::Failed, None, Some(error))
905 .await?;
906 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
907 self.store
908 .append_event(&WorkflowEvent {
909 id: None,
910 workflow_id: workflow_id.to_string(),
911 seq: event_seq,
912 event_type: "WorkflowFailed".to_string(),
913 payload: Some(serde_json::json!({"error": error}).to_string()),
914 timestamp: timestamp_now(),
915 })
916 .await?;
917 self.notify_parent_of_child_outcome(
918 workflow_id,
919 "ChildWorkflowFailed",
920 serde_json::json!({
921 "child_workflow_id": workflow_id,
922 "error": error,
923 }),
924 )
925 .await?;
926 Ok(())
927 }
928
929 async fn notify_parent_of_child_outcome(
933 &self,
934 child_workflow_id: &str,
935 event_type: &str,
936 payload: serde_json::Value,
937 ) -> Result<()> {
938 let Some(child) = self.store.get_workflow(child_workflow_id).await? else {
939 return Ok(());
940 };
941 let Some(parent_id) = child.parent_id else {
942 return Ok(());
943 };
944 let event_seq = self.store.get_event_count(&parent_id).await? as i32 + 1;
945 self.store
946 .append_event(&WorkflowEvent {
947 id: None,
948 workflow_id: parent_id.clone(),
949 seq: event_seq,
950 event_type: event_type.to_string(),
951 payload: Some(payload.to_string()),
952 timestamp: timestamp_now(),
953 })
954 .await?;
955 self.store.mark_workflow_dispatchable(&parent_id).await?;
956 Ok(())
957 }
958
959 pub async fn heartbeat_activity(&self, id: i64, details: Option<&str>) -> Result<()> {
960 self.store.heartbeat_activity(id, details).await
961 }
962
963 pub async fn create_schedule(&self, schedule: &WorkflowSchedule) -> Result<()> {
966 self.store.create_schedule(schedule).await
967 }
968
969 pub async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
970 self.store.list_schedules(namespace).await
971 }
972
973 pub async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
974 self.store.get_schedule(namespace, name).await
975 }
976
977 pub async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
978 self.store.delete_schedule(namespace, name).await
979 }
980
981 pub async fn update_schedule(
982 &self,
983 namespace: &str,
984 name: &str,
985 patch: &SchedulePatch,
986 ) -> Result<Option<WorkflowSchedule>> {
987 self.store.update_schedule(namespace, name, patch).await
988 }
989
990 pub async fn set_schedule_paused(
991 &self,
992 namespace: &str,
993 name: &str,
994 paused: bool,
995 ) -> Result<Option<WorkflowSchedule>> {
996 self.store.set_schedule_paused(namespace, name, paused).await
997 }
998
999 pub async fn create_namespace(&self, name: &str) -> Result<()> {
1002 self.store.create_namespace(name).await
1003 }
1004
1005 pub async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
1006 self.store.list_namespaces().await
1007 }
1008
1009 pub async fn delete_namespace(&self, name: &str) -> Result<bool> {
1010 self.store.delete_namespace(name).await
1011 }
1012
1013 pub async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
1014 self.store.get_namespace_stats(namespace).await
1015 }
1016
1017 pub async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
1018 self.store.get_queue_stats(namespace).await
1019 }
1020
1021 pub async fn start_child_workflow(
1024 &self,
1025 namespace: &str,
1026 parent_id: &str,
1027 workflow_type: &str,
1028 workflow_id: &str,
1029 input: Option<&str>,
1030 task_queue: &str,
1031 ) -> Result<WorkflowRecord> {
1032 let now = timestamp_now();
1033 let run_id = format!("run-{workflow_id}-{}", now as u64);
1034
1035 let wf = WorkflowRecord {
1036 id: workflow_id.to_string(),
1037 namespace: namespace.to_string(),
1038 run_id,
1039 workflow_type: workflow_type.to_string(),
1040 task_queue: task_queue.to_string(),
1041 status: "PENDING".to_string(),
1042 input: input.map(String::from),
1043 result: None,
1044 error: None,
1045 parent_id: Some(parent_id.to_string()),
1046 claimed_by: None,
1047 search_attributes: None,
1048 archived_at: None,
1049 archive_uri: None,
1050 created_at: now,
1051 updated_at: now,
1052 completed_at: None,
1053 };
1054
1055 self.store.create_workflow(&wf).await?;
1056
1057 self.store
1059 .append_event(&WorkflowEvent {
1060 id: None,
1061 workflow_id: workflow_id.to_string(),
1062 seq: 1,
1063 event_type: "WorkflowStarted".to_string(),
1064 payload: input.map(String::from),
1065 timestamp: now,
1066 })
1067 .await?;
1068
1069 let parent_seq = self.store.get_event_count(parent_id).await? as i32 + 1;
1070 self.store
1071 .append_event(&WorkflowEvent {
1072 id: None,
1073 workflow_id: parent_id.to_string(),
1074 seq: parent_seq,
1075 event_type: "ChildWorkflowStarted".to_string(),
1076 payload: Some(
1077 serde_json::json!({
1078 "child_workflow_id": workflow_id,
1079 "workflow_type": workflow_type,
1080 })
1081 .to_string(),
1082 ),
1083 timestamp: now,
1084 })
1085 .await?;
1086
1087 Ok(wf)
1088 }
1089
1090 pub async fn list_child_workflows(
1091 &self,
1092 parent_id: &str,
1093 ) -> Result<Vec<WorkflowRecord>> {
1094 self.store.list_child_workflows(parent_id).await
1095 }
1096
1097 pub async fn continue_as_new(
1100 &self,
1101 workflow_id: &str,
1102 input: Option<&str>,
1103 ) -> Result<WorkflowRecord> {
1104 let old_wf = self
1105 .store
1106 .get_workflow(workflow_id)
1107 .await?
1108 .ok_or_else(|| anyhow::anyhow!("workflow not found: {workflow_id}"))?;
1109
1110 self.store
1112 .update_workflow_status(workflow_id, WorkflowStatus::Completed, None, None)
1113 .await?;
1114
1115 let new_id = format!("{workflow_id}-continued-{}", timestamp_now() as u64);
1117 self.start_workflow(
1118 &old_wf.namespace,
1119 &old_wf.workflow_type,
1120 &new_id,
1121 input,
1122 &old_wf.task_queue,
1123 old_wf.search_attributes.as_deref(),
1124 )
1125 .await
1126 }
1127
1128 pub async fn create_snapshot(
1131 &self,
1132 workflow_id: &str,
1133 event_seq: i32,
1134 state_json: &str,
1135 ) -> Result<()> {
1136 self.store
1137 .create_snapshot(workflow_id, event_seq, state_json)
1138 .await
1139 }
1140
1141 pub async fn get_latest_snapshot(
1142 &self,
1143 workflow_id: &str,
1144 ) -> Result<Option<WorkflowSnapshot>> {
1145 self.store.get_latest_snapshot(workflow_id).await
1146 }
1147
1148 pub async fn record_side_effect(
1151 &self,
1152 workflow_id: &str,
1153 value: &str,
1154 ) -> Result<()> {
1155 let now = timestamp_now();
1156 let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
1157 self.store
1158 .append_event(&WorkflowEvent {
1159 id: None,
1160 workflow_id: workflow_id.to_string(),
1161 seq,
1162 event_type: "SideEffectRecorded".to_string(),
1163 payload: Some(value.to_string()),
1164 timestamp: now,
1165 })
1166 .await?;
1167 Ok(())
1168 }
1169}
1170
1171fn timestamp_now() -> f64 {
1172 std::time::SystemTime::now()
1173 .duration_since(std::time::UNIX_EPOCH)
1174 .unwrap()
1175 .as_secs_f64()
1176}
1177
1178use std::str::FromStr;