Skip to main content

sayiir_core/
workflow.rs

1//! Workflow structures, continuation tree, and serializable representations.
2//!
3//! The continuation tree ([`WorkflowContinuation`]) is the in-memory
4//! representation of a workflow's execution graph. Each node is either a
5//! [`Task`](WorkflowContinuation::Task),
6//! [`Fork`](WorkflowContinuation::Fork),
7//! [`Delay`](WorkflowContinuation::Delay),
8//! [`AwaitSignal`](WorkflowContinuation::AwaitSignal),
9//! [`Branch`](WorkflowContinuation::Branch), or
10//! [`ChildWorkflow`](WorkflowContinuation::ChildWorkflow).
11//!
12//! [`SerializableContinuation`] strips out function pointers so the tree
13//! can be persisted and later rehydrated via a [`TaskRegistry`].
14
15use crate::context::WorkflowContext;
16use crate::task::{RetryPolicy, UntypedCoreTask};
17use sha2::{Digest, Sha256};
18use std::collections::{HashMap, HashSet};
19use std::marker::PhantomData;
20use std::ops::Deref;
21use std::sync::Arc;
22
23/// Policy for what happens when a loop reaches its maximum iteration count.
24#[derive(
25    Debug,
26    Clone,
27    Copy,
28    PartialEq,
29    Eq,
30    serde::Serialize,
31    serde::Deserialize,
32    strum::EnumString,
33    strum::Display,
34)]
35pub enum MaxIterationsPolicy {
36    /// Fail the workflow with a `MaxIterationsExceeded` error.
37    #[strum(serialize = "fail")]
38    Fail,
39    /// Exit the loop with the last iteration's output (unwrapped from `LoopResult`).
40    #[strum(serialize = "exit_with_last")]
41    ExitWithLast,
42}
43
44/// Generate a `find_duplicate_id` method for continuation-like enums
45///
46macro_rules! impl_find_duplicate_id {
47    ($name:ident, task_fields: { $($task_extra:tt)* }, delay_extra: { $($delay_extra:tt)* }, deref_branch: $deref:expr, deref_branch_map: $deref_map:expr) => {
48        impl $name {
49            pub(crate) fn find_duplicate_id(&self) -> Option<String> {
50                fn collect(cont: &$name, seen: &mut HashSet<String>) -> Option<String> {
51                    match cont {
52                        $name::Task { id, next, $($task_extra)* } => {
53                            if !seen.insert(id.clone()) {
54                                return Some(id.clone());
55                            }
56                            next.as_ref().and_then(|n| collect(n, seen))
57                        }
58                        $name::Fork { id, branches, join } => {
59                            if !seen.insert(id.clone()) {
60                                return Some(id.clone());
61                            }
62                            let deref_fn: fn(&_) -> &$name = $deref;
63                            branches
64                                .iter()
65                                .find_map(|b| collect(deref_fn(b), seen))
66                                .or_else(|| join.as_ref().and_then(|j| collect(j, seen)))
67                        }
68                        $name::Branch { id, branches, default, next, .. } => {
69                            if !seen.insert(id.clone()) {
70                                return Some(id.clone());
71                            }
72                            let deref_map_fn: fn(&_) -> &$name = $deref_map;
73                            branches
74                                .values()
75                                .find_map(|b| collect(deref_map_fn(b), seen))
76                                .or_else(|| default.as_ref().and_then(|d| collect(d, seen)))
77                                .or_else(|| next.as_ref().and_then(|n| collect(n, seen)))
78                        }
79                        $name::Delay { id, next, $($delay_extra)* }
80                        | $name::AwaitSignal { id, next, $($delay_extra)* } => {
81                            if !seen.insert(id.clone()) {
82                                return Some(id.clone());
83                            }
84                            next.as_ref().and_then(|n| collect(n, seen))
85                        }
86                        $name::Loop { id, body, next, .. } => {
87                            if !seen.insert(id.clone()) {
88                                return Some(id.clone());
89                            }
90                            collect(body, seen)
91                                .or_else(|| next.as_ref().and_then(|n| collect(n, seen)))
92                        }
93                        $name::ChildWorkflow { id, child, next } => {
94                            if !seen.insert(id.clone()) {
95                                return Some(id.clone());
96                            }
97                            collect(child, seen)
98                                .or_else(|| next.as_ref().and_then(|n| collect(n, seen)))
99                        }
100                    }
101                }
102                collect(self, &mut HashSet::new())
103            }
104        }
105    };
106}
107
108/// The kind of node in a workflow continuation tree.
109#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::AsRefStr, strum::Display, strum::EnumString)]
110#[strum(serialize_all = "snake_case")]
111pub enum NodeKind {
112    /// A sequential task node.
113    Task,
114    /// A parallel fork node.
115    Fork,
116    /// A durable delay node.
117    Delay,
118    /// A signal-wait node.
119    AwaitSignal,
120    /// A conditional branching node.
121    Branch,
122    /// A loop node.
123    Loop,
124    /// A child workflow node.
125    ChildWorkflow,
126}
127
128/// Metadata about a single node in the workflow DAG, returned by
129/// topological iteration.
130#[derive(Debug, Clone)]
131pub struct NodeInfo<'a> {
132    /// Unique node identifier.
133    pub id: &'a str,
134    /// The structural kind of this node.
135    pub kind: NodeKind,
136    /// ID of the node that precedes this one in execution order.
137    /// `None` for the root node.
138    pub predecessor_id: Option<&'a str>,
139    /// Timeout (task timeout, delay duration, or signal timeout).
140    pub timeout: Option<std::time::Duration>,
141    /// Retry policy (only populated for [`NodeKind::Task`]).
142    pub retry_policy: Option<&'a RetryPolicy>,
143    /// Execution priority (only populated for [`NodeKind::Task`]).
144    pub priority: Option<u8>,
145}
146
147/// Lazy, stack-based iterator over workflow nodes in topological order.
148///
149/// Created by [`WorkflowContinuation::iter_nodes`].
150pub struct NodeIter<'a> {
151    stack: Vec<(&'a WorkflowContinuation, Option<&'a str>)>,
152}
153
154impl<'a> Iterator for NodeIter<'a> {
155    type Item = NodeInfo<'a>;
156
157    #[allow(clippy::too_many_lines)]
158    fn next(&mut self) -> Option<Self::Item> {
159        let (cont, predecessor) = self.stack.pop()?;
160
161        let (id, kind, timeout, retry_policy, priority) = match cont {
162            WorkflowContinuation::Task {
163                id,
164                timeout,
165                retry_policy,
166                priority,
167                ..
168            } => (
169                id.as_str(),
170                NodeKind::Task,
171                *timeout,
172                retry_policy.as_ref(),
173                *priority,
174            ),
175            WorkflowContinuation::Fork { id, .. } => {
176                (id.as_str(), NodeKind::Fork, None, None, None)
177            }
178            WorkflowContinuation::Delay { id, duration, .. } => {
179                (id.as_str(), NodeKind::Delay, Some(*duration), None, None)
180            }
181            WorkflowContinuation::AwaitSignal { id, timeout, .. } => {
182                (id.as_str(), NodeKind::AwaitSignal, *timeout, None, None)
183            }
184            WorkflowContinuation::Branch { id, .. } => {
185                (id.as_str(), NodeKind::Branch, None, None, None)
186            }
187            WorkflowContinuation::Loop { id, .. } => {
188                (id.as_str(), NodeKind::Loop, None, None, None)
189            }
190            WorkflowContinuation::ChildWorkflow { id, .. } => {
191                (id.as_str(), NodeKind::ChildWorkflow, None, None, None)
192            }
193        };
194
195        // Push children in reverse order so the first child is popped next.
196        match cont {
197            WorkflowContinuation::Task { id, next, .. }
198            | WorkflowContinuation::Delay { id, next, .. }
199            | WorkflowContinuation::AwaitSignal { id, next, .. } => {
200                if let Some(n) = next {
201                    self.stack.push((n, Some(id)));
202                }
203            }
204            WorkflowContinuation::Fork { id, branches, join } => {
205                if let Some(j) = join {
206                    self.stack.push((j, Some(id)));
207                }
208                for b in branches.iter().rev() {
209                    self.stack.push((b, Some(id)));
210                }
211            }
212            WorkflowContinuation::Branch {
213                id,
214                branches,
215                default,
216                next,
217                ..
218            } => {
219                if let Some(n) = next {
220                    self.stack.push((n, Some(id)));
221                }
222                if let Some(d) = default {
223                    self.stack.push((d, Some(id)));
224                }
225                // stable sort for deterministic iteration
226                let mut keys: Vec<&String> = branches.keys().collect();
227                keys.sort();
228                for k in keys.into_iter().rev() {
229                    self.stack.push((&branches[k], Some(id)));
230                }
231            }
232            WorkflowContinuation::Loop { id, body, next, .. } => {
233                if let Some(n) = next {
234                    self.stack.push((n, Some(id)));
235                }
236                self.stack.push((body, Some(id)));
237            }
238            WorkflowContinuation::ChildWorkflow {
239                id, child, next, ..
240            } => {
241                if let Some(n) = next {
242                    self.stack.push((n, Some(id)));
243                }
244                self.stack.push((child, Some(id)));
245            }
246        }
247
248        Some(NodeInfo {
249            id,
250            kind,
251            predecessor_id: predecessor,
252            timeout,
253            retry_policy,
254            priority,
255        })
256    }
257}
258
259/// A workflow structure representing the tasks to execute.
260pub enum WorkflowContinuation {
261    /// A sequential task node.
262    Task {
263        /// Unique task identifier.
264        id: String,
265        /// Task implementation. `None` for registry-based execution
266        /// where tasks are looked up by `id` at runtime.
267        func: Option<UntypedCoreTask>,
268        /// Maximum time the task is allowed to run before being cancelled.
269        timeout: Option<std::time::Duration>,
270        /// Retry policy for failed task executions.
271        retry_policy: Option<RetryPolicy>,
272        /// Schema version string (included in definition hash).
273        version: Option<String>,
274        /// Execution priority (1–5). `None` inherits the default (Normal = 3).
275        priority: Option<u8>,
276        /// Affinity tags for worker routing.
277        tags: Vec<String>,
278        /// Next node in the chain.
279        next: Option<Box<WorkflowContinuation>>,
280    },
281    /// A parallel fork node.
282    Fork {
283        /// Fork identifier (derived from branch IDs).
284        id: String,
285        /// Parallel branch continuations.
286        branches: Box<[Arc<WorkflowContinuation>]>,
287        /// Optional join task after all branches complete.
288        join: Option<Box<WorkflowContinuation>>,
289    },
290    /// A durable delay node. Input passes through unchanged.
291    Delay {
292        /// Unique delay identifier.
293        id: String,
294        /// How long to wait.
295        duration: std::time::Duration,
296        /// Next node in the chain.
297        next: Option<Box<WorkflowContinuation>>,
298    },
299    /// Wait for an external signal (event). Input passes through unchanged
300    /// when no signal payload is provided; otherwise the signal payload
301    /// becomes the input to the next step.
302    AwaitSignal {
303        /// Unique signal-wait identifier.
304        id: String,
305        /// Name of the signal to wait for.
306        signal_name: String,
307        /// Optional timeout duration.
308        timeout: Option<std::time::Duration>,
309        /// Next node in the chain.
310        next: Option<Box<WorkflowContinuation>>,
311    },
312    /// Conditional branching node. A key function extracts a routing key
313    /// from the previous step's output and dispatches to one of the named
314    /// sub-continuations.
315    Branch {
316        /// Unique branch identifier.
317        id: String,
318        /// Key function implementation. `None` for registry-based execution
319        /// where the key function is looked up by [`key_fn_id`] at runtime.
320        key_fn: Option<UntypedCoreTask>,
321        /// Named branch continuations keyed by routing key.
322        branches: HashMap<String, Box<WorkflowContinuation>>,
323        /// Optional default branch if no key matches.
324        default: Option<Box<WorkflowContinuation>>,
325        /// Continuation after the chosen branch completes.
326        next: Option<Box<WorkflowContinuation>>,
327    },
328    /// A loop node. Repeatedly executes its body until the task returns
329    /// `LoopResult::Done`, or until `max_iterations` is reached.
330    Loop {
331        /// Unique loop identifier.
332        id: String,
333        /// The body continuation to execute on each iteration.
334        body: Box<WorkflowContinuation>,
335        /// Maximum number of iterations before applying `on_max` policy.
336        max_iterations: u32,
337        /// What to do when `max_iterations` is reached.
338        on_max: MaxIterationsPolicy,
339        /// Continuation after the loop completes.
340        next: Option<Box<WorkflowContinuation>>,
341    },
342    /// A child workflow node. Executes another workflow's continuation inline.
343    ChildWorkflow {
344        /// Unique child workflow identifier.
345        id: String,
346        /// The child workflow's continuation tree (inlined, not a reference).
347        child: Arc<WorkflowContinuation>,
348        /// Continuation after the child workflow completes.
349        next: Option<Box<WorkflowContinuation>>,
350    },
351}
352
353impl_find_duplicate_id!(
354    WorkflowContinuation,
355    task_fields: { .. },
356    delay_extra: { .. },
357    deref_branch: |b: &Arc<WorkflowContinuation>| -> &WorkflowContinuation { b },
358    deref_branch_map: |b: &WorkflowContinuation| -> &WorkflowContinuation { b }
359);
360
361/// Derive the key-function task ID for a Branch node.
362///
363/// By convention the key function is registered under `"{branch_id}::key_fn"`.
364/// This helper centralises that convention so callers don't repeat the suffix.
365#[must_use]
366pub fn key_fn_id(branch_id: &str) -> String {
367    format!("{branch_id}::key_fn")
368}
369
370/// Derive a loop node ID from a counter value.
371///
372/// By convention loop nodes are named `"loop_0"`, `"loop_1"`, etc.,
373/// matching the pattern used by branch nodes (`"branch_0"`, …).
374#[must_use]
375pub fn loop_node_id(counter: usize) -> String {
376    format!("loop_{counter}")
377}
378
379impl WorkflowContinuation {
380    /// Derive a fork ID from a list of branch IDs.
381    ///
382    /// The fork ID is a concatenation of branch IDs separated by `||`.
383    #[must_use]
384    pub fn derive_fork_id(branch_ids: &[&str]) -> String {
385        branch_ids.join("||")
386    }
387
388    /// Get the ID of this continuation node.
389    #[must_use]
390    pub fn id(&self) -> &str {
391        match self {
392            WorkflowContinuation::Task { id, .. }
393            | WorkflowContinuation::Fork { id, .. }
394            | WorkflowContinuation::Delay { id, .. }
395            | WorkflowContinuation::AwaitSignal { id, .. }
396            | WorkflowContinuation::Branch { id, .. }
397            | WorkflowContinuation::Loop { id, .. }
398            | WorkflowContinuation::ChildWorkflow { id, .. } => id,
399        }
400    }
401
402    /// Get the next continuation in the chain, if any.
403    ///
404    #[must_use]
405    pub fn get_next(&self) -> Option<&WorkflowContinuation> {
406        match self {
407            Self::Task { next, .. }
408            | Self::Delay { next, .. }
409            | Self::AwaitSignal { next, .. }
410            | Self::Branch { next, .. }
411            | Self::Loop { next, .. }
412            | Self::ChildWorkflow { next, .. } => next.as_deref(),
413            Self::Fork { join, .. } => join.as_deref(),
414        }
415    }
416
417    /// Get the first task ID from this continuation.
418    ///
419    /// For a `Task`, returns its ID. For a `Fork`, returns the first task ID
420    /// from the first branch.
421    #[must_use]
422    pub fn first_task_id(&self) -> &str {
423        match self {
424            WorkflowContinuation::Task { id, .. }
425            | WorkflowContinuation::Delay { id, .. }
426            | WorkflowContinuation::AwaitSignal { id, .. }
427            | WorkflowContinuation::Branch { id, .. } => id,
428            WorkflowContinuation::Fork { branches, .. } => {
429                if let Some(first_branch) = branches.first() {
430                    first_branch.first_task_id()
431                } else {
432                    "unknown"
433                }
434            }
435            WorkflowContinuation::Loop { body, .. } => body.first_task_id(),
436            WorkflowContinuation::ChildWorkflow { child, .. } => child.first_task_id(),
437        }
438    }
439
440    /// Get the execution priority of the first task in this continuation.
441    ///
442    /// Returns `Some(priority)` for `Task` nodes, `None` for non-task nodes
443    /// (Delay, Signal, Branch). Recurses through Fork, Loop, and `ChildWorkflow`.
444    #[must_use]
445    pub fn first_task_priority(&self) -> Option<u8> {
446        match self {
447            WorkflowContinuation::Task { priority, .. } => *priority,
448            WorkflowContinuation::Delay { .. }
449            | WorkflowContinuation::AwaitSignal { .. }
450            | WorkflowContinuation::Branch { .. } => None,
451            WorkflowContinuation::Fork { branches, .. } => {
452                branches.first().and_then(|b| b.first_task_priority())
453            }
454            WorkflowContinuation::Loop { body, .. } => body.first_task_priority(),
455            WorkflowContinuation::ChildWorkflow { child, .. } => child.first_task_priority(),
456        }
457    }
458
459    /// Get the affinity tags of the first task in this continuation.
460    ///
461    /// Returns the tags for `Task` nodes, empty for non-task nodes
462    /// (Delay, Signal, Branch). Recurses through Fork, Loop, and `ChildWorkflow`.
463    #[must_use]
464    pub fn first_task_tags(&self) -> Vec<String> {
465        match self {
466            WorkflowContinuation::Task { tags, .. } => tags.clone(),
467            WorkflowContinuation::Delay { .. }
468            | WorkflowContinuation::AwaitSignal { .. }
469            | WorkflowContinuation::Branch { .. } => vec![],
470            WorkflowContinuation::Fork { branches, .. } => branches
471                .first()
472                .map(|b| b.first_task_tags())
473                .unwrap_or_default(),
474            WorkflowContinuation::Loop { body, .. } => body.first_task_tags(),
475            WorkflowContinuation::ChildWorkflow { child, .. } => child.first_task_tags(),
476        }
477    }
478
479    /// Build a [`TaskHint`] from the first task in this continuation.
480    ///
481    /// Combines [`first_task_id`], [`first_task_priority`], and [`first_task_tags`]
482    /// into a single struct for passing through `prepare_run` and `ParkReason`.
483    #[must_use]
484    pub fn first_task_hint(&self) -> crate::snapshot::TaskHint {
485        crate::snapshot::TaskHint {
486            id: self.first_task_id().to_string(),
487            priority: self.first_task_priority(),
488            tags: self.first_task_tags(),
489        }
490    }
491
492    /// Get the terminal task ID of this continuation chain.
493    ///
494    /// Follows `get_next()` pointers to the end and returns the ID of the
495    /// last node. This is the task whose output is the "final" output of the
496    /// chain (e.g. the `LoopResult` envelope for a loop body).
497    #[must_use]
498    pub fn terminal_task_id(&self) -> &str {
499        let mut current = self;
500        while let Some(next) = current.get_next() {
501            current = next;
502        }
503        current.first_task_id()
504    }
505
506    /// Find a task node by ID (immutable).
507    ///
508    /// Recursively walks the full continuation tree, including through `Arc`
509    /// fork branches, and returns a reference to the matching `Task` node.
510    fn find_task(&self, target_id: &str) -> Option<&Self> {
511        match self {
512            WorkflowContinuation::Task { id, next, .. } => {
513                if id == target_id {
514                    return Some(self);
515                }
516                next.as_ref().and_then(|n| n.find_task(target_id))
517            }
518            WorkflowContinuation::Delay { next, .. }
519            | WorkflowContinuation::AwaitSignal { next, .. } => {
520                next.as_ref().and_then(|n| n.find_task(target_id))
521            }
522            WorkflowContinuation::Fork { branches, join, .. } => {
523                for branch in branches {
524                    if let Some(found) = branch.find_task(target_id) {
525                        return Some(found);
526                    }
527                }
528                join.as_ref().and_then(|j| j.find_task(target_id))
529            }
530            WorkflowContinuation::Branch {
531                branches,
532                default,
533                next,
534                ..
535            } => {
536                for branch in branches.values() {
537                    if let Some(found) = branch.find_task(target_id) {
538                        return Some(found);
539                    }
540                }
541                if let Some(d) = default
542                    && let Some(found) = d.find_task(target_id)
543                {
544                    return Some(found);
545                }
546                next.as_ref().and_then(|n| n.find_task(target_id))
547            }
548            WorkflowContinuation::Loop { body, next, .. } => body
549                .find_task(target_id)
550                .or_else(|| next.as_ref().and_then(|n| n.find_task(target_id))),
551            WorkflowContinuation::ChildWorkflow { child, next, .. } => child
552                .find_task(target_id)
553                .or_else(|| next.as_ref().and_then(|n| n.find_task(target_id))),
554        }
555    }
556
557    /// Find a task node by ID (mutable).
558    ///
559    /// Same traversal as [`find_task`](Self::find_task) but returns a mutable
560    /// reference. Fork branches behind `Arc` are skipped since they cannot be
561    /// mutated; only the join continuation is searched.
562    fn find_task_mut(&mut self, target_id: &str) -> Option<&mut Self> {
563        match self {
564            WorkflowContinuation::Task { id, .. } if id == target_id => Some(self),
565            WorkflowContinuation::Task { next, .. } => {
566                next.as_mut().and_then(|n| n.find_task_mut(target_id))
567            }
568            WorkflowContinuation::Delay { next, .. }
569            | WorkflowContinuation::AwaitSignal { next, .. } => {
570                next.as_mut().and_then(|n| n.find_task_mut(target_id))
571            }
572            WorkflowContinuation::Fork { join, .. } => {
573                join.as_mut().and_then(|j| j.find_task_mut(target_id))
574            }
575            WorkflowContinuation::Branch {
576                branches,
577                default,
578                next,
579                ..
580            } => {
581                for branch in branches.values_mut() {
582                    if let Some(found) = branch.find_task_mut(target_id) {
583                        return Some(found);
584                    }
585                }
586                if let Some(d) = default
587                    && let Some(found) = d.find_task_mut(target_id)
588                {
589                    return Some(found);
590                }
591                next.as_mut().and_then(|n| n.find_task_mut(target_id))
592            }
593            WorkflowContinuation::Loop { body, next, .. } => {
594                if let Some(found) = body.find_task_mut(target_id) {
595                    return Some(found);
596                }
597                next.as_mut().and_then(|n| n.find_task_mut(target_id))
598            }
599            WorkflowContinuation::ChildWorkflow { next, .. } => {
600                // Arc child branches cannot be mutated; only search next.
601                next.as_mut().and_then(|n| n.find_task_mut(target_id))
602            }
603        }
604    }
605
606    /// Set the timeout on a specific task node found by ID.
607    pub fn set_task_timeout(&mut self, target_id: &str, timeout: Option<std::time::Duration>) {
608        if let Some(WorkflowContinuation::Task { timeout: t, .. }) = self.find_task_mut(target_id) {
609            *t = timeout;
610        }
611    }
612
613    /// Set the retry policy on a specific task node found by ID.
614    pub fn set_task_retry_policy(&mut self, target_id: &str, policy: Option<RetryPolicy>) {
615        if let Some(WorkflowContinuation::Task { retry_policy, .. }) = self.find_task_mut(target_id)
616        {
617            *retry_policy = policy;
618        }
619    }
620
621    /// Set the schema version on a specific task node found by ID.
622    pub fn set_task_version(&mut self, target_id: &str, ver: Option<String>) {
623        if let Some(WorkflowContinuation::Task { version, .. }) = self.find_task_mut(target_id) {
624            *version = ver;
625        }
626    }
627
628    /// Look up the retry policy configured on a specific task by ID.
629    #[must_use]
630    pub fn get_task_retry_policy(&self, task_id: &str) -> Option<&RetryPolicy> {
631        match self.find_task(task_id)? {
632            WorkflowContinuation::Task { retry_policy, .. } => retry_policy.as_ref(),
633            _ => None,
634        }
635    }
636
637    /// Look up the timeout configured on a specific task by ID.
638    #[must_use]
639    pub fn get_task_timeout(&self, task_id: &str) -> Option<std::time::Duration> {
640        match self.find_task(task_id)? {
641            WorkflowContinuation::Task { timeout, .. } => *timeout,
642            _ => None,
643        }
644    }
645
646    /// Look up the priority configured on a specific task by ID.
647    #[must_use]
648    pub fn get_task_priority(&self, task_id: &str) -> Option<u8> {
649        match self.find_task(task_id)? {
650            WorkflowContinuation::Task { priority, .. } => *priority,
651            _ => None,
652        }
653    }
654
655    /// Look up the affinity tags configured on a specific task by ID.
656    #[must_use]
657    pub fn get_task_tags(&self, task_id: &str) -> Vec<String> {
658        match self.find_task(task_id) {
659            Some(WorkflowContinuation::Task { tags, .. }) => tags.clone(),
660            _ => vec![],
661        }
662    }
663
664    /// Set the affinity tags on a specific task node found by ID.
665    pub fn set_task_tags(&mut self, target_id: &str, new_tags: Vec<String>) {
666        if let Some(WorkflowContinuation::Task { tags, .. }) = self.find_task_mut(target_id) {
667            *tags = new_tags;
668        }
669    }
670
671    /// Build a [`TaskMetadata`](crate::task::TaskMetadata) from the fields
672    /// available on the continuation node for the given task.
673    ///
674    /// Only `timeout`, `retries`, `version`, and `tags` are populated — display
675    /// name and description are left as defaults since they are not stored in
676    /// the continuation tree.
677    #[must_use]
678    pub fn build_task_metadata(&self, task_id: &str) -> crate::task::TaskMetadata {
679        match self.find_task(task_id) {
680            Some(WorkflowContinuation::Task {
681                timeout,
682                retry_policy,
683                version,
684                priority,
685                tags,
686                ..
687            }) => crate::task::TaskMetadata {
688                timeout: *timeout,
689                retries: retry_policy.clone(),
690                version: version.clone(),
691                priority: priority.and_then(crate::priority::Priority::from_u8),
692                tags: tags.clone(),
693                ..Default::default()
694            },
695            _ => crate::task::TaskMetadata::default(),
696        }
697    }
698
699    /// Returns a lazy iterator over all nodes in topological (execution) order.
700    ///
701    /// The traversal mirrors the order that the workflow engine would visit
702    /// each node during execution, making the result useful for introspection,
703    /// UI visualisation, and documentation generation.
704    ///
705    /// Each [`NodeInfo`] includes a `predecessor_id` linking back to the node
706    /// whose completion triggers this one. The root node has `None`.
707    #[must_use]
708    pub fn iter_nodes(&self) -> NodeIter<'_> {
709        NodeIter {
710            stack: vec![(self, None)],
711        }
712    }
713
714    /// Convert to a serializable representation (strips out task implementations).
715    #[must_use]
716    pub fn to_serializable(&self) -> SerializableContinuation {
717        match self {
718            #[allow(clippy::cast_possible_truncation)] // Durations > u64::MAX ms are not realistic
719            WorkflowContinuation::Task {
720                id,
721                timeout,
722                retry_policy,
723                version,
724                priority,
725                tags,
726                next,
727                ..
728            } => SerializableContinuation::Task {
729                id: id.clone(),
730                timeout_ms: timeout.map(|d| d.as_millis() as u64),
731                retry_policy: retry_policy.clone(),
732                version: version.clone(),
733                priority: *priority,
734                tags: tags.clone(),
735                next: next.as_ref().map(|n| Box::new(n.to_serializable())),
736            },
737            WorkflowContinuation::Fork { id, branches, join } => SerializableContinuation::Fork {
738                id: id.clone(),
739                branches: branches.iter().map(|b| b.to_serializable()).collect(),
740                join: join.as_ref().map(|j| Box::new(j.to_serializable())),
741            },
742            #[allow(clippy::cast_possible_truncation)] // Durations > u64::MAX ms are not realistic
743            WorkflowContinuation::Delay { id, duration, next } => SerializableContinuation::Delay {
744                id: id.clone(),
745                duration_ms: duration.as_millis() as u64,
746                next: next.as_ref().map(|n| Box::new(n.to_serializable())),
747            },
748            #[allow(clippy::cast_possible_truncation)]
749            WorkflowContinuation::AwaitSignal {
750                id,
751                signal_name,
752                timeout,
753                next,
754            } => SerializableContinuation::AwaitSignal {
755                id: id.clone(),
756                signal_name: signal_name.clone(),
757                timeout_ms: timeout.map(|d| d.as_millis() as u64),
758                next: next.as_ref().map(|n| Box::new(n.to_serializable())),
759            },
760            WorkflowContinuation::Branch {
761                id,
762                branches,
763                default,
764                next,
765                ..
766            } => SerializableContinuation::Branch {
767                id: id.clone(),
768                branches: branches
769                    .iter()
770                    .map(|(k, v)| (k.clone(), Box::new(v.to_serializable())))
771                    .collect(),
772                default: default.as_ref().map(|d| Box::new(d.to_serializable())),
773                next: next.as_ref().map(|n| Box::new(n.to_serializable())),
774            },
775            WorkflowContinuation::ChildWorkflow { id, child, next } => {
776                SerializableContinuation::ChildWorkflow {
777                    id: id.clone(),
778                    child: Box::new(child.to_serializable()),
779                    next: next.as_ref().map(|n| Box::new(n.to_serializable())),
780                }
781            }
782            WorkflowContinuation::Loop {
783                id,
784                body,
785                max_iterations,
786                on_max,
787                next,
788            } => SerializableContinuation::Loop {
789                id: id.clone(),
790                body: Box::new(body.to_serializable()),
791                max_iterations: *max_iterations,
792                on_max: *on_max,
793                next: next.as_ref().map(|n| Box::new(n.to_serializable())),
794            },
795        }
796    }
797
798    /// Append a new node to the end of this continuation chain.
799    ///
800    /// Recursively walks the chain to find the tail and attaches `new_node` there.
801    pub fn append_to_chain(&mut self, new_node: WorkflowContinuation) {
802        match self {
803            WorkflowContinuation::Task { next, .. }
804            | WorkflowContinuation::Delay { next, .. }
805            | WorkflowContinuation::AwaitSignal { next, .. }
806            | WorkflowContinuation::Branch { next, .. }
807            | WorkflowContinuation::Loop { next, .. }
808            | WorkflowContinuation::ChildWorkflow { next, .. } => match next {
809                Some(next_box) => next_box.append_to_chain(new_node),
810                None => *next = Some(Box::new(new_node)),
811            },
812            WorkflowContinuation::Fork { join, .. } => match join {
813                Some(join_box) => join_box.append_to_chain(new_node),
814                None => *join = Some(Box::new(new_node)),
815            },
816        }
817    }
818}
819
820/// A serializable workflow continuation (stores only IDs and structure).
821///
822/// This type can be serialized/deserialized and later converted back into a runnable
823/// `WorkflowContinuation` using a `TaskRegistry`.
824///
825/// # Serialization
826///
827/// ```rust
828/// # use sayiir_core::prelude::*;
829/// # use sayiir_core::codec::{Encoder, Decoder, sealed};
830/// # use sayiir_core::workflow::SerializableContinuation;
831/// # use bytes::Bytes;
832/// # use std::sync::Arc;
833/// # struct MyCodec;
834/// # impl Encoder for MyCodec {}
835/// # impl Decoder for MyCodec {}
836/// # impl<T> sealed::EncodeValue<T> for MyCodec {
837/// #     fn encode_value(&self, _: &T) -> Result<Bytes, BoxError> { Ok(Bytes::new()) }
838/// # }
839/// # impl<T> sealed::DecodeValue<T> for MyCodec {
840/// #     fn decode_value(&self, _: Bytes) -> Result<T, BoxError> { Err("dummy".into()) }
841/// # }
842/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
843/// # let codec = Arc::new(MyCodec);
844/// # let ctx = WorkflowContext::new("wf", codec.clone(), Arc::new(()));
845/// # let workflow = WorkflowBuilder::new(ctx)
846/// #     .with_registry()
847/// #     .then("step1", |i: u32| async move { Ok(i + 1) })
848/// #     .build()?;
849/// # let mut registry = TaskRegistry::new();
850/// # registry.register_fn("step1", codec, |i: u32| async move { Ok(i + 1) });
851/// // Serialize a workflow
852/// let serializable = workflow.continuation().to_serializable();
853/// let json = serde_json::to_string(&serializable)?;
854///
855/// // Deserialize and convert to runnable
856/// let serializable: SerializableContinuation = serde_json::from_str(&json)?;
857/// let continuation = serializable.to_runnable(&registry)?;
858/// # Ok(())
859/// # }
860/// ```
861#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
862pub enum SerializableContinuation {
863    /// A sequential task node.
864    Task {
865        /// Unique task identifier.
866        id: String,
867        /// Optional timeout in milliseconds.
868        #[serde(default, skip_serializing_if = "Option::is_none")]
869        timeout_ms: Option<u64>,
870        /// Optional retry policy.
871        #[serde(default, skip_serializing_if = "Option::is_none")]
872        retry_policy: Option<RetryPolicy>,
873        /// Schema version string (included in definition hash).
874        #[serde(default, skip_serializing_if = "Option::is_none")]
875        version: Option<String>,
876        /// Execution priority (1–5). `None` inherits the default (Normal = 3).
877        #[serde(default, skip_serializing_if = "Option::is_none")]
878        priority: Option<u8>,
879        /// Affinity tags for worker routing.
880        #[serde(default, skip_serializing_if = "Vec::is_empty")]
881        tags: Vec<String>,
882        /// Next node in the chain.
883        next: Option<Box<SerializableContinuation>>,
884    },
885    /// A parallel fork node.
886    Fork {
887        /// Fork identifier (derived from branch IDs).
888        id: String,
889        /// Parallel branches.
890        branches: Vec<SerializableContinuation>,
891        /// Optional join task after all branches complete.
892        join: Option<Box<SerializableContinuation>>,
893    },
894    /// A durable delay node.
895    Delay {
896        /// Unique delay identifier.
897        id: String,
898        /// Duration in milliseconds.
899        duration_ms: u64,
900        /// Next node in the chain.
901        next: Option<Box<SerializableContinuation>>,
902    },
903    /// A signal-wait node.
904    AwaitSignal {
905        /// Unique signal-wait identifier.
906        id: String,
907        /// Name of the signal to wait for.
908        signal_name: String,
909        /// Optional timeout in milliseconds.
910        #[serde(default, skip_serializing_if = "Option::is_none")]
911        timeout_ms: Option<u64>,
912        /// Next node in the chain.
913        next: Option<Box<SerializableContinuation>>,
914    },
915    /// A conditional branching node.
916    Branch {
917        /// Unique branch identifier.
918        id: String,
919        /// Named branch continuations keyed by routing key.
920        branches: HashMap<String, Box<SerializableContinuation>>,
921        /// Optional default branch if no key matches.
922        #[serde(default, skip_serializing_if = "Option::is_none")]
923        default: Option<Box<SerializableContinuation>>,
924        /// Continuation after the chosen branch completes.
925        next: Option<Box<SerializableContinuation>>,
926    },
927    /// A loop node.
928    Loop {
929        /// Unique loop identifier.
930        id: String,
931        /// The body continuation to execute on each iteration.
932        body: Box<SerializableContinuation>,
933        /// Maximum number of iterations.
934        max_iterations: u32,
935        /// What to do when `max_iterations` is reached.
936        on_max: MaxIterationsPolicy,
937        /// Continuation after the loop completes.
938        next: Option<Box<SerializableContinuation>>,
939    },
940    /// A child workflow node.
941    ChildWorkflow {
942        /// Unique child workflow identifier.
943        id: String,
944        /// The child workflow's continuation tree.
945        child: Box<SerializableContinuation>,
946        /// Continuation after the child workflow completes.
947        next: Option<Box<SerializableContinuation>>,
948    },
949}
950
951impl_find_duplicate_id!(
952    SerializableContinuation,
953    task_fields: { .. },
954    delay_extra: { .. },
955    deref_branch: |b: &SerializableContinuation| -> &SerializableContinuation { b },
956    deref_branch_map: |b: &SerializableContinuation| -> &SerializableContinuation { b }
957);
958
959impl SerializableContinuation {
960    /// Convert this serializable continuation into a runnable `WorkflowContinuation`.
961    ///
962    /// Looks up each task ID in the registry to get the actual implementation.
963    ///
964    /// # Errors
965    ///
966    /// Returns `BuildError::TaskNotFound` if any task ID is not in the registry.
967    pub fn to_runnable(
968        &self,
969        registry: &crate::registry::TaskRegistry,
970    ) -> Result<WorkflowContinuation, crate::error::BuildError> {
971        if let Some(dup) = self.find_duplicate_id() {
972            return Err(crate::error::BuildError::DuplicateTaskId(dup));
973        }
974
975        self.to_runnable_unchecked(registry)
976    }
977
978    /// Convert without duplicate check (called after validation).
979    #[allow(clippy::too_many_lines)]
980    fn to_runnable_unchecked(
981        &self,
982        registry: &crate::registry::TaskRegistry,
983    ) -> Result<WorkflowContinuation, crate::error::BuildError> {
984        match self {
985            SerializableContinuation::Task {
986                id,
987                timeout_ms,
988                retry_policy,
989                version,
990                priority,
991                tags,
992                next,
993            } => {
994                let func = registry
995                    .get(id)
996                    .ok_or_else(|| crate::error::BuildError::TaskNotFound(id.clone()))?;
997                let next = next
998                    .as_ref()
999                    .map(|n| n.to_runnable_unchecked(registry).map(Box::new))
1000                    .transpose()?;
1001                Ok(WorkflowContinuation::Task {
1002                    id: id.clone(),
1003                    func: Some(func),
1004                    timeout: timeout_ms.map(std::time::Duration::from_millis),
1005                    retry_policy: retry_policy.clone(),
1006                    version: version.clone(),
1007                    priority: *priority,
1008                    tags: tags.clone(),
1009                    next,
1010                })
1011            }
1012            SerializableContinuation::Fork { id, branches, join } => {
1013                let branches: Result<Vec<_>, _> = branches
1014                    .iter()
1015                    .map(|b| b.to_runnable_unchecked(registry).map(Arc::new))
1016                    .collect();
1017                let join = join
1018                    .as_ref()
1019                    .map(|j| j.to_runnable_unchecked(registry).map(Box::new))
1020                    .transpose()?;
1021                Ok(WorkflowContinuation::Fork {
1022                    id: id.clone(),
1023                    branches: branches?.into_boxed_slice(),
1024                    join,
1025                })
1026            }
1027            SerializableContinuation::Delay {
1028                id,
1029                duration_ms,
1030                next,
1031            } => {
1032                let next = next
1033                    .as_ref()
1034                    .map(|n| n.to_runnable_unchecked(registry).map(Box::new))
1035                    .transpose()?;
1036                Ok(WorkflowContinuation::Delay {
1037                    id: id.clone(),
1038                    duration: std::time::Duration::from_millis(*duration_ms),
1039                    next,
1040                })
1041            }
1042            SerializableContinuation::AwaitSignal {
1043                id,
1044                signal_name,
1045                timeout_ms,
1046                next,
1047            } => {
1048                let next = next
1049                    .as_ref()
1050                    .map(|n| n.to_runnable_unchecked(registry).map(Box::new))
1051                    .transpose()?;
1052                Ok(WorkflowContinuation::AwaitSignal {
1053                    id: id.clone(),
1054                    signal_name: signal_name.clone(),
1055                    timeout: timeout_ms.map(std::time::Duration::from_millis),
1056                    next,
1057                })
1058            }
1059            SerializableContinuation::Branch {
1060                id,
1061                branches,
1062                default,
1063                next,
1064            } => {
1065                let kf_id = key_fn_id(id);
1066                let key_fn = registry
1067                    .get(&kf_id)
1068                    .ok_or(crate::error::BuildError::TaskNotFound(kf_id))?;
1069                let branches: Result<HashMap<_, _>, _> = branches
1070                    .iter()
1071                    .map(|(k, v)| {
1072                        v.to_runnable_unchecked(registry)
1073                            .map(|c| (k.clone(), Box::new(c)))
1074                    })
1075                    .collect();
1076                let default = default
1077                    .as_ref()
1078                    .map(|d| d.to_runnable_unchecked(registry).map(Box::new))
1079                    .transpose()?;
1080                let next = next
1081                    .as_ref()
1082                    .map(|n| n.to_runnable_unchecked(registry).map(Box::new))
1083                    .transpose()?;
1084                Ok(WorkflowContinuation::Branch {
1085                    id: id.clone(),
1086                    key_fn: Some(key_fn),
1087                    branches: branches?,
1088                    default,
1089                    next,
1090                })
1091            }
1092            SerializableContinuation::Loop {
1093                id,
1094                body,
1095                max_iterations,
1096                on_max,
1097                next,
1098            } => {
1099                let body = body.to_runnable_unchecked(registry)?;
1100                let next = next
1101                    .as_ref()
1102                    .map(|n| n.to_runnable_unchecked(registry).map(Box::new))
1103                    .transpose()?;
1104                Ok(WorkflowContinuation::Loop {
1105                    id: id.clone(),
1106                    body: Box::new(body),
1107                    max_iterations: *max_iterations,
1108                    on_max: *on_max,
1109                    next,
1110                })
1111            }
1112            SerializableContinuation::ChildWorkflow { id, child, next } => {
1113                let child = child.to_runnable_unchecked(registry)?;
1114                let next = next
1115                    .as_ref()
1116                    .map(|n| n.to_runnable_unchecked(registry).map(Box::new))
1117                    .transpose()?;
1118                Ok(WorkflowContinuation::ChildWorkflow {
1119                    id: id.clone(),
1120                    child: Arc::new(child),
1121                    next,
1122                })
1123            }
1124        }
1125    }
1126
1127    /// Get all task IDs referenced in this continuation.
1128    #[must_use]
1129    pub fn task_ids(&self) -> Vec<&str> {
1130        fn collect<'a>(cont: &'a SerializableContinuation, ids: &mut Vec<&'a str>) {
1131            match cont {
1132                SerializableContinuation::Task { id, next, .. }
1133                | SerializableContinuation::Delay { id, next, .. }
1134                | SerializableContinuation::AwaitSignal { id, next, .. } => {
1135                    ids.push(id.as_str());
1136                    if let Some(n) = next {
1137                        collect(n, ids);
1138                    }
1139                }
1140                SerializableContinuation::Fork { id, branches, join } => {
1141                    ids.push(id.as_str());
1142                    for b in branches {
1143                        collect(b, ids);
1144                    }
1145                    if let Some(j) = join {
1146                        collect(j, ids);
1147                    }
1148                }
1149                SerializableContinuation::Branch {
1150                    id,
1151                    branches,
1152                    default,
1153                    next,
1154                } => {
1155                    ids.push(id.as_str());
1156                    for b in branches.values() {
1157                        collect(b, ids);
1158                    }
1159                    if let Some(d) = default {
1160                        collect(d, ids);
1161                    }
1162                    if let Some(n) = next {
1163                        collect(n, ids);
1164                    }
1165                }
1166                SerializableContinuation::Loop { id, body, next, .. } => {
1167                    ids.push(id.as_str());
1168                    collect(body, ids);
1169                    if let Some(n) = next {
1170                        collect(n, ids);
1171                    }
1172                }
1173                SerializableContinuation::ChildWorkflow { id, child, next } => {
1174                    ids.push(id.as_str());
1175                    collect(child, ids);
1176                    if let Some(n) = next {
1177                        collect(n, ids);
1178                    }
1179                }
1180            }
1181        }
1182        let mut ids = vec![];
1183        collect(self, &mut ids);
1184        ids
1185    }
1186
1187    /// Compute a SHA256 hash of this continuation's structure.
1188    ///
1189    /// This hash serves as a "version" identifier for the workflow definition.
1190    /// It can be used to detect when a serialized workflow state was created
1191    /// with a different workflow definition than the current one.
1192    ///
1193    /// The hash is computed from the canonical structure of task IDs and their
1194    /// arrangement.
1195    #[must_use]
1196    #[allow(clippy::too_many_lines)]
1197    pub fn compute_definition_hash(&self) -> String {
1198        #[allow(clippy::too_many_lines)]
1199        fn hash_continuation(cont: &SerializableContinuation, hasher: &mut Sha256) {
1200            match cont {
1201                SerializableContinuation::Task {
1202                    id,
1203                    timeout_ms,
1204                    retry_policy,
1205                    version,
1206                    next,
1207                    ..
1208                } => {
1209                    hasher.update(b"T:"); // Tag for Task
1210                    hasher.update(id.as_bytes());
1211                    if let Some(ms) = timeout_ms {
1212                        hasher.update(b":t:");
1213                        hasher.update(ms.to_string().as_bytes());
1214                    }
1215                    if let Some(rp) = retry_policy {
1216                        hasher.update(b":r:");
1217                        hasher.update(rp.max_retries.to_string().as_bytes());
1218                        hasher.update(b":");
1219                        hasher.update(rp.initial_delay.as_millis().to_string().as_bytes());
1220                        hasher.update(b":");
1221                        hasher.update(rp.backoff_multiplier.to_string().as_bytes());
1222                    }
1223                    if let Some(v) = version {
1224                        hasher.update(b":v:");
1225                        hasher.update(v.as_bytes());
1226                    }
1227                    hasher.update(b";");
1228                    if let Some(n) = next {
1229                        hash_continuation(n, hasher);
1230                    }
1231                }
1232                SerializableContinuation::Fork { id, branches, join } => {
1233                    hasher.update(b"F:");
1234                    hasher.update(id.as_bytes());
1235                    hasher.update(b"[");
1236                    for branch in branches {
1237                        hash_continuation(branch, hasher);
1238                        hasher.update(b",");
1239                    }
1240                    hasher.update(b"]");
1241                    if let Some(j) = join {
1242                        hasher.update(b"J:");
1243                        hash_continuation(j, hasher);
1244                    }
1245                }
1246                SerializableContinuation::Delay {
1247                    id,
1248                    duration_ms,
1249                    next,
1250                } => {
1251                    hasher.update(b"D:");
1252                    hasher.update(id.as_bytes());
1253                    hasher.update(b":");
1254                    hasher.update(duration_ms.to_string().as_bytes());
1255                    hasher.update(b";");
1256                    if let Some(n) = next {
1257                        hash_continuation(n, hasher);
1258                    }
1259                }
1260                SerializableContinuation::AwaitSignal {
1261                    id,
1262                    signal_name,
1263                    timeout_ms,
1264                    next,
1265                } => {
1266                    hasher.update(b"S:");
1267                    hasher.update(id.as_bytes());
1268                    hasher.update(b":");
1269                    hasher.update(signal_name.as_bytes());
1270                    if let Some(ms) = timeout_ms {
1271                        hasher.update(b":t:");
1272                        hasher.update(ms.to_string().as_bytes());
1273                    }
1274                    hasher.update(b";");
1275                    if let Some(n) = next {
1276                        hash_continuation(n, hasher);
1277                    }
1278                }
1279                SerializableContinuation::Branch {
1280                    id,
1281                    branches,
1282                    default,
1283                    next,
1284                } => {
1285                    hasher.update(b"B:");
1286                    hasher.update(id.as_bytes());
1287                    hasher.update(b"{");
1288                    // Sort keys for deterministic hashing
1289                    let mut keys: Vec<&String> = branches.keys().collect();
1290                    keys.sort();
1291                    for key in keys {
1292                        hasher.update(key.as_bytes());
1293                        hasher.update(b"=>");
1294                        if let Some(branch) = branches.get(key) {
1295                            hash_continuation(branch, hasher);
1296                        }
1297                        hasher.update(b",");
1298                    }
1299                    hasher.update(b"}");
1300                    if let Some(d) = default {
1301                        hasher.update(b"_=>");
1302                        hash_continuation(d, hasher);
1303                    }
1304                    hasher.update(b";");
1305                    if let Some(n) = next {
1306                        hash_continuation(n, hasher);
1307                    }
1308                }
1309                SerializableContinuation::Loop {
1310                    id,
1311                    body,
1312                    max_iterations,
1313                    on_max,
1314                    next,
1315                } => {
1316                    hasher.update(b"L:");
1317                    hasher.update(id.as_bytes());
1318                    hasher.update(b":");
1319                    hasher.update(max_iterations.to_string().as_bytes());
1320                    hasher.update(b":");
1321                    hasher.update(on_max.to_string().as_bytes());
1322                    hasher.update(b"{");
1323                    hash_continuation(body, hasher);
1324                    hasher.update(b"}");
1325                    hasher.update(b";");
1326                    if let Some(n) = next {
1327                        hash_continuation(n, hasher);
1328                    }
1329                }
1330                SerializableContinuation::ChildWorkflow { id, child, next } => {
1331                    hasher.update(b"CW:");
1332                    hasher.update(id.as_bytes());
1333                    hasher.update(b"{");
1334                    hash_continuation(child, hasher);
1335                    hasher.update(b"}");
1336                    hasher.update(b";");
1337                    if let Some(n) = next {
1338                        hash_continuation(n, hasher);
1339                    }
1340                }
1341            }
1342        }
1343
1344        let mut hasher = Sha256::new();
1345        hash_continuation(self, &mut hasher);
1346        let result = hasher.finalize();
1347        format!("{result:x}")
1348    }
1349}
1350
1351/// A complete serializable workflow state including version information.
1352///
1353/// This type wraps `SerializableContinuation` with workflow identification and
1354/// a definition hash that serves as a version check. When deserializing, the
1355/// hash is verified to ensure the serialized state matches the current workflow
1356/// definition.
1357#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1358pub struct SerializedWorkflowState {
1359    /// The workflow identifier.
1360    pub workflow_id: String,
1361    /// SHA256 hash of the workflow definition structure.
1362    /// Used to detect version mismatches during deserialization.
1363    pub definition_hash: String,
1364    /// The serializable continuation structure.
1365    pub continuation: SerializableContinuation,
1366}
1367
1368/// Policy controlling what happens when [`run()`] is called with an
1369/// `instance_id` that already has a persisted snapshot.
1370#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, strum::EnumString, strum::Display)]
1371#[strum(serialize_all = "snake_case")]
1372pub enum ConflictPolicy {
1373    /// Return an error if the instance already exists (default).
1374    #[default]
1375    Fail,
1376    /// Reuse the existing snapshot: return its current status without re-executing.
1377    #[strum(serialize = "use_existing", serialize = "useExisting")]
1378    UseExisting,
1379    /// Terminate the existing instance (delete snapshot + clear signals) and start fresh.
1380    #[strum(serialize = "terminate_existing", serialize = "terminateExisting")]
1381    TerminateExisting,
1382}
1383
1384/// The status of a workflow execution.
1385#[derive(Debug, strum::AsRefStr, strum::EnumDiscriminants)]
1386#[strum_discriminants(name(WorkflowStatusKind))]
1387#[strum_discriminants(derive(strum::AsRefStr))]
1388#[strum_discriminants(strum(serialize_all = "snake_case"))]
1389#[strum_discriminants(doc = "Fieldless discriminant of [`WorkflowStatus`] for string comparisons.")]
1390pub enum WorkflowStatus {
1391    /// The workflow is still in progress (task completed, workflow continues).
1392    #[strum(serialize = "in_progress")]
1393    InProgress,
1394    /// The workflow completed successfully.
1395    #[strum(serialize = "completed")]
1396    Completed,
1397    /// The workflow failed with an error.
1398    #[strum(serialize = "failed")]
1399    Failed(String),
1400    /// The workflow was cancelled.
1401    #[strum(serialize = "cancelled")]
1402    Cancelled {
1403        /// Optional reason for the cancellation.
1404        reason: Option<String>,
1405        /// Optional identifier of who cancelled the workflow.
1406        cancelled_by: Option<String>,
1407    },
1408    /// The workflow was paused.
1409    #[strum(serialize = "paused")]
1410    Paused {
1411        /// Optional reason for the pause.
1412        reason: Option<String>,
1413        /// Optional identifier of who paused the workflow.
1414        paused_by: Option<String>,
1415    },
1416    /// The workflow is waiting for a delay to expire.
1417    #[strum(serialize = "waiting")]
1418    Waiting {
1419        /// When the delay expires.
1420        wake_at: chrono::DateTime<chrono::Utc>,
1421        /// The delay node ID.
1422        delay_id: String,
1423    },
1424    /// The workflow is waiting for an external signal.
1425    #[strum(serialize = "awaiting_signal")]
1426    AwaitingSignal {
1427        /// The signal node ID.
1428        signal_id: String,
1429        /// The named signal being waited on.
1430        signal_name: String,
1431        /// Optional timeout deadline.
1432        wake_at: Option<chrono::DateTime<chrono::Utc>>,
1433    },
1434}
1435
1436/// Flattened representation of [`WorkflowStatus`] for binding crates.
1437///
1438/// Both the Node.js and Python bindings expose a flat struct with string
1439/// fields to their respective languages. This struct holds the common
1440/// fields so bindings only need to map the language-specific output.
1441#[derive(Debug, Default)]
1442pub struct FlatWorkflowStatus {
1443    /// One of: `"completed"`, `"in_progress"`, `"failed"`, `"cancelled"`,
1444    /// `"paused"`, `"waiting"`, `"awaiting_signal"`.
1445    pub status: String,
1446    /// Error message (present when `status == "failed"`).
1447    pub error: Option<String>,
1448    /// Reason (present when `status` is `"cancelled"` or `"paused"`).
1449    pub reason: Option<String>,
1450    /// Who cancelled (present when `status == "cancelled"`).
1451    pub cancelled_by: Option<String>,
1452    /// Who paused (present when `status == "paused"`).
1453    pub paused_by: Option<String>,
1454    /// ISO-8601 wake-up timestamp (present when `status` is `"waiting"` or `"awaiting_signal"`).
1455    pub wake_at: Option<String>,
1456    /// Delay step identifier (present when `status == "waiting"`).
1457    pub delay_id: Option<String>,
1458    /// Signal step identifier (present when `status == "awaiting_signal"`).
1459    pub signal_id: Option<String>,
1460    /// Signal name (present when `status == "awaiting_signal"`).
1461    pub signal_name: Option<String>,
1462}
1463
1464impl From<WorkflowStatus> for FlatWorkflowStatus {
1465    fn from(status: WorkflowStatus) -> Self {
1466        let mut flat = Self {
1467            status: status.as_ref().to_string(),
1468            ..Self::default()
1469        };
1470        match status {
1471            WorkflowStatus::Completed | WorkflowStatus::InProgress => {}
1472            WorkflowStatus::Failed(e) => flat.error = Some(e),
1473            WorkflowStatus::Cancelled {
1474                reason,
1475                cancelled_by,
1476            } => {
1477                flat.reason = reason;
1478                flat.cancelled_by = cancelled_by;
1479            }
1480            WorkflowStatus::Paused { reason, paused_by } => {
1481                flat.reason = reason;
1482                flat.paused_by = paused_by;
1483            }
1484            WorkflowStatus::Waiting { wake_at, delay_id } => {
1485                flat.wake_at = Some(wake_at.to_rfc3339());
1486                flat.delay_id = Some(delay_id);
1487            }
1488            WorkflowStatus::AwaitingSignal {
1489                signal_id,
1490                signal_name,
1491                wake_at,
1492            } => {
1493                flat.signal_id = Some(signal_id);
1494                flat.signal_name = Some(signal_name);
1495                flat.wake_at = wake_at.map(|t| t.to_rfc3339());
1496            }
1497        }
1498        flat
1499    }
1500}
1501
1502// Re-export builder types for backwards compatibility.
1503pub use crate::builder::{
1504    BranchCollector, ContinuationState, ForkBuilder, NoContinuation, NoRegistry, RegistryBehavior,
1505    RouteBuilder, SubBuilder, WorkflowBuilder,
1506};
1507
1508use crate::registry::TaskRegistry;
1509
1510/// A built workflow that can be executed.
1511pub struct Workflow<C, Input, M = ()> {
1512    pub(crate) definition_hash: String,
1513    pub(crate) context: WorkflowContext<C, M>,
1514    pub(crate) continuation: WorkflowContinuation,
1515    pub(crate) _phantom: PhantomData<Input>,
1516}
1517
1518impl<C, Input, M> Workflow<C, Input, M> {
1519    /// Get the workflow ID.
1520    #[must_use]
1521    pub fn workflow_id(&self) -> &str {
1522        &self.context.workflow_id
1523    }
1524
1525    /// Get the definition hash.
1526    ///
1527    /// This hash is computed from the workflow's continuation structure and serves
1528    /// as a version identifier. It can be used to detect when a serialized workflow
1529    /// state was created with a different workflow definition.
1530    #[must_use]
1531    pub fn definition_hash(&self) -> &str {
1532        &self.definition_hash
1533    }
1534
1535    /// Get a reference to the context of this workflow.
1536    #[must_use]
1537    pub fn context(&self) -> &WorkflowContext<C, M> {
1538        &self.context
1539    }
1540
1541    /// Get a reference to the codec used by this workflow.
1542    #[must_use]
1543    pub fn codec(&self) -> &Arc<C> {
1544        &self.context.codec
1545    }
1546
1547    /// Get a reference to the continuation of this workflow.
1548    #[must_use]
1549    pub fn continuation(&self) -> &WorkflowContinuation {
1550        &self.continuation
1551    }
1552
1553    /// Get a reference to the metadata attached to this workflow.
1554    #[must_use]
1555    pub fn metadata(&self) -> &Arc<M> {
1556        &self.context.metadata
1557    }
1558
1559    /// Returns a lazy iterator over all nodes in topological (execution) order.
1560    ///
1561    /// Convenience wrapper around [`WorkflowContinuation::iter_nodes`].
1562    #[must_use]
1563    pub fn iter_nodes(&self) -> NodeIter<'_> {
1564        self.continuation.iter_nodes()
1565    }
1566
1567    /// Consume the workflow and return its continuation tree.
1568    ///
1569    /// Useful for inlining this workflow as a child inside another workflow.
1570    #[must_use]
1571    pub fn into_continuation(self) -> WorkflowContinuation {
1572        self.continuation
1573    }
1574}
1575
1576// ============================================================================
1577// Serializable Workflow
1578// ============================================================================
1579
1580/// A workflow that can be serialized and deserialized.
1581///
1582/// This is a wrapper around `Workflow` that carries an internal `TaskRegistry`,
1583/// automatically populated during building. This enables serialization without
1584/// manually setting up a separate registry.
1585///
1586/// # Example
1587///
1588/// ```rust
1589/// # use sayiir_core::prelude::*;
1590/// # use sayiir_core::codec::{Encoder, Decoder, sealed};
1591/// # use sayiir_core::workflow::SerializedWorkflowState;
1592/// # use bytes::Bytes;
1593/// # use std::sync::Arc;
1594/// # struct MyCodec;
1595/// # impl Encoder for MyCodec {}
1596/// # impl Decoder for MyCodec {}
1597/// # impl<T> sealed::EncodeValue<T> for MyCodec {
1598/// #     fn encode_value(&self, _: &T) -> Result<Bytes, BoxError> { Ok(Bytes::new()) }
1599/// # }
1600/// # impl<T> sealed::DecodeValue<T> for MyCodec {
1601/// #     fn decode_value(&self, _: Bytes) -> Result<T, BoxError> { Err("dummy".into()) }
1602/// # }
1603/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1604/// # let codec = Arc::new(MyCodec);
1605/// # let ctx = WorkflowContext::new("my-workflow", codec, Arc::new(()));
1606/// // Build a serializable workflow
1607/// let workflow = WorkflowBuilder::new(ctx)
1608///     .with_registry()  // Enable serialization
1609///     .then("step1", |i: u32| async move { Ok(i + 1) })
1610///     .then("step2", |i: u32| async move { Ok(i * 2) })
1611///     .build()?;
1612///
1613/// // Serialize
1614/// let serialized = workflow.to_serializable();
1615/// let json = serde_json::to_string(&serialized)?;
1616///
1617/// // Deserialize (uses internal registry)
1618/// let deserialized: SerializedWorkflowState = serde_json::from_str(&json)?;
1619/// let restored = workflow.to_runnable(&deserialized)?;
1620/// # Ok(())
1621/// # }
1622/// ```
1623pub struct SerializableWorkflow<C, Input, M = ()> {
1624    pub(crate) inner: Workflow<C, Input, M>,
1625    pub(crate) registry: TaskRegistry,
1626}
1627
1628impl<C, Input, M> SerializableWorkflow<C, Input, M> {
1629    /// Get the workflow ID.
1630    #[must_use]
1631    pub fn workflow_id(&self) -> &str {
1632        self.inner.workflow_id()
1633    }
1634
1635    /// Get the definition hash.
1636    #[must_use]
1637    pub fn definition_hash(&self) -> &str {
1638        self.inner.definition_hash()
1639    }
1640
1641    /// Get a reference to the inner workflow.
1642    #[must_use]
1643    pub fn workflow(&self) -> &Workflow<C, Input, M> {
1644        &self.inner
1645    }
1646
1647    /// Get a reference to the context.
1648    #[must_use]
1649    pub fn context(&self) -> &WorkflowContext<C, M> {
1650        self.inner.context()
1651    }
1652
1653    /// Get a reference to the codec.
1654    #[must_use]
1655    pub fn codec(&self) -> &Arc<C> {
1656        self.inner.codec()
1657    }
1658
1659    /// Get a reference to the continuation.
1660    #[must_use]
1661    pub fn continuation(&self) -> &WorkflowContinuation {
1662        self.inner.continuation()
1663    }
1664
1665    /// Get a reference to the metadata.
1666    #[must_use]
1667    pub fn metadata(&self) -> &Arc<M> {
1668        self.inner.metadata()
1669    }
1670
1671    /// Get a reference to the internal task registry.
1672    #[must_use]
1673    pub fn registry(&self) -> &TaskRegistry {
1674        &self.registry
1675    }
1676
1677    /// Consume the workflow and return its continuation tree and task registry.
1678    ///
1679    /// Useful for inlining this workflow as a child inside another workflow
1680    /// while merging task registries.
1681    #[must_use]
1682    pub fn into_parts(self) -> (WorkflowContinuation, TaskRegistry) {
1683        (self.inner.continuation, self.registry)
1684    }
1685
1686    /// Convert to a serializable state representation.
1687    ///
1688    /// Returns a `SerializedWorkflowState` that includes the workflow ID,
1689    /// definition hash, and continuation structure. This can be serialized
1690    /// and later deserialized to resume the workflow.
1691    #[must_use]
1692    pub fn to_serializable(&self) -> SerializedWorkflowState {
1693        SerializedWorkflowState {
1694            workflow_id: self.inner.workflow_id().to_string(),
1695            definition_hash: self.inner.definition_hash.clone(),
1696            continuation: self.inner.continuation().to_serializable(),
1697        }
1698    }
1699
1700    /// Convert a serialized workflow state to runnable using the internal registry.
1701    ///
1702    /// # Errors
1703    ///
1704    /// Returns `BuildError::DefinitionMismatch` if the definition hash doesn't
1705    /// match this workflow's hash, indicating the serialized state was created with
1706    /// a different workflow definition.
1707    ///
1708    /// Returns `BuildError::TaskNotFound` if any task ID is not in the registry.
1709    pub fn to_runnable(
1710        &self,
1711        state: &SerializedWorkflowState,
1712    ) -> Result<WorkflowContinuation, crate::error::BuildError> {
1713        if state.definition_hash != self.inner.definition_hash {
1714            return Err(crate::error::BuildError::DefinitionMismatch {
1715                expected: self.inner.definition_hash.clone(),
1716                found: state.definition_hash.clone(),
1717            });
1718        }
1719        state.continuation.to_runnable(&self.registry)
1720    }
1721}
1722
1723impl<C, Input, M> Deref for SerializableWorkflow<C, Input, M> {
1724    type Target = Workflow<C, Input, M>;
1725
1726    fn deref(&self) -> &Self::Target {
1727        &self.inner
1728    }
1729}
1730
1731#[cfg(test)]
1732#[allow(
1733    clippy::unwrap_used,
1734    clippy::panic,
1735    clippy::cast_lossless,
1736    clippy::cast_possible_truncation,
1737    clippy::uninlined_format_args,
1738    clippy::manual_let_else,
1739    clippy::too_many_lines,
1740    clippy::items_after_statements
1741)]
1742mod tests {
1743    use crate::codec::{Decoder, Encoder, sealed};
1744    use crate::error::BoxError;
1745    use crate::workflow::WorkflowBuilder;
1746    use bytes::Bytes;
1747
1748    struct DummyCodec;
1749
1750    impl Encoder for DummyCodec {}
1751    impl Decoder for DummyCodec {}
1752
1753    impl<Input> sealed::EncodeValue<Input> for DummyCodec {
1754        fn encode_value(&self, _value: &Input) -> Result<Bytes, BoxError> {
1755            Ok(Bytes::new())
1756        }
1757    }
1758    impl<Output> sealed::DecodeValue<Output> for DummyCodec {
1759        fn decode_value(&self, _bytes: Bytes) -> Result<Output, BoxError> {
1760            Err("Not implemented".into())
1761        }
1762    }
1763
1764    #[test]
1765    fn test_workflow_build() {
1766        use crate::context::WorkflowContext;
1767        use crate::workflow::Workflow;
1768        use std::sync::Arc;
1769
1770        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
1771        let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
1772            .then("test", |i: u32| async move { Ok(i + 1) })
1773            .build()
1774            .unwrap();
1775
1776        // Verify the workflow was built successfully
1777        // The workflow can be executed using a WorkflowRunner from sayiir-runtime
1778        let _workflow_ref = &workflow;
1779    }
1780
1781    #[test]
1782    fn test_workflow_with_metadata() {
1783        use crate::context::WorkflowContext;
1784        use crate::workflow::Workflow;
1785        use std::sync::Arc;
1786
1787        let ctx = WorkflowContext::new(
1788            "test-workflow",
1789            Arc::new(DummyCodec),
1790            Arc::new("test_metadata"),
1791        );
1792        let workflow: Workflow<DummyCodec, u32, &str> = WorkflowBuilder::new(ctx)
1793            .then("test", |i: u32| async move { Ok(i + 1) })
1794            .build()
1795            .unwrap();
1796
1797        assert_eq!(**workflow.metadata(), "test_metadata");
1798    }
1799
1800    #[test]
1801    fn test_task_order() {
1802        use crate::context::WorkflowContext;
1803        use crate::workflow::Workflow;
1804        use std::sync::Arc;
1805
1806        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
1807        let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
1808            .then("first", |i: u32| async move { Ok(i + 1) })
1809            .then("second", |i: u32| async move { Ok(i + 2) })
1810            .then("third", |i: u32| async move { Ok(i + 3) })
1811            .build()
1812            .unwrap();
1813
1814        // Verify the continuation chain structure
1815        // Tasks should be linked in order: first -> second -> third
1816        let mut current = workflow.continuation();
1817        let mut task_ids = vec![];
1818
1819        loop {
1820            match current {
1821                crate::workflow::WorkflowContinuation::Task { id, next, .. } => {
1822                    task_ids.push(id.clone());
1823                    match next {
1824                        Some(next_box) => current = next_box.as_ref(),
1825                        None => break,
1826                    }
1827                }
1828                _ => break,
1829            }
1830        }
1831
1832        assert_eq!(
1833            task_ids,
1834            vec!["first", "second", "third"],
1835            "Tasks should execute in the order they were added"
1836        );
1837    }
1838
1839    #[test]
1840    fn test_heterogeneous_fork_join_compiles() {
1841        use crate::context::WorkflowContext;
1842        use crate::task::BranchOutputs;
1843        use crate::workflow::Workflow;
1844        use std::sync::Arc;
1845
1846        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
1847        // This test verifies that the heterogeneous fork-join API compiles correctly.
1848        // Each branch can return a different type thanks to type erasure.
1849        let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
1850            .then("prepare", |i: u32| async move { Ok(i) })
1851            .branches(|b| {
1852                // Returns u32
1853                b.add("count", |i: u32| async move { Ok(i * 2) });
1854                // Returns String - heterogeneous output type!
1855                b.add("name", |i: u32| async move { Ok(format!("item_{}", i)) });
1856                // Returns f64 - another different type!
1857                b.add("ratio", |i: u32| async move { Ok(i as f64 / 100.0) });
1858            })
1859            .join("combine", |outputs: BranchOutputs<DummyCodec>| async move {
1860                // In a real workflow with a proper codec, you would:
1861                // let count: u32 = outputs.get_by_id("count")?;
1862                // let name: String = outputs.get_by_id("name")?;
1863                // let ratio: f64 = outputs.get_by_id("ratio")?;
1864                // For this test, just verify the API compiles
1865                let _ = outputs.len();
1866                Ok(format!("combined {} branches", outputs.len()))
1867            })
1868            .then("final", |s: String| async move { Ok(s.len() as u32) })
1869            .build()
1870            .unwrap();
1871
1872        let _workflow_ref = &workflow;
1873    }
1874
1875    #[test]
1876    fn test_duplicate_branch_id_returns_error() {
1877        use crate::context::WorkflowContext;
1878        use crate::error::BuildError;
1879        use std::sync::Arc;
1880
1881        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
1882        let result = WorkflowBuilder::<_, u32, _>::new(ctx)
1883            .then("prepare", |i: u32| async move { Ok(i) })
1884            .branches(|b| {
1885                b.add("count", |i: u32| async move { Ok(i * 2) });
1886                b.add("count", |i: u32| async move { Ok(i * 3) }); // Duplicate!
1887            })
1888            .join("combine", |_outputs| async move { Ok(0u32) })
1889            .build();
1890
1891        let err = match result {
1892            Err(e) => e,
1893            Ok(_) => panic!("expected build error"),
1894        };
1895        assert!(
1896            err.iter()
1897                .any(|e| matches!(e, BuildError::DuplicateTaskId(id) if id == "count"))
1898        );
1899    }
1900
1901    #[test]
1902    fn test_serializable_continuation() {
1903        use crate::context::WorkflowContext;
1904        use crate::error::BuildError;
1905        use crate::registry::TaskRegistry;
1906        use std::sync::Arc;
1907
1908        // Build a workflow
1909        let codec = Arc::new(DummyCodec);
1910        let ctx = WorkflowContext::new("test-workflow", codec.clone(), Arc::new(()));
1911        let workflow = WorkflowBuilder::new(ctx)
1912            .then("step1", |i: u32| async move { Ok(i + 1) })
1913            .then("step2", |i: u32| async move { Ok(i * 2) })
1914            .build()
1915            .unwrap();
1916
1917        // Convert to serializable
1918        let serializable = workflow.continuation().to_serializable();
1919
1920        // Check structure
1921        let task_ids = serializable.task_ids();
1922        assert_eq!(task_ids, vec!["step1", "step2"]);
1923
1924        // Hydration fails without registry
1925        let empty_registry = TaskRegistry::new();
1926        let result = serializable.to_runnable(&empty_registry);
1927        assert!(matches!(result, Err(BuildError::TaskNotFound(id)) if id == "step1"));
1928
1929        // Hydration succeeds with proper registry
1930        let mut registry = TaskRegistry::new();
1931        registry.register_fn("step1", codec.clone(), |i: u32| async move { Ok(i + 1) });
1932        registry.register_fn("step2", codec.clone(), |i: u32| async move { Ok(i * 2) });
1933
1934        let hydrated = serializable.to_runnable(&registry);
1935        assert!(hydrated.is_ok());
1936    }
1937
1938    #[test]
1939    fn test_serializable_fork_join() {
1940        use crate::context::WorkflowContext;
1941        use crate::task::BranchOutputs;
1942        use std::sync::Arc;
1943
1944        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
1945        let workflow = WorkflowBuilder::new(ctx)
1946            .then("prepare", |i: u32| async move { Ok(i) })
1947            .branches(|b| {
1948                b.add("branch_a", |i: u32| async move { Ok(i * 2) });
1949                b.add("branch_b", |i: u32| async move { Ok(i + 10) });
1950            })
1951            .join(
1952                "merge",
1953                |_: BranchOutputs<DummyCodec>| async move { Ok(0u32) },
1954            )
1955            .build()
1956            .unwrap();
1957
1958        let serializable = workflow.continuation().to_serializable();
1959        let task_ids = serializable.task_ids();
1960
1961        // Should contain: prepare, fork (branch_a||branch_b), branch_a, branch_b, merge
1962        assert!(task_ids.contains(&"prepare"));
1963        assert!(task_ids.contains(&"branch_a||branch_b"));
1964        assert!(task_ids.contains(&"branch_a"));
1965        assert!(task_ids.contains(&"branch_b"));
1966        assert!(task_ids.contains(&"merge"));
1967        assert_eq!(task_ids.len(), 5);
1968    }
1969
1970    #[test]
1971    fn test_serializable_workflow_builder() {
1972        use crate::context::WorkflowContext;
1973        use std::sync::Arc;
1974
1975        let codec = Arc::new(DummyCodec);
1976        let ctx = WorkflowContext::new("test-workflow", codec, Arc::new(()));
1977
1978        // Build with with_registry() - registry is auto-populated
1979        let workflow = WorkflowBuilder::new(ctx)
1980            .with_registry()
1981            .then("step1", |i: u32| async move { Ok(i + 1) })
1982            .then("step2", |i: u32| async move { Ok(i * 2) })
1983            .build()
1984            .unwrap();
1985
1986        // Registry was auto-populated
1987        assert!(workflow.registry().contains("step1"));
1988        assert!(workflow.registry().contains("step2"));
1989        assert_eq!(workflow.registry().len(), 2);
1990
1991        // Can serialize
1992        let serializable = workflow.to_serializable();
1993        assert_eq!(serializable.continuation.task_ids(), vec!["step1", "step2"]);
1994
1995        // Can hydrate using internal registry
1996        let hydrated = workflow.to_runnable(&serializable);
1997        assert!(hydrated.is_ok());
1998    }
1999
2000    #[test]
2001    fn test_with_existing_registry_and_then_registered() {
2002        use crate::context::WorkflowContext;
2003        use crate::registry::TaskRegistry;
2004        use crate::workflow::SerializableWorkflow;
2005        use std::sync::Arc;
2006
2007        let codec = Arc::new(DummyCodec);
2008
2009        // Pre-register tasks in a registry
2010        let mut registry = TaskRegistry::new();
2011        registry.register_fn("double", codec.clone(), |i: u32| async move { Ok(i * 2) });
2012        registry.register_fn("add_ten", codec.clone(), |i: u32| async move { Ok(i + 10) });
2013
2014        // Build workflow using existing registry and referencing pre-registered tasks
2015        let ctx = WorkflowContext::new("test-workflow", codec.clone(), Arc::new(()));
2016        let workflow: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx)
2017            .with_existing_registry(registry)
2018            .then_registered::<u32>("double")
2019            .then_registered::<u32>("add_ten")
2020            .build()
2021            .unwrap();
2022
2023        // Registry should contain the pre-registered tasks
2024        assert!(workflow.registry().contains("double"));
2025        assert!(workflow.registry().contains("add_ten"));
2026
2027        // Workflow structure should reference those tasks
2028        let serializable = workflow.to_serializable();
2029        assert_eq!(
2030            serializable.continuation.task_ids(),
2031            vec!["double", "add_ten"]
2032        );
2033
2034        // Can hydrate using the same registry
2035        let hydrated = workflow.to_runnable(&serializable);
2036        assert!(hydrated.is_ok());
2037    }
2038
2039    #[test]
2040    fn test_mixed_inline_and_registered_tasks() {
2041        use crate::context::WorkflowContext;
2042        use crate::registry::TaskRegistry;
2043        use crate::workflow::SerializableWorkflow;
2044        use std::sync::Arc;
2045
2046        let codec = Arc::new(DummyCodec);
2047
2048        // Pre-register one task
2049        let mut registry = TaskRegistry::new();
2050        registry.register_fn(
2051            "preregistered",
2052            codec.clone(),
2053            |i: u32| async move { Ok(i * 2) },
2054        );
2055
2056        // Build workflow mixing pre-registered and inline tasks
2057        let ctx = WorkflowContext::new("test-workflow", codec.clone(), Arc::new(()));
2058        let workflow: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx)
2059            .with_existing_registry(registry)
2060            .then_registered::<u32>("preregistered") // Use pre-registered
2061            .then("inline", |i: u32| async move { Ok(i + 5) }) // Define inline
2062            .build()
2063            .unwrap();
2064
2065        // Registry should have both tasks
2066        assert!(workflow.registry().contains("preregistered"));
2067        assert!(workflow.registry().contains("inline"));
2068        assert_eq!(workflow.registry().len(), 2);
2069    }
2070
2071    #[test]
2072    fn test_workflow_id_and_definition_hash() {
2073        use crate::context::WorkflowContext;
2074        use std::sync::Arc;
2075
2076        let ctx = WorkflowContext::new("my-workflow-id", Arc::new(DummyCodec), Arc::new(()));
2077        let workflow = WorkflowBuilder::new(ctx)
2078            .with_registry()
2079            .then("step1", |i: u32| async move { Ok(i + 1) })
2080            .then("step2", |i: u32| async move { Ok(i * 2) })
2081            .build()
2082            .unwrap();
2083
2084        // Check workflow_id is set correctly
2085        assert_eq!(workflow.workflow_id(), "my-workflow-id");
2086
2087        // Definition hash should be non-empty
2088        assert!(!workflow.definition_hash().is_empty());
2089
2090        // Serializable state should contain the same id and hash
2091        let state = workflow.to_serializable();
2092        assert_eq!(state.workflow_id, "my-workflow-id");
2093        assert_eq!(state.definition_hash, workflow.definition_hash());
2094    }
2095
2096    #[test]
2097    fn test_definition_hash_changes_with_structure() {
2098        use crate::context::WorkflowContext;
2099        use std::sync::Arc;
2100
2101        // Build two workflows with different structures
2102        let ctx1 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2103        let workflow1 = WorkflowBuilder::new(ctx1)
2104            .with_registry()
2105            .then("step1", |i: u32| async move { Ok(i + 1) })
2106            .build()
2107            .unwrap();
2108
2109        let ctx2 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2110        let workflow2 = WorkflowBuilder::new(ctx2)
2111            .with_registry()
2112            .then("step1", |i: u32| async move { Ok(i + 1) })
2113            .then("step2", |i: u32| async move { Ok(i * 2) })
2114            .build()
2115            .unwrap();
2116
2117        assert_ne!(workflow1.definition_hash(), workflow2.definition_hash());
2118    }
2119
2120    #[test]
2121    fn test_definition_mismatch_error() {
2122        use crate::context::WorkflowContext;
2123        use crate::error::BuildError;
2124        use std::sync::Arc;
2125
2126        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2127        let workflow = WorkflowBuilder::new(ctx)
2128            .with_registry()
2129            .then("step1", |i: u32| async move { Ok(i + 1) })
2130            .build()
2131            .unwrap();
2132
2133        // Create a state with wrong hash
2134        let mut state = workflow.to_serializable();
2135        state.definition_hash = "wrong-hash".to_string();
2136
2137        // to_runnable should fail with DefinitionMismatch
2138        let result = workflow.to_runnable(&state);
2139        assert!(matches!(result, Err(BuildError::DefinitionMismatch { .. })));
2140    }
2141
2142    #[test]
2143    fn test_duplicate_id_tampering_detection() {
2144        use crate::error::BuildError;
2145        use crate::registry::TaskRegistry;
2146        use crate::workflow::SerializableContinuation;
2147        use std::sync::Arc;
2148
2149        let codec = Arc::new(DummyCodec);
2150
2151        // Create a registry with tasks
2152        let mut registry = TaskRegistry::new();
2153        registry.register_fn("step1", codec.clone(), |i: u32| async move { Ok(i + 1) });
2154        registry.register_fn("step2", codec.clone(), |i: u32| async move { Ok(i * 2) });
2155
2156        // Manually construct a tampered continuation with duplicate IDs
2157        let tampered = SerializableContinuation::Task {
2158            id: "step1".to_string(),
2159            timeout_ms: None,
2160            retry_policy: None,
2161            version: None,
2162            priority: None,
2163
2164            tags: vec![],
2165            next: Some(Box::new(SerializableContinuation::Task {
2166                id: "step1".to_string(), // Duplicate!
2167                timeout_ms: None,
2168                retry_policy: None,
2169                version: None,
2170                priority: None,
2171
2172                tags: vec![],
2173                next: None,
2174            })),
2175        };
2176
2177        // to_runnable should detect the tampering
2178        let result = tampered.to_runnable(&registry);
2179        assert!(matches!(
2180            result,
2181            Err(BuildError::DuplicateTaskId(id)) if id == "step1"
2182        ));
2183    }
2184
2185    // ========================================================================
2186    // Delay tests
2187    // ========================================================================
2188
2189    #[test]
2190    fn test_delay_builder() {
2191        use crate::context::WorkflowContext;
2192        use crate::workflow::{Workflow, WorkflowContinuation};
2193        use std::sync::Arc;
2194        use std::time::Duration;
2195
2196        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2197        let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2198            .then("step1", |i: u32| async move { Ok(i + 1) })
2199            .delay("wait_1s", Duration::from_secs(1))
2200            .then("step2", |i: u32| async move { Ok(i * 2) })
2201            .build()
2202            .unwrap();
2203
2204        // Verify the chain structure: Task -> Delay -> Task
2205        let mut ids = vec![];
2206        let mut current = workflow.continuation();
2207        loop {
2208            match current {
2209                WorkflowContinuation::Task { id, next, .. } => {
2210                    ids.push(format!("task:{id}"));
2211                    match next {
2212                        Some(n) => current = n,
2213                        None => break,
2214                    }
2215                }
2216                WorkflowContinuation::Delay {
2217                    id, duration, next, ..
2218                } => {
2219                    ids.push(format!("delay:{id}:{}ms", duration.as_millis()));
2220                    match next {
2221                        Some(n) => current = n,
2222                        None => break,
2223                    }
2224                }
2225                _ => break,
2226            }
2227        }
2228
2229        assert_eq!(
2230            ids,
2231            vec!["task:step1", "delay:wait_1s:1000ms", "task:step2"]
2232        );
2233    }
2234
2235    #[test]
2236    fn test_delay_serialization_roundtrip() {
2237        use crate::context::WorkflowContext;
2238        use crate::workflow::SerializableContinuation;
2239        use std::sync::Arc;
2240        use std::time::Duration;
2241
2242        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2243        let workflow = WorkflowBuilder::new(ctx)
2244            .with_registry()
2245            .then("step1", |i: u32| async move { Ok(i + 1) })
2246            .delay("wait_5s", Duration::from_secs(5))
2247            .then("step2", |i: u32| async move { Ok(i * 2) })
2248            .build()
2249            .unwrap();
2250
2251        // Convert to serializable
2252        let serializable = workflow.to_serializable();
2253
2254        // Check structure
2255        let task_ids = serializable.continuation.task_ids();
2256        assert_eq!(task_ids, vec!["step1", "wait_5s", "step2"]);
2257
2258        // Check delay duration is preserved
2259        match &serializable.continuation {
2260            SerializableContinuation::Task { next, .. } => {
2261                let next = next.as_ref().unwrap();
2262                match next.as_ref() {
2263                    SerializableContinuation::Delay {
2264                        id, duration_ms, ..
2265                    } => {
2266                        assert_eq!(id, "wait_5s");
2267                        assert_eq!(*duration_ms, 5000);
2268                    }
2269                    other => panic!("Expected Delay, got {other:?}"),
2270                }
2271            }
2272            other => panic!("Expected Task, got {other:?}"),
2273        }
2274
2275        // Hydrate back to runnable
2276        let hydrated = workflow.to_runnable(&serializable);
2277        assert!(hydrated.is_ok());
2278    }
2279
2280    #[test]
2281    fn test_delay_first_task_id() {
2282        use crate::context::WorkflowContext;
2283        use std::sync::Arc;
2284        use std::time::Duration;
2285
2286        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2287        let workflow = WorkflowBuilder::new(ctx)
2288            .delay("initial_delay", Duration::from_secs(10))
2289            .then("step1", |i: u32| async move { Ok(i + 1) })
2290            .build()
2291            .unwrap();
2292
2293        assert_eq!(workflow.continuation().first_task_id(), "initial_delay");
2294    }
2295
2296    #[test]
2297    fn test_delay_duplicate_id_detection() {
2298        use crate::context::WorkflowContext;
2299        use crate::error::BuildError;
2300        use std::sync::Arc;
2301        use std::time::Duration;
2302
2303        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2304        let result = WorkflowBuilder::<_, u32, _>::new(ctx)
2305            .then("dup", |i: u32| async move { Ok(i + 1) })
2306            .delay("dup", Duration::from_secs(1))
2307            .build();
2308
2309        let err = match result {
2310            Err(e) => e,
2311            Ok(_) => panic!("expected build error"),
2312        };
2313        assert!(
2314            err.iter()
2315                .any(|e| matches!(e, BuildError::DuplicateTaskId(id) if id == "dup"))
2316        );
2317    }
2318
2319    #[test]
2320    fn test_delay_definition_hash_includes_duration() {
2321        use crate::context::WorkflowContext;
2322        use crate::workflow::SerializableWorkflow;
2323        use std::sync::Arc;
2324        use std::time::Duration;
2325
2326        // Workflow with 1-second delay
2327        let ctx1 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2328        let wf1: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx1)
2329            .with_registry()
2330            .then("step1", |i: u32| async move { Ok(i + 1) })
2331            .delay("wait", Duration::from_secs(1))
2332            .build()
2333            .unwrap();
2334
2335        // Workflow with 60-second delay (same ID, different duration)
2336        let ctx2 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2337        let wf2: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx2)
2338            .with_registry()
2339            .then("step1", |i: u32| async move { Ok(i + 1) })
2340            .delay("wait", Duration::from_secs(60))
2341            .build()
2342            .unwrap();
2343
2344        // Hashes should differ because duration differs
2345        assert_ne!(wf1.definition_hash(), wf2.definition_hash());
2346    }
2347
2348    #[test]
2349    fn test_delay_definition_hash_differs_from_task() {
2350        use crate::context::WorkflowContext;
2351        use crate::workflow::SerializableWorkflow;
2352        use std::sync::Arc;
2353        use std::time::Duration;
2354
2355        // Workflow with task
2356        let ctx1 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2357        let wf1: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx1)
2358            .with_registry()
2359            .then("step1", |i: u32| async move { Ok(i + 1) })
2360            .build()
2361            .unwrap();
2362
2363        // Workflow with delay instead
2364        let ctx2 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2365        let wf2: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx2)
2366            .with_registry()
2367            .delay("step1", Duration::from_secs(1))
2368            .build()
2369            .unwrap();
2370
2371        // Hashes should differ (Task vs Delay are tagged differently)
2372        assert_ne!(wf1.definition_hash(), wf2.definition_hash());
2373    }
2374
2375    #[test]
2376    fn test_delay_task_ids() {
2377        use crate::context::WorkflowContext;
2378        use std::sync::Arc;
2379        use std::time::Duration;
2380
2381        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2382        let workflow = WorkflowBuilder::new(ctx)
2383            .then("fetch", |i: u32| async move { Ok(i) })
2384            .delay("wait_24h", Duration::from_secs(86400))
2385            .then("process", |i: u32| async move { Ok(i + 1) })
2386            .build()
2387            .unwrap();
2388
2389        let serializable = workflow.continuation().to_serializable();
2390        let ids = serializable.task_ids();
2391        assert_eq!(ids, vec!["fetch", "wait_24h", "process"]);
2392    }
2393
2394    #[test]
2395    fn test_delay_only_workflow() {
2396        use crate::context::WorkflowContext;
2397        use std::sync::Arc;
2398        use std::time::Duration;
2399
2400        use crate::workflow::Workflow;
2401
2402        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2403        let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2404            .delay("just_wait", Duration::from_millis(10))
2405            .build()
2406            .unwrap();
2407
2408        assert_eq!(workflow.continuation().first_task_id(), "just_wait");
2409
2410        let serializable = workflow.continuation().to_serializable();
2411        assert_eq!(serializable.task_ids(), vec!["just_wait"]);
2412    }
2413
2414    #[test]
2415    fn test_delay_to_runnable_no_registry_needed() {
2416        use crate::registry::TaskRegistry;
2417        use crate::workflow::SerializableContinuation;
2418
2419        // A delay doesn't need a registry entry (it has no func)
2420        let delay = SerializableContinuation::Delay {
2421            id: "wait".to_string(),
2422            duration_ms: 5000,
2423            next: None,
2424        };
2425
2426        let empty_registry = TaskRegistry::new();
2427        let result = delay.to_runnable(&empty_registry);
2428        assert!(result.is_ok());
2429
2430        let runnable = result.unwrap();
2431        match runnable {
2432            crate::workflow::WorkflowContinuation::Delay {
2433                id, duration, next, ..
2434            } => {
2435                assert_eq!(id, "wait");
2436                assert_eq!(duration, std::time::Duration::from_millis(5000));
2437                assert!(next.is_none());
2438            }
2439            _ => panic!("Expected Delay variant"),
2440        }
2441    }
2442
2443    // ========================================================================
2444    // Timeout tests
2445    // ========================================================================
2446
2447    #[test]
2448    fn test_timeout_serialization_roundtrip() {
2449        use crate::context::WorkflowContext;
2450        use crate::task::TaskMetadata;
2451        use crate::workflow::SerializableContinuation;
2452        use std::sync::Arc;
2453        use std::time::Duration;
2454
2455        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2456        let workflow = WorkflowBuilder::new(ctx)
2457            .with_registry()
2458            .then("step1", |i: u32| async move { Ok(i + 1) })
2459            .with_metadata(TaskMetadata {
2460                timeout: Some(Duration::from_secs(30)),
2461                ..Default::default()
2462            })
2463            .then("step2", |i: u32| async move { Ok(i * 2) })
2464            .build()
2465            .unwrap();
2466
2467        // Convert to serializable
2468        let serializable = workflow.to_serializable();
2469
2470        // Check timeout is preserved in serialization
2471        match &serializable.continuation {
2472            SerializableContinuation::Task { id, timeout_ms, .. } => {
2473                assert_eq!(id, "step1");
2474                assert_eq!(*timeout_ms, Some(30_000));
2475            }
2476            other => panic!("Expected Task, got {other:?}"),
2477        }
2478
2479        // Hydrate back to runnable and verify timeout
2480        let hydrated = workflow.to_runnable(&serializable).unwrap();
2481        match &hydrated {
2482            crate::workflow::WorkflowContinuation::Task { id, timeout, .. } => {
2483                assert_eq!(id, "step1");
2484                assert_eq!(*timeout, Some(Duration::from_secs(30)));
2485            }
2486            _ => panic!("Expected Task variant"),
2487        }
2488    }
2489
2490    #[test]
2491    fn test_timeout_changes_definition_hash() {
2492        use crate::context::WorkflowContext;
2493        use crate::task::TaskMetadata;
2494        use crate::workflow::SerializableWorkflow;
2495        use std::sync::Arc;
2496        use std::time::Duration;
2497
2498        // Workflow without timeout
2499        let ctx1 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2500        let wf1: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx1)
2501            .with_registry()
2502            .then("step1", |i: u32| async move { Ok(i + 1) })
2503            .build()
2504            .unwrap();
2505
2506        // Workflow with timeout (same ID, different timeout)
2507        let ctx2 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2508        let wf2: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx2)
2509            .with_registry()
2510            .then("step1", |i: u32| async move { Ok(i + 1) })
2511            .with_metadata(TaskMetadata {
2512                timeout: Some(Duration::from_secs(30)),
2513                ..Default::default()
2514            })
2515            .build()
2516            .unwrap();
2517
2518        // Hashes should differ because timeout differs
2519        assert_ne!(wf1.definition_hash(), wf2.definition_hash());
2520    }
2521
2522    #[test]
2523    fn test_no_timeout_field_absent_in_serialization() {
2524        use crate::context::WorkflowContext;
2525        use std::sync::Arc;
2526
2527        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2528        let workflow = WorkflowBuilder::new(ctx)
2529            .with_registry()
2530            .then("step1", |i: u32| async move { Ok(i + 1) })
2531            .build()
2532            .unwrap();
2533
2534        let serializable = workflow.to_serializable();
2535        // With serde skip_serializing_if, timeout_ms should not appear in JSON
2536        let json = serde_json::to_string(&serializable.continuation).unwrap();
2537        assert!(
2538            !json.contains("timeout_ms"),
2539            "timeout_ms should be absent when None: {json}"
2540        );
2541    }
2542
2543    #[test]
2544    fn test_task_version_changes_definition_hash() {
2545        use crate::context::WorkflowContext;
2546        use crate::task::TaskMetadata;
2547        use crate::workflow::SerializableWorkflow;
2548        use std::sync::Arc;
2549
2550        // Workflow without version
2551        let ctx1 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2552        let wf_no_version: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx1)
2553            .with_registry()
2554            .then("step1", |i: u32| async move { Ok(i + 1) })
2555            .build()
2556            .unwrap();
2557
2558        // Workflow with version "1.0"
2559        let ctx2 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2560        let wf_v1: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx2)
2561            .with_registry()
2562            .then("step1", |i: u32| async move { Ok(i + 1) })
2563            .with_metadata(TaskMetadata {
2564                version: Some("1.0".into()),
2565                ..Default::default()
2566            })
2567            .build()
2568            .unwrap();
2569
2570        // Workflow with version "2.0"
2571        let ctx3 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2572        let wf_v2: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx3)
2573            .with_registry()
2574            .then("step1", |i: u32| async move { Ok(i + 1) })
2575            .with_metadata(TaskMetadata {
2576                version: Some("2.0".into()),
2577                ..Default::default()
2578            })
2579            .build()
2580            .unwrap();
2581
2582        // Same version produces same hash
2583        let ctx4 = WorkflowContext::new("workflow", Arc::new(DummyCodec), Arc::new(()));
2584        let wf_v1_again: SerializableWorkflow<_, u32> = WorkflowBuilder::new(ctx4)
2585            .with_registry()
2586            .then("step1", |i: u32| async move { Ok(i + 1) })
2587            .with_metadata(TaskMetadata {
2588                version: Some("1.0".into()),
2589                ..Default::default()
2590            })
2591            .build()
2592            .unwrap();
2593
2594        assert_ne!(
2595            wf_no_version.definition_hash(),
2596            wf_v1.definition_hash(),
2597            "Adding version should change hash"
2598        );
2599        assert_ne!(
2600            wf_v1.definition_hash(),
2601            wf_v2.definition_hash(),
2602            "Different versions should produce different hashes"
2603        );
2604        assert_eq!(
2605            wf_v1.definition_hash(),
2606            wf_v1_again.definition_hash(),
2607            "Same version should produce same hash"
2608        );
2609    }
2610
2611    #[test]
2612    fn test_version_absent_in_serialization_when_none() {
2613        use crate::context::WorkflowContext;
2614        use std::sync::Arc;
2615
2616        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2617        let workflow = WorkflowBuilder::new(ctx)
2618            .with_registry()
2619            .then("step1", |i: u32| async move { Ok(i + 1) })
2620            .build()
2621            .unwrap();
2622
2623        let serializable = workflow.to_serializable();
2624        let json = serde_json::to_string(&serializable.continuation).unwrap();
2625        assert!(
2626            !json.contains("version"),
2627            "version should be absent when None: {json}"
2628        );
2629    }
2630
2631    #[test]
2632    fn test_version_present_in_serialization_when_set() {
2633        use crate::context::WorkflowContext;
2634        use crate::task::TaskMetadata;
2635        use std::sync::Arc;
2636
2637        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2638        let workflow = WorkflowBuilder::new(ctx)
2639            .with_registry()
2640            .then("step1", |i: u32| async move { Ok(i + 1) })
2641            .with_metadata(TaskMetadata {
2642                version: Some("3.0".into()),
2643                ..Default::default()
2644            })
2645            .build()
2646            .unwrap();
2647
2648        let serializable = workflow.to_serializable();
2649        let json = serde_json::to_string(&serializable.continuation).unwrap();
2650        assert!(
2651            json.contains(r#""version":"3.0""#),
2652            "version should be present in JSON: {json}"
2653        );
2654    }
2655
2656    // ========================================================================
2657    // Topological nodes() tests
2658    // ========================================================================
2659
2660    #[test]
2661    fn test_nodes_single_task() {
2662        use crate::context::WorkflowContext;
2663        use crate::workflow::{NodeKind, Workflow};
2664        use std::sync::Arc;
2665
2666        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2667        let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2668            .then("only", |i: u32| async move { Ok(i + 1) })
2669            .build()
2670            .unwrap();
2671
2672        let nodes: Vec<_> = workflow.iter_nodes().collect();
2673        assert_eq!(nodes.len(), 1);
2674        assert_eq!(nodes[0].id, "only");
2675        assert_eq!(nodes[0].kind, NodeKind::Task);
2676        assert!(nodes[0].predecessor_id.is_none());
2677    }
2678
2679    #[test]
2680    fn test_nodes_chain_order() {
2681        use crate::context::WorkflowContext;
2682        use crate::workflow::{NodeKind, Workflow};
2683        use std::sync::Arc;
2684
2685        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2686        let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2687            .then("a", |i: u32| async move { Ok(i + 1) })
2688            .then("b", |i: u32| async move { Ok(i + 2) })
2689            .then("c", |i: u32| async move { Ok(i + 3) })
2690            .build()
2691            .unwrap();
2692
2693        let nodes: Vec<_> = workflow.iter_nodes().collect();
2694        let ids: Vec<&str> = nodes.iter().map(|n| n.id).collect();
2695        assert_eq!(ids, vec!["a", "b", "c"]);
2696        assert!(nodes.iter().all(|n| n.kind == NodeKind::Task));
2697
2698        // Predecessor chain
2699        assert_eq!(nodes[0].predecessor_id, None);
2700        assert_eq!(nodes[1].predecessor_id, Some("a"));
2701        assert_eq!(nodes[2].predecessor_id, Some("b"));
2702    }
2703
2704    #[test]
2705    fn test_nodes_fork_with_join() {
2706        use crate::context::WorkflowContext;
2707        use crate::task::BranchOutputs;
2708        use crate::workflow::{NodeKind, Workflow};
2709        use std::sync::Arc;
2710
2711        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2712        let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2713            .then("prepare", |i: u32| async move { Ok(i) })
2714            .branches(|b| {
2715                b.add("left", |i: u32| async move { Ok(i * 2) });
2716                b.add("right", |i: u32| async move { Ok(i + 10) });
2717            })
2718            .join(
2719                "merge",
2720                |_: BranchOutputs<DummyCodec>| async move { Ok(0u32) },
2721            )
2722            .build()
2723            .unwrap();
2724
2725        let nodes: Vec<_> = workflow.iter_nodes().collect();
2726        let ids: Vec<&str> = nodes.iter().map(|n| n.id).collect();
2727
2728        // prepare → fork → (left, right) → merge
2729        assert_eq!(ids[0], "prepare");
2730        assert_eq!(nodes[1].kind, NodeKind::Fork);
2731        assert!(ids.contains(&"left"));
2732        assert!(ids.contains(&"right"));
2733        assert_eq!(*ids.last().unwrap(), "merge");
2734
2735        // Fork's predecessor is prepare
2736        assert_eq!(nodes[1].predecessor_id, Some("prepare"));
2737
2738        // Branches' predecessor is the fork node
2739        let fork_id = nodes[1].id;
2740        let left_node = nodes.iter().find(|n| n.id == "left").unwrap();
2741        let right_node = nodes.iter().find(|n| n.id == "right").unwrap();
2742        assert_eq!(left_node.predecessor_id, Some(fork_id));
2743        assert_eq!(right_node.predecessor_id, Some(fork_id));
2744
2745        // Merge's predecessor is the fork node
2746        let merge_node = nodes.iter().find(|n| n.id == "merge").unwrap();
2747        assert_eq!(merge_node.predecessor_id, Some(fork_id));
2748    }
2749
2750    #[test]
2751    fn test_nodes_loop() {
2752        use crate::context::WorkflowContext;
2753        use crate::loop_result::LoopResult;
2754        use crate::workflow::{NodeKind, Workflow};
2755        use std::sync::Arc;
2756
2757        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2758        let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2759            .loop_task(
2760                "iterate",
2761                |i: u32| async move { Ok(LoopResult::Done(i)) },
2762                5,
2763            )
2764            .then("after", |i: u32| async move { Ok(i) })
2765            .build()
2766            .unwrap();
2767
2768        let nodes: Vec<_> = workflow.iter_nodes().collect();
2769
2770        // loop_0 (Loop) → iterate (Task, body) → after (Task, next)
2771        assert_eq!(nodes[0].kind, NodeKind::Loop);
2772        assert_eq!(nodes[1].id, "iterate");
2773        assert_eq!(nodes[1].kind, NodeKind::Task);
2774        assert_eq!(nodes[2].id, "after");
2775        assert_eq!(nodes[2].kind, NodeKind::Task);
2776
2777        // Predecessors
2778        assert_eq!(nodes[0].predecessor_id, None);
2779        assert_eq!(nodes[1].predecessor_id, Some(nodes[0].id)); // body → loop
2780        assert_eq!(nodes[2].predecessor_id, Some(nodes[0].id)); // next → loop
2781    }
2782
2783    #[test]
2784    fn test_nodes_delay_reports_duration_as_timeout() {
2785        use crate::context::WorkflowContext;
2786        use crate::workflow::{NodeKind, Workflow};
2787        use std::sync::Arc;
2788        use std::time::Duration;
2789
2790        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2791        let workflow: Workflow<DummyCodec, u32> = WorkflowBuilder::new(ctx)
2792            .delay("wait_5s", Duration::from_secs(5))
2793            .then("after", |i: u32| async move { Ok(i) })
2794            .build()
2795            .unwrap();
2796
2797        let nodes: Vec<_> = workflow.iter_nodes().collect();
2798        assert_eq!(nodes[0].id, "wait_5s");
2799        assert_eq!(nodes[0].kind, NodeKind::Delay);
2800        assert_eq!(nodes[0].timeout, Some(Duration::from_secs(5)));
2801        assert_eq!(nodes[0].predecessor_id, None);
2802
2803        assert_eq!(nodes[1].id, "after");
2804        assert_eq!(nodes[1].predecessor_id, Some("wait_5s"));
2805    }
2806
2807    #[test]
2808    fn test_nodes_metadata_extraction() {
2809        use crate::context::WorkflowContext;
2810        use crate::task::{RetryPolicy, TaskMetadata};
2811        use crate::workflow::NodeKind;
2812        use std::sync::Arc;
2813        use std::time::Duration;
2814
2815        let retry = RetryPolicy {
2816            max_retries: 3,
2817            initial_delay: Duration::from_millis(100),
2818            backoff_multiplier: 2.0,
2819            max_delay: Some(Duration::from_secs(10)),
2820        };
2821
2822        let ctx = WorkflowContext::new("test-workflow", Arc::new(DummyCodec), Arc::new(()));
2823        let workflow = WorkflowBuilder::new(ctx)
2824            .with_registry()
2825            .then("step", |i: u32| async move { Ok(i) })
2826            .with_metadata(TaskMetadata {
2827                timeout: Some(Duration::from_secs(30)),
2828                retries: Some(retry.clone()),
2829                version: Some("2.0".into()),
2830                ..Default::default()
2831            })
2832            .build()
2833            .unwrap();
2834
2835        let nodes: Vec<_> = workflow.iter_nodes().collect();
2836        assert_eq!(nodes.len(), 1);
2837        let node = &nodes[0];
2838
2839        assert_eq!(node.id, "step");
2840        assert_eq!(node.kind, NodeKind::Task);
2841        assert_eq!(node.timeout, Some(Duration::from_secs(30)));
2842        assert_eq!(node.retry_policy.unwrap().max_retries, 3);
2843    }
2844}
2845
2846#[cfg(test)]
2847#[allow(
2848    clippy::unwrap_used,
2849    clippy::expect_used,
2850    clippy::panic,
2851    clippy::indexing_slicing,
2852    clippy::too_many_lines,
2853    clippy::items_after_statements
2854)]
2855mod proptests {
2856    use super::{MaxIterationsPolicy, SerializableContinuation};
2857    use proptest::prelude::*;
2858
2859    /// Strategy for alphanumeric IDs (1..8 chars).
2860    fn arb_id() -> impl Strategy<Value = String> {
2861        "[a-z0-9]{1,8}"
2862    }
2863
2864    /// Recursive strategy for `SerializableContinuation` with bounded depth.
2865    fn arb_continuation(depth: usize) -> BoxedStrategy<SerializableContinuation> {
2866        let leaf = arb_id().prop_map(|id| SerializableContinuation::Task {
2867            id,
2868            timeout_ms: None,
2869            retry_policy: None,
2870            version: None,
2871            priority: None,
2872
2873            tags: vec![],
2874            next: None,
2875        });
2876
2877        if depth == 0 {
2878            return leaf.boxed();
2879        }
2880
2881        prop_oneof![
2882            // Task with optional next and optional timeout
2883            (
2884                arb_id(),
2885                prop::option::of(any::<u64>()),
2886                prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2887            )
2888                .prop_map(|(id, timeout_ms, next)| SerializableContinuation::Task {
2889                    id,
2890                    timeout_ms,
2891                    retry_policy: None,
2892                    version: None,
2893                    priority: None,
2894
2895                    tags: vec![],
2896                    next,
2897                }),
2898            // Fork with branches and optional join
2899            (
2900                arb_id(),
2901                prop::collection::vec(arb_continuation(depth - 1), 0..3),
2902                prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2903            )
2904                .prop_map(|(id, branches, join)| SerializableContinuation::Fork {
2905                    id,
2906                    branches,
2907                    join,
2908                }),
2909            // Delay with optional next
2910            (
2911                arb_id(),
2912                any::<u64>(),
2913                prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2914            )
2915                .prop_map(|(id, duration_ms, next)| SerializableContinuation::Delay {
2916                    id,
2917                    duration_ms,
2918                    next,
2919                }),
2920            // AwaitSignal with optional next
2921            (
2922                arb_id(),
2923                arb_id(),
2924                prop::option::of(any::<u64>()),
2925                prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2926            )
2927                .prop_map(|(id, signal_name, timeout_ms, next)| {
2928                    SerializableContinuation::AwaitSignal {
2929                        id,
2930                        signal_name,
2931                        timeout_ms,
2932                        next,
2933                    }
2934                }),
2935            // Branch with named branches, optional default and next
2936            (
2937                arb_id(),
2938                prop::collection::hash_map(
2939                    arb_id(),
2940                    arb_continuation(depth - 1).prop_map(Box::new),
2941                    0..3
2942                ),
2943                prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2944                prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2945            )
2946                .prop_map(|(id, branches, default, next)| {
2947                    SerializableContinuation::Branch {
2948                        id,
2949                        branches,
2950                        default,
2951                        next,
2952                    }
2953                }),
2954            // Loop with body and optional next
2955            (
2956                arb_id(),
2957                arb_continuation(depth - 1).prop_map(Box::new),
2958                1..100u32,
2959                prop::bool::ANY.prop_map(|b| if b {
2960                    MaxIterationsPolicy::Fail
2961                } else {
2962                    MaxIterationsPolicy::ExitWithLast
2963                }),
2964                prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2965            )
2966                .prop_map(|(id, body, max_iterations, on_max, next)| {
2967                    SerializableContinuation::Loop {
2968                        id,
2969                        body,
2970                        max_iterations,
2971                        on_max,
2972                        next,
2973                    }
2974                }),
2975            // ChildWorkflow with child and optional next
2976            (
2977                arb_id(),
2978                arb_continuation(depth - 1).prop_map(Box::new),
2979                prop::option::of(arb_continuation(depth - 1).prop_map(Box::new)),
2980            )
2981                .prop_map(|(id, child, next)| {
2982                    SerializableContinuation::ChildWorkflow { id, child, next }
2983                }),
2984        ]
2985        .boxed()
2986    }
2987
2988    /// Strategy for a continuation tree where all IDs are guaranteed unique.
2989    ///
2990    /// Each node gets an ID formed by its path index to prevent collisions.
2991    fn arb_unique_continuation(
2992        depth: usize,
2993        prefix: &str,
2994    ) -> BoxedStrategy<SerializableContinuation> {
2995        let id = format!("{prefix}n");
2996
2997        if depth == 0 {
2998            return Just(SerializableContinuation::Task {
2999                id,
3000                timeout_ms: None,
3001                retry_policy: None,
3002                version: None,
3003                priority: None,
3004
3005                tags: vec![],
3006                next: None,
3007            })
3008            .boxed();
3009        }
3010
3011        let id_clone = id.clone();
3012        prop_oneof![
3013            // Task with optional next
3014            prop::option::of(
3015                arb_unique_continuation(depth - 1, &format!("{prefix}0_")).prop_map(Box::new),
3016            )
3017            .prop_map(move |next| SerializableContinuation::Task {
3018                id: id_clone.clone(),
3019                timeout_ms: None,
3020                retry_policy: None,
3021                version: None,
3022                priority: None,
3023
3024                tags: vec![],
3025                next,
3026            }),
3027            // Fork with 0..3 branches (each gets unique prefix) and optional join
3028            {
3029                let id_f = id.clone();
3030                let prefix_f = prefix.to_string();
3031                (0..3u8)
3032                    .prop_flat_map(move |branch_count| {
3033                        let id_inner = id_f.clone();
3034                        let prefix_inner = prefix_f.clone();
3035                        let branches: Vec<BoxedStrategy<SerializableContinuation>> = (0
3036                            ..branch_count)
3037                            .map(|i| {
3038                                arb_unique_continuation(depth - 1, &format!("{prefix_inner}b{i}_"))
3039                            })
3040                            .collect();
3041                        let join = prop::option::of(
3042                            arb_unique_continuation(depth - 1, &format!("{prefix_inner}j_"))
3043                                .prop_map(Box::new),
3044                        );
3045                        (branches, join).prop_map(move |(branches, join)| {
3046                            SerializableContinuation::Fork {
3047                                id: id_inner.clone(),
3048                                branches,
3049                                join,
3050                            }
3051                        })
3052                    })
3053                    .boxed()
3054            },
3055            // Delay with optional next
3056            {
3057                let id_d = id.clone();
3058                let prefix_d = prefix.to_string();
3059                (
3060                    any::<u64>(),
3061                    prop::option::of(
3062                        arb_unique_continuation(depth - 1, &format!("{prefix_d}d_"))
3063                            .prop_map(Box::new),
3064                    ),
3065                )
3066                    .prop_map(move |(duration_ms, next)| {
3067                        SerializableContinuation::Delay {
3068                            id: id_d.clone(),
3069                            duration_ms,
3070                            next,
3071                        }
3072                    })
3073            },
3074            // AwaitSignal with optional next
3075            {
3076                let id_s = id.clone();
3077                let prefix_s = prefix.to_string();
3078                (
3079                    arb_id(),
3080                    prop::option::of(any::<u64>()),
3081                    prop::option::of(
3082                        arb_unique_continuation(depth - 1, &format!("{prefix_s}s_"))
3083                            .prop_map(Box::new),
3084                    ),
3085                )
3086                    .prop_map(move |(signal_name, timeout_ms, next)| {
3087                        SerializableContinuation::AwaitSignal {
3088                            id: id_s.clone(),
3089                            signal_name,
3090                            timeout_ms,
3091                            next,
3092                        }
3093                    })
3094            },
3095            // Branch with two named branches, optional default and next
3096            {
3097                let id_b = id.clone();
3098                let prefix_b = prefix.to_string();
3099                let b0 = arb_unique_continuation(depth - 1, &format!("{prefix_b}br0_"))
3100                    .prop_map(Box::new);
3101                let b1 = arb_unique_continuation(depth - 1, &format!("{prefix_b}br1_"))
3102                    .prop_map(Box::new);
3103                let default = prop::option::of(
3104                    arb_unique_continuation(depth - 1, &format!("{prefix_b}bd_"))
3105                        .prop_map(Box::new),
3106                );
3107                let next = prop::option::of(
3108                    arb_unique_continuation(depth - 1, &format!("{prefix_b}bn_"))
3109                        .prop_map(Box::new),
3110                );
3111                (b0, b1, default, next).prop_map(move |(branch0, branch1, default, next)| {
3112                    let mut branches = std::collections::HashMap::new();
3113                    branches.insert("k0".to_string(), branch0);
3114                    branches.insert("k1".to_string(), branch1);
3115                    SerializableContinuation::Branch {
3116                        id: id_b.clone(),
3117                        branches,
3118                        default,
3119                        next,
3120                    }
3121                })
3122            },
3123            // Loop with body and optional next
3124            {
3125                let id_l = id.clone();
3126                let prefix_l = prefix.to_string();
3127                let body = arb_unique_continuation(depth - 1, &format!("{prefix_l}lb_"))
3128                    .prop_map(Box::new);
3129                let next = prop::option::of(
3130                    arb_unique_continuation(depth - 1, &format!("{prefix_l}ln_"))
3131                        .prop_map(Box::new),
3132                );
3133                (
3134                    body,
3135                    1..100u32,
3136                    prop::bool::ANY.prop_map(|b| {
3137                        if b {
3138                            MaxIterationsPolicy::Fail
3139                        } else {
3140                            MaxIterationsPolicy::ExitWithLast
3141                        }
3142                    }),
3143                    next,
3144                )
3145                    .prop_map(move |(body, max_iterations, on_max, next)| {
3146                        SerializableContinuation::Loop {
3147                            id: id_l.clone(),
3148                            body,
3149                            max_iterations,
3150                            on_max,
3151                            next,
3152                        }
3153                    })
3154            },
3155            // ChildWorkflow with child and optional next
3156            {
3157                let id_cw = id;
3158                let prefix_cw = prefix.to_string();
3159                let child = arb_unique_continuation(depth - 1, &format!("{prefix_cw}cc_"))
3160                    .prop_map(Box::new);
3161                let next = prop::option::of(
3162                    arb_unique_continuation(depth - 1, &format!("{prefix_cw}cn_"))
3163                        .prop_map(Box::new),
3164                );
3165                (child, next).prop_map(move |(child, next)| {
3166                    SerializableContinuation::ChildWorkflow {
3167                        id: id_cw.clone(),
3168                        child,
3169                        next,
3170                    }
3171                })
3172            },
3173        ]
3174        .boxed()
3175    }
3176
3177    /// Collect all IDs in a continuation tree.
3178    fn collect_ids(cont: &SerializableContinuation) -> Vec<String> {
3179        let mut ids = vec![];
3180        fn walk(c: &SerializableContinuation, out: &mut Vec<String>) {
3181            match c {
3182                SerializableContinuation::Task { id, next, .. }
3183                | SerializableContinuation::Delay { id, next, .. }
3184                | SerializableContinuation::AwaitSignal { id, next, .. } => {
3185                    out.push(id.clone());
3186                    if let Some(n) = next {
3187                        walk(n, out);
3188                    }
3189                }
3190                SerializableContinuation::Fork { id, branches, join } => {
3191                    out.push(id.clone());
3192                    for b in branches {
3193                        walk(b, out);
3194                    }
3195                    if let Some(j) = join {
3196                        walk(j, out);
3197                    }
3198                }
3199                SerializableContinuation::Branch {
3200                    id,
3201                    branches,
3202                    default,
3203                    next,
3204                } => {
3205                    out.push(id.clone());
3206                    for b in branches.values() {
3207                        walk(b, out);
3208                    }
3209                    if let Some(d) = default {
3210                        walk(d, out);
3211                    }
3212                    if let Some(n) = next {
3213                        walk(n, out);
3214                    }
3215                }
3216                SerializableContinuation::Loop { id, body, next, .. } => {
3217                    out.push(id.clone());
3218                    walk(body, out);
3219                    if let Some(n) = next {
3220                        walk(n, out);
3221                    }
3222                }
3223                SerializableContinuation::ChildWorkflow { id, child, next } => {
3224                    out.push(id.clone());
3225                    walk(child, out);
3226                    if let Some(n) = next {
3227                        walk(n, out);
3228                    }
3229                }
3230            }
3231        }
3232        walk(cont, &mut ids);
3233        ids
3234    }
3235
3236    /// Inject a duplicate ID into a continuation by replacing the first node's ID.
3237    fn inject_duplicate(cont: &SerializableContinuation, dup_id: &str) -> SerializableContinuation {
3238        match cont {
3239            SerializableContinuation::Task {
3240                timeout_ms,
3241                retry_policy,
3242                version,
3243                next,
3244                ..
3245            } => SerializableContinuation::Task {
3246                id: dup_id.to_string(),
3247                timeout_ms: *timeout_ms,
3248                retry_policy: retry_policy.clone(),
3249                version: version.clone(),
3250                priority: None,
3251                tags: vec![],
3252                next: next.clone(),
3253            },
3254            SerializableContinuation::Fork { branches, join, .. } => {
3255                SerializableContinuation::Fork {
3256                    id: dup_id.to_string(),
3257                    branches: branches.clone(),
3258                    join: join.clone(),
3259                }
3260            }
3261            SerializableContinuation::Delay {
3262                duration_ms, next, ..
3263            } => SerializableContinuation::Delay {
3264                id: dup_id.to_string(),
3265                duration_ms: *duration_ms,
3266                next: next.clone(),
3267            },
3268            SerializableContinuation::AwaitSignal {
3269                signal_name,
3270                timeout_ms,
3271                next,
3272                ..
3273            } => SerializableContinuation::AwaitSignal {
3274                id: dup_id.to_string(),
3275                signal_name: signal_name.clone(),
3276                timeout_ms: *timeout_ms,
3277                next: next.clone(),
3278            },
3279            SerializableContinuation::Branch {
3280                branches,
3281                default,
3282                next,
3283                ..
3284            } => SerializableContinuation::Branch {
3285                id: dup_id.to_string(),
3286                branches: branches.clone(),
3287                default: default.clone(),
3288                next: next.clone(),
3289            },
3290            SerializableContinuation::Loop {
3291                body,
3292                max_iterations,
3293                on_max,
3294                next,
3295                ..
3296            } => SerializableContinuation::Loop {
3297                id: dup_id.to_string(),
3298                body: body.clone(),
3299                max_iterations: *max_iterations,
3300                on_max: *on_max,
3301                next: next.clone(),
3302            },
3303            SerializableContinuation::ChildWorkflow { child, next, .. } => {
3304                SerializableContinuation::ChildWorkflow {
3305                    id: dup_id.to_string(),
3306                    child: child.clone(),
3307                    next: next.clone(),
3308                }
3309            }
3310        }
3311    }
3312
3313    proptest! {
3314        // Property 4: `compute_definition_hash` is deterministic.
3315        #[test]
3316        fn hash_is_deterministic(cont in arb_continuation(3)) {
3317            let h1 = cont.compute_definition_hash();
3318            let h2 = cont.compute_definition_hash();
3319            prop_assert_eq!(h1, h2);
3320        }
3321
3322        // Property 5: serde roundtrip preserves the definition hash.
3323        #[test]
3324        fn serde_roundtrip_preserves_hash(cont in arb_continuation(3)) {
3325            let original_hash = cont.compute_definition_hash();
3326            let json = serde_json::to_string(&cont).unwrap();
3327            let recovered: SerializableContinuation = serde_json::from_str(&json).unwrap();
3328            prop_assert_eq!(original_hash, recovered.compute_definition_hash());
3329        }
3330
3331        // Property 6: a tree with guaranteed-unique IDs has no duplicates.
3332        #[test]
3333        fn unique_ids_means_none(cont in arb_unique_continuation(3, "r_")) {
3334            prop_assert!(cont.find_duplicate_id().is_none());
3335        }
3336
3337        // Property 7: injecting a duplicate ID is always detected.
3338        #[test]
3339        fn injected_duplicate_is_detected(cont in arb_unique_continuation(3, "r_")) {
3340            let ids = collect_ids(&cont);
3341            // Need at least 2 nodes to have a meaningful duplicate injection
3342            if ids.len() >= 2 {
3343                // Pick the second ID and inject it into the root (which has the first ID)
3344                let dup_id = &ids[1];
3345                let tampered = inject_duplicate(&cont, dup_id);
3346                prop_assert!(tampered.find_duplicate_id().is_some());
3347            }
3348        }
3349    }
3350}