1use std::collections::HashMap;
7use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10use sha2::{Digest, Sha256};
11
12use crate::error::RustvelloResult;
13use crate::state_backend::StateBackend;
14use rustvello_proto::identifiers::InvocationId;
15
16pub struct DeterministicExecutor {
23 workflow_id: InvocationId,
24 state_backend: Arc<dyn StateBackend>,
25 operation_counters: HashMap<String, u64>,
26}
27
28impl DeterministicExecutor {
29 pub fn new(workflow_id: InvocationId, state_backend: Arc<dyn StateBackend>) -> Self {
31 Self {
32 workflow_id,
33 state_backend,
34 operation_counters: HashMap::new(),
35 }
36 }
37
38 pub fn get_next_sequence(&mut self, operation: &str) -> u64 {
40 let counter = self
41 .operation_counters
42 .entry(operation.to_string())
43 .or_insert(0);
44 *counter += 1;
45 *counter
46 }
47
48 pub fn get_operation_count(&self, operation: &str) -> u64 {
50 self.operation_counters.get(operation).copied().unwrap_or(0)
51 }
52
53 pub async fn deterministic_operation<F>(
59 &mut self,
60 operation: &str,
61 generator: F,
62 ) -> RustvelloResult<String>
63 where
64 F: FnOnce() -> String,
65 {
66 let sequence = self.get_next_sequence(operation);
67 let operation_key = format!("{operation}:{sequence}");
68
69 if let Some(value) = self
71 .state_backend
72 .get_workflow_data(&self.workflow_id, &operation_key)
73 .await?
74 {
75 return Ok(value);
76 }
77
78 let value = generator();
80 self.state_backend
81 .set_workflow_data(&self.workflow_id, &operation_key, &value)
82 .await?;
83
84 let total_count_key = format!("counter:{operation}");
86 let current_total = self
87 .state_backend
88 .get_workflow_data(&self.workflow_id, &total_count_key)
89 .await?
90 .and_then(|s| s.parse::<u64>().ok())
91 .unwrap_or(0);
92 self.state_backend
93 .set_workflow_data(
94 &self.workflow_id,
95 &total_count_key,
96 ¤t_total.max(sequence).to_string(),
97 )
98 .await?;
99
100 Ok(value)
101 }
102
103 pub async fn get_base_time(&self) -> RustvelloResult<DateTime<Utc>> {
105 let base_time_key = "workflow:base_time";
106 if let Some(stored) = self
107 .state_backend
108 .get_workflow_data(&self.workflow_id, base_time_key)
109 .await?
110 {
111 let dt = DateTime::parse_from_rfc3339(&stored)
112 .map_or_else(|_| Utc::now(), |dt| dt.with_timezone(&Utc));
113 return Ok(dt);
114 }
115
116 let base_time = Utc::now();
117 self.state_backend
118 .set_workflow_data(&self.workflow_id, base_time_key, &base_time.to_rfc3339())
119 .await?;
120 Ok(base_time)
121 }
122
123 pub async fn random(&mut self) -> RustvelloResult<f64> {
125 let wf_id = self.workflow_id.as_str().to_owned();
126 let seq = self.get_operation_count("random") + 1;
127
128 let value_str = self
129 .deterministic_operation("random", move || {
130 let seed_string = format!("{wf_id}:random:{seq}");
131 let hash = Sha256::digest(seed_string.as_bytes());
132 let bytes: [u8; 8] = hash[..8]
134 .try_into()
135 .expect("SHA-256 always produces ≥8 bytes");
136 let seed = u64::from_le_bytes(bytes);
137 let random_val = (seed as f64) / (u64::MAX as f64);
138 random_val.to_string()
139 })
140 .await?;
141
142 Ok(value_str.parse::<f64>().unwrap_or(0.0))
143 }
144
145 pub async fn utc_now(&mut self) -> RustvelloResult<DateTime<Utc>> {
147 let base_time = self.get_base_time().await?;
148 let seq = self.get_operation_count("time") + 1;
149
150 let value_str = self
151 .deterministic_operation("time", move || {
152 let current_time = base_time + chrono::Duration::seconds(seq as i64);
153 current_time.to_rfc3339()
154 })
155 .await?;
156
157 let dt = DateTime::parse_from_rfc3339(&value_str)
158 .map_or_else(|_| Utc::now(), |dt| dt.with_timezone(&Utc));
159 Ok(dt)
160 }
161
162 pub async fn uuid(&mut self) -> RustvelloResult<String> {
164 let wf_id = self.workflow_id.as_str().to_owned();
165 let seq = self.get_operation_count("uuid") + 1;
166
167 self.deterministic_operation("uuid", move || {
168 let seed_string = format!("{wf_id}:uuid:{seq}");
169 let hash = Sha256::digest(seed_string.as_bytes());
170 let mut bytes = [0u8; 16];
172 bytes.copy_from_slice(&hash[..16]);
173 bytes[6] = (bytes[6] & 0x0f) | 0x40; bytes[8] = (bytes[8] & 0x3f) | 0x80; let u = uuid::Uuid::from_bytes(bytes);
177 u.to_string()
178 })
179 .await
180 }
181}
182
183#[cfg(test)]
184#[allow(clippy::clone_on_ref_ptr)]
185mod tests {
186 use super::*;
187 use crate::state_backend::{StateBackendCore, StateBackendQuery, StateBackendRunner};
188
189 struct TestStateBackend {
193 data: std::sync::Mutex<HashMap<String, HashMap<String, String>>>,
194 }
195
196 impl TestStateBackend {
197 fn new() -> Self {
198 Self {
199 data: std::sync::Mutex::new(HashMap::new()),
200 }
201 }
202 }
203
204 #[async_trait::async_trait]
205 impl StateBackendCore for TestStateBackend {
206 async fn upsert_invocation(
207 &self,
208 _inv: &rustvello_proto::invocation::InvocationDTO,
209 _call: &rustvello_proto::call::CallDTO,
210 ) -> RustvelloResult<()> {
211 Ok(())
212 }
213 async fn get_invocation(
214 &self,
215 id: &InvocationId,
216 ) -> RustvelloResult<rustvello_proto::invocation::InvocationDTO> {
217 Err(crate::error::RustvelloError::InvocationNotFound {
218 invocation_id: id.clone(),
219 })
220 }
221 async fn get_call(
222 &self,
223 id: &rustvello_proto::identifiers::CallId,
224 ) -> RustvelloResult<rustvello_proto::call::CallDTO> {
225 Err(crate::error::RustvelloError::Internal {
226 message: format!("call not found: {id}"),
227 })
228 }
229 async fn store_result(&self, _id: &InvocationId, _r: &str) -> RustvelloResult<()> {
230 Ok(())
231 }
232 async fn get_result(&self, _id: &InvocationId) -> RustvelloResult<Option<String>> {
233 Ok(None)
234 }
235 async fn store_error(
236 &self,
237 _id: &InvocationId,
238 _e: &crate::error::TaskError,
239 ) -> RustvelloResult<()> {
240 Ok(())
241 }
242 async fn get_error(
243 &self,
244 _id: &InvocationId,
245 ) -> RustvelloResult<Option<crate::error::TaskError>> {
246 Ok(None)
247 }
248 async fn add_history(
249 &self,
250 _h: &rustvello_proto::invocation::InvocationHistory,
251 ) -> RustvelloResult<()> {
252 Ok(())
253 }
254 async fn get_history(
255 &self,
256 _id: &InvocationId,
257 ) -> RustvelloResult<Vec<rustvello_proto::invocation::InvocationHistory>> {
258 Ok(Vec::new())
259 }
260 async fn purge(&self) -> RustvelloResult<()> {
261 self.data.lock().unwrap().clear();
262 Ok(())
263 }
264 }
265
266 #[async_trait::async_trait]
267 impl StateBackendQuery for TestStateBackend {
268 async fn set_workflow_data(
269 &self,
270 workflow_id: &InvocationId,
271 key: &str,
272 value: &str,
273 ) -> RustvelloResult<()> {
274 self.data
275 .lock()
276 .unwrap()
277 .entry(workflow_id.as_str().to_string())
278 .or_default()
279 .insert(key.to_string(), value.to_string());
280 Ok(())
281 }
282
283 async fn get_workflow_data(
284 &self,
285 workflow_id: &InvocationId,
286 key: &str,
287 ) -> RustvelloResult<Option<String>> {
288 Ok(self
289 .data
290 .lock()
291 .unwrap()
292 .get(&workflow_id.as_str().to_string())
293 .and_then(|m| m.get(key).cloned()))
294 }
295
296 async fn get_workflow_invocations(
297 &self,
298 _workflow_id: &InvocationId,
299 ) -> RustvelloResult<Vec<InvocationId>> {
300 Err(crate::error::RustvelloError::NotSupported {
301 backend: "TestStateBackend".into(),
302 method: "get_workflow_invocations".into(),
303 })
304 }
305 async fn get_child_invocations(
306 &self,
307 _parent_invocation_id: &InvocationId,
308 ) -> RustvelloResult<Vec<InvocationId>> {
309 Err(crate::error::RustvelloError::NotSupported {
310 backend: "TestStateBackend".into(),
311 method: "get_child_invocations".into(),
312 })
313 }
314 async fn store_workflow_run(
315 &self,
316 _workflow: &rustvello_proto::invocation::WorkflowIdentity,
317 ) -> RustvelloResult<()> {
318 Err(crate::error::RustvelloError::NotSupported {
319 backend: "TestStateBackend".into(),
320 method: "store_workflow_run".into(),
321 })
322 }
323 async fn get_all_workflow_types(
324 &self,
325 ) -> RustvelloResult<Vec<rustvello_proto::identifiers::TaskId>> {
326 Err(crate::error::RustvelloError::NotSupported {
327 backend: "TestStateBackend".into(),
328 method: "get_all_workflow_types".into(),
329 })
330 }
331 async fn get_workflow_runs(
332 &self,
333 _workflow_type: &rustvello_proto::identifiers::TaskId,
334 ) -> RustvelloResult<Vec<rustvello_proto::invocation::WorkflowIdentity>> {
335 Err(crate::error::RustvelloError::NotSupported {
336 backend: "TestStateBackend".into(),
337 method: "get_workflow_runs".into(),
338 })
339 }
340 async fn store_app_info(&self, _app_id: &str, _info_json: &str) -> RustvelloResult<()> {
341 Err(crate::error::RustvelloError::NotSupported {
342 backend: "TestStateBackend".into(),
343 method: "store_app_info".into(),
344 })
345 }
346 async fn get_app_info(&self, _app_id: &str) -> RustvelloResult<Option<String>> {
347 Err(crate::error::RustvelloError::NotSupported {
348 backend: "TestStateBackend".into(),
349 method: "get_app_info".into(),
350 })
351 }
352 async fn get_all_app_infos(&self) -> RustvelloResult<Vec<(String, String)>> {
353 Err(crate::error::RustvelloError::NotSupported {
354 backend: "TestStateBackend".into(),
355 method: "get_all_app_infos".into(),
356 })
357 }
358 async fn store_workflow_sub_invocation(
359 &self,
360 _workflow_id: &InvocationId,
361 _sub_inv_id: &InvocationId,
362 ) -> RustvelloResult<()> {
363 Err(crate::error::RustvelloError::NotSupported {
364 backend: "TestStateBackend".into(),
365 method: "store_workflow_sub_invocation".into(),
366 })
367 }
368 async fn get_workflow_sub_invocations(
369 &self,
370 _workflow_id: &InvocationId,
371 ) -> RustvelloResult<Vec<InvocationId>> {
372 Err(crate::error::RustvelloError::NotSupported {
373 backend: "TestStateBackend".into(),
374 method: "get_workflow_sub_invocations".into(),
375 })
376 }
377 }
378
379 #[async_trait::async_trait]
380 impl StateBackendRunner for TestStateBackend {
381 async fn store_runner_context(
382 &self,
383 _context: &crate::state_backend::StoredRunnerContext,
384 ) -> RustvelloResult<()> {
385 Err(crate::error::RustvelloError::NotSupported {
386 backend: "TestStateBackend".into(),
387 method: "store_runner_context".into(),
388 })
389 }
390 async fn get_runner_context(
391 &self,
392 _runner_id: &str,
393 ) -> RustvelloResult<Option<crate::state_backend::StoredRunnerContext>> {
394 Err(crate::error::RustvelloError::NotSupported {
395 backend: "TestStateBackend".into(),
396 method: "get_runner_context".into(),
397 })
398 }
399 async fn get_runner_contexts_by_parent(
400 &self,
401 _parent_runner_id: &str,
402 ) -> RustvelloResult<Vec<crate::state_backend::StoredRunnerContext>> {
403 Err(crate::error::RustvelloError::NotSupported {
404 backend: "TestStateBackend".into(),
405 method: "get_runner_contexts_by_parent".into(),
406 })
407 }
408 async fn get_invocation_ids_by_runner(
409 &self,
410 _runner_id: &str,
411 _limit: usize,
412 _offset: usize,
413 ) -> RustvelloResult<Vec<InvocationId>> {
414 Err(crate::error::RustvelloError::NotSupported {
415 backend: "TestStateBackend".into(),
416 method: "get_invocation_ids_by_runner".into(),
417 })
418 }
419 async fn count_invocations_by_runner(&self, _runner_id: &str) -> RustvelloResult<usize> {
420 Err(crate::error::RustvelloError::NotSupported {
421 backend: "TestStateBackend".into(),
422 method: "count_invocations_by_runner".into(),
423 })
424 }
425 async fn get_history_in_timerange(
426 &self,
427 _start: chrono::DateTime<chrono::Utc>,
428 _end: chrono::DateTime<chrono::Utc>,
429 _limit: usize,
430 _offset: usize,
431 ) -> RustvelloResult<Vec<rustvello_proto::invocation::InvocationHistory>> {
432 Err(crate::error::RustvelloError::NotSupported {
433 backend: "TestStateBackend".into(),
434 method: "get_history_in_timerange".into(),
435 })
436 }
437 async fn get_matching_runner_contexts(
438 &self,
439 _partial_id: &str,
440 ) -> RustvelloResult<Vec<crate::state_backend::StoredRunnerContext>> {
441 Err(crate::error::RustvelloError::NotSupported {
442 backend: "TestStateBackend".into(),
443 method: "get_matching_runner_contexts".into(),
444 })
445 }
446 }
447
448 fn make_executor() -> (DeterministicExecutor, InvocationId) {
449 let wf_id = InvocationId::from_string("test-workflow-001".to_string());
450 let backend = Arc::new(TestStateBackend::new());
451 let executor = DeterministicExecutor::new(wf_id.clone(), backend);
452 (executor, wf_id)
453 }
454
455 #[test]
458 fn sequence_increments_correctly() {
459 let (mut executor, _) = make_executor();
460 assert_eq!(executor.get_next_sequence("test_op"), 1);
461 assert_eq!(executor.get_next_sequence("test_op"), 2);
462 assert_eq!(executor.get_next_sequence("other_op"), 1);
463 assert_eq!(executor.get_next_sequence("test_op"), 3);
464 }
465
466 #[test]
467 fn operation_count_retrieval() {
468 let (mut executor, _) = make_executor();
469 assert_eq!(executor.get_operation_count("test_op"), 0);
470 executor.get_next_sequence("test_op");
471 executor.get_next_sequence("test_op");
472 assert_eq!(executor.get_operation_count("test_op"), 2);
473 }
474
475 #[test]
476 fn operation_count_per_instance() {
477 let (mut exec1, wf_id) = make_executor();
478 exec1.get_next_sequence("test");
479 exec1.get_next_sequence("test");
480 assert_eq!(exec1.get_operation_count("test"), 2);
481
482 let exec2 = DeterministicExecutor::new(wf_id, Arc::new(TestStateBackend::new()));
483 assert_eq!(exec2.get_operation_count("test"), 0);
484 }
485
486 #[test]
487 fn operation_count_isolated_by_type() {
488 let (mut executor, _) = make_executor();
489 executor.get_next_sequence("random");
490 executor.get_next_sequence("random");
491 executor.get_next_sequence("time");
492 assert_eq!(executor.get_operation_count("random"), 2);
493 assert_eq!(executor.get_operation_count("time"), 1);
494 assert_eq!(executor.get_operation_count("uuid"), 0);
495 }
496
497 #[tokio::test]
500 async fn stores_and_retrieves_values() {
501 let (mut executor, wf_id) = make_executor();
502 let backend = executor.state_backend.clone();
503
504 let result = executor
505 .deterministic_operation("test", || "generated_value_1".to_string())
506 .await
507 .unwrap();
508 assert_eq!(result, "generated_value_1");
509
510 let stored = backend.get_workflow_data(&wf_id, "test:1").await.unwrap();
511 assert_eq!(stored, Some("generated_value_1".to_string()));
512 }
513
514 #[tokio::test]
515 async fn creates_unique_sequences() {
516 let (mut executor, wf_id) = make_executor();
517 let backend = executor.state_backend.clone();
518 let mut counter = 0u32;
519
520 let r1 = executor
521 .deterministic_operation("test", || {
522 counter += 1;
523 format!("value_{counter}")
524 })
525 .await
526 .unwrap();
527 let r2 = executor
528 .deterministic_operation("test", || {
529 counter += 1;
530 format!("value_{counter}")
531 })
532 .await
533 .unwrap();
534 let r3 = executor
535 .deterministic_operation("test", || {
536 counter += 1;
537 format!("value_{counter}")
538 })
539 .await
540 .unwrap();
541
542 assert_eq!(r1, "value_1");
543 assert_eq!(r2, "value_2");
544 assert_eq!(r3, "value_3");
545
546 assert_eq!(
547 backend.get_workflow_data(&wf_id, "test:1").await.unwrap(),
548 Some("value_1".to_string())
549 );
550 assert_eq!(
551 backend.get_workflow_data(&wf_id, "test:2").await.unwrap(),
552 Some("value_2".to_string())
553 );
554 assert_eq!(
555 backend.get_workflow_data(&wf_id, "test:3").await.unwrap(),
556 Some("value_3".to_string())
557 );
558 }
559
560 #[tokio::test]
561 async fn replays_stored_values() {
562 let wf_id = InvocationId::from_string("test-wf-replay".to_string());
563 let backend = Arc::new(TestStateBackend::new());
564
565 let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
567 let r1 = exec1
568 .deterministic_operation("test", || "fresh_1".to_string())
569 .await
570 .unwrap();
571 let r2 = exec1
572 .deterministic_operation("test", || "fresh_2".to_string())
573 .await
574 .unwrap();
575 assert_eq!(r1, "fresh_1");
576 assert_eq!(r2, "fresh_2");
577
578 let mut exec2 = DeterministicExecutor::new(wf_id, backend);
580 let mut gen_called = false;
581 let replay1 = exec2
582 .deterministic_operation("test", || {
583 gen_called = true;
584 "should_not_appear".to_string()
585 })
586 .await
587 .unwrap();
588 assert_eq!(replay1, "fresh_1");
589 assert!(!gen_called, "Generator should not be called during replay");
590
591 let replay2 = exec2
592 .deterministic_operation("test", || {
593 gen_called = true;
594 "should_not_appear".to_string()
595 })
596 .await
597 .unwrap();
598 assert_eq!(replay2, "fresh_2");
599 assert!(!gen_called);
600 }
601
602 #[tokio::test]
603 async fn handles_partial_replay_then_generation() {
604 let wf_id = InvocationId::from_string("test-wf-partial".to_string());
605 let backend = Arc::new(TestStateBackend::new());
606
607 let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
609 let r1 = exec1
610 .deterministic_operation("test", || "value_1".to_string())
611 .await
612 .unwrap();
613 let r2 = exec1
614 .deterministic_operation("test", || "value_2".to_string())
615 .await
616 .unwrap();
617
618 let mut exec2 = DeterministicExecutor::new(wf_id, backend);
620 let replay1 = exec2
621 .deterministic_operation("test", || "new_1".to_string())
622 .await
623 .unwrap();
624 let replay2 = exec2
625 .deterministic_operation("test", || "new_2".to_string())
626 .await
627 .unwrap();
628 let new_val = exec2
629 .deterministic_operation("test", || "value_3".to_string())
630 .await
631 .unwrap();
632
633 assert_eq!(replay1, r1);
634 assert_eq!(replay2, r2);
635 assert_eq!(new_val, "value_3");
636 }
637
638 #[tokio::test]
639 async fn isolated_by_operation_type() {
640 let (mut executor, wf_id) = make_executor();
641 let backend = executor.state_backend.clone();
642
643 executor
644 .deterministic_operation("type_a", || "a_value".to_string())
645 .await
646 .unwrap();
647 executor
648 .deterministic_operation("type_b", || "b_value".to_string())
649 .await
650 .unwrap();
651 executor
652 .deterministic_operation("type_a", || "a_value_2".to_string())
653 .await
654 .unwrap();
655
656 assert_eq!(
657 backend.get_workflow_data(&wf_id, "type_a:1").await.unwrap(),
658 Some("a_value".to_string())
659 );
660 assert_eq!(
661 backend.get_workflow_data(&wf_id, "type_b:1").await.unwrap(),
662 Some("b_value".to_string())
663 );
664 assert_eq!(
665 backend.get_workflow_data(&wf_id, "type_a:2").await.unwrap(),
666 Some("a_value_2".to_string())
667 );
668 }
669
670 #[tokio::test]
673 async fn base_time_establishment() {
674 let (executor, _) = make_executor();
675 let base1 = executor.get_base_time().await.unwrap();
676 assert!(base1.timezone() == Utc);
677 let base2 = executor.get_base_time().await.unwrap();
678 assert_eq!(base1, base2);
679 }
680
681 #[tokio::test]
682 async fn deterministic_random_generation() {
683 let wf_id = InvocationId::from_string("test-wf-random".to_string());
684 let backend = Arc::new(TestStateBackend::new());
685
686 let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
687 let randoms: Vec<f64> = {
688 let mut v = Vec::new();
689 for _ in 0..5 {
690 v.push(exec1.random().await.unwrap());
691 }
692 v
693 };
694
695 assert!(randoms.iter().all(|&r| (0.0..=1.0).contains(&r)));
697 let unique: std::collections::HashSet<u64> = randoms.iter().map(|r| r.to_bits()).collect();
699 assert_eq!(unique.len(), 5);
700
701 let mut exec2 = DeterministicExecutor::new(wf_id, backend);
703 for (i, &original) in randoms.iter().enumerate() {
704 let replayed = exec2.random().await.unwrap();
705 assert_eq!(original, replayed, "random mismatch at index {i}");
706 }
707 }
708
709 #[tokio::test]
710 async fn deterministic_time_progression() {
711 let wf_id = InvocationId::from_string("test-wf-time".to_string());
712 let backend = Arc::new(TestStateBackend::new());
713
714 let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
715 let times: Vec<DateTime<Utc>> = {
716 let mut v = Vec::new();
717 for _ in 0..3 {
718 v.push(exec1.utc_now().await.unwrap());
719 }
720 v
721 };
722
723 assert!(times.iter().all(|t| t.timezone() == Utc));
725 assert!(times.windows(2).all(|w| w[0] < w[1]));
727 let base = exec1.get_base_time().await.unwrap();
729 assert!(times[0] >= base);
730
731 let mut exec2 = DeterministicExecutor::new(wf_id, backend);
733 for (i, &original) in times.iter().enumerate() {
734 let replayed = exec2.utc_now().await.unwrap();
735 assert_eq!(original, replayed, "time mismatch at index {i}");
736 }
737 }
738
739 #[tokio::test]
740 async fn deterministic_uuid_generation() {
741 let wf_id = InvocationId::from_string("test-wf-uuid".to_string());
742 let backend = Arc::new(TestStateBackend::new());
743
744 let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
745 let uuids: Vec<String> = {
746 let mut v = Vec::new();
747 for _ in 0..3 {
748 v.push(exec1.uuid().await.unwrap());
749 }
750 v
751 };
752
753 assert!(uuids
755 .iter()
756 .all(|u| u.len() == 36 && u.chars().filter(|&c| c == '-').count() == 4));
757 let unique: std::collections::HashSet<&String> = uuids.iter().collect();
759 assert_eq!(unique.len(), 3);
760
761 let mut exec2 = DeterministicExecutor::new(wf_id, backend);
763 for (i, original) in uuids.iter().enumerate() {
764 let replayed = exec2.uuid().await.unwrap();
765 assert_eq!(original, &replayed, "uuid mismatch at index {i}");
766 }
767 }
768
769 #[tokio::test]
770 async fn mixed_deterministic_operations_sequence() {
771 let wf_id = InvocationId::from_string("test-wf-mixed".to_string());
772 let backend = Arc::new(TestStateBackend::new());
773
774 let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
775 let random1 = exec1.random().await.unwrap();
776 let time1 = exec1.utc_now().await.unwrap();
777 let uuid1 = exec1.uuid().await.unwrap();
778 let random2 = exec1.random().await.unwrap();
779 let time2 = exec1.utc_now().await.unwrap();
780
781 assert_ne!(random1, random2);
782 assert_ne!(time1, time2);
783
784 let mut exec2 = DeterministicExecutor::new(wf_id, backend);
786 assert_eq!(random1, exec2.random().await.unwrap());
787 assert_eq!(time1, exec2.utc_now().await.unwrap());
788 assert_eq!(uuid1, exec2.uuid().await.unwrap());
789 assert_eq!(random2, exec2.random().await.unwrap());
790 assert_eq!(time2, exec2.utc_now().await.unwrap());
791 }
792
793 #[tokio::test]
796 async fn complete_workflow_replay() {
797 let wf_id = InvocationId::from_string("test-wf-complete".to_string());
798 let backend = Arc::new(TestStateBackend::new());
799
800 let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
801 let original_random = exec1.random().await.unwrap();
802 let original_time = exec1.utc_now().await.unwrap();
803 let original_uuid = exec1.uuid().await.unwrap();
804 let original_custom = exec1
805 .deterministic_operation("custom", || "custom_1".to_string())
806 .await
807 .unwrap();
808
809 let mut exec2 = DeterministicExecutor::new(wf_id, backend);
810 assert_eq!(original_random, exec2.random().await.unwrap());
811 assert_eq!(original_time, exec2.utc_now().await.unwrap());
812 assert_eq!(original_uuid, exec2.uuid().await.unwrap());
813 let replay_custom = exec2
814 .deterministic_operation("custom", || "should_not_appear".to_string())
815 .await
816 .unwrap();
817 assert_eq!(original_custom, replay_custom);
818 }
819
820 #[tokio::test]
821 async fn counter_consistency_across_replay() {
822 let wf_id = InvocationId::from_string("test-wf-counter".to_string());
823 let backend = Arc::new(TestStateBackend::new());
824
825 let mut exec1 = DeterministicExecutor::new(wf_id.clone(), backend.clone());
826 for _ in 0..3 {
827 exec1
828 .deterministic_operation("test", || "val".to_string())
829 .await
830 .unwrap();
831 }
832 assert_eq!(exec1.get_operation_count("test"), 3);
833
834 let mut exec2 = DeterministicExecutor::new(wf_id, backend);
835 for _ in 0..3 {
836 exec2
837 .deterministic_operation("test", || "val".to_string())
838 .await
839 .unwrap();
840 }
841 assert_eq!(exec2.get_operation_count("test"), 3);
842
843 exec2
844 .deterministic_operation("test", || "val_4".to_string())
845 .await
846 .unwrap();
847 assert_eq!(exec2.get_operation_count("test"), 4);
848 }
849
850 #[tokio::test]
853 async fn workflow_isolation() {
854 let backend = Arc::new(TestStateBackend::new());
855 let wf1_id = InvocationId::from_string("workflow-1".to_string());
856 let wf2_id = InvocationId::from_string("workflow-2".to_string());
857
858 let mut exec1 = DeterministicExecutor::new(wf1_id.clone(), backend.clone());
859 let mut exec2 = DeterministicExecutor::new(wf2_id, backend.clone());
860
861 let randoms1: Vec<f64> = {
862 let mut v = Vec::new();
863 for _ in 0..3 {
864 v.push(exec1.random().await.unwrap());
865 }
866 v
867 };
868 let randoms2: Vec<f64> = {
869 let mut v = Vec::new();
870 for _ in 0..3 {
871 v.push(exec2.random().await.unwrap());
872 }
873 v
874 };
875
876 assert_ne!(randoms1, randoms2);
878
879 let mut exec1_replay = DeterministicExecutor::new(wf1_id, backend);
881 let replayed: Vec<f64> = {
882 let mut v = Vec::new();
883 for _ in 0..3 {
884 v.push(exec1_replay.random().await.unwrap());
885 }
886 v
887 };
888 assert_eq!(randoms1, replayed);
889 }
890
891 #[tokio::test]
892 async fn state_backend_basic_operations() {
893 let wf_id = InvocationId::from_string("test-wf-basic".to_string());
894 let backend = Arc::new(TestStateBackend::new());
895
896 backend
897 .set_workflow_data(&wf_id, "test_key", "test_value")
898 .await
899 .unwrap();
900
901 let retrieved = backend.get_workflow_data(&wf_id, "test_key").await.unwrap();
902 assert_eq!(retrieved, Some("test_value".to_string()));
903
904 let non_existent = backend
905 .get_workflow_data(&wf_id, "non_existent")
906 .await
907 .unwrap();
908 assert_eq!(non_existent, None);
909 }
910}