1use crate::context::WorkflowContext;
16use crate::task::{RetryPolicy, UntypedCoreTask};
17use sha2::{Digest, Sha256};
18use std::collections::{HashMap, HashSet};
19use std::marker::PhantomData;
20use std::ops::Deref;
21use std::sync::Arc;
22
23#[derive(
25 Debug,
26 Clone,
27 Copy,
28 PartialEq,
29 Eq,
30 serde::Serialize,
31 serde::Deserialize,
32 strum::EnumString,
33 strum::Display,
34)]
35pub enum MaxIterationsPolicy {
36 #[strum(serialize = "fail")]
38 Fail,
39 #[strum(serialize = "exit_with_last")]
41 ExitWithLast,
42}
43
44macro_rules! impl_find_duplicate_id {
47 ($name:ident, task_fields: { $($task_extra:tt)* }, delay_extra: { $($delay_extra:tt)* }, deref_branch: $deref:expr, deref_branch_map: $deref_map:expr) => {
48 impl $name {
49 pub(crate) fn find_duplicate_id(&self) -> Option<String> {
50 fn collect(cont: &$name, seen: &mut HashSet<String>) -> Option<String> {
51 match cont {
52 $name::Task { id, next, $($task_extra)* } => {
53 if !seen.insert(id.clone()) {
54 return Some(id.clone());
55 }
56 next.as_ref().and_then(|n| collect(n, seen))
57 }
58 $name::Fork { id, branches, join } => {
59 if !seen.insert(id.clone()) {
60 return Some(id.clone());
61 }
62 let deref_fn: fn(&_) -> &$name = $deref;
63 branches
64 .iter()
65 .find_map(|b| collect(deref_fn(b), seen))
66 .or_else(|| join.as_ref().and_then(|j| collect(j, seen)))
67 }
68 $name::Branch { id, branches, default, next, .. } => {
69 if !seen.insert(id.clone()) {
70 return Some(id.clone());
71 }
72 let deref_map_fn: fn(&_) -> &$name = $deref_map;
73 branches
74 .values()
75 .find_map(|b| collect(deref_map_fn(b), seen))
76 .or_else(|| default.as_ref().and_then(|d| collect(d, seen)))
77 .or_else(|| next.as_ref().and_then(|n| collect(n, seen)))
78 }
79 $name::Delay { id, next, $($delay_extra)* }
80 | $name::AwaitSignal { id, next, $($delay_extra)* } => {
81 if !seen.insert(id.clone()) {
82 return Some(id.clone());
83 }
84 next.as_ref().and_then(|n| collect(n, seen))
85 }
86 $name::Loop { id, body, next, .. } => {
87 if !seen.insert(id.clone()) {
88 return Some(id.clone());
89 }
90 collect(body, seen)
91 .or_else(|| next.as_ref().and_then(|n| collect(n, seen)))
92 }
93 $name::ChildWorkflow { id, child, next } => {
94 if !seen.insert(id.clone()) {
95 return Some(id.clone());
96 }
97 collect(child, seen)
98 .or_else(|| next.as_ref().and_then(|n| collect(n, seen)))
99 }
100 }
101 }
102 collect(self, &mut HashSet::new())
103 }
104 }
105 };
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::AsRefStr, strum::Display, strum::EnumString)]
110#[strum(serialize_all = "snake_case")]
111pub enum NodeKind {
112 Task,
114 Fork,
116 Delay,
118 AwaitSignal,
120 Branch,
122 Loop,
124 ChildWorkflow,
126}
127
128#[derive(Debug, Clone)]
131pub struct NodeInfo<'a> {
132 pub id: &'a str,
134 pub kind: NodeKind,
136 pub predecessor_id: Option<&'a str>,
139 pub timeout: Option<std::time::Duration>,
141 pub retry_policy: Option<&'a RetryPolicy>,
143 pub priority: Option<u8>,
145}
146
147pub struct NodeIter<'a> {
151 stack: Vec<(&'a WorkflowContinuation, Option<&'a str>)>,
152}
153
154impl<'a> Iterator for NodeIter<'a> {
155 type Item = NodeInfo<'a>;
156
157 #[allow(clippy::too_many_lines)]
158 fn next(&mut self) -> Option<Self::Item> {
159 let (cont, predecessor) = self.stack.pop()?;
160
161 let (id, kind, timeout, retry_policy, priority) = match cont {
162 WorkflowContinuation::Task {
163 id,
164 timeout,
165 retry_policy,
166 priority,
167 ..
168 } => (
169 id.as_str(),
170 NodeKind::Task,
171 *timeout,
172 retry_policy.as_ref(),
173 *priority,
174 ),
175 WorkflowContinuation::Fork { id, .. } => {
176 (id.as_str(), NodeKind::Fork, None, None, None)
177 }
178 WorkflowContinuation::Delay { id, duration, .. } => {
179 (id.as_str(), NodeKind::Delay, Some(*duration), None, None)
180 }
181 WorkflowContinuation::AwaitSignal { id, timeout, .. } => {
182 (id.as_str(), NodeKind::AwaitSignal, *timeout, None, None)
183 }
184 WorkflowContinuation::Branch { id, .. } => {
185 (id.as_str(), NodeKind::Branch, None, None, None)
186 }
187 WorkflowContinuation::Loop { id, .. } => {
188 (id.as_str(), NodeKind::Loop, None, None, None)
189 }
190 WorkflowContinuation::ChildWorkflow { id, .. } => {
191 (id.as_str(), NodeKind::ChildWorkflow, None, None, None)
192 }
193 };
194
195 match cont {
197 WorkflowContinuation::Task { id, next, .. }
198 | WorkflowContinuation::Delay { id, next, .. }
199 | WorkflowContinuation::AwaitSignal { id, next, .. } => {
200 if let Some(n) = next {
201 self.stack.push((n, Some(id)));
202 }
203 }
204 WorkflowContinuation::Fork { id, branches, join } => {
205 if let Some(j) = join {
206 self.stack.push((j, Some(id)));
207 }
208 for b in branches.iter().rev() {
209 self.stack.push((b, Some(id)));
210 }
211 }
212 WorkflowContinuation::Branch {
213 id,
214 branches,
215 default,
216 next,
217 ..
218 } => {
219 if let Some(n) = next {
220 self.stack.push((n, Some(id)));
221 }
222 if let Some(d) = default {
223 self.stack.push((d, Some(id)));
224 }
225 let mut keys: Vec<&String> = branches.keys().collect();
227 keys.sort();
228 for k in keys.into_iter().rev() {
229 self.stack.push((&branches[k], Some(id)));
230 }
231 }
232 WorkflowContinuation::Loop { id, body, next, .. } => {
233 if let Some(n) = next {
234 self.stack.push((n, Some(id)));
235 }
236 self.stack.push((body, Some(id)));
237 }
238 WorkflowContinuation::ChildWorkflow {
239 id, child, next, ..
240 } => {
241 if let Some(n) = next {
242 self.stack.push((n, Some(id)));
243 }
244 self.stack.push((child, Some(id)));
245 }
246 }
247
248 Some(NodeInfo {
249 id,
250 kind,
251 predecessor_id: predecessor,
252 timeout,
253 retry_policy,
254 priority,
255 })
256 }
257}
258
259pub enum WorkflowContinuation {
261 Task {
263 id: String,
265 func: Option<UntypedCoreTask>,
268 timeout: Option<std::time::Duration>,
270 retry_policy: Option<RetryPolicy>,
272 version: Option<String>,
274 priority: Option<u8>,
276 tags: Vec<String>,
278 next: Option<Box<WorkflowContinuation>>,
280 },
281 Fork {
283 id: String,
285 branches: Box<[Arc<WorkflowContinuation>]>,
287 join: Option<Box<WorkflowContinuation>>,
289 },
290 Delay {
292 id: String,
294 duration: std::time::Duration,
296 next: Option<Box<WorkflowContinuation>>,
298 },
299 AwaitSignal {
303 id: String,
305 signal_name: String,
307 timeout: Option<std::time::Duration>,
309 next: Option<Box<WorkflowContinuation>>,
311 },
312 Branch {
316 id: String,
318 key_fn: Option<UntypedCoreTask>,
321 branches: HashMap<String, Box<WorkflowContinuation>>,
323 default: Option<Box<WorkflowContinuation>>,
325 next: Option<Box<WorkflowContinuation>>,
327 },
328 Loop {
331 id: String,
333 body: Box<WorkflowContinuation>,
335 max_iterations: u32,
337 on_max: MaxIterationsPolicy,
339 next: Option<Box<WorkflowContinuation>>,
341 },
342 ChildWorkflow {
344 id: String,
346 child: Arc<WorkflowContinuation>,
348 next: Option<Box<WorkflowContinuation>>,
350 },
351}
352
353impl_find_duplicate_id!(
354 WorkflowContinuation,
355 task_fields: { .. },
356 delay_extra: { .. },
357 deref_branch: |b: &Arc<WorkflowContinuation>| -> &WorkflowContinuation { b },
358 deref_branch_map: |b: &WorkflowContinuation| -> &WorkflowContinuation { b }
359);
360
361#[must_use]
366pub fn key_fn_id(branch_id: &str) -> String {
367 format!("{branch_id}::key_fn")
368}
369
370#[must_use]
375pub fn loop_node_id(counter: usize) -> String {
376 format!("loop_{counter}")
377}
378
379impl WorkflowContinuation {
380 #[must_use]
384 pub fn derive_fork_id(branch_ids: &[&str]) -> String {
385 branch_ids.join("||")
386 }
387
388 #[must_use]
390 pub fn id(&self) -> &str {
391 match self {
392 WorkflowContinuation::Task { id, .. }
393 | WorkflowContinuation::Fork { id, .. }
394 | WorkflowContinuation::Delay { id, .. }
395 | WorkflowContinuation::AwaitSignal { id, .. }
396 | WorkflowContinuation::Branch { id, .. }
397 | WorkflowContinuation::Loop { id, .. }
398 | WorkflowContinuation::ChildWorkflow { id, .. } => id,
399 }
400 }
401
402 #[must_use]
405 pub fn get_next(&self) -> Option<&WorkflowContinuation> {
406 match self {
407 Self::Task { next, .. }
408 | Self::Delay { next, .. }
409 | Self::AwaitSignal { next, .. }
410 | Self::Branch { next, .. }
411 | Self::Loop { next, .. }
412 | Self::ChildWorkflow { next, .. } => next.as_deref(),
413 Self::Fork { join, .. } => join.as_deref(),
414 }
415 }
416
417 #[must_use]
422 pub fn first_task_id(&self) -> &str {
423 match self {
424 WorkflowContinuation::Task { id, .. }
425 | WorkflowContinuation::Delay { id, .. }
426 | WorkflowContinuation::AwaitSignal { id, .. }
427 | WorkflowContinuation::Branch { id, .. } => id,
428 WorkflowContinuation::Fork { branches, .. } => {
429 if let Some(first_branch) = branches.first() {
430 first_branch.first_task_id()
431 } else {
432 "unknown"
433 }
434 }
435 WorkflowContinuation::Loop { body, .. } => body.first_task_id(),
436 WorkflowContinuation::ChildWorkflow { child, .. } => child.first_task_id(),
437 }
438 }
439
440 #[must_use]
445 pub fn first_task_priority(&self) -> Option<u8> {
446 match self {
447 WorkflowContinuation::Task { priority, .. } => *priority,
448 WorkflowContinuation::Delay { .. }
449 | WorkflowContinuation::AwaitSignal { .. }
450 | WorkflowContinuation::Branch { .. } => None,
451 WorkflowContinuation::Fork { branches, .. } => {
452 branches.first().and_then(|b| b.first_task_priority())
453 }
454 WorkflowContinuation::Loop { body, .. } => body.first_task_priority(),
455 WorkflowContinuation::ChildWorkflow { child, .. } => child.first_task_priority(),
456 }
457 }
458
459 #[must_use]
464 pub fn first_task_tags(&self) -> Vec<String> {
465 match self {
466 WorkflowContinuation::Task { tags, .. } => tags.clone(),
467 WorkflowContinuation::Delay { .. }
468 | WorkflowContinuation::AwaitSignal { .. }
469 | WorkflowContinuation::Branch { .. } => vec![],
470 WorkflowContinuation::Fork { branches, .. } => branches
471 .first()
472 .map(|b| b.first_task_tags())
473 .unwrap_or_default(),
474 WorkflowContinuation::Loop { body, .. } => body.first_task_tags(),
475 WorkflowContinuation::ChildWorkflow { child, .. } => child.first_task_tags(),
476 }
477 }
478
479 #[must_use]
484 pub fn first_task_hint(&self) -> crate::snapshot::TaskHint {
485 crate::snapshot::TaskHint {
486 id: self.first_task_id().to_string(),
487 priority: self.first_task_priority(),
488 tags: self.first_task_tags(),
489 }
490 }
491
492 #[must_use]
498 pub fn terminal_task_id(&self) -> &str {
499 let mut current = self;
500 while let Some(next) = current.get_next() {
501 current = next;
502 }
503 current.first_task_id()
504 }
505
506 fn find_task(&self, target_id: &str) -> Option<&Self> {
511 match self {
512 WorkflowContinuation::Task { id, next, .. } => {
513 if id == target_id {
514 return Some(self);
515 }
516 next.as_ref().and_then(|n| n.find_task(target_id))
517 }
518 WorkflowContinuation::Delay { next, .. }
519 | WorkflowContinuation::AwaitSignal { next, .. } => {
520 next.as_ref().and_then(|n| n.find_task(target_id))
521 }
522 WorkflowContinuation::Fork { branches, join, .. } => {
523 for branch in branches {
524 if let Some(found) = branch.find_task(target_id) {
525 return Some(found);
526 }
527 }
528 join.as_ref().and_then(|j| j.find_task(target_id))
529 }
530 WorkflowContinuation::Branch {
531 branches,
532 default,
533 next,
534 ..
535 } => {
536 for branch in branches.values() {
537 if let Some(found) = branch.find_task(target_id) {
538 return Some(found);
539 }
540 }
541 if let Some(d) = default
542 && let Some(found) = d.find_task(target_id)
543 {
544 return Some(found);
545 }
546 next.as_ref().and_then(|n| n.find_task(target_id))
547 }
548 WorkflowContinuation::Loop { body, next, .. } => body
549 .find_task(target_id)
550 .or_else(|| next.as_ref().and_then(|n| n.find_task(target_id))),
551 WorkflowContinuation::ChildWorkflow { child, next, .. } => child
552 .find_task(target_id)
553 .or_else(|| next.as_ref().and_then(|n| n.find_task(target_id))),
554 }
555 }
556
557 fn find_task_mut(&mut self, target_id: &str) -> Option<&mut Self> {
563 match self {
564 WorkflowContinuation::Task { id, .. } if id == target_id => Some(self),
565 WorkflowContinuation::Task { next, .. } => {
566 next.as_mut().and_then(|n| n.find_task_mut(target_id))
567 }
568 WorkflowContinuation::Delay { next, .. }
569 | WorkflowContinuation::AwaitSignal { next, .. } => {
570 next.as_mut().and_then(|n| n.find_task_mut(target_id))
571 }
572 WorkflowContinuation::Fork { join, .. } => {
573 join.as_mut().and_then(|j| j.find_task_mut(target_id))
574 }
575 WorkflowContinuation::Branch {
576 branches,
577 default,
578 next,
579 ..
580 } => {
581 for branch in branches.values_mut() {
582 if let Some(found) = branch.find_task_mut(target_id) {
583 return Some(found);
584 }
585 }
586 if let Some(d) = default
587 && let Some(found) = d.find_task_mut(target_id)
588 {
589 return Some(found);
590 }
591 next.as_mut().and_then(|n| n.find_task_mut(target_id))
592 }
593 WorkflowContinuation::Loop { body, next, .. } => {
594 if let Some(found) = body.find_task_mut(target_id) {
595 return Some(found);
596 }
597 next.as_mut().and_then(|n| n.find_task_mut(target_id))
598 }
599 WorkflowContinuation::ChildWorkflow { next, .. } => {
600 next.as_mut().and_then(|n| n.find_task_mut(target_id))
602 }
603 }
604 }
605
606 pub fn set_task_timeout(&mut self, target_id: &str, timeout: Option<std::time::Duration>) {
608 if let Some(WorkflowContinuation::Task { timeout: t, .. }) = self.find_task_mut(target_id) {
609 *t = timeout;
610 }
611 }
612
613 pub fn set_task_retry_policy(&mut self, target_id: &str, policy: Option<RetryPolicy>) {
615 if let Some(WorkflowContinuation::Task { retry_policy, .. }) = self.find_task_mut(target_id)
616 {
617 *retry_policy = policy;
618 }
619 }
620
621 pub fn set_task_version(&mut self, target_id: &str, ver: Option<String>) {
623 if let Some(WorkflowContinuation::Task { version, .. }) = self.find_task_mut(target_id) {
624 *version = ver;
625 }
626 }
627
628 #[must_use]
630 pub fn get_task_retry_policy(&self, task_id: &str) -> Option<&RetryPolicy> {
631 match self.find_task(task_id)? {
632 WorkflowContinuation::Task { retry_policy, .. } => retry_policy.as_ref(),
633 _ => None,
634 }
635 }
636
637 #[must_use]
639 pub fn get_task_timeout(&self, task_id: &str) -> Option<std::time::Duration> {
640 match self.find_task(task_id)? {
641 WorkflowContinuation::Task { timeout, .. } => *timeout,
642 _ => None,
643 }
644 }
645
646 #[must_use]
648 pub fn get_task_priority(&self, task_id: &str) -> Option<u8> {
649 match self.find_task(task_id)? {
650 WorkflowContinuation::Task { priority, .. } => *priority,
651 _ => None,
652 }
653 }
654
655 #[must_use]
657 pub fn get_task_tags(&self, task_id: &str) -> Vec<String> {
658 match self.find_task(task_id) {
659 Some(WorkflowContinuation::Task { tags, .. }) => tags.clone(),
660 _ => vec![],
661 }
662 }
663
664 pub fn set_task_tags(&mut self, target_id: &str, new_tags: Vec<String>) {
666 if let Some(WorkflowContinuation::Task { tags, .. }) = self.find_task_mut(target_id) {
667 *tags = new_tags;
668 }
669 }
670
671 #[must_use]
678 pub fn build_task_metadata(&self, task_id: &str) -> crate::task::TaskMetadata {
679 match self.find_task(task_id) {
680 Some(WorkflowContinuation::Task {
681 timeout,
682 retry_policy,
683 version,
684 priority,
685 tags,
686 ..
687 }) => crate::task::TaskMetadata {
688 timeout: *timeout,
689 retries: retry_policy.clone(),
690 version: version.clone(),
691 priority: priority.and_then(crate::priority::Priority::from_u8),
692 tags: tags.clone(),
693 ..Default::default()
694 },
695 _ => crate::task::TaskMetadata::default(),
696 }
697 }
698
699 #[must_use]
708 pub fn iter_nodes(&self) -> NodeIter<'_> {
709 NodeIter {
710 stack: vec![(self, None)],
711 }
712 }
713
714 #[must_use]
716 pub fn to_serializable(&self) -> SerializableContinuation {
717 match self {
718 #[allow(clippy::cast_possible_truncation)] WorkflowContinuation::Task {
720 id,
721 timeout,
722 retry_policy,
723 version,
724 priority,
725 tags,
726 next,
727 ..
728 } => SerializableContinuation::Task {
729 id: id.clone(),
730 timeout_ms: timeout.map(|d| d.as_millis() as u64),
731 retry_policy: retry_policy.clone(),
732 version: version.clone(),
733 priority: *priority,
734 tags: tags.clone(),
735 next: next.as_ref().map(|n| Box::new(n.to_serializable())),
736 },
737 WorkflowContinuation::Fork { id, branches, join } => SerializableContinuation::Fork {
738 id: id.clone(),
739 branches: branches.iter().map(|b| b.to_serializable()).collect(),
740 join: join.as_ref().map(|j| Box::new(j.to_serializable())),
741 },
742 #[allow(clippy::cast_possible_truncation)] WorkflowContinuation::Delay { id, duration, next } => SerializableContinuation::Delay {
744 id: id.clone(),
745 duration_ms: duration.as_millis() as u64,
746 next: next.as_ref().map(|n| Box::new(n.to_serializable())),
747 },
748 #[allow(clippy::cast_possible_truncation)]
749 WorkflowContinuation::AwaitSignal {
750 id,
751 signal_name,
752 timeout,
753 next,
754 } => SerializableContinuation::AwaitSignal {
755 id: id.clone(),
756 signal_name: signal_name.clone(),
757 timeout_ms: timeout.map(|d| d.as_millis() as u64),
758 next: next.as_ref().map(|n| Box::new(n.to_serializable())),
759 },
760 WorkflowContinuation::Branch {
761 id,
762 branches,
763 default,
764 next,
765 ..
766 } => SerializableContinuation::Branch {
767 id: id.clone(),
768 branches: branches
769 .iter()
770 .map(|(k, v)| (k.clone(), Box::new(v.to_serializable())))
771 .collect(),
772 default: default.as_ref().map(|d| Box::new(d.to_serializable())),
773 next: next.as_ref().map(|n| Box::new(n.to_serializable())),
774 },
775 WorkflowContinuation::ChildWorkflow { id, child, next } => {
776 SerializableContinuation::ChildWorkflow {
777 id: id.clone(),
778 child: Box::new(child.to_serializable()),
779 next: next.as_ref().map(|n| Box::new(n.to_serializable())),
780 }
781 }
782 WorkflowContinuation::Loop {
783 id,
784 body,
785 max_iterations,
786 on_max,
787 next,
788 } => SerializableContinuation::Loop {
789 id: id.clone(),
790 body: Box::new(body.to_serializable()),
791 max_iterations: *max_iterations,
792 on_max: *on_max,
793 next: next.as_ref().map(|n| Box::new(n.to_serializable())),
794 },
795 }
796 }
797
798 pub fn append_to_chain(&mut self, new_node: WorkflowContinuation) {
802 match self {
803 WorkflowContinuation::Task { next, .. }
804 | WorkflowContinuation::Delay { next, .. }
805 | WorkflowContinuation::AwaitSignal { next, .. }
806 | WorkflowContinuation::Branch { next, .. }
807 | WorkflowContinuation::Loop { next, .. }
808 | WorkflowContinuation::ChildWorkflow { next, .. } => match next {
809 Some(next_box) => next_box.append_to_chain(new_node),
810 None => *next = Some(Box::new(new_node)),
811 },
812 WorkflowContinuation::Fork { join, .. } => match join {
813 Some(join_box) => join_box.append_to_chain(new_node),
814 None => *join = Some(Box::new(new_node)),
815 },
816 }
817 }
818}
819
820#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
862pub enum SerializableContinuation {
863 Task {
865 id: String,
867 #[serde(default, skip_serializing_if = "Option::is_none")]
869 timeout_ms: Option<u64>,
870 #[serde(default, skip_serializing_if = "Option::is_none")]
872 retry_policy: Option<RetryPolicy>,
873 #[serde(default, skip_serializing_if = "Option::is_none")]
875 version: Option<String>,
876 #[serde(default, skip_serializing_if = "Option::is_none")]
878 priority: Option<u8>,
879 #[serde(default, skip_serializing_if = "Vec::is_empty")]
881 tags: Vec<String>,
882 next: Option<Box<SerializableContinuation>>,
884 },
885 Fork {
887 id: String,
889 branches: Vec<SerializableContinuation>,
891 join: Option<Box<SerializableContinuation>>,
893 },
894 Delay {
896 id: String,
898 duration_ms: u64,
900 next: Option<Box<SerializableContinuation>>,
902 },
903 AwaitSignal {
905 id: String,
907 signal_name: String,
909 #[serde(default, skip_serializing_if = "Option::is_none")]
911 timeout_ms: Option<u64>,
912 next: Option<Box<SerializableContinuation>>,
914 },
915 Branch {
917 id: String,
919 branches: HashMap<String, Box<SerializableContinuation>>,
921 #[serde(default, skip_serializing_if = "Option::is_none")]
923 default: Option<Box<SerializableContinuation>>,
924 next: Option<Box<SerializableContinuation>>,
926 },
927 Loop {
929 id: String,
931 body: Box<SerializableContinuation>,
933 max_iterations: u32,
935 on_max: MaxIterationsPolicy,
937 next: Option<Box<SerializableContinuation>>,
939 },
940 ChildWorkflow {
942 id: String,
944 child: Box<SerializableContinuation>,
946 next: Option<Box<SerializableContinuation>>,
948 },
949}
950
951impl_find_duplicate_id!(
952 SerializableContinuation,
953 task_fields: { .. },
954 delay_extra: { .. },
955 deref_branch: |b: &SerializableContinuation| -> &SerializableContinuation { b },
956 deref_branch_map: |b: &SerializableContinuation| -> &SerializableContinuation { b }
957);
958
959impl SerializableContinuation {
960 pub fn to_runnable(
968 &self,
969 registry: &crate::registry::TaskRegistry,
970 ) -> Result<WorkflowContinuation, crate::error::BuildError> {
971 if let Some(dup) = self.find_duplicate_id() {
972 return Err(crate::error::BuildError::DuplicateTaskId(dup));
973 }
974
975 self.to_runnable_unchecked(registry)
976 }
977
978 #[allow(clippy::too_many_lines)]
980 fn to_runnable_unchecked(
981 &self,
982 registry: &crate::registry::TaskRegistry,
983 ) -> Result<WorkflowContinuation, crate::error::BuildError> {
984 match self {
985 SerializableContinuation::Task {
986 id,
987 timeout_ms,
988 retry_policy,
989 version,
990 priority,
991 tags,
992 next,
993 } => {
994 let func = registry
995 .get(id)
996 .ok_or_else(|| crate::error::BuildError::TaskNotFound(id.clone()))?;
997 let next = next
998 .as_ref()
999 .map(|n| n.to_runnable_unchecked(registry).map(Box::new))
1000 .transpose()?;
1001 Ok(WorkflowContinuation::Task {
1002 id: id.clone(),
1003 func: Some(func),
1004 timeout: timeout_ms.map(std::time::Duration::from_millis),
1005 retry_policy: retry_policy.clone(),
1006 version: version.clone(),
1007 priority: *priority,
1008 tags: tags.clone(),
1009 next,
1010 })
1011 }
1012 SerializableContinuation::Fork { id, branches, join } => {
1013 let branches: Result<Vec<_>, _> = branches
1014 .iter()
1015 .map(|b| b.to_runnable_unchecked(registry).map(Arc::new))
1016 .collect();
1017 let join = join
1018 .as_ref()
1019 .map(|j| j.to_runnable_unchecked(registry).map(Box::new))
1020 .transpose()?;
1021 Ok(WorkflowContinuation::Fork {
1022 id: id.clone(),
1023 branches: branches?.into_boxed_slice(),
1024 join,
1025 })
1026 }
1027 SerializableContinuation::Delay {
1028 id,
1029 duration_ms,
1030 next,
1031 } => {
1032 let next = next
1033 .as_ref()
1034 .map(|n| n.to_runnable_unchecked(registry).map(Box::new))
1035 .transpose()?;
1036 Ok(WorkflowContinuation::Delay {
1037 id: id.clone(),
1038 duration: std::time::Duration::from_millis(*duration_ms),
1039 next,
1040 })
1041 }
1042 SerializableContinuation::AwaitSignal {
1043 id,
1044 signal_name,
1045 timeout_ms,
1046 next,
1047 } => {
1048 let next = next
1049 .as_ref()
1050 .map(|n| n.to_runnable_unchecked(registry).map(Box::new))
1051 .transpose()?;
1052 Ok(WorkflowContinuation::AwaitSignal {
1053 id: id.clone(),
1054 signal_name: signal_name.clone(),
1055 timeout: timeout_ms.map(std::time::Duration::from_millis),
1056 next,
1057 })
1058 }
1059 SerializableContinuation::Branch {
1060 id,
1061 branches,
1062 default,
1063 next,
1064 } => {
1065 let kf_id = key_fn_id(id);
1066 let key_fn = registry
1067 .get(&kf_id)
1068 .ok_or(crate::error::BuildError::TaskNotFound(kf_id))?;
1069 let branches: Result<HashMap<_, _>, _> = branches
1070 .iter()
1071 .map(|(k, v)| {
1072 v.to_runnable_unchecked(registry)
1073 .map(|c| (k.clone(), Box::new(c)))
1074 })
1075 .collect();
1076 let default = default
1077 .as_ref()
1078 .map(|d| d.to_runnable_unchecked(registry).map(Box::new))
1079 .transpose()?;
1080 let next = next
1081 .as_ref()
1082 .map(|n| n.to_runnable_unchecked(registry).map(Box::new))
1083 .transpose()?;
1084 Ok(WorkflowContinuation::Branch {
1085 id: id.clone(),
1086 key_fn: Some(key_fn),
1087 branches: branches?,
1088 default,
1089 next,
1090 })
1091 }
1092 SerializableContinuation::Loop {
1093 id,
1094 body,
1095 max_iterations,
1096 on_max,
1097 next,
1098 } => {
1099 let body = body.to_runnable_unchecked(registry)?;
1100 let next = next
1101 .as_ref()
1102 .map(|n| n.to_runnable_unchecked(registry).map(Box::new))
1103 .transpose()?;
1104 Ok(WorkflowContinuation::Loop {
1105 id: id.clone(),
1106 body: Box::new(body),
1107 max_iterations: *max_iterations,
1108 on_max: *on_max,
1109 next,
1110 })
1111 }
1112 SerializableContinuation::ChildWorkflow { id, child, next } => {
1113 let child = child.to_runnable_unchecked(registry)?;
1114 let next = next
1115 .as_ref()
1116 .map(|n| n.to_runnable_unchecked(registry).map(Box::new))
1117 .transpose()?;
1118 Ok(WorkflowContinuation::ChildWorkflow {
1119 id: id.clone(),
1120 child: Arc::new(child),
1121 next,
1122 })
1123 }
1124 }
1125 }
1126
1127 #[must_use]
1129 pub fn task_ids(&self) -> Vec<&str> {
1130 fn collect<'a>(cont: &'a SerializableContinuation, ids: &mut Vec<&'a str>) {
1131 match cont {
1132 SerializableContinuation::Task { id, next, .. }
1133 | SerializableContinuation::Delay { id, next, .. }
1134 | SerializableContinuation::AwaitSignal { id, next, .. } => {
1135 ids.push(id.as_str());
1136 if let Some(n) = next {
1137 collect(n, ids);
1138 }
1139 }
1140 SerializableContinuation::Fork { id, branches, join } => {
1141 ids.push(id.as_str());
1142 for b in branches {
1143 collect(b, ids);
1144 }
1145 if let Some(j) = join {
1146 collect(j, ids);
1147 }
1148 }
1149 SerializableContinuation::Branch {
1150 id,
1151 branches,
1152 default,
1153 next,
1154 } => {
1155 ids.push(id.as_str());
1156 for b in branches.values() {
1157 collect(b, ids);
1158 }
1159 if let Some(d) = default {
1160 collect(d, ids);
1161 }
1162 if let Some(n) = next {
1163 collect(n, ids);
1164 }
1165 }
1166 SerializableContinuation::Loop { id, body, next, .. } => {
1167 ids.push(id.as_str());
1168 collect(body, ids);
1169 if let Some(n) = next {
1170 collect(n, ids);
1171 }
1172 }
1173 SerializableContinuation::ChildWorkflow { id, child, next } => {
1174 ids.push(id.as_str());
1175 collect(child, ids);
1176 if let Some(n) = next {
1177 collect(n, ids);
1178 }
1179 }
1180 }
1181 }
1182 let mut ids = vec![];
1183 collect(self, &mut ids);
1184 ids
1185 }
1186
1187 #[must_use]
1196 #[allow(clippy::too_many_lines)]
1197 pub fn compute_definition_hash(&self) -> String {
1198 #[allow(clippy::too_many_lines)]
1199 fn hash_continuation(cont: &SerializableContinuation, hasher: &mut Sha256) {
1200 match cont {
1201 SerializableContinuation::Task {
1202 id,
1203 timeout_ms,
1204 retry_policy,
1205 version,
1206 next,
1207 ..
1208 } => {
1209 hasher.update(b"T:"); hasher.update(id.as_bytes());
1211 if let Some(ms) = timeout_ms {
1212 hasher.update(b":t:");
1213 hasher.update(ms.to_string().as_bytes());
1214 }
1215 if let Some(rp) = retry_policy {
1216 hasher.update(b":r:");
1217 hasher.update(rp.max_retries.to_string().as_bytes());
1218 hasher.update(b":");
1219 hasher.update(rp.initial_delay.as_millis().to_string().as_bytes());
1220 hasher.update(b":");
1221 hasher.update(rp.backoff_multiplier.to_string().as_bytes());
1222 }
1223 if let Some(v) = version {
1224 hasher.update(b":v:");
1225 hasher.update(v.as_bytes());
1226 }
1227 hasher.update(b";");
1228 if let Some(n) = next {
1229 hash_continuation(n, hasher);
1230 }
1231 }
1232 SerializableContinuation::Fork { id, branches, join } => {
1233 hasher.update(b"F:");
1234 hasher.update(id.as_bytes());
1235 hasher.update(b"[");
1236 for branch in branches {
1237 hash_continuation(branch, hasher);
1238 hasher.update(b",");
1239 }
1240 hasher.update(b"]");
1241 if let Some(j) = join {
1242 hasher.update(b"J:");
1243 hash_continuation(j, hasher);
1244 }
1245 }
1246 SerializableContinuation::Delay {
1247 id,
1248 duration_ms,
1249 next,
1250 } => {
1251 hasher.update(b"D:");
1252 hasher.update(id.as_bytes());
1253 hasher.update(b":");
1254 hasher.update(duration_ms.to_string().as_bytes());
1255 hasher.update(b";");
1256 if let Some(n) = next {
1257 hash_continuation(n, hasher);
1258 }
1259 }
1260 SerializableContinuation::AwaitSignal {
1261 id,
1262 signal_name,
1263 timeout_ms,
1264 next,
1265 } => {
1266 hasher.update(b"S:");
1267 hasher.update(id.as_bytes());
1268 hasher.update(b":");
1269 hasher.update(signal_name.as_bytes());
1270 if let Some(ms) = timeout_ms {
1271 hasher.update(b":t:");
1272 hasher.update(ms.to_string().as_bytes());
1273 }
1274 hasher.update(b";");
1275 if let Some(n) = next {
1276 hash_continuation(n, hasher);
1277 }
1278 }
1279 SerializableContinuation::Branch {
1280 id,
1281 branches,
1282 default,
1283 next,
1284 } => {
1285 hasher.update(b"B:");
1286 hasher.update(id.as_bytes());
1287 hasher.update(b"{");
1288 let mut keys: Vec<&String> = branches.keys().collect();
1290 keys.sort();
1291 for key in keys {
1292 hasher.update(key.as_bytes());
1293 hasher.update(b"=>");
1294 if let Some(branch) = branches.get(key) {
1295 hash_continuation(branch, hasher);
1296 }
1297 hasher.update(b",");
1298 }
1299 hasher.update(b"}");
1300 if let Some(d) = default {
1301 hasher.update(b"_=>");
1302 hash_continuation(d, hasher);
1303 }
1304 hasher.update(b";");
1305 if let Some(n) = next {
1306 hash_continuation(n, hasher);
1307 }
1308 }
1309 SerializableContinuation::Loop {
1310 id,
1311 body,
1312 max_iterations,
1313 on_max,
1314 next,
1315 } => {
1316 hasher.update(b"L:");
1317 hasher.update(id.as_bytes());
1318 hasher.update(b":");
1319 hasher.update(max_iterations.to_string().as_bytes());
1320 hasher.update(b":");
1321 hasher.update(on_max.to_string().as_bytes());
1322 hasher.update(b"{");
1323 hash_continuation(body, hasher);
1324 hasher.update(b"}");
1325 hasher.update(b";");
1326 if let Some(n) = next {
1327 hash_continuation(n, hasher);
1328 }
1329 }
1330 SerializableContinuation::ChildWorkflow { id, child, next } => {
1331 hasher.update(b"CW:");
1332 hasher.update(id.as_bytes());
1333 hasher.update(b"{");
1334 hash_continuation(child, hasher);
1335 hasher.update(b"}");
1336 hasher.update(b";");
1337 if let Some(n) = next {
1338 hash_continuation(n, hasher);
1339 }
1340 }
1341 }
1342 }
1343
1344 let mut hasher = Sha256::new();
1345 hash_continuation(self, &mut hasher);
1346 let result = hasher.finalize();
1347 format!("{result:x}")
1348 }
1349}
1350
1351#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1358pub struct SerializedWorkflowState {
1359 pub workflow_id: String,
1361 pub definition_hash: String,
1364 pub continuation: SerializableContinuation,
1366}
1367
1368#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, strum::EnumString, strum::Display)]
1371#[strum(serialize_all = "snake_case")]
1372pub enum ConflictPolicy {
1373 #[default]
1375 Fail,
1376 #[strum(serialize = "use_existing", serialize = "useExisting")]
1378 UseExisting,
1379 #[strum(serialize = "terminate_existing", serialize = "terminateExisting")]
1381 TerminateExisting,
1382}
1383
1384#[derive(Debug, strum::AsRefStr, strum::EnumDiscriminants)]
1386#[strum_discriminants(name(WorkflowStatusKind))]
1387#[strum_discriminants(derive(strum::AsRefStr))]
1388#[strum_discriminants(strum(serialize_all = "snake_case"))]
1389#[strum_discriminants(doc = "Fieldless discriminant of [`WorkflowStatus`] for string comparisons.")]
1390pub enum WorkflowStatus {
1391 #[strum(serialize = "in_progress")]
1393 InProgress,
1394 #[strum(serialize = "completed")]
1396 Completed,
1397 #[strum(serialize = "failed")]
1399 Failed(String),
1400 #[strum(serialize = "cancelled")]
1402 Cancelled {
1403 reason: Option<String>,
1405 cancelled_by: Option<String>,
1407 },
1408 #[strum(serialize = "paused")]
1410 Paused {
1411 reason: Option<String>,
1413 paused_by: Option<String>,
1415 },
1416 #[strum(serialize = "waiting")]
1418 Waiting {
1419 wake_at: chrono::DateTime<chrono::Utc>,
1421 delay_id: String,
1423 },
1424 #[strum(serialize = "awaiting_signal")]
1426 AwaitingSignal {
1427 signal_id: String,
1429 signal_name: String,
1431 wake_at: Option<chrono::DateTime<chrono::Utc>>,
1433 },
1434}
1435
1436#[derive(Debug, Default)]
1442pub struct FlatWorkflowStatus {
1443 pub status: String,
1446 pub error: Option<String>,
1448 pub reason: Option<String>,
1450 pub cancelled_by: Option<String>,
1452 pub paused_by: Option<String>,
1454 pub wake_at: Option<String>,
1456 pub delay_id: Option<String>,
1458 pub signal_id: Option<String>,
1460 pub signal_name: Option<String>,
1462}
1463
1464impl From<WorkflowStatus> for FlatWorkflowStatus {
1465 fn from(status: WorkflowStatus) -> Self {
1466 let mut flat = Self {
1467 status: status.as_ref().to_string(),
1468 ..Self::default()
1469 };
1470 match status {
1471 WorkflowStatus::Completed | WorkflowStatus::InProgress => {}
1472 WorkflowStatus::Failed(e) => flat.error = Some(e),
1473 WorkflowStatus::Cancelled {
1474 reason,
1475 cancelled_by,
1476 } => {
1477 flat.reason = reason;
1478 flat.cancelled_by = cancelled_by;
1479 }
1480 WorkflowStatus::Paused { reason, paused_by } => {
1481 flat.reason = reason;
1482 flat.paused_by = paused_by;
1483 }
1484 WorkflowStatus::Waiting { wake_at, delay_id } => {
1485 flat.wake_at = Some(wake_at.to_rfc3339());
1486 flat.delay_id = Some(delay_id);
1487 }
1488 WorkflowStatus::AwaitingSignal {
1489 signal_id,
1490 signal_name,
1491 wake_at,
1492 } => {
1493 flat.signal_id = Some(signal_id);
1494 flat.signal_name = Some(signal_name);
1495 flat.wake_at = wake_at.map(|t| t.to_rfc3339());
1496 }
1497 }
1498 flat
1499 }
1500}
1501
1502pub use crate::builder::{
1504 BranchCollector, ContinuationState, ForkBuilder, NoContinuation, NoRegistry, RegistryBehavior,
1505 RouteBuilder, SubBuilder, WorkflowBuilder,
1506};
1507
1508use crate::registry::TaskRegistry;
1509
1510pub struct Workflow<C, Input, M = ()> {
1512 pub(crate) definition_hash: String,
1513 pub(crate) context: WorkflowContext<C, M>,
1514 pub(crate) continuation: WorkflowContinuation,
1515 pub(crate) _phantom: PhantomData<Input>,
1516}
1517
1518impl<C, Input, M> Workflow<C, Input, M> {
1519 #[must_use]
1521 pub fn workflow_id(&self) -> &str {
1522 &self.context.workflow_id
1523 }
1524
1525 #[must_use]
1531 pub fn definition_hash(&self) -> &str {
1532 &self.definition_hash
1533 }
1534
1535 #[must_use]
1537 pub fn context(&self) -> &WorkflowContext<C, M> {
1538 &self.context
1539 }
1540
1541 #[must_use]
1543 pub fn codec(&self) -> &Arc<C> {
1544 &self.context.codec
1545 }
1546
1547 #[must_use]
1549 pub fn continuation(&self) -> &WorkflowContinuation {
1550 &self.continuation
1551 }
1552
1553 #[must_use]
1555 pub fn metadata(&self) -> &Arc<M> {
1556 &self.context.metadata
1557 }
1558
1559 #[must_use]
1563 pub fn iter_nodes(&self) -> NodeIter<'_> {
1564 self.continuation.iter_nodes()
1565 }
1566
1567 #[must_use]
1571 pub fn into_continuation(self) -> WorkflowContinuation {
1572 self.continuation
1573 }
1574}
1575
1576pub struct SerializableWorkflow<C, Input, M = ()> {
1624 pub(crate) inner: Workflow<C, Input, M>,
1625 pub(crate) registry: TaskRegistry,
1626}
1627
1628impl<C, Input, M> SerializableWorkflow<C, Input, M> {
1629 #[must_use]
1631 pub fn workflow_id(&self) -> &str {
1632 self.inner.workflow_id()
1633 }
1634
1635 #[must_use]
1637 pub fn definition_hash(&self) -> &str {
1638 self.inner.definition_hash()
1639 }
1640
1641 #[must_use]
1643 pub fn workflow(&self) -> &Workflow<C, Input, M> {
1644 &self.inner
1645 }
1646
1647 #[must_use]
1649 pub fn context(&self) -> &WorkflowContext<C, M> {
1650 self.inner.context()
1651 }
1652
1653 #[must_use]
1655 pub fn codec(&self) -> &Arc<C> {
1656 self.inner.codec()
1657 }
1658
1659 #[must_use]
1661 pub fn continuation(&self) -> &WorkflowContinuation {
1662 self.inner.continuation()
1663 }
1664
1665 #[must_use]
1667 pub fn metadata(&self) -> &Arc<M> {
1668 self.inner.metadata()
1669 }
1670
1671 #[must_use]
1673 pub fn registry(&self) -> &TaskRegistry {
1674 &self.registry
1675 }
1676
1677 #[must_use]
1682 pub fn into_parts(self) -> (WorkflowContinuation, TaskRegistry) {
1683 (self.inner.continuation, self.registry)
1684 }
1685
1686 #[must_use]
1692 pub fn to_serializable(&self) -> SerializedWorkflowState {
1693 SerializedWorkflowState {
1694 workflow_id: self.inner.workflow_id().to_string(),
1695 definition_hash: self.inner.definition_hash.clone(),
1696 continuation: self.inner.continuation().to_serializable(),
1697 }
1698 }
1699
1700 pub fn to_runnable(
1710 &self,
1711 state: &SerializedWorkflowState,
1712 ) -> Result<WorkflowContinuation, crate::error::BuildError> {
1713 if state.definition_hash != self.inner.definition_hash {
1714 return Err(crate::error::BuildError::DefinitionMismatch {
1715 expected: self.inner.definition_hash.clone(),
1716 found: state.definition_hash.clone(),
1717 });
1718 }
1719 state.continuation.to_runnable(&self.registry)
1720 }
1721}
1722
1723impl<C, Input, M> Deref for SerializableWorkflow<C, Input, M> {
1724 type Target = Workflow<C, Input, M>;
1725
1726 fn deref(&self) -> &Self::Target {
1727 &self.inner
1728 }
1729}
1730
1731#[cfg(test)]
1732#[allow(
1733 clippy::unwrap_used,
1734 clippy::panic,
1735 clippy::cast_lossless,
1736 clippy::cast_possible_truncation,
1737 clippy::uninlined_format_args,
1738 clippy::manual_let_else,
1739 clippy::too_many_lines,
1740 clippy::items_after_statements
1741)]
1742mod tests {
1743 use crate::codec::{Decoder, Encoder, sealed};
1744 use crate::error::BoxError;
1745 use crate::workflow::WorkflowBuilder;
1746 use bytes::Bytes;
1747
1748 struct DummyCodec;
1749
1750 impl Encoder for DummyCodec {}
1751 impl Decoder for DummyCodec {}
1752
1753 impl<Input> sealed::EncodeValue<Input> for DummyCodec {
1754 fn encode_value(&self, _value: &Input) -> Result<Bytes, BoxError> {
1755 Ok(Bytes::new())
1756 }
1757 }
1758 impl<Output> sealed::DecodeValue<Output> for DummyCodec {
1759 fn decode_value(&self, _bytes: Bytes) -> Result<Output, BoxError> {
1760 Err("Not implemented".into())
1761 }
1762 }
1763
1764 #[test]
1765 fn test_workflow_build() {
1766 use crate::context::WorkflowContext;
1767 use crate::workflow::Workflow;
1768 use std::sync::Arc;
1769
1770 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
1771 let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
1772 .then("test", |i: u32| async move { Ok(i + 1) })
1773 .build()
1774 .unwrap();
1775
1776 let _workflow_ref = &workflow;
1779 }
1780
1781 #[test]
1782 fn test_workflow_with_metadata() {
1783 use crate::context::WorkflowContext;
1784 use crate::workflow::Workflow;
1785 use std::sync::Arc;
1786
1787 let ctx = WorkflowContext::new(
1788 "test-workflow",
1789 Arc::new(DummyCodec),
1790 Arc::new("test_metadata"),
1791 );
1792 let workflow: Workflow<DummyCodec, u32, &str> = WorkflowBuilder::new(ctx)
1793 .then("test", |i: u32| async move { Ok(i + 1) })
1794 .build()
1795 .unwrap();
1796
1797 assert_eq!(**workflow.metadata(), "test_metadata");
1798 }
1799
1800 #[test]
1801 fn test_task_order() {
1802 use crate::context::WorkflowContext;
1803 use crate::workflow::Workflow;
1804 use std::sync::Arc;
1805
1806 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
1807 let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
1808 .then("first", |i: u32| async move { Ok(i + 1) })
1809 .then("second", |i: u32| async move { Ok(i + 2) })
1810 .then("third", |i: u32| async move { Ok(i + 3) })
1811 .build()
1812 .unwrap();
1813
1814 let mut current = workflow.continuation();
1817 let mut task_ids = vec![];
1818
1819 loop {
1820 match current {
1821 crate::workflow::WorkflowContinuation::Task { id, next, .. } => {
1822 task_ids.push(id.clone());
1823 match next {
1824 Some(next_box) => current = next_box.as_ref(),
1825 None => break,
1826 }
1827 }
1828 _ => break,
1829 }
1830 }
1831
1832 assert_eq!(
1833 task_ids,
1834 vec!["first", "second", "third"],
1835 "Tasks should execute in the order they were added"
1836 );
1837 }
1838
1839 #[test]
1840 fn test_heterogeneous_fork_join_compiles() {
1841 use crate::context::WorkflowContext;
1842 use crate::task::BranchOutputs;
1843 use crate::workflow::Workflow;
1844 use std::sync::Arc;
1845
1846 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
1847 let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
1850 .then("prepare", |i: u32| async move { Ok(i) })
1851 .branches(|b| {
1852 b.add("count", |i: u32| async move { Ok(i * 2) });
1854 b.add("name", |i: u32| async move { Ok(format!("item_{}", i)) });
1856 b.add("ratio", |i: u32| async move { Ok(i as f64 / 100.0) });
1858 })
1859 .join("combine", |outputs: BranchOutputs<DummyCodec>| async move {
1860 let _ = outputs.len();
1866 Ok(format!("combined {} branches", outputs.len()))
1867 })
1868 .then("final", |s: String| async move { Ok(s.len() as u32) })
1869 .build()
1870 .unwrap();
1871
1872 let _workflow_ref = &workflow;
1873 }
1874
1875 #[test]
1876 fn test_duplicate_branch_id_returns_error() {
1877 use crate::context::WorkflowContext;
1878 use crate::error::BuildError;
1879 use std::sync::Arc;
1880
1881 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
1882 let result = WorkflowBuilder::<_, u32, _>::new(ctx)
1883 .then("prepare", |i: u32| async move { Ok(i) })
1884 .branches(|b| {
1885 b.add("count", |i: u32| async move { Ok(i * 2) });
1886 b.add("count", |i: u32| async move { Ok(i * 3) }); })
1888 .join("combine", |_outputs| async move { Ok(0u32) })
1889 .build();
1890
1891 let err = match result {
1892 Err(e) => e,
1893 Ok(_) => panic!("expected build error"),
1894 };
1895 assert!(
1896 err.iter()
1897 .any(|e| matches!(e, BuildError::DuplicateTaskId(id) if id == "count"))
1898 );
1899 }
1900
1901 #[test]
1902 fn test_serializable_continuation() {
1903 use crate::context::WorkflowContext;
1904 use crate::error::BuildError;
1905 use crate::registry::TaskRegistry;
1906 use std::sync::Arc;
1907
1908 let codec = Arc::new(DummyCodec);
1910 let ctx = WorkflowContext::new("test-workflow", codec.clone(), Arc::new(()));
1911 let workflow = WorkflowBuilder::new(ctx)
1912 .then("step1", |i: u32| async move { Ok(i + 1) })
1913 .then("step2", |i: u32| async move { Ok(i * 2) })
1914 .build()
1915 .unwrap();
1916
1917 let serializable = workflow.continuation().to_serializable();
1919
1920 let task_ids = serializable.task_ids();
1922 assert_eq!(task_ids, vec!["step1", "step2"]);
1923
1924 let empty_registry = TaskRegistry::new();
1926 let result = serializable.to_runnable(&empty_registry);
1927 assert!(matches!(result, Err(BuildError::TaskNotFound(id)) if id == "step1"));
1928
1929 let mut registry = TaskRegistry::new();
1931 registry.register_fn("step1", codec.clone(), |i: u32| async move { Ok(i + 1) });
1932 registry.register_fn("step2", codec.clone(), |i: u32| async move { Ok(i * 2) });
1933
1934 let hydrated = serializable.to_runnable(®istry);
1935 assert!(hydrated.is_ok());
1936 }
1937
1938 #[test]
1939 fn test_serializable_fork_join() {
1940 use crate::context::WorkflowContext;
1941 use crate::task::BranchOutputs;
1942 use std::sync::Arc;
1943
1944 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
1945 let workflow = WorkflowBuilder::new(ctx)
1946 .then("prepare", |i: u32| async move { Ok(i) })
1947 .branches(|b| {
1948 b.add("branch_a", |i: u32| async move { Ok(i * 2) });
1949 b.add("branch_b", |i: u32| async move { Ok(i + 10) });
1950 })
1951 .join(
1952 "merge",
1953 |_: BranchOutputs<DummyCodec>| async move { Ok(0u32) },
1954 )
1955 .build()
1956 .unwrap();
1957
1958 let serializable = workflow.continuation().to_serializable();
1959 let task_ids = serializable.task_ids();
1960
1961 assert!(task_ids.contains(&"prepare"));
1963 assert!(task_ids.contains(&"branch_a||branch_b"));
1964 assert!(task_ids.contains(&"branch_a"));
1965 assert!(task_ids.contains(&"branch_b"));
1966 assert!(task_ids.contains(&"merge"));
1967 assert_eq!(task_ids.len(), 5);
1968 }
1969
1970 #[test]
1971 fn test_serializable_workflow_builder() {
1972 use crate::context::WorkflowContext;
1973 use std::sync::Arc;
1974
1975 let codec = Arc::new(DummyCodec);
1976 let ctx = WorkflowContext::new("test-workflow", codec, Arc::new(()));
1977
1978 let workflow = WorkflowBuilder::new(ctx)
1980 .with_registry()
1981 .then("step1", |i: u32| async move { Ok(i + 1) })
1982 .then("step2", |i: u32| async move { Ok(i * 2) })
1983 .build()
1984 .unwrap();
1985
1986 assert!(workflow.registry().contains("step1"));
1988 assert!(workflow.registry().contains("step2"));
1989 assert_eq!(workflow.registry().len(), 2);
1990
1991 let serializable = workflow.to_serializable();
1993 assert_eq!(serializable.continuation.task_ids(), vec!["step1", "step2"]);
1994
1995 let hydrated = workflow.to_runnable(&serializable);
1997 assert!(hydrated.is_ok());
1998 }
1999
2000 #[test]
2001 fn test_with_existing_registry_and_then_registered() {
2002 use crate::context::WorkflowContext;
2003 use crate::registry::TaskRegistry;
2004 use crate::workflow::SerializableWorkflow;
2005 use std::sync::Arc;
2006
2007 let codec = Arc::new(DummyCodec);
2008
2009 let mut registry = TaskRegistry::new();
2011 registry.register_fn("double", codec.clone(), |i: u32| async move { Ok(i * 2) });
2012 registry.register_fn("add_ten", codec.clone(), |i: u32| async move { Ok(i + 10) });
2013
2014 let ctx = WorkflowContext::new("test-workflow", codec.clone(), Arc::new(()));
2016 let workflow: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx)
2017 .with_existing_registry(registry)
2018 .then_registered::<u32>("double")
2019 .then_registered::<u32>("add_ten")
2020 .build()
2021 .unwrap();
2022
2023 assert!(workflow.registry().contains("double"));
2025 assert!(workflow.registry().contains("add_ten"));
2026
2027 let serializable = workflow.to_serializable();
2029 assert_eq!(
2030 serializable.continuation.task_ids(),
2031 vec!["double", "add_ten"]
2032 );
2033
2034 let hydrated = workflow.to_runnable(&serializable);
2036 assert!(hydrated.is_ok());
2037 }
2038
2039 #[test]
2040 fn test_mixed_inline_and_registered_tasks() {
2041 use crate::context::WorkflowContext;
2042 use crate::registry::TaskRegistry;
2043 use crate::workflow::SerializableWorkflow;
2044 use std::sync::Arc;
2045
2046 let codec = Arc::new(DummyCodec);
2047
2048 let mut registry = TaskRegistry::new();
2050 registry.register_fn(
2051 "preregistered",
2052 codec.clone(),
2053 |i: u32| async move { Ok(i * 2) },
2054 );
2055
2056 let ctx = WorkflowContext::new("test-workflow", codec.clone(), Arc::new(()));
2058 let workflow: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx)
2059 .with_existing_registry(registry)
2060 .then_registered::<u32>("preregistered") .then("inline", |i: u32| async move { Ok(i + 5) }) .build()
2063 .unwrap();
2064
2065 assert!(workflow.registry().contains("preregistered"));
2067 assert!(workflow.registry().contains("inline"));
2068 assert_eq!(workflow.registry().len(), 2);
2069 }
2070
2071 #[test]
2072 fn test_workflow_id_and_definition_hash() {
2073 use crate::context::WorkflowContext;
2074 use std::sync::Arc;
2075
2076 let ctx = WorkflowContext::new("my-workflow-id", Arc::new(DummyCodec), Arc::new(()));
2077 let workflow = WorkflowBuilder::new(ctx)
2078 .with_registry()
2079 .then("step1", |i: u32| async move { Ok(i + 1) })
2080 .then("step2", |i: u32| async move { Ok(i * 2) })
2081 .build()
2082 .unwrap();
2083
2084 assert_eq!(workflow.workflow_id(), "my-workflow-id");
2086
2087 assert!(!workflow.definition_hash().is_empty());
2089
2090 let state = workflow.to_serializable();
2092 assert_eq!(state.workflow_id, "my-workflow-id");
2093 assert_eq!(state.definition_hash, workflow.definition_hash());
2094 }
2095
2096 #[test]
2097 fn test_definition_hash_changes_with_structure() {
2098 use crate::context::WorkflowContext;
2099 use std::sync::Arc;
2100
2101 let ctx1 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2103 let workflow1 = WorkflowBuilder::new(ctx1)
2104 .with_registry()
2105 .then("step1", |i: u32| async move { Ok(i + 1) })
2106 .build()
2107 .unwrap();
2108
2109 let ctx2 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2110 let workflow2 = WorkflowBuilder::new(ctx2)
2111 .with_registry()
2112 .then("step1", |i: u32| async move { Ok(i + 1) })
2113 .then("step2", |i: u32| async move { Ok(i * 2) })
2114 .build()
2115 .unwrap();
2116
2117 assert_ne!(workflow1.definition_hash(), workflow2.definition_hash());
2118 }
2119
2120 #[test]
2121 fn test_definition_mismatch_error() {
2122 use crate::context::WorkflowContext;
2123 use crate::error::BuildError;
2124 use std::sync::Arc;
2125
2126 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2127 let workflow = WorkflowBuilder::new(ctx)
2128 .with_registry()
2129 .then("step1", |i: u32| async move { Ok(i + 1) })
2130 .build()
2131 .unwrap();
2132
2133 let mut state = workflow.to_serializable();
2135 state.definition_hash = "wrong-hash".to_string();
2136
2137 let result = workflow.to_runnable(&state);
2139 assert!(matches!(result, Err(BuildError::DefinitionMismatch { .. })));
2140 }
2141
2142 #[test]
2143 fn test_duplicate_id_tampering_detection() {
2144 use crate::error::BuildError;
2145 use crate::registry::TaskRegistry;
2146 use crate::workflow::SerializableContinuation;
2147 use std::sync::Arc;
2148
2149 let codec = Arc::new(DummyCodec);
2150
2151 let mut registry = TaskRegistry::new();
2153 registry.register_fn("step1", codec.clone(), |i: u32| async move { Ok(i + 1) });
2154 registry.register_fn("step2", codec.clone(), |i: u32| async move { Ok(i * 2) });
2155
2156 let tampered = SerializableContinuation::Task {
2158 id: "step1".to_string(),
2159 timeout_ms: None,
2160 retry_policy: None,
2161 version: None,
2162 priority: None,
2163
2164 tags: vec![],
2165 next: Some(Box::new(SerializableContinuation::Task {
2166 id: "step1".to_string(), timeout_ms: None,
2168 retry_policy: None,
2169 version: None,
2170 priority: None,
2171
2172 tags: vec![],
2173 next: None,
2174 })),
2175 };
2176
2177 let result = tampered.to_runnable(®istry);
2179 assert!(matches!(
2180 result,
2181 Err(BuildError::DuplicateTaskId(id)) if id == "step1"
2182 ));
2183 }
2184
2185 #[test]
2190 fn test_delay_builder() {
2191 use crate::context::WorkflowContext;
2192 use crate::workflow::{Workflow, WorkflowContinuation};
2193 use std::sync::Arc;
2194 use std::time::Duration;
2195
2196 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2197 let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2198 .then("step1", |i: u32| async move { Ok(i + 1) })
2199 .delay("wait_1s", Duration::from_secs(1))
2200 .then("step2", |i: u32| async move { Ok(i * 2) })
2201 .build()
2202 .unwrap();
2203
2204 let mut ids = vec![];
2206 let mut current = workflow.continuation();
2207 loop {
2208 match current {
2209 WorkflowContinuation::Task { id, next, .. } => {
2210 ids.push(format!("task:{id}"));
2211 match next {
2212 Some(n) => current = n,
2213 None => break,
2214 }
2215 }
2216 WorkflowContinuation::Delay {
2217 id, duration, next, ..
2218 } => {
2219 ids.push(format!("delay:{id}:{}ms", duration.as_millis()));
2220 match next {
2221 Some(n) => current = n,
2222 None => break,
2223 }
2224 }
2225 _ => break,
2226 }
2227 }
2228
2229 assert_eq!(
2230 ids,
2231 vec!["task:step1", "delay:wait_1s:1000ms", "task:step2"]
2232 );
2233 }
2234
2235 #[test]
2236 fn test_delay_serialization_roundtrip() {
2237 use crate::context::WorkflowContext;
2238 use crate::workflow::SerializableContinuation;
2239 use std::sync::Arc;
2240 use std::time::Duration;
2241
2242 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2243 let workflow = WorkflowBuilder::new(ctx)
2244 .with_registry()
2245 .then("step1", |i: u32| async move { Ok(i + 1) })
2246 .delay("wait_5s", Duration::from_secs(5))
2247 .then("step2", |i: u32| async move { Ok(i * 2) })
2248 .build()
2249 .unwrap();
2250
2251 let serializable = workflow.to_serializable();
2253
2254 let task_ids = serializable.continuation.task_ids();
2256 assert_eq!(task_ids, vec!["step1", "wait_5s", "step2"]);
2257
2258 match &serializable.continuation {
2260 SerializableContinuation::Task { next, .. } => {
2261 let next = next.as_ref().unwrap();
2262 match next.as_ref() {
2263 SerializableContinuation::Delay {
2264 id, duration_ms, ..
2265 } => {
2266 assert_eq!(id, "wait_5s");
2267 assert_eq!(*duration_ms, 5000);
2268 }
2269 other => panic!("Expected Delay, got {other:?}"),
2270 }
2271 }
2272 other => panic!("Expected Task, got {other:?}"),
2273 }
2274
2275 let hydrated = workflow.to_runnable(&serializable);
2277 assert!(hydrated.is_ok());
2278 }
2279
2280 #[test]
2281 fn test_delay_first_task_id() {
2282 use crate::context::WorkflowContext;
2283 use std::sync::Arc;
2284 use std::time::Duration;
2285
2286 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2287 let workflow = WorkflowBuilder::new(ctx)
2288 .delay("initial_delay", Duration::from_secs(10))
2289 .then("step1", |i: u32| async move { Ok(i + 1) })
2290 .build()
2291 .unwrap();
2292
2293 assert_eq!(workflow.continuation().first_task_id(), "initial_delay");
2294 }
2295
2296 #[test]
2297 fn test_delay_duplicate_id_detection() {
2298 use crate::context::WorkflowContext;
2299 use crate::error::BuildError;
2300 use std::sync::Arc;
2301 use std::time::Duration;
2302
2303 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2304 let result = WorkflowBuilder::<_, u32, _>::new(ctx)
2305 .then("dup", |i: u32| async move { Ok(i + 1) })
2306 .delay("dup", Duration::from_secs(1))
2307 .build();
2308
2309 let err = match result {
2310 Err(e) => e,
2311 Ok(_) => panic!("expected build error"),
2312 };
2313 assert!(
2314 err.iter()
2315 .any(|e| matches!(e, BuildError::DuplicateTaskId(id) if id == "dup"))
2316 );
2317 }
2318
2319 #[test]
2320 fn test_delay_definition_hash_includes_duration() {
2321 use crate::context::WorkflowContext;
2322 use crate::workflow::SerializableWorkflow;
2323 use std::sync::Arc;
2324 use std::time::Duration;
2325
2326 let ctx1 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2328 let wf1: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx1)
2329 .with_registry()
2330 .then("step1", |i: u32| async move { Ok(i + 1) })
2331 .delay("wait", Duration::from_secs(1))
2332 .build()
2333 .unwrap();
2334
2335 let ctx2 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2337 let wf2: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx2)
2338 .with_registry()
2339 .then("step1", |i: u32| async move { Ok(i + 1) })
2340 .delay("wait", Duration::from_secs(60))
2341 .build()
2342 .unwrap();
2343
2344 assert_ne!(wf1.definition_hash(), wf2.definition_hash());
2346 }
2347
2348 #[test]
2349 fn test_delay_definition_hash_differs_from_task() {
2350 use crate::context::WorkflowContext;
2351 use crate::workflow::SerializableWorkflow;
2352 use std::sync::Arc;
2353 use std::time::Duration;
2354
2355 let ctx1 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2357 let wf1: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx1)
2358 .with_registry()
2359 .then("step1", |i: u32| async move { Ok(i + 1) })
2360 .build()
2361 .unwrap();
2362
2363 let ctx2 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2365 let wf2: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx2)
2366 .with_registry()
2367 .delay("step1", Duration::from_secs(1))
2368 .build()
2369 .unwrap();
2370
2371 assert_ne!(wf1.definition_hash(), wf2.definition_hash());
2373 }
2374
2375 #[test]
2376 fn test_delay_task_ids() {
2377 use crate::context::WorkflowContext;
2378 use std::sync::Arc;
2379 use std::time::Duration;
2380
2381 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2382 let workflow = WorkflowBuilder::new(ctx)
2383 .then("fetch", |i: u32| async move { Ok(i) })
2384 .delay("wait_24h", Duration::from_secs(86400))
2385 .then("process", |i: u32| async move { Ok(i + 1) })
2386 .build()
2387 .unwrap();
2388
2389 let serializable = workflow.continuation().to_serializable();
2390 let ids = serializable.task_ids();
2391 assert_eq!(ids, vec!["fetch", "wait_24h", "process"]);
2392 }
2393
2394 #[test]
2395 fn test_delay_only_workflow() {
2396 use crate::context::WorkflowContext;
2397 use std::sync::Arc;
2398 use std::time::Duration;
2399
2400 use crate::workflow::Workflow;
2401
2402 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2403 let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2404 .delay("just_wait", Duration::from_millis(10))
2405 .build()
2406 .unwrap();
2407
2408 assert_eq!(workflow.continuation().first_task_id(), "just_wait");
2409
2410 let serializable = workflow.continuation().to_serializable();
2411 assert_eq!(serializable.task_ids(), vec!["just_wait"]);
2412 }
2413
2414 #[test]
2415 fn test_delay_to_runnable_no_registry_needed() {
2416 use crate::registry::TaskRegistry;
2417 use crate::workflow::SerializableContinuation;
2418
2419 let delay = SerializableContinuation::Delay {
2421 id: "wait".to_string(),
2422 duration_ms: 5000,
2423 next: None,
2424 };
2425
2426 let empty_registry = TaskRegistry::new();
2427 let result = delay.to_runnable(&empty_registry);
2428 assert!(result.is_ok());
2429
2430 let runnable = result.unwrap();
2431 match runnable {
2432 crate::workflow::WorkflowContinuation::Delay {
2433 id, duration, next, ..
2434 } => {
2435 assert_eq!(id, "wait");
2436 assert_eq!(duration, std::time::Duration::from_millis(5000));
2437 assert!(next.is_none());
2438 }
2439 _ => panic!("Expected Delay variant"),
2440 }
2441 }
2442
2443 #[test]
2448 fn test_timeout_serialization_roundtrip() {
2449 use crate::context::WorkflowContext;
2450 use crate::task::TaskMetadata;
2451 use crate::workflow::SerializableContinuation;
2452 use std::sync::Arc;
2453 use std::time::Duration;
2454
2455 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2456 let workflow = WorkflowBuilder::new(ctx)
2457 .with_registry()
2458 .then("step1", |i: u32| async move { Ok(i + 1) })
2459 .with_metadata(TaskMetadata {
2460 timeout: Some(Duration::from_secs(30)),
2461 ..Default::default()
2462 })
2463 .then("step2", |i: u32| async move { Ok(i * 2) })
2464 .build()
2465 .unwrap();
2466
2467 let serializable = workflow.to_serializable();
2469
2470 match &serializable.continuation {
2472 SerializableContinuation::Task { id, timeout_ms, .. } => {
2473 assert_eq!(id, "step1");
2474 assert_eq!(*timeout_ms, Some(30_000));
2475 }
2476 other => panic!("Expected Task, got {other:?}"),
2477 }
2478
2479 let hydrated = workflow.to_runnable(&serializable).unwrap();
2481 match &hydrated {
2482 crate::workflow::WorkflowContinuation::Task { id, timeout, .. } => {
2483 assert_eq!(id, "step1");
2484 assert_eq!(*timeout, Some(Duration::from_secs(30)));
2485 }
2486 _ => panic!("Expected Task variant"),
2487 }
2488 }
2489
2490 #[test]
2491 fn test_timeout_changes_definition_hash() {
2492 use crate::context::WorkflowContext;
2493 use crate::task::TaskMetadata;
2494 use crate::workflow::SerializableWorkflow;
2495 use std::sync::Arc;
2496 use std::time::Duration;
2497
2498 let ctx1 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2500 let wf1: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx1)
2501 .with_registry()
2502 .then("step1", |i: u32| async move { Ok(i + 1) })
2503 .build()
2504 .unwrap();
2505
2506 let ctx2 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2508 let wf2: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx2)
2509 .with_registry()
2510 .then("step1", |i: u32| async move { Ok(i + 1) })
2511 .with_metadata(TaskMetadata {
2512 timeout: Some(Duration::from_secs(30)),
2513 ..Default::default()
2514 })
2515 .build()
2516 .unwrap();
2517
2518 assert_ne!(wf1.definition_hash(), wf2.definition_hash());
2520 }
2521
2522 #[test]
2523 fn test_no_timeout_field_absent_in_serialization() {
2524 use crate::context::WorkflowContext;
2525 use std::sync::Arc;
2526
2527 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2528 let workflow = WorkflowBuilder::new(ctx)
2529 .with_registry()
2530 .then("step1", |i: u32| async move { Ok(i + 1) })
2531 .build()
2532 .unwrap();
2533
2534 let serializable = workflow.to_serializable();
2535 let json = serde_json::to_string(&serializable.continuation).unwrap();
2537 assert!(
2538 !json.contains("timeout_ms"),
2539 "timeout_ms should be absent when None: {json}"
2540 );
2541 }
2542
2543 #[test]
2544 fn test_task_version_changes_definition_hash() {
2545 use crate::context::WorkflowContext;
2546 use crate::task::TaskMetadata;
2547 use crate::workflow::SerializableWorkflow;
2548 use std::sync::Arc;
2549
2550 let ctx1 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2552 let wf_no_version: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx1)
2553 .with_registry()
2554 .then("step1", |i: u32| async move { Ok(i + 1) })
2555 .build()
2556 .unwrap();
2557
2558 let ctx2 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2560 let wf_v1: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx2)
2561 .with_registry()
2562 .then("step1", |i: u32| async move { Ok(i + 1) })
2563 .with_metadata(TaskMetadata {
2564 version: Some("1.0".into()),
2565 ..Default::default()
2566 })
2567 .build()
2568 .unwrap();
2569
2570 let ctx3 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2572 let wf_v2: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx3)
2573 .with_registry()
2574 .then("step1", |i: u32| async move { Ok(i + 1) })
2575 .with_metadata(TaskMetadata {
2576 version: Some("2.0".into()),
2577 ..Default::default()
2578 })
2579 .build()
2580 .unwrap();
2581
2582 let ctx4 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2584 let wf_v1_again: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx4)
2585 .with_registry()
2586 .then("step1", |i: u32| async move { Ok(i + 1) })
2587 .with_metadata(TaskMetadata {
2588 version: Some("1.0".into()),
2589 ..Default::default()
2590 })
2591 .build()
2592 .unwrap();
2593
2594 assert_ne!(
2595 wf_no_version.definition_hash(),
2596 wf_v1.definition_hash(),
2597 "Adding version should change hash"
2598 );
2599 assert_ne!(
2600 wf_v1.definition_hash(),
2601 wf_v2.definition_hash(),
2602 "Different versions should produce different hashes"
2603 );
2604 assert_eq!(
2605 wf_v1.definition_hash(),
2606 wf_v1_again.definition_hash(),
2607 "Same version should produce same hash"
2608 );
2609 }
2610
2611 #[test]
2612 fn test_version_absent_in_serialization_when_none() {
2613 use crate::context::WorkflowContext;
2614 use std::sync::Arc;
2615
2616 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2617 let workflow = WorkflowBuilder::new(ctx)
2618 .with_registry()
2619 .then("step1", |i: u32| async move { Ok(i + 1) })
2620 .build()
2621 .unwrap();
2622
2623 let serializable = workflow.to_serializable();
2624 let json = serde_json::to_string(&serializable.continuation).unwrap();
2625 assert!(
2626 !json.contains("version"),
2627 "version should be absent when None: {json}"
2628 );
2629 }
2630
2631 #[test]
2632 fn test_version_present_in_serialization_when_set() {
2633 use crate::context::WorkflowContext;
2634 use crate::task::TaskMetadata;
2635 use std::sync::Arc;
2636
2637 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2638 let workflow = WorkflowBuilder::new(ctx)
2639 .with_registry()
2640 .then("step1", |i: u32| async move { Ok(i + 1) })
2641 .with_metadata(TaskMetadata {
2642 version: Some("3.0".into()),
2643 ..Default::default()
2644 })
2645 .build()
2646 .unwrap();
2647
2648 let serializable = workflow.to_serializable();
2649 let json = serde_json::to_string(&serializable.continuation).unwrap();
2650 assert!(
2651 json.contains(r#""version":"3.0""#),
2652 "version should be present in JSON: {json}"
2653 );
2654 }
2655
2656 #[test]
2661 fn test_nodes_single_task() {
2662 use crate::context::WorkflowContext;
2663 use crate::workflow::{NodeKind, Workflow};
2664 use std::sync::Arc;
2665
2666 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2667 let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2668 .then("only", |i: u32| async move { Ok(i + 1) })
2669 .build()
2670 .unwrap();
2671
2672 let nodes: Vec<_> = workflow.iter_nodes().collect();
2673 assert_eq!(nodes.len(), 1);
2674 assert_eq!(nodes[0].id, "only");
2675 assert_eq!(nodes[0].kind, NodeKind::Task);
2676 assert!(nodes[0].predecessor_id.is_none());
2677 }
2678
2679 #[test]
2680 fn test_nodes_chain_order() {
2681 use crate::context::WorkflowContext;
2682 use crate::workflow::{NodeKind, Workflow};
2683 use std::sync::Arc;
2684
2685 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2686 let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2687 .then("a", |i: u32| async move { Ok(i + 1) })
2688 .then("b", |i: u32| async move { Ok(i + 2) })
2689 .then("c", |i: u32| async move { Ok(i + 3) })
2690 .build()
2691 .unwrap();
2692
2693 let nodes: Vec<_> = workflow.iter_nodes().collect();
2694 let ids: Vec<&str> = nodes.iter().map(|n| n.id).collect();
2695 assert_eq!(ids, vec!["a", "b", "c"]);
2696 assert!(nodes.iter().all(|n| n.kind == NodeKind::Task));
2697
2698 assert_eq!(nodes[0].predecessor_id, None);
2700 assert_eq!(nodes[1].predecessor_id, Some("a"));
2701 assert_eq!(nodes[2].predecessor_id, Some("b"));
2702 }
2703
2704 #[test]
2705 fn test_nodes_fork_with_join() {
2706 use crate::context::WorkflowContext;
2707 use crate::task::BranchOutputs;
2708 use crate::workflow::{NodeKind, Workflow};
2709 use std::sync::Arc;
2710
2711 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2712 let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2713 .then("prepare", |i: u32| async move { Ok(i) })
2714 .branches(|b| {
2715 b.add("left", |i: u32| async move { Ok(i * 2) });
2716 b.add("right", |i: u32| async move { Ok(i + 10) });
2717 })
2718 .join(
2719 "merge",
2720 |_: BranchOutputs<DummyCodec>| async move { Ok(0u32) },
2721 )
2722 .build()
2723 .unwrap();
2724
2725 let nodes: Vec<_> = workflow.iter_nodes().collect();
2726 let ids: Vec<&str> = nodes.iter().map(|n| n.id).collect();
2727
2728 assert_eq!(ids[0], "prepare");
2730 assert_eq!(nodes[1].kind, NodeKind::Fork);
2731 assert!(ids.contains(&"left"));
2732 assert!(ids.contains(&"right"));
2733 assert_eq!(*ids.last().unwrap(), "merge");
2734
2735 assert_eq!(nodes[1].predecessor_id, Some("prepare"));
2737
2738 let fork_id = nodes[1].id;
2740 let left_node = nodes.iter().find(|n| n.id == "left").unwrap();
2741 let right_node = nodes.iter().find(|n| n.id == "right").unwrap();
2742 assert_eq!(left_node.predecessor_id, Some(fork_id));
2743 assert_eq!(right_node.predecessor_id, Some(fork_id));
2744
2745 let merge_node = nodes.iter().find(|n| n.id == "merge").unwrap();
2747 assert_eq!(merge_node.predecessor_id, Some(fork_id));
2748 }
2749
2750 #[test]
2751 fn test_nodes_loop() {
2752 use crate::context::WorkflowContext;
2753 use crate::loop_result::LoopResult;
2754 use crate::workflow::{NodeKind, Workflow};
2755 use std::sync::Arc;
2756
2757 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2758 let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2759 .loop_task(
2760 "iterate",
2761 |i: u32| async move { Ok(LoopResult::Done(i)) },
2762 5,
2763 )
2764 .then("after", |i: u32| async move { Ok(i) })
2765 .build()
2766 .unwrap();
2767
2768 let nodes: Vec<_> = workflow.iter_nodes().collect();
2769
2770 assert_eq!(nodes[0].kind, NodeKind::Loop);
2772 assert_eq!(nodes[1].id, "iterate");
2773 assert_eq!(nodes[1].kind, NodeKind::Task);
2774 assert_eq!(nodes[2].id, "after");
2775 assert_eq!(nodes[2].kind, NodeKind::Task);
2776
2777 assert_eq!(nodes[0].predecessor_id, None);
2779 assert_eq!(nodes[1].predecessor_id, Some(nodes[0].id)); assert_eq!(nodes[2].predecessor_id, Some(nodes[0].id)); }
2782
2783 #[test]
2784 fn test_nodes_delay_reports_duration_as_timeout() {
2785 use crate::context::WorkflowContext;
2786 use crate::workflow::{NodeKind, Workflow};
2787 use std::sync::Arc;
2788 use std::time::Duration;
2789
2790 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2791 let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2792 .delay("wait_5s", Duration::from_secs(5))
2793 .then("after", |i: u32| async move { Ok(i) })
2794 .build()
2795 .unwrap();
2796
2797 let nodes: Vec<_> = workflow.iter_nodes().collect();
2798 assert_eq!(nodes[0].id, "wait_5s");
2799 assert_eq!(nodes[0].kind, NodeKind::Delay);
2800 assert_eq!(nodes[0].timeout, Some(Duration::from_secs(5)));
2801 assert_eq!(nodes[0].predecessor_id, None);
2802
2803 assert_eq!(nodes[1].id, "after");
2804 assert_eq!(nodes[1].predecessor_id, Some("wait_5s"));
2805 }
2806
2807 #[test]
2808 fn test_nodes_metadata_extraction() {
2809 use crate::context::WorkflowContext;
2810 use crate::task::{RetryPolicy, TaskMetadata};
2811 use crate::workflow::NodeKind;
2812 use std::sync::Arc;
2813 use std::time::Duration;
2814
2815 let retry = RetryPolicy {
2816 max_retries: 3,
2817 initial_delay: Duration::from_millis(100),
2818 backoff_multiplier: 2.0,
2819 max_delay: Some(Duration::from_secs(10)),
2820 };
2821
2822 let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2823 let workflow = WorkflowBuilder::new(ctx)
2824 .with_registry()
2825 .then("step", |i: u32| async move { Ok(i) })
2826 .with_metadata(TaskMetadata {
2827 timeout: Some(Duration::from_secs(30)),
2828 retries: Some(retry.clone()),
2829 version: Some("2.0".into()),
2830 ..Default::default()
2831 })
2832 .build()
2833 .unwrap();
2834
2835 let nodes: Vec<_> = workflow.iter_nodes().collect();
2836 assert_eq!(nodes.len(), 1);
2837 let node = &nodes[0];
2838
2839 assert_eq!(node.id, "step");
2840 assert_eq!(node.kind, NodeKind::Task);
2841 assert_eq!(node.timeout, Some(Duration::from_secs(30)));
2842 assert_eq!(node.retry_policy.unwrap().max_retries, 3);
2843 }
2844}
2845
2846#[cfg(test)]
2847#[allow(
2848 clippy::unwrap_used,
2849 clippy::expect_used,
2850 clippy::panic,
2851 clippy::indexing_slicing,
2852 clippy::too_many_lines,
2853 clippy::items_after_statements
2854)]
2855mod proptests {
2856 use super::{MaxIterationsPolicy, SerializableContinuation};
2857 use proptest::prelude::*;
2858
2859 fn arb_id() -> impl Strategy<Value = String> {
2861 "[a-z0-9]{1,8}"
2862 }
2863
2864 fn arb_continuation(depth: usize) -> BoxedStrategy<SerializableContinuation> {
2866 let leaf = arb_id().prop_map(|id| SerializableContinuation::Task {
2867 id,
2868 timeout_ms: None,
2869 retry_policy: None,
2870 version: None,
2871 priority: None,
2872
2873 tags: vec![],
2874 next: None,
2875 });
2876
2877 if depth == 0 {
2878 return leaf.boxed();
2879 }
2880
2881 prop_oneof![
2882 (
2884 arb_id(),
2885 prop::option::of(any::<u64>()),
2886 prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2887 )
2888 .prop_map(|(id, timeout_ms, next)| SerializableContinuation::Task {
2889 id,
2890 timeout_ms,
2891 retry_policy: None,
2892 version: None,
2893 priority: None,
2894
2895 tags: vec![],
2896 next,
2897 }),
2898 (
2900 arb_id(),
2901 prop::collection::vec(arb_continuation(depth - 1), 0..3),
2902 prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2903 )
2904 .prop_map(|(id, branches, join)| SerializableContinuation::Fork {
2905 id,
2906 branches,
2907 join,
2908 }),
2909 (
2911 arb_id(),
2912 any::<u64>(),
2913 prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2914 )
2915 .prop_map(|(id, duration_ms, next)| SerializableContinuation::Delay {
2916 id,
2917 duration_ms,
2918 next,
2919 }),
2920 (
2922 arb_id(),
2923 arb_id(),
2924 prop::option::of(any::<u64>()),
2925 prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2926 )
2927 .prop_map(|(id, signal_name, timeout_ms, next)| {
2928 SerializableContinuation::AwaitSignal {
2929 id,
2930 signal_name,
2931 timeout_ms,
2932 next,
2933 }
2934 }),
2935 (
2937 arb_id(),
2938 prop::collection::hash_map(
2939 arb_id(),
2940 arb_continuation(depth - 1).prop_map(Box::new),
2941 0..3
2942 ),
2943 prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2944 prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2945 )
2946 .prop_map(|(id, branches, default, next)| {
2947 SerializableContinuation::Branch {
2948 id,
2949 branches,
2950 default,
2951 next,
2952 }
2953 }),
2954 (
2956 arb_id(),
2957 arb_continuation(depth - 1).prop_map(Box::new),
2958 1..100u32,
2959 prop::bool::ANY.prop_map(|b| if b {
2960 MaxIterationsPolicy::Fail
2961 } else {
2962 MaxIterationsPolicy::ExitWithLast
2963 }),
2964 prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2965 )
2966 .prop_map(|(id, body, max_iterations, on_max, next)| {
2967 SerializableContinuation::Loop {
2968 id,
2969 body,
2970 max_iterations,
2971 on_max,
2972 next,
2973 }
2974 }),
2975 (
2977 arb_id(),
2978 arb_continuation(depth - 1).prop_map(Box::new),
2979 prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2980 )
2981 .prop_map(|(id, child, next)| {
2982 SerializableContinuation::ChildWorkflow { id, child, next }
2983 }),
2984 ]
2985 .boxed()
2986 }
2987
2988 fn arb_unique_continuation(
2992 depth: usize,
2993 prefix: &str,
2994 ) -> BoxedStrategy<SerializableContinuation> {
2995 let id = format!("{prefix}n");
2996
2997 if depth == 0 {
2998 return Just(SerializableContinuation::Task {
2999 id,
3000 timeout_ms: None,
3001 retry_policy: None,
3002 version: None,
3003 priority: None,
3004
3005 tags: vec![],
3006 next: None,
3007 })
3008 .boxed();
3009 }
3010
3011 let id_clone = id.clone();
3012 prop_oneof![
3013 prop::option::of(
3015 arb_unique_continuation(depth - 1, &format!("{prefix}0_")).prop_map(Box::new),
3016 )
3017 .prop_map(move |next| SerializableContinuation::Task {
3018 id: id_clone.clone(),
3019 timeout_ms: None,
3020 retry_policy: None,
3021 version: None,
3022 priority: None,
3023
3024 tags: vec![],
3025 next,
3026 }),
3027 {
3029 let id_f = id.clone();
3030 let prefix_f = prefix.to_string();
3031 (0..3u8)
3032 .prop_flat_map(move |branch_count| {
3033 let id_inner = id_f.clone();
3034 let prefix_inner = prefix_f.clone();
3035 let branches: Vec<BoxedStrategy<SerializableContinuation>> = (0
3036 ..branch_count)
3037 .map(|i| {
3038 arb_unique_continuation(depth - 1, &format!("{prefix_inner}b{i}_"))
3039 })
3040 .collect();
3041 let join = prop::option::of(
3042 arb_unique_continuation(depth - 1, &format!("{prefix_inner}j_"))
3043 .prop_map(Box::new),
3044 );
3045 (branches, join).prop_map(move |(branches, join)| {
3046 SerializableContinuation::Fork {
3047 id: id_inner.clone(),
3048 branches,
3049 join,
3050 }
3051 })
3052 })
3053 .boxed()
3054 },
3055 {
3057 let id_d = id.clone();
3058 let prefix_d = prefix.to_string();
3059 (
3060 any::<u64>(),
3061 prop::option::of(
3062 arb_unique_continuation(depth - 1, &format!("{prefix_d}d_"))
3063 .prop_map(Box::new),
3064 ),
3065 )
3066 .prop_map(move |(duration_ms, next)| {
3067 SerializableContinuation::Delay {
3068 id: id_d.clone(),
3069 duration_ms,
3070 next,
3071 }
3072 })
3073 },
3074 {
3076 let id_s = id.clone();
3077 let prefix_s = prefix.to_string();
3078 (
3079 arb_id(),
3080 prop::option::of(any::<u64>()),
3081 prop::option::of(
3082 arb_unique_continuation(depth - 1, &format!("{prefix_s}s_"))
3083 .prop_map(Box::new),
3084 ),
3085 )
3086 .prop_map(move |(signal_name, timeout_ms, next)| {
3087 SerializableContinuation::AwaitSignal {
3088 id: id_s.clone(),
3089 signal_name,
3090 timeout_ms,
3091 next,
3092 }
3093 })
3094 },
3095 {
3097 let id_b = id.clone();
3098 let prefix_b = prefix.to_string();
3099 let b0 = arb_unique_continuation(depth - 1, &format!("{prefix_b}br0_"))
3100 .prop_map(Box::new);
3101 let b1 = arb_unique_continuation(depth - 1, &format!("{prefix_b}br1_"))
3102 .prop_map(Box::new);
3103 let default = prop::option::of(
3104 arb_unique_continuation(depth - 1, &format!("{prefix_b}bd_"))
3105 .prop_map(Box::new),
3106 );
3107 let next = prop::option::of(
3108 arb_unique_continuation(depth - 1, &format!("{prefix_b}bn_"))
3109 .prop_map(Box::new),
3110 );
3111 (b0, b1, default, next).prop_map(move |(branch0, branch1, default, next)| {
3112 let mut branches = std::collections::HashMap::new();
3113 branches.insert("k0".to_string(), branch0);
3114 branches.insert("k1".to_string(), branch1);
3115 SerializableContinuation::Branch {
3116 id: id_b.clone(),
3117 branches,
3118 default,
3119 next,
3120 }
3121 })
3122 },
3123 {
3125 let id_l = id.clone();
3126 let prefix_l = prefix.to_string();
3127 let body = arb_unique_continuation(depth - 1, &format!("{prefix_l}lb_"))
3128 .prop_map(Box::new);
3129 let next = prop::option::of(
3130 arb_unique_continuation(depth - 1, &format!("{prefix_l}ln_"))
3131 .prop_map(Box::new),
3132 );
3133 (
3134 body,
3135 1..100u32,
3136 prop::bool::ANY.prop_map(|b| {
3137 if b {
3138 MaxIterationsPolicy::Fail
3139 } else {
3140 MaxIterationsPolicy::ExitWithLast
3141 }
3142 }),
3143 next,
3144 )
3145 .prop_map(move |(body, max_iterations, on_max, next)| {
3146 SerializableContinuation::Loop {
3147 id: id_l.clone(),
3148 body,
3149 max_iterations,
3150 on_max,
3151 next,
3152 }
3153 })
3154 },
3155 {
3157 let id_cw = id;
3158 let prefix_cw = prefix.to_string();
3159 let child = arb_unique_continuation(depth - 1, &format!("{prefix_cw}cc_"))
3160 .prop_map(Box::new);
3161 let next = prop::option::of(
3162 arb_unique_continuation(depth - 1, &format!("{prefix_cw}cn_"))
3163 .prop_map(Box::new),
3164 );
3165 (child, next).prop_map(move |(child, next)| {
3166 SerializableContinuation::ChildWorkflow {
3167 id: id_cw.clone(),
3168 child,
3169 next,
3170 }
3171 })
3172 },
3173 ]
3174 .boxed()
3175 }
3176
3177 fn collect_ids(cont: &SerializableContinuation) -> Vec<String> {
3179 let mut ids = vec![];
3180 fn walk(c: &SerializableContinuation, out: &mut Vec<String>) {
3181 match c {
3182 SerializableContinuation::Task { id, next, .. }
3183 | SerializableContinuation::Delay { id, next, .. }
3184 | SerializableContinuation::AwaitSignal { id, next, .. } => {
3185 out.push(id.clone());
3186 if let Some(n) = next {
3187 walk(n, out);
3188 }
3189 }
3190 SerializableContinuation::Fork { id, branches, join } => {
3191 out.push(id.clone());
3192 for b in branches {
3193 walk(b, out);
3194 }
3195 if let Some(j) = join {
3196 walk(j, out);
3197 }
3198 }
3199 SerializableContinuation::Branch {
3200 id,
3201 branches,
3202 default,
3203 next,
3204 } => {
3205 out.push(id.clone());
3206 for b in branches.values() {
3207 walk(b, out);
3208 }
3209 if let Some(d) = default {
3210 walk(d, out);
3211 }
3212 if let Some(n) = next {
3213 walk(n, out);
3214 }
3215 }
3216 SerializableContinuation::Loop { id, body, next, .. } => {
3217 out.push(id.clone());
3218 walk(body, out);
3219 if let Some(n) = next {
3220 walk(n, out);
3221 }
3222 }
3223 SerializableContinuation::ChildWorkflow { id, child, next } => {
3224 out.push(id.clone());
3225 walk(child, out);
3226 if let Some(n) = next {
3227 walk(n, out);
3228 }
3229 }
3230 }
3231 }
3232 walk(cont, &mut ids);
3233 ids
3234 }
3235
3236 fn inject_duplicate(cont: &SerializableContinuation, dup_id: &str) -> SerializableContinuation {
3238 match cont {
3239 SerializableContinuation::Task {
3240 timeout_ms,
3241 retry_policy,
3242 version,
3243 next,
3244 ..
3245 } => SerializableContinuation::Task {
3246 id: dup_id.to_string(),
3247 timeout_ms: *timeout_ms,
3248 retry_policy: retry_policy.clone(),
3249 version: version.clone(),
3250 priority: None,
3251 tags: vec![],
3252 next: next.clone(),
3253 },
3254 SerializableContinuation::Fork { branches, join, .. } => {
3255 SerializableContinuation::Fork {
3256 id: dup_id.to_string(),
3257 branches: branches.clone(),
3258 join: join.clone(),
3259 }
3260 }
3261 SerializableContinuation::Delay {
3262 duration_ms, next, ..
3263 } => SerializableContinuation::Delay {
3264 id: dup_id.to_string(),
3265 duration_ms: *duration_ms,
3266 next: next.clone(),
3267 },
3268 SerializableContinuation::AwaitSignal {
3269 signal_name,
3270 timeout_ms,
3271 next,
3272 ..
3273 } => SerializableContinuation::AwaitSignal {
3274 id: dup_id.to_string(),
3275 signal_name: signal_name.clone(),
3276 timeout_ms: *timeout_ms,
3277 next: next.clone(),
3278 },
3279 SerializableContinuation::Branch {
3280 branches,
3281 default,
3282 next,
3283 ..
3284 } => SerializableContinuation::Branch {
3285 id: dup_id.to_string(),
3286 branches: branches.clone(),
3287 default: default.clone(),
3288 next: next.clone(),
3289 },
3290 SerializableContinuation::Loop {
3291 body,
3292 max_iterations,
3293 on_max,
3294 next,
3295 ..
3296 } => SerializableContinuation::Loop {
3297 id: dup_id.to_string(),
3298 body: body.clone(),
3299 max_iterations: *max_iterations,
3300 on_max: *on_max,
3301 next: next.clone(),
3302 },
3303 SerializableContinuation::ChildWorkflow { child, next, .. } => {
3304 SerializableContinuation::ChildWorkflow {
3305 id: dup_id.to_string(),
3306 child: child.clone(),
3307 next: next.clone(),
3308 }
3309 }
3310 }
3311 }
3312
3313 proptest! {
3314 #[test]
3316 fn hash_is_deterministic(cont in arb_continuation(3)) {
3317 let h1 = cont.compute_definition_hash();
3318 let h2 = cont.compute_definition_hash();
3319 prop_assert_eq!(h1, h2);
3320 }
3321
3322 #[test]
3324 fn serde_roundtrip_preserves_hash(cont in arb_continuation(3)) {
3325 let original_hash = cont.compute_definition_hash();
3326 let json = serde_json::to_string(&cont).unwrap();
3327 let recovered: SerializableContinuation = serde_json::from_str(&json).unwrap();
3328 prop_assert_eq!(original_hash, recovered.compute_definition_hash());
3329 }
3330
3331 #[test]
3333 fn unique_ids_means_none(cont in arb_unique_continuation(3, "r_")) {
3334 prop_assert!(cont.find_duplicate_id().is_none());
3335 }
3336
3337 #[test]
3339 fn injected_duplicate_is_detected(cont in arb_unique_continuation(3, "r_")) {
3340 let ids = collect_ids(&cont);
3341 if ids.len() >= 2 {
3343 let dup_id = &ids[1];
3345 let tampered = inject_duplicate(&cont, dup_id);
3346 prop_assert!(tampered.find_duplicate_id().is_some());
3347 }
3348 }
3349 }
3350}