1use std::sync::Arc;
2
3use anyhow::Result;
4use tokio::task::JoinHandle;
5use tracing::info;
6
7use crate::dispatch_recovery;
8use crate::health;
9use crate::scheduler;
10use crate::store::WorkflowStore;
11use crate::timers;
12use crate::types::*;
13
14pub struct Engine<S: WorkflowStore> {
19 store: Arc<S>,
20 _scheduler: JoinHandle<()>,
21 _timer_poller: JoinHandle<()>,
22 _health_monitor: JoinHandle<()>,
23 _dispatch_recovery: JoinHandle<()>,
24}
25
26impl<S: WorkflowStore> Engine<S> {
27 pub fn start(store: S) -> Self {
29 let store = Arc::new(store);
30
31 let _scheduler = tokio::spawn(scheduler::run_scheduler(Arc::clone(&store)));
32 let _timer_poller = tokio::spawn(timers::run_timer_poller(Arc::clone(&store)));
33 let _health_monitor = tokio::spawn(health::run_health_monitor(Arc::clone(&store)));
34 let _dispatch_recovery = tokio::spawn(dispatch_recovery::run_dispatch_recovery(
35 Arc::clone(&store),
36 ));
37
38 info!("Workflow engine started");
39
40 Self {
41 store,
42 _scheduler,
43 _timer_poller,
44 _health_monitor,
45 _dispatch_recovery,
46 }
47 }
48
49 pub fn store(&self) -> &S {
51 &self.store
52 }
53
54 pub async fn start_workflow(
57 &self,
58 namespace: &str,
59 workflow_type: &str,
60 workflow_id: &str,
61 input: Option<&str>,
62 task_queue: &str,
63 ) -> Result<WorkflowRecord> {
64 let now = timestamp_now();
65 let run_id = format!("run-{workflow_id}-{}", now as u64);
66
67 let wf = WorkflowRecord {
68 id: workflow_id.to_string(),
69 namespace: namespace.to_string(),
70 run_id,
71 workflow_type: workflow_type.to_string(),
72 task_queue: task_queue.to_string(),
73 status: "PENDING".to_string(),
74 input: input.map(String::from),
75 result: None,
76 error: None,
77 parent_id: None,
78 claimed_by: None,
79 created_at: now,
80 updated_at: now,
81 completed_at: None,
82 };
83
84 self.store.create_workflow(&wf).await?;
85
86 self.store
87 .append_event(&WorkflowEvent {
88 id: None,
89 workflow_id: workflow_id.to_string(),
90 seq: 1,
91 event_type: "WorkflowStarted".to_string(),
92 payload: input.map(String::from),
93 timestamp: now,
94 })
95 .await?;
96
97 self.store.mark_workflow_dispatchable(workflow_id).await?;
100
101 Ok(wf)
102 }
103
104 pub async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
105 self.store.get_workflow(id).await
106 }
107
108 pub async fn list_workflows(
109 &self,
110 namespace: &str,
111 status: Option<WorkflowStatus>,
112 workflow_type: Option<&str>,
113 limit: i64,
114 offset: i64,
115 ) -> Result<Vec<WorkflowRecord>> {
116 self.store
117 .list_workflows(namespace, status, workflow_type, limit, offset)
118 .await
119 }
120
121 pub async fn cancel_workflow(&self, id: &str) -> Result<bool> {
122 let wf = self.store.get_workflow(id).await?;
123 match wf {
124 None => Ok(false),
125 Some(wf) => {
126 let status = WorkflowStatus::from_str(&wf.status)
127 .map_err(|e| anyhow::anyhow!(e))?;
128 if status.is_terminal() {
129 return Ok(false);
130 }
131
132 self.store.cancel_pending_activities(id).await?;
144 self.store.cancel_pending_timers(id).await?;
145
146 let seq = self.store.get_event_count(id).await? as i32 + 1;
147 self.store
148 .append_event(&WorkflowEvent {
149 id: None,
150 workflow_id: id.to_string(),
151 seq,
152 event_type: "WorkflowCancelRequested".to_string(),
153 payload: None,
154 timestamp: timestamp_now(),
155 })
156 .await?;
157
158 self.store.mark_workflow_dispatchable(id).await?;
159
160 let children = self.store.list_child_workflows(id).await?;
162 for child in children {
163 Box::pin(self.cancel_workflow(&child.id)).await?;
164 }
165
166 if matches!(status, WorkflowStatus::Pending) {
172 self.finalise_cancellation(id).await?;
173 }
174
175 Ok(true)
176 }
177 }
178 }
179
180 pub async fn terminate_workflow(&self, id: &str, reason: Option<&str>) -> Result<bool> {
181 let wf = self.store.get_workflow(id).await?;
182 match wf {
183 None => Ok(false),
184 Some(wf) => {
185 let status = WorkflowStatus::from_str(&wf.status)
186 .map_err(|e| anyhow::anyhow!(e))?;
187 if status.is_terminal() {
188 return Ok(false);
189 }
190
191 self.store
192 .update_workflow_status(
193 id,
194 WorkflowStatus::Failed,
195 None,
196 Some(reason.unwrap_or("terminated")),
197 )
198 .await?;
199
200 Ok(true)
201 }
202 }
203 }
204
205 pub async fn send_signal(
208 &self,
209 workflow_id: &str,
210 name: &str,
211 payload: Option<&str>,
212 ) -> Result<()> {
213 let now = timestamp_now();
214
215 self.store
216 .send_signal(&WorkflowSignal {
217 id: None,
218 workflow_id: workflow_id.to_string(),
219 name: name.to_string(),
220 payload: payload.map(String::from),
221 consumed: false,
222 received_at: now,
223 })
224 .await?;
225
226 let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
227 let payload_value: serde_json::Value = payload
232 .and_then(|s| serde_json::from_str(s).ok())
233 .unwrap_or(serde_json::Value::Null);
234 self.store
235 .append_event(&WorkflowEvent {
236 id: None,
237 workflow_id: workflow_id.to_string(),
238 seq,
239 event_type: "SignalReceived".to_string(),
240 payload: Some(
241 serde_json::json!({ "signal": name, "payload": payload_value }).to_string(),
242 ),
243 timestamp: now,
244 })
245 .await?;
246
247 self.store.mark_workflow_dispatchable(workflow_id).await?;
250
251 Ok(())
252 }
253
254 pub async fn get_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
257 self.store.list_events(workflow_id).await
258 }
259
260 pub async fn register_worker(&self, worker: &WorkflowWorker) -> Result<()> {
263 self.store.register_worker(worker).await
264 }
265
266 pub async fn heartbeat_worker(&self, id: &str) -> Result<()> {
267 self.store.heartbeat_worker(id, timestamp_now()).await
268 }
269
270 pub async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
271 self.store.list_workers(namespace).await
272 }
273
274 pub async fn schedule_activity(
290 &self,
291 workflow_id: &str,
292 seq: i32,
293 name: &str,
294 input: Option<&str>,
295 task_queue: &str,
296 opts: ScheduleActivityOpts,
297 ) -> Result<WorkflowActivity> {
298 if let Some(existing) = self
300 .store
301 .get_activity_by_workflow_seq(workflow_id, seq)
302 .await?
303 {
304 return Ok(existing);
305 }
306
307 let now = timestamp_now();
308 let mut act = WorkflowActivity {
309 id: None,
310 workflow_id: workflow_id.to_string(),
311 seq,
312 name: name.to_string(),
313 task_queue: task_queue.to_string(),
314 input: input.map(String::from),
315 status: "PENDING".to_string(),
316 result: None,
317 error: None,
318 attempt: 1,
319 max_attempts: opts.max_attempts.unwrap_or(3),
320 initial_interval_secs: opts.initial_interval_secs.unwrap_or(1.0),
321 backoff_coefficient: opts.backoff_coefficient.unwrap_or(2.0),
322 start_to_close_secs: opts.start_to_close_secs.unwrap_or(300.0),
323 heartbeat_timeout_secs: opts.heartbeat_timeout_secs,
324 claimed_by: None,
325 scheduled_at: now,
326 started_at: None,
327 completed_at: None,
328 last_heartbeat: None,
329 };
330
331 let id = self.store.create_activity(&act).await?;
332 act.id = Some(id);
333
334 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
336 self.store
337 .append_event(&WorkflowEvent {
338 id: None,
339 workflow_id: workflow_id.to_string(),
340 seq: event_seq,
341 event_type: "ActivityScheduled".to_string(),
342 payload: Some(
343 serde_json::json!({
344 "activity_id": id,
345 "activity_seq": seq,
346 "name": name,
347 "task_queue": task_queue,
348 "input": input,
349 })
350 .to_string(),
351 ),
352 timestamp: now,
353 })
354 .await?;
355
356 if let Some(wf) = self.store.get_workflow(workflow_id).await?
358 && wf.status == "PENDING"
359 {
360 self.store
361 .update_workflow_status(workflow_id, WorkflowStatus::Running, None, None)
362 .await?;
363 }
364
365 Ok(act)
366 }
367
368 pub async fn claim_activity(
369 &self,
370 task_queue: &str,
371 worker_id: &str,
372 ) -> Result<Option<WorkflowActivity>> {
373 self.store.claim_activity(task_queue, worker_id).await
374 }
375
376 pub async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
377 self.store.get_activity(id).await
378 }
379
380 pub async fn complete_activity(
388 &self,
389 id: i64,
390 result: Option<&str>,
391 error: Option<&str>,
392 failed: bool,
393 ) -> Result<()> {
394 self.store.complete_activity(id, result, error, failed).await?;
395
396 let act = match self.store.get_activity(id).await? {
398 Some(a) => a,
399 None => return Ok(()),
400 };
401
402 let event_type = if failed {
403 "ActivityFailed"
404 } else {
405 "ActivityCompleted"
406 };
407 let payload = serde_json::json!({
408 "activity_id": id,
409 "activity_seq": act.seq,
410 "name": act.name,
411 "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
412 "error": error,
413 });
414 let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
415 let workflow_id = act.workflow_id.clone();
416 self.store
417 .append_event(&WorkflowEvent {
418 id: None,
419 workflow_id: act.workflow_id,
420 seq: event_seq,
421 event_type: event_type.to_string(),
422 payload: Some(payload.to_string()),
423 timestamp: timestamp_now(),
424 })
425 .await?;
426 self.store.mark_workflow_dispatchable(&workflow_id).await?;
429 Ok(())
430 }
431
432 pub async fn fail_activity(&self, id: i64, error: &str) -> Result<()> {
442 let act = match self.store.get_activity(id).await? {
443 Some(a) => a,
444 None => return Ok(()),
445 };
446
447 if act.attempt < act.max_attempts {
448 let backoff = act.initial_interval_secs
450 * act.backoff_coefficient.powi(act.attempt - 1);
451 let next_scheduled_at = timestamp_now() + backoff;
452 self.store
453 .requeue_activity_for_retry(id, act.attempt + 1, next_scheduled_at)
454 .await?;
455 return Ok(());
456 }
457
458 self.store
460 .complete_activity(id, None, Some(error), true)
461 .await?;
462
463 let event_seq = self.store.get_event_count(&act.workflow_id).await? as i32 + 1;
464 let workflow_id = act.workflow_id.clone();
465 self.store
466 .append_event(&WorkflowEvent {
467 id: None,
468 workflow_id: act.workflow_id,
469 seq: event_seq,
470 event_type: "ActivityFailed".to_string(),
471 payload: Some(
472 serde_json::json!({
473 "activity_id": id,
474 "activity_seq": act.seq,
475 "name": act.name,
476 "error": error,
477 "final_attempt": act.attempt,
478 })
479 .to_string(),
480 ),
481 timestamp: timestamp_now(),
482 })
483 .await?;
484 self.store.mark_workflow_dispatchable(&workflow_id).await?;
486 Ok(())
487 }
488
489 pub async fn claim_workflow_task(
496 &self,
497 task_queue: &str,
498 worker_id: &str,
499 ) -> Result<Option<(WorkflowRecord, Vec<WorkflowEvent>)>> {
500 let Some(mut wf) = self
501 .store
502 .claim_workflow_task(task_queue, worker_id)
503 .await?
504 else {
505 return Ok(None);
506 };
507 if wf.status == "PENDING" {
511 self.store
512 .update_workflow_status(&wf.id, WorkflowStatus::Running, None, None)
513 .await?;
514 wf.status = "RUNNING".to_string();
515 }
516 let history = self.store.list_events(&wf.id).await?;
517 Ok(Some((wf, history)))
518 }
519
520 pub async fn submit_workflow_commands(
529 &self,
530 workflow_id: &str,
531 worker_id: &str,
532 commands: &[serde_json::Value],
533 ) -> Result<()> {
534 for cmd in commands {
535 let cmd_type = cmd.get("type").and_then(|v| v.as_str()).unwrap_or("");
536 match cmd_type {
537 "ScheduleActivity" => {
538 let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
539 let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
540 let queue = cmd
541 .get("task_queue")
542 .and_then(|v| v.as_str())
543 .unwrap_or("default");
544 let input = cmd.get("input").map(|v| v.to_string());
545 let opts = ScheduleActivityOpts {
546 max_attempts: cmd
547 .get("max_attempts")
548 .and_then(|v| v.as_i64())
549 .map(|n| n as i32),
550 initial_interval_secs: cmd
551 .get("initial_interval_secs")
552 .and_then(|v| v.as_f64()),
553 backoff_coefficient: cmd
554 .get("backoff_coefficient")
555 .and_then(|v| v.as_f64()),
556 start_to_close_secs: cmd
557 .get("start_to_close_secs")
558 .and_then(|v| v.as_f64()),
559 heartbeat_timeout_secs: cmd
560 .get("heartbeat_timeout_secs")
561 .and_then(|v| v.as_f64()),
562 };
563 self.schedule_activity(
564 workflow_id,
565 seq,
566 name,
567 input.as_deref(),
568 queue,
569 opts,
570 )
571 .await?;
572 }
573 "CancelWorkflow" => {
574 self.finalise_cancellation(workflow_id).await?;
576 }
577 "WaitForSignal" => {
578 let signal_name =
583 cmd.get("name").and_then(|v| v.as_str()).unwrap_or("?");
584 let event_seq =
585 self.store.get_event_count(workflow_id).await? as i32 + 1;
586 self.store
587 .append_event(&WorkflowEvent {
588 id: None,
589 workflow_id: workflow_id.to_string(),
590 seq: event_seq,
591 event_type: "WorkflowAwaitingSignal".to_string(),
592 payload: Some(
593 serde_json::json!({ "signal": signal_name }).to_string(),
594 ),
595 timestamp: timestamp_now(),
596 })
597 .await?;
598 }
599 "StartChildWorkflow" => {
600 let workflow_type = cmd
601 .get("workflow_type")
602 .and_then(|v| v.as_str())
603 .unwrap_or("");
604 let child_id =
605 cmd.get("workflow_id").and_then(|v| v.as_str()).unwrap_or("");
606 let task_queue = cmd
607 .get("task_queue")
608 .and_then(|v| v.as_str())
609 .unwrap_or("default");
610 let input = cmd.get("input").map(|v| v.to_string());
611 let namespace = self
613 .store
614 .get_workflow(workflow_id)
615 .await?
616 .map(|wf| wf.namespace)
617 .unwrap_or_else(|| "main".to_string());
618
619 if self.store.get_workflow(child_id).await?.is_none() {
624 self.start_child_workflow(
625 &namespace,
626 workflow_id,
627 workflow_type,
628 child_id,
629 input.as_deref(),
630 task_queue,
631 )
632 .await?;
633 self.store.mark_workflow_dispatchable(child_id).await?;
636 }
637 }
638 "RecordSideEffect" => {
639 let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
640 let name = cmd.get("name").and_then(|v| v.as_str()).unwrap_or("");
641 let value =
642 cmd.get("value").cloned().unwrap_or(serde_json::Value::Null);
643 let event_seq =
644 self.store.get_event_count(workflow_id).await? as i32 + 1;
645 self.store
646 .append_event(&WorkflowEvent {
647 id: None,
648 workflow_id: workflow_id.to_string(),
649 seq: event_seq,
650 event_type: "SideEffectRecorded".to_string(),
651 payload: Some(
652 serde_json::json!({
653 "side_effect_seq": seq,
654 "name": name,
655 "value": value,
656 })
657 .to_string(),
658 ),
659 timestamp: timestamp_now(),
660 })
661 .await?;
662 self.store.mark_workflow_dispatchable(workflow_id).await?;
666 }
667 "ScheduleTimer" => {
668 let seq = cmd.get("seq").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
669 let duration = cmd
670 .get("duration_secs")
671 .and_then(|v| v.as_f64())
672 .unwrap_or(0.0);
673 self.schedule_timer(workflow_id, seq, duration).await?;
674 }
675 "CompleteWorkflow" => {
676 let result = cmd.get("result").map(|v| v.to_string());
677 self.complete_workflow(workflow_id, result.as_deref()).await?;
678 }
679 "FailWorkflow" => {
680 let error = cmd
681 .get("error")
682 .and_then(|v| v.as_str())
683 .unwrap_or("workflow handler raised an error");
684 self.fail_workflow(workflow_id, error).await?;
685 }
686 other => {
687 tracing::warn!("submit_workflow_commands: unknown command type {other:?}");
688 }
689 }
690 }
691
692 self.store
693 .release_workflow_task(workflow_id, worker_id)
694 .await?;
695 Ok(())
696 }
697
698 pub async fn schedule_timer(
710 &self,
711 workflow_id: &str,
712 seq: i32,
713 duration_secs: f64,
714 ) -> Result<WorkflowTimer> {
715 if let Some(existing) = self
716 .store
717 .get_timer_by_workflow_seq(workflow_id, seq)
718 .await?
719 {
720 return Ok(existing);
721 }
722
723 let now = timestamp_now();
724 let mut timer = WorkflowTimer {
725 id: None,
726 workflow_id: workflow_id.to_string(),
727 seq,
728 fire_at: now + duration_secs,
729 fired: false,
730 };
731 let id = self.store.create_timer(&timer).await?;
732 timer.id = Some(id);
733
734 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
735 self.store
736 .append_event(&WorkflowEvent {
737 id: None,
738 workflow_id: workflow_id.to_string(),
739 seq: event_seq,
740 event_type: "TimerScheduled".to_string(),
741 payload: Some(
742 serde_json::json!({
743 "timer_id": id,
744 "timer_seq": seq,
745 "fire_at": timer.fire_at,
746 "duration_secs": duration_secs,
747 })
748 .to_string(),
749 ),
750 timestamp: now,
751 })
752 .await?;
753
754 Ok(timer)
755 }
756
757 pub async fn finalise_cancellation(&self, workflow_id: &str) -> Result<()> {
762 if let Some(wf) = self.store.get_workflow(workflow_id).await?
764 && wf.status == "CANCELLED"
765 {
766 return Ok(());
767 }
768 self.store
769 .update_workflow_status(workflow_id, WorkflowStatus::Cancelled, None, None)
770 .await?;
771 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
772 self.store
773 .append_event(&WorkflowEvent {
774 id: None,
775 workflow_id: workflow_id.to_string(),
776 seq: event_seq,
777 event_type: "WorkflowCancelled".to_string(),
778 payload: None,
779 timestamp: timestamp_now(),
780 })
781 .await?;
782 Ok(())
783 }
784
785 pub async fn complete_workflow(&self, workflow_id: &str, result: Option<&str>) -> Result<()> {
790 self.store
791 .update_workflow_status(workflow_id, WorkflowStatus::Completed, result, None)
792 .await?;
793 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
794 self.store
795 .append_event(&WorkflowEvent {
796 id: None,
797 workflow_id: workflow_id.to_string(),
798 seq: event_seq,
799 event_type: "WorkflowCompleted".to_string(),
800 payload: result.map(String::from),
801 timestamp: timestamp_now(),
802 })
803 .await?;
804 self.notify_parent_of_child_outcome(
805 workflow_id,
806 "ChildWorkflowCompleted",
807 serde_json::json!({
808 "child_workflow_id": workflow_id,
809 "result": result.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok()),
810 }),
811 )
812 .await?;
813 Ok(())
814 }
815
816 pub async fn fail_workflow(&self, workflow_id: &str, error: &str) -> Result<()> {
819 self.store
820 .update_workflow_status(workflow_id, WorkflowStatus::Failed, None, Some(error))
821 .await?;
822 let event_seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
823 self.store
824 .append_event(&WorkflowEvent {
825 id: None,
826 workflow_id: workflow_id.to_string(),
827 seq: event_seq,
828 event_type: "WorkflowFailed".to_string(),
829 payload: Some(serde_json::json!({"error": error}).to_string()),
830 timestamp: timestamp_now(),
831 })
832 .await?;
833 self.notify_parent_of_child_outcome(
834 workflow_id,
835 "ChildWorkflowFailed",
836 serde_json::json!({
837 "child_workflow_id": workflow_id,
838 "error": error,
839 }),
840 )
841 .await?;
842 Ok(())
843 }
844
845 async fn notify_parent_of_child_outcome(
849 &self,
850 child_workflow_id: &str,
851 event_type: &str,
852 payload: serde_json::Value,
853 ) -> Result<()> {
854 let Some(child) = self.store.get_workflow(child_workflow_id).await? else {
855 return Ok(());
856 };
857 let Some(parent_id) = child.parent_id else {
858 return Ok(());
859 };
860 let event_seq = self.store.get_event_count(&parent_id).await? as i32 + 1;
861 self.store
862 .append_event(&WorkflowEvent {
863 id: None,
864 workflow_id: parent_id.clone(),
865 seq: event_seq,
866 event_type: event_type.to_string(),
867 payload: Some(payload.to_string()),
868 timestamp: timestamp_now(),
869 })
870 .await?;
871 self.store.mark_workflow_dispatchable(&parent_id).await?;
872 Ok(())
873 }
874
875 pub async fn heartbeat_activity(&self, id: i64, details: Option<&str>) -> Result<()> {
876 self.store.heartbeat_activity(id, details).await
877 }
878
879 pub async fn create_schedule(&self, schedule: &WorkflowSchedule) -> Result<()> {
882 self.store.create_schedule(schedule).await
883 }
884
885 pub async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
886 self.store.list_schedules(namespace).await
887 }
888
889 pub async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
890 self.store.get_schedule(namespace, name).await
891 }
892
893 pub async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
894 self.store.delete_schedule(namespace, name).await
895 }
896
897 pub async fn create_namespace(&self, name: &str) -> Result<()> {
900 self.store.create_namespace(name).await
901 }
902
903 pub async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
904 self.store.list_namespaces().await
905 }
906
907 pub async fn delete_namespace(&self, name: &str) -> Result<bool> {
908 self.store.delete_namespace(name).await
909 }
910
911 pub async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
912 self.store.get_namespace_stats(namespace).await
913 }
914
915 pub async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
916 self.store.get_queue_stats(namespace).await
917 }
918
919 pub async fn start_child_workflow(
922 &self,
923 namespace: &str,
924 parent_id: &str,
925 workflow_type: &str,
926 workflow_id: &str,
927 input: Option<&str>,
928 task_queue: &str,
929 ) -> Result<WorkflowRecord> {
930 let now = timestamp_now();
931 let run_id = format!("run-{workflow_id}-{}", now as u64);
932
933 let wf = WorkflowRecord {
934 id: workflow_id.to_string(),
935 namespace: namespace.to_string(),
936 run_id,
937 workflow_type: workflow_type.to_string(),
938 task_queue: task_queue.to_string(),
939 status: "PENDING".to_string(),
940 input: input.map(String::from),
941 result: None,
942 error: None,
943 parent_id: Some(parent_id.to_string()),
944 claimed_by: None,
945 created_at: now,
946 updated_at: now,
947 completed_at: None,
948 };
949
950 self.store.create_workflow(&wf).await?;
951
952 self.store
954 .append_event(&WorkflowEvent {
955 id: None,
956 workflow_id: workflow_id.to_string(),
957 seq: 1,
958 event_type: "WorkflowStarted".to_string(),
959 payload: input.map(String::from),
960 timestamp: now,
961 })
962 .await?;
963
964 let parent_seq = self.store.get_event_count(parent_id).await? as i32 + 1;
965 self.store
966 .append_event(&WorkflowEvent {
967 id: None,
968 workflow_id: parent_id.to_string(),
969 seq: parent_seq,
970 event_type: "ChildWorkflowStarted".to_string(),
971 payload: Some(
972 serde_json::json!({
973 "child_workflow_id": workflow_id,
974 "workflow_type": workflow_type,
975 })
976 .to_string(),
977 ),
978 timestamp: now,
979 })
980 .await?;
981
982 Ok(wf)
983 }
984
985 pub async fn list_child_workflows(
986 &self,
987 parent_id: &str,
988 ) -> Result<Vec<WorkflowRecord>> {
989 self.store.list_child_workflows(parent_id).await
990 }
991
992 pub async fn continue_as_new(
995 &self,
996 workflow_id: &str,
997 input: Option<&str>,
998 ) -> Result<WorkflowRecord> {
999 let old_wf = self
1000 .store
1001 .get_workflow(workflow_id)
1002 .await?
1003 .ok_or_else(|| anyhow::anyhow!("workflow not found: {workflow_id}"))?;
1004
1005 self.store
1007 .update_workflow_status(workflow_id, WorkflowStatus::Completed, None, None)
1008 .await?;
1009
1010 let new_id = format!("{workflow_id}-continued-{}", timestamp_now() as u64);
1012 self.start_workflow(
1013 &old_wf.namespace,
1014 &old_wf.workflow_type,
1015 &new_id,
1016 input,
1017 &old_wf.task_queue,
1018 )
1019 .await
1020 }
1021
1022 pub async fn create_snapshot(
1025 &self,
1026 workflow_id: &str,
1027 event_seq: i32,
1028 state_json: &str,
1029 ) -> Result<()> {
1030 self.store
1031 .create_snapshot(workflow_id, event_seq, state_json)
1032 .await
1033 }
1034
1035 pub async fn get_latest_snapshot(
1036 &self,
1037 workflow_id: &str,
1038 ) -> Result<Option<WorkflowSnapshot>> {
1039 self.store.get_latest_snapshot(workflow_id).await
1040 }
1041
1042 pub async fn record_side_effect(
1045 &self,
1046 workflow_id: &str,
1047 value: &str,
1048 ) -> Result<()> {
1049 let now = timestamp_now();
1050 let seq = self.store.get_event_count(workflow_id).await? as i32 + 1;
1051 self.store
1052 .append_event(&WorkflowEvent {
1053 id: None,
1054 workflow_id: workflow_id.to_string(),
1055 seq,
1056 event_type: "SideEffectRecorded".to_string(),
1057 payload: Some(value.to_string()),
1058 timestamp: now,
1059 })
1060 .await?;
1061 Ok(())
1062 }
1063}
1064
1065fn timestamp_now() -> f64 {
1066 std::time::SystemTime::now()
1067 .duration_since(std::time::UNIX_EPOCH)
1068 .unwrap()
1069 .as_secs_f64()
1070}
1071
1072use std::str::FromStr;