Skip to main content

rustvello_core/
workflow.rs

1//! Workflow-related types and utilities.
2//!
3//! Provides deterministic execution capabilities for workflow replay,
4//! matching pynenc's `DeterministicExecutor`.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10use sha2::{Digest, Sha256};
11
12use crate::error::RustvelloResult;
13use crate::state_backend::StateBackend;
14use rustvello_proto::identifiers::InvocationId;
15
16/// Handles deterministic operations for workflow execution.
17///
18/// Mirrors pynenc's `DeterministicExecutor`. Ensures that operations like
19/// random number generation, time functions, and UUIDs behave deterministically
20/// across workflow replays by using deterministic seeds and storing results
21/// in the state backend.
22pub struct DeterministicExecutor {
23    workflow_id: InvocationId,
24    state_backend: Arc<dyn StateBackend>,
25    operation_counters: HashMap<String, u64>,
26}
27
28impl DeterministicExecutor {
29    /// Create a new deterministic executor for a workflow.
30    pub fn new(workflow_id: InvocationId, state_backend: Arc<dyn StateBackend>) -> Self {
31        Self {
32            workflow_id,
33            state_backend,
34            operation_counters: HashMap::new(),
35        }
36    }
37
38    /// Get the next sequence number for an operation type.
39    pub fn get_next_sequence(&mut self, operation: &str) -> u64 {
40        let counter = self
41            .operation_counters
42            .entry(operation.to_string())
43            .or_insert(0);
44        *counter += 1;
45        *counter
46    }
47
48    /// Get the current count for an operation type.
49    pub fn get_operation_count(&self, operation: &str) -> u64 {
50        self.operation_counters.get(operation).copied().unwrap_or(0)
51    }
52
53    /// Execute an operation with deterministic results.
54    ///
55    /// Checks if a value exists for this operation+sequence in the state backend.
56    /// If found, returns the stored value (replay mode). Otherwise, generates
57    /// a new value using the provided generator and stores it.
58    pub async fn deterministic_operation<F>(
59        &mut self,
60        operation: &str,
61        generator: F,
62    ) -> RustvelloResult<String>
63    where
64        F: FnOnce() -> String,
65    {
66        let sequence = self.get_next_sequence(operation);
67        let operation_key = format!("{operation}:{sequence}");
68
69        // Check for stored value (replay)
70        if let Some(value) = self
71            .state_backend
72            .get_workflow_data(&self.workflow_id, &operation_key)
73            .await?
74        {
75            return Ok(value);
76        }
77
78        // Generate new value and store
79        let value = generator();
80        self.state_backend
81            .set_workflow_data(&self.workflow_id, &operation_key, &value)
82            .await?;
83
84        // Update total count
85        let total_count_key = format!("counter:{operation}");
86        let current_total = self
87            .state_backend
88            .get_workflow_data(&self.workflow_id, &total_count_key)
89            .await?
90            .and_then(|s| s.parse::<u64>().ok())
91            .unwrap_or(0);
92        self.state_backend
93            .set_workflow_data(
94                &self.workflow_id,
95                &total_count_key,
96                &current_total.max(sequence).to_string(),
97            )
98            .await?;
99
100        Ok(value)
101    }
102
103    /// Get or establish the workflow base time for deterministic timestamps.
104    pub async fn get_base_time(&self) -> RustvelloResult<DateTime<Utc>> {
105        let base_time_key = "workflow:base_time";
106        if let Some(stored) = self
107            .state_backend
108            .get_workflow_data(&self.workflow_id, base_time_key)
109            .await?
110        {
111            let dt = DateTime::parse_from_rfc3339(&stored)
112                .map_or_else(|_| Utc::now(), |dt| dt.with_timezone(&Utc));
113            return Ok(dt);
114        }
115
116        let base_time = Utc::now();
117        self.state_backend
118            .set_workflow_data(&self.workflow_id, base_time_key, &base_time.to_rfc3339())
119            .await?;
120        Ok(base_time)
121    }
122
123    /// Generate a deterministic random number (0.0..1.0) using workflow-specific seed.
124    pub async fn random(&mut self) -> RustvelloResult<f64> {
125        let wf_id = self.workflow_id.as_str().to_owned();
126        let seq = self.get_operation_count("random") + 1;
127
128        let value_str = self
129            .deterministic_operation("random", move || {
130                let seed_string = format!("{wf_id}:random:{seq}");
131                let hash = Sha256::digest(seed_string.as_bytes());
132                // Use first 8 bytes as u64 seed, then map to [0, 1)
133                let bytes: [u8; 8] = hash[..8]
134                    .try_into()
135                    .expect("SHA-256 always produces ≥8 bytes");
136                let seed = u64::from_le_bytes(bytes);
137                let random_val = (seed as f64) / (u64::MAX as f64);
138                random_val.to_string()
139            })
140            .await?;
141
142        Ok(value_str.parse::<f64>().unwrap_or(0.0))
143    }
144
145    /// Get current time deterministically by advancing from base time.
146    pub async fn utc_now(&mut self) -> RustvelloResult<DateTime<Utc>> {
147        let base_time = self.get_base_time().await?;
148        let seq = self.get_operation_count("time") + 1;
149
150        let value_str = self
151            .deterministic_operation("time", move || {
152                let current_time = base_time + chrono::Duration::seconds(seq as i64);
153                current_time.to_rfc3339()
154            })
155            .await?;
156
157        let dt = DateTime::parse_from_rfc3339(&value_str)
158            .map_or_else(|_| Utc::now(), |dt| dt.with_timezone(&Utc));
159        Ok(dt)
160    }
161
162    /// Generate a deterministic UUID using workflow-specific seed.
163    pub async fn uuid(&mut self) -> RustvelloResult<String> {
164        let wf_id = self.workflow_id.as_str().to_owned();
165        let seq = self.get_operation_count("uuid") + 1;
166
167        self.deterministic_operation("uuid", move || {
168            let seed_string = format!("{wf_id}:uuid:{seq}");
169            let hash = Sha256::digest(seed_string.as_bytes());
170            // Use first 16 bytes to construct a UUID
171            let mut bytes = [0u8; 16];
172            bytes.copy_from_slice(&hash[..16]);
173            // Set version 4 and variant bits for valid UUID format
174            bytes[6] = (bytes[6] & 0x0f) | 0x40; // version 4
175            bytes[8] = (bytes[8] & 0x3f) | 0x80; // variant 1
176            let u = uuid::Uuid::from_bytes(bytes);
177            u.to_string()
178        })
179        .await
180    }
181}
182
183#[cfg(test)]
184#[allow(clippy::clone_on_ref_ptr)]
185mod tests {
186    use super::*;
187    use crate::state_backend::{StateBackendCore, StateBackendQuery, StateBackendRunner};
188
189    // Helper to create an in-memory state backend for testing.
190    // We use a simple inline implementation to avoid depending on rustvello-mem
191    // in the core crate.
192    struct TestStateBackend {
193        data: std::sync::Mutex<HashMap<String, HashMap<String, String>>>,
194    }
195
196    impl TestStateBackend {
197        fn new() -> Self {
198            Self {
199                data: std::sync::Mutex::new(HashMap::new()),
200            }
201        }
202    }
203
204    #[async_trait::async_trait]
205    impl StateBackendCore for TestStateBackend {
206        async fn upsert_invocation(
207            &self,
208            _inv: &rustvello_proto::invocation::InvocationDTO,
209            _call: &rustvello_proto::call::CallDTO,
210        ) -> RustvelloResult<()> {
211            Ok(())
212        }
213        async fn get_invocation(
214            &self,
215            id: &InvocationId,
216        ) -> RustvelloResult<rustvello_proto::invocation::InvocationDTO> {
217            Err(crate::error::RustvelloError::InvocationNotFound {
218                invocation_id: id.clone(),
219            })
220        }
221        async fn get_call(
222            &self,
223            id: &rustvello_proto::identifiers::CallId,
224        ) -> RustvelloResult<rustvello_proto::call::CallDTO> {
225            Err(crate::error::RustvelloError::Internal {
226                message: format!("call not found: {id}"),
227            })
228        }
229        async fn store_result(&self, _id: &InvocationId, _r: &str) -> RustvelloResult<()> {
230            Ok(())
231        }
232        async fn get_result(&self, _id: &InvocationId) -> RustvelloResult<Option<String>> {
233            Ok(None)
234        }
235        async fn store_error(
236            &self,
237            _id: &InvocationId,
238            _e: &crate::error::TaskError,
239        ) -> RustvelloResult<()> {
240            Ok(())
241        }
242        async fn get_error(
243            &self,
244            _id: &InvocationId,
245        ) -> RustvelloResult<Option<crate::error::TaskError>> {
246            Ok(None)
247        }
248        async fn add_history(
249            &self,
250            _h: &rustvello_proto::invocation::InvocationHistory,
251        ) -> RustvelloResult<()> {
252            Ok(())
253        }
254        async fn get_history(
255            &self,
256            _id: &InvocationId,
257        ) -> RustvelloResult<Vec<rustvello_proto::invocation::InvocationHistory>> {
258            Ok(Vec::new())
259        }
260        async fn purge(&self) -> RustvelloResult<()> {
261            self.data.lock().unwrap().clear();
262            Ok(())
263        }
264    }
265
266    #[async_trait::async_trait]
267    impl StateBackendQuery for TestStateBackend {
268        async fn set_workflow_data(
269            &self,
270            workflow_id: &InvocationId,
271            key: &str,
272            value: &str,
273        ) -> RustvelloResult<()> {
274            self.data
275                .lock()
276                .unwrap()
277                .entry(workflow_id.as_str().to_string())
278                .or_default()
279                .insert(key.to_string(), value.to_string());
280            Ok(())
281        }
282
283        async fn get_workflow_data(
284            &self,
285            workflow_id: &InvocationId,
286            key: &str,
287        ) -> RustvelloResult<Option<String>> {
288            Ok(self
289                .data
290                .lock()
291                .unwrap()
292                .get(&workflow_id.as_str().to_string())
293                .and_then(|m| m.get(key).cloned()))
294        }
295
296        async fn get_workflow_invocations(
297            &self,
298            _workflow_id: &InvocationId,
299        ) -> RustvelloResult<Vec<InvocationId>> {
300            Err(crate::error::RustvelloError::NotSupported {
301                backend: "TestStateBackend".into(),
302                method: "get_workflow_invocations".into(),
303            })
304        }
305        async fn get_child_invocations(
306            &self,
307            _parent_invocation_id: &InvocationId,
308        ) -> RustvelloResult<Vec<InvocationId>> {
309            Err(crate::error::RustvelloError::NotSupported {
310                backend: "TestStateBackend".into(),
311                method: "get_child_invocations".into(),
312            })
313        }
314        async fn store_workflow_run(
315            &self,
316            _workflow: &rustvello_proto::invocation::WorkflowIdentity,
317        ) -> RustvelloResult<()> {
318            Err(crate::error::RustvelloError::NotSupported {
319                backend: "TestStateBackend".into(),
320                method: "store_workflow_run".into(),
321            })
322        }
323        async fn get_all_workflow_types(
324            &self,
325        ) -> RustvelloResult<Vec<rustvello_proto::identifiers::TaskId>> {
326            Err(crate::error::RustvelloError::NotSupported {
327                backend: "TestStateBackend".into(),
328                method: "get_all_workflow_types".into(),
329            })
330        }
331        async fn get_workflow_runs(
332            &self,
333            _workflow_type: &rustvello_proto::identifiers::TaskId,
334        ) -> RustvelloResult<Vec<rustvello_proto::invocation::WorkflowIdentity>> {
335            Err(crate::error::RustvelloError::NotSupported {
336                backend: "TestStateBackend".into(),
337                method: "get_workflow_runs".into(),
338            })
339        }
340        async fn store_app_info(&self, _app_id: &str, _info_json: &str) -> RustvelloResult<()> {
341            Err(crate::error::RustvelloError::NotSupported {
342                backend: "TestStateBackend".into(),
343                method: "store_app_info".into(),
344            })
345        }
346        async fn get_app_info(&self, _app_id: &str) -> RustvelloResult<Option<String>> {
347            Err(crate::error::RustvelloError::NotSupported {
348                backend: "TestStateBackend".into(),
349                method: "get_app_info".into(),
350            })
351        }
352        async fn get_all_app_infos(&self) -> RustvelloResult<Vec<(String, String)>> {
353            Err(crate::error::RustvelloError::NotSupported {
354                backend: "TestStateBackend".into(),
355                method: "get_all_app_infos".into(),
356            })
357        }
358        async fn store_workflow_sub_invocation(
359            &self,
360            _workflow_id: &InvocationId,
361            _sub_inv_id: &InvocationId,
362        ) -> RustvelloResult<()> {
363            Err(crate::error::RustvelloError::NotSupported {
364                backend: "TestStateBackend".into(),
365                method: "store_workflow_sub_invocation".into(),
366            })
367        }
368        async fn get_workflow_sub_invocations(
369            &self,
370            _workflow_id: &InvocationId,
371        ) -> RustvelloResult<Vec<InvocationId>> {
372            Err(crate::error::RustvelloError::NotSupported {
373                backend: "TestStateBackend".into(),
374                method: "get_workflow_sub_invocations".into(),
375            })
376        }
377    }
378
379    #[async_trait::async_trait]
380    impl StateBackendRunner for TestStateBackend {
381        async fn store_runner_context(
382            &self,
383            _context: &crate::state_backend::StoredRunnerContext,
384        ) -> RustvelloResult<()> {
385            Err(crate::error::RustvelloError::NotSupported {
386                backend: "TestStateBackend".into(),
387                method: "store_runner_context".into(),
388            })
389        }
390        async fn get_runner_context(
391            &self,
392            _runner_id: &str,
393        ) -> RustvelloResult<Option<crate::state_backend::StoredRunnerContext>> {
394            Err(crate::error::RustvelloError::NotSupported {
395                backend: "TestStateBackend".into(),
396                method: "get_runner_context".into(),
397            })
398        }
399        async fn get_runner_contexts_by_parent(
400            &self,
401            _parent_runner_id: &str,
402        ) -> RustvelloResult<Vec<crate::state_backend::StoredRunnerContext>> {
403            Err(crate::error::RustvelloError::NotSupported {
404                backend: "TestStateBackend".into(),
405                method: "get_runner_contexts_by_parent".into(),
406            })
407        }
408        async fn get_invocation_ids_by_runner(
409            &self,
410            _runner_id: &str,
411            _limit: usize,
412            _offset: usize,
413        ) -> RustvelloResult<Vec<InvocationId>> {
414            Err(crate::error::RustvelloError::NotSupported {
415                backend: "TestStateBackend".into(),
416                method: "get_invocation_ids_by_runner".into(),
417            })
418        }
419        async fn count_invocations_by_runner(&self, _runner_id: &str) -> RustvelloResult<usize> {
420            Err(crate::error::RustvelloError::NotSupported {
421                backend: "TestStateBackend".into(),
422                method: "count_invocations_by_runner".into(),
423            })
424        }
425        async fn get_history_in_timerange(
426            &self,
427            _start: chrono::DateTime<chrono::Utc>,
428            _end: chrono::DateTime<chrono::Utc>,
429            _limit: usize,
430            _offset: usize,
431        ) -> RustvelloResult<Vec<rustvello_proto::invocation::InvocationHistory>> {
432            Err(crate::error::RustvelloError::NotSupported {
433                backend: "TestStateBackend".into(),
434                method: "get_history_in_timerange".into(),
435            })
436        }
437        async fn get_matching_runner_contexts(
438            &self,
439            _partial_id: &str,
440        ) -> RustvelloResult<Vec<crate::state_backend::StoredRunnerContext>> {
441            Err(crate::error::RustvelloError::NotSupported {
442                backend: "TestStateBackend".into(),
443                method: "get_matching_runner_contexts".into(),
444            })
445        }
446    }
447
448    fn make_executor() -> (DeterministicExecutor, InvocationId) {
449        let wf_id = InvocationId::from_string("test-workflow-001".to_string());
450        let backend = Arc::new(TestStateBackend::new());
451        let executor = DeterministicExecutor::new(wf_id.clone(), backend);
452        (executor, wf_id)
453    }
454
455    // --- Sequence tracking tests ---
456
457    #[test]
458    fn sequence_increments_correctly() {
459        let (mut executor, _) = make_executor();
460        assert_eq!(executor.get_next_sequence("test_op"), 1);
461        assert_eq!(executor.get_next_sequence("test_op"), 2);
462        assert_eq!(executor.get_next_sequence("other_op"), 1);
463        assert_eq!(executor.get_next_sequence("test_op"), 3);
464    }
465
466    #[test]
467    fn operation_count_retrieval() {
468        let (mut executor, _) = make_executor();
469        assert_eq!(executor.get_operation_count("test_op"), 0);
470        executor.get_next_sequence("test_op");
471        executor.get_next_sequence("test_op");
472        assert_eq!(executor.get_operation_count("test_op"), 2);
473    }
474
475    #[test]
476    fn operation_count_per_instance() {
477        let (mut exec1, wf_id) = make_executor();
478        exec1.get_next_sequence("test");
479        exec1.get_next_sequence("test");
480        assert_eq!(exec1.get_operation_count("test"), 2);
481
482        let exec2 = DeterministicExecutor::new(wf_id, Arc::new(TestStateBackend::new()));
483        assert_eq!(exec2.get_operation_count("test"), 0);
484    }
485
486    #[test]
487    fn operation_count_isolated_by_type() {
488        let (mut executor, _) = make_executor();
489        executor.get_next_sequence("random");
490        executor.get_next_sequence("random");
491        executor.get_next_sequence("time");
492        assert_eq!(executor.get_operation_count("random"), 2);
493        assert_eq!(executor.get_operation_count("time"), 1);
494        assert_eq!(executor.get_operation_count("uuid"), 0);
495    }
496
497    // --- Deterministic operation tests ---
498
499    #[tokio::test]
500    async fn stores_and_retrieves_values() {
501        let (mut executor, wf_id) = make_executor();
502        let backend = executor.state_backend.clone();
503
504        let result = executor
505            .deterministic_operation("test", || "generated_value_1".to_string())
506            .await
507            .unwrap();
508        assert_eq!(result, "generated_value_1");
509
510        let stored = backend.get_workflow_data(&wf_id, "test:1").await.unwrap();
511        assert_eq!(stored, Some("generated_value_1".to_string()));
512    }
513
514    #[tokio::test]
515    async fn creates_unique_sequences() {
516        let (mut executor, wf_id) = make_executor();
517        let backend = executor.state_backend.clone();
518        let mut counter = 0u32;
519
520        let r1 = executor
521            .deterministic_operation("test", || {
522                counter += 1;
523                format!("value_{counter}")
524            })
525            .await
526            .unwrap();
527        let r2 = executor
528            .deterministic_operation("test", || {
529                counter += 1;
530                format!("value_{counter}")
531            })
532            .await
533            .unwrap();
534        let r3 = executor
535            .deterministic_operation("test", || {
536                counter += 1;
537                format!("value_{counter}")
538            })
539            .await
540            .unwrap();
541
542        assert_eq!(r1, "value_1");
543        assert_eq!(r2, "value_2");
544        assert_eq!(r3, "value_3");
545
546        assert_eq!(
547            backend.get_workflow_data(&wf_id, "test:1").await.unwrap(),
548            Some("value_1".to_string())
549        );
550        assert_eq!(
551            backend.get_workflow_data(&wf_id, "test:2").await.unwrap(),
552            Some("value_2".to_string())
553        );
554        assert_eq!(
555            backend.get_workflow_data(&wf_id, "test:3").await.unwrap(),
556            Some("value_3".to_string())
557        );
558    }
559
560    #[tokio::test]
561    async fn replays_stored_values() {
562        let wf_id = InvocationId::from_string("test-wf-replay".to_string());
563        let backend = Arc::new(TestStateBackend::new());
564
565        // First executor: generate values
566        let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
567        let r1 = exec1
568            .deterministic_operation("test", || "fresh_1".to_string())
569            .await
570            .unwrap();
571        let r2 = exec1
572            .deterministic_operation("test", || "fresh_2".to_string())
573            .await
574            .unwrap();
575        assert_eq!(r1, "fresh_1");
576        assert_eq!(r2, "fresh_2");
577
578        // Second executor: replay without calling generators
579        let mut exec2 = DeterministicExecutor::new(wf_id, backend);
580        let mut gen_called = false;
581        let replay1 = exec2
582            .deterministic_operation("test", || {
583                gen_called = true;
584                "should_not_appear".to_string()
585            })
586            .await
587            .unwrap();
588        assert_eq!(replay1, "fresh_1");
589        assert!(!gen_called, "Generator should not be called during replay");
590
591        let replay2 = exec2
592            .deterministic_operation("test", || {
593                gen_called = true;
594                "should_not_appear".to_string()
595            })
596            .await
597            .unwrap();
598        assert_eq!(replay2, "fresh_2");
599        assert!(!gen_called);
600    }
601
602    #[tokio::test]
603    async fn handles_partial_replay_then_generation() {
604        let wf_id = InvocationId::from_string("test-wf-partial".to_string());
605        let backend = Arc::new(TestStateBackend::new());
606
607        // First executor: generate 2 values
608        let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
609        let r1 = exec1
610            .deterministic_operation("test", || "value_1".to_string())
611            .await
612            .unwrap();
613        let r2 = exec1
614            .deterministic_operation("test", || "value_2".to_string())
615            .await
616            .unwrap();
617
618        // Second executor: replay 2, then generate 1 new
619        let mut exec2 = DeterministicExecutor::new(wf_id, backend);
620        let replay1 = exec2
621            .deterministic_operation("test", || "new_1".to_string())
622            .await
623            .unwrap();
624        let replay2 = exec2
625            .deterministic_operation("test", || "new_2".to_string())
626            .await
627            .unwrap();
628        let new_val = exec2
629            .deterministic_operation("test", || "value_3".to_string())
630            .await
631            .unwrap();
632
633        assert_eq!(replay1, r1);
634        assert_eq!(replay2, r2);
635        assert_eq!(new_val, "value_3");
636    }
637
638    #[tokio::test]
639    async fn isolated_by_operation_type() {
640        let (mut executor, wf_id) = make_executor();
641        let backend = executor.state_backend.clone();
642
643        executor
644            .deterministic_operation("type_a", || "a_value".to_string())
645            .await
646            .unwrap();
647        executor
648            .deterministic_operation("type_b", || "b_value".to_string())
649            .await
650            .unwrap();
651        executor
652            .deterministic_operation("type_a", || "a_value_2".to_string())
653            .await
654            .unwrap();
655
656        assert_eq!(
657            backend.get_workflow_data(&wf_id, "type_a:1").await.unwrap(),
658            Some("a_value".to_string())
659        );
660        assert_eq!(
661            backend.get_workflow_data(&wf_id, "type_b:1").await.unwrap(),
662            Some("b_value".to_string())
663        );
664        assert_eq!(
665            backend.get_workflow_data(&wf_id, "type_a:2").await.unwrap(),
666            Some("a_value_2".to_string())
667        );
668    }
669
670    // --- Built-in function tests ---
671
672    #[tokio::test]
673    async fn base_time_establishment() {
674        let (executor, _) = make_executor();
675        let base1 = executor.get_base_time().await.unwrap();
676        assert!(base1.timezone() == Utc);
677        let base2 = executor.get_base_time().await.unwrap();
678        assert_eq!(base1, base2);
679    }
680
681    #[tokio::test]
682    async fn deterministic_random_generation() {
683        let wf_id = InvocationId::from_string("test-wf-random".to_string());
684        let backend = Arc::new(TestStateBackend::new());
685
686        let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
687        let randoms: Vec<f64> = {
688            let mut v = Vec::new();
689            for _ in 0..5 {
690                v.push(exec1.random().await.unwrap());
691            }
692            v
693        };
694
695        // All in range [0, 1)
696        assert!(randoms.iter().all(|&r| (0.0..=1.0).contains(&r)));
697        // All unique
698        let unique: std::collections::HashSet<u64> = randoms.iter().map(|r| r.to_bits()).collect();
699        assert_eq!(unique.len(), 5);
700
701        // Replay produces same values
702        let mut exec2 = DeterministicExecutor::new(wf_id, backend);
703        for (i, &original) in randoms.iter().enumerate() {
704            let replayed = exec2.random().await.unwrap();
705            assert_eq!(original, replayed, "random mismatch at index {i}");
706        }
707    }
708
709    #[tokio::test]
710    async fn deterministic_time_progression() {
711        let wf_id = InvocationId::from_string("test-wf-time".to_string());
712        let backend = Arc::new(TestStateBackend::new());
713
714        let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
715        let times: Vec<DateTime<Utc>> = {
716            let mut v = Vec::new();
717            for _ in 0..3 {
718                v.push(exec1.utc_now().await.unwrap());
719            }
720            v
721        };
722
723        // All UTC
724        assert!(times.iter().all(|t| t.timezone() == Utc));
725        // Strictly ascending
726        assert!(times.windows(2).all(|w| w[0] < w[1]));
727        // At or after base time
728        let base = exec1.get_base_time().await.unwrap();
729        assert!(times[0] >= base);
730
731        // Replay produces same values
732        let mut exec2 = DeterministicExecutor::new(wf_id, backend);
733        for (i, &original) in times.iter().enumerate() {
734            let replayed = exec2.utc_now().await.unwrap();
735            assert_eq!(original, replayed, "time mismatch at index {i}");
736        }
737    }
738
739    #[tokio::test]
740    async fn deterministic_uuid_generation() {
741        let wf_id = InvocationId::from_string("test-wf-uuid".to_string());
742        let backend = Arc::new(TestStateBackend::new());
743
744        let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
745        let uuids: Vec<String> = {
746            let mut v = Vec::new();
747            for _ in 0..3 {
748                v.push(exec1.uuid().await.unwrap());
749            }
750            v
751        };
752
753        // Valid UUID format
754        assert!(uuids
755            .iter()
756            .all(|u| u.len() == 36 && u.chars().filter(|&c| c == '-').count() == 4));
757        // All unique
758        let unique: std::collections::HashSet<&String> = uuids.iter().collect();
759        assert_eq!(unique.len(), 3);
760
761        // Replay produces same values
762        let mut exec2 = DeterministicExecutor::new(wf_id, backend);
763        for (i, original) in uuids.iter().enumerate() {
764            let replayed = exec2.uuid().await.unwrap();
765            assert_eq!(original, &replayed, "uuid mismatch at index {i}");
766        }
767    }
768
769    #[tokio::test]
770    async fn mixed_deterministic_operations_sequence() {
771        let wf_id = InvocationId::from_string("test-wf-mixed".to_string());
772        let backend = Arc::new(TestStateBackend::new());
773
774        let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
775        let random1 = exec1.random().await.unwrap();
776        let time1 = exec1.utc_now().await.unwrap();
777        let uuid1 = exec1.uuid().await.unwrap();
778        let random2 = exec1.random().await.unwrap();
779        let time2 = exec1.utc_now().await.unwrap();
780
781        assert_ne!(random1, random2);
782        assert_ne!(time1, time2);
783
784        // Replay
785        let mut exec2 = DeterministicExecutor::new(wf_id, backend);
786        assert_eq!(random1, exec2.random().await.unwrap());
787        assert_eq!(time1, exec2.utc_now().await.unwrap());
788        assert_eq!(uuid1, exec2.uuid().await.unwrap());
789        assert_eq!(random2, exec2.random().await.unwrap());
790        assert_eq!(time2, exec2.utc_now().await.unwrap());
791    }
792
793    // --- Replay behavior tests ---
794
795    #[tokio::test]
796    async fn complete_workflow_replay() {
797        let wf_id = InvocationId::from_string("test-wf-complete".to_string());
798        let backend = Arc::new(TestStateBackend::new());
799
800        let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
801        let original_random = exec1.random().await.unwrap();
802        let original_time = exec1.utc_now().await.unwrap();
803        let original_uuid = exec1.uuid().await.unwrap();
804        let original_custom = exec1
805            .deterministic_operation("custom", || "custom_1".to_string())
806            .await
807            .unwrap();
808
809        let mut exec2 = DeterministicExecutor::new(wf_id, backend);
810        assert_eq!(original_random, exec2.random().await.unwrap());
811        assert_eq!(original_time, exec2.utc_now().await.unwrap());
812        assert_eq!(original_uuid, exec2.uuid().await.unwrap());
813        let replay_custom = exec2
814            .deterministic_operation("custom", || "should_not_appear".to_string())
815            .await
816            .unwrap();
817        assert_eq!(original_custom, replay_custom);
818    }
819
820    #[tokio::test]
821    async fn counter_consistency_across_replay() {
822        let wf_id = InvocationId::from_string("test-wf-counter".to_string());
823        let backend = Arc::new(TestStateBackend::new());
824
825        let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
826        for _ in 0..3 {
827            exec1
828                .deterministic_operation("test", || "val".to_string())
829                .await
830                .unwrap();
831        }
832        assert_eq!(exec1.get_operation_count("test"), 3);
833
834        let mut exec2 = DeterministicExecutor::new(wf_id, backend);
835        for _ in 0..3 {
836            exec2
837                .deterministic_operation("test", || "val".to_string())
838                .await
839                .unwrap();
840        }
841        assert_eq!(exec2.get_operation_count("test"), 3);
842
843        exec2
844            .deterministic_operation("test", || "val_4".to_string())
845            .await
846            .unwrap();
847        assert_eq!(exec2.get_operation_count("test"), 4);
848    }
849
850    // --- Workflow isolation tests ---
851
852    #[tokio::test]
853    async fn workflow_isolation() {
854        let backend = Arc::new(TestStateBackend::new());
855        let wf1_id = InvocationId::from_string("workflow-1".to_string());
856        let wf2_id = InvocationId::from_string("workflow-2".to_string());
857
858        let mut exec1 = DeterministicExecutor::new(wf1_id.clone(), backend.clone());
859        let mut exec2 = DeterministicExecutor::new(wf2_id, backend.clone());
860
861        let randoms1: Vec<f64> = {
862            let mut v = Vec::new();
863            for _ in 0..3 {
864                v.push(exec1.random().await.unwrap());
865            }
866            v
867        };
868        let randoms2: Vec<f64> = {
869            let mut v = Vec::new();
870            for _ in 0..3 {
871                v.push(exec2.random().await.unwrap());
872            }
873            v
874        };
875
876        // Different workflows produce different values
877        assert_ne!(randoms1, randoms2);
878
879        // Same workflow replays same values
880        let mut exec1_replay = DeterministicExecutor::new(wf1_id, backend);
881        let replayed: Vec<f64> = {
882            let mut v = Vec::new();
883            for _ in 0..3 {
884                v.push(exec1_replay.random().await.unwrap());
885            }
886            v
887        };
888        assert_eq!(randoms1, replayed);
889    }
890
891    #[tokio::test]
892    async fn state_backend_basic_operations() {
893        let wf_id = InvocationId::from_string("test-wf-basic".to_string());
894        let backend = Arc::new(TestStateBackend::new());
895
896        backend
897            .set_workflow_data(&wf_id, "test_key", "test_value")
898            .await
899            .unwrap();
900
901        let retrieved = backend.get_workflow_data(&wf_id, "test_key").await.unwrap();
902        assert_eq!(retrieved, Some("test_value".to_string()));
903
904        let non_existent = backend
905            .get_workflow_data(&wf_id, "non_existent")
906            .await
907            .unwrap();
908        assert_eq!(non_existent, None);
909    }
910}