1use std::collections::{HashMap, HashSet};
8
9use chrono::{DateTime, Utc};
10use uuid::Uuid;
11
12use durable_execution_sdk::{
13 Operation, OperationAction, OperationStatus, OperationType, OperationUpdate,
14};
15
16use super::callback_manager::CallbackManager;
17use super::event_processor::{EventProcessor, EventType, HistoryEvent};
18use super::nodejs_event_types::{
19 ErrorWrapper, ExecutionStartedDetails, ExecutionStartedDetailsWrapper,
20 InvocationCompletedDetails, InvocationCompletedDetailsWrapper, NodeJsEventDetails,
21 NodeJsEventType, NodeJsHistoryEvent, PayloadWrapper,
22};
23use super::types::{ExecutionId, InvocationId};
24use crate::error::TestError;
25
26#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28pub struct OperationEvents {
29 pub operation: Operation,
31 pub update: Option<OperationUpdate>,
33}
34
35#[derive(Debug, Clone)]
37pub struct CheckpointOperation {
38 pub events: OperationEvents,
40 pub has_pending_update: bool,
42}
43
44#[derive(Debug)]
46pub struct CheckpointManager {
47 execution_id: ExecutionId,
49 operation_data_map: HashMap<String, OperationEvents>,
51 operation_order: Vec<String>,
53 callback_manager: CallbackManager,
55 event_processor: EventProcessor,
57 invocations_map: HashMap<InvocationId, DateTime<Utc>>,
59 dirty_operation_ids: HashSet<String>,
61 is_execution_completed: bool,
63}
64
65impl CheckpointManager {
66 pub fn new(execution_id: &str) -> Self {
68 Self {
69 execution_id: execution_id.to_string(),
70 operation_data_map: HashMap::new(),
71 operation_order: Vec::new(),
72 callback_manager: CallbackManager::new(execution_id),
73 event_processor: EventProcessor::new(),
74 invocations_map: HashMap::new(),
75 dirty_operation_ids: HashSet::new(),
76 is_execution_completed: false,
77 }
78 }
79
80 pub fn initialize(&mut self, payload: &str) -> OperationEvents {
82 let initial_id = Uuid::new_v4().to_string();
83 let now = Utc::now().timestamp_millis();
84
85 let initial_operation = Operation {
86 operation_id: initial_id.clone(),
87 operation_type: OperationType::Execution,
88 status: OperationStatus::Started,
89 result: Some(payload.to_string()),
90 error: None,
91 parent_id: None,
92 name: None,
93 sub_type: None,
94 start_timestamp: Some(now),
95 end_timestamp: None,
96 wait_details: None,
97 step_details: None,
98 callback_details: None,
99 chained_invoke_details: None,
100 context_details: None,
101 execution_details: None,
102 };
103
104 let events = OperationEvents {
105 operation: initial_operation.clone(),
106 update: None,
107 };
108
109 self.operation_data_map
110 .insert(initial_id.clone(), events.clone());
111 self.operation_order.push(initial_id.clone());
112 self.dirty_operation_ids.insert(initial_id);
113
114 self.event_processor.create_history_event(
116 EventType::OperationStarted,
117 None,
118 "ExecutionStartedDetails",
119 serde_json::json!({ "input_payload": payload }),
120 );
121
122 let nodejs_details = NodeJsEventDetails::ExecutionStarted(ExecutionStartedDetailsWrapper {
124 execution_started_details: ExecutionStartedDetails {
125 input: PayloadWrapper::new(payload.to_string()),
126 execution_timeout: None,
127 },
128 });
129 self.event_processor.create_nodejs_event(
130 NodeJsEventType::ExecutionStarted,
131 Some(&events.operation),
132 nodejs_details,
133 );
134
135 events
136 }
137
138 pub fn get_operation_data(&self, operation_id: &str) -> Option<&OperationEvents> {
140 self.operation_data_map.get(operation_id)
141 }
142
143 pub fn get_all_operation_data(&self) -> &HashMap<String, OperationEvents> {
145 &self.operation_data_map
146 }
147
148 pub fn has_dirty_operations(&self) -> bool {
150 !self.dirty_operation_ids.is_empty()
151 }
152
153 pub fn get_dirty_operations(&mut self) -> Vec<Operation> {
155 let dirty_ops: Vec<Operation> = self
156 .dirty_operation_ids
157 .iter()
158 .filter_map(|id| self.operation_data_map.get(id).map(|e| e.operation.clone()))
159 .collect();
160
161 self.dirty_operation_ids.clear();
162 dirty_ops
163 }
164
165 pub fn is_execution_completed(&self) -> bool {
167 self.is_execution_completed
168 }
169
170 pub fn start_invocation(&mut self, invocation_id: &str) -> Vec<OperationEvents> {
172 self.invocations_map
173 .insert(invocation_id.to_string(), Utc::now());
174
175 self.dirty_operation_ids.clear();
177
178 self.operation_order
180 .iter()
181 .filter_map(|id| self.operation_data_map.get(id).cloned())
182 .collect()
183 }
184
185 pub fn complete_invocation(
187 &mut self,
188 invocation_id: &str,
189 ) -> Result<InvocationTimestamps, TestError> {
190 let start_timestamp = self
191 .invocations_map
192 .remove(invocation_id)
193 .ok_or_else(|| TestError::InvocationNotFound(invocation_id.to_string()))?;
194
195 let end_timestamp = Utc::now();
196
197 self.event_processor.create_history_event(
199 EventType::InvocationCompleted,
200 None,
201 "InvocationCompletedDetails",
202 serde_json::json!({
203 "start_timestamp": start_timestamp,
204 "end_timestamp": end_timestamp,
205 "request_id": invocation_id,
206 }),
207 );
208
209 let nodejs_details =
211 NodeJsEventDetails::InvocationCompleted(InvocationCompletedDetailsWrapper {
212 invocation_completed_details: InvocationCompletedDetails {
213 start_timestamp: EventProcessor::format_timestamp(start_timestamp),
214 end_timestamp: EventProcessor::format_timestamp(end_timestamp),
215 request_id: invocation_id.to_string(),
216 error: ErrorWrapper::empty(),
217 },
218 });
219 self.event_processor.create_nodejs_event(
220 NodeJsEventType::InvocationCompleted,
221 None,
222 nodejs_details,
223 );
224
225 Ok(InvocationTimestamps {
226 start_timestamp,
227 end_timestamp,
228 })
229 }
230
231 pub fn get_state(&self) -> Vec<Operation> {
233 self.operation_order
234 .iter()
235 .filter_map(|id| self.operation_data_map.get(id).map(|e| e.operation.clone()))
236 .collect()
237 }
238
239 pub fn process_checkpoint(
241 &mut self,
242 updates: Vec<OperationUpdate>,
243 ) -> Result<Vec<Operation>, TestError> {
244 for update in updates {
245 self.process_operation_update(update)?;
246 }
247
248 Ok(self.get_dirty_operations())
249 }
250
251 fn process_operation_update(&mut self, update: OperationUpdate) -> Result<(), TestError> {
253 let operation_id = update.operation_id.clone();
254 let now = Utc::now().timestamp_millis();
255
256 let operation = if let Some(existing) = self.operation_data_map.get(&operation_id) {
258 let mut op = existing.operation.clone();
259 self.merge_operation_update(&mut op, &update, now);
261 op
262 } else {
263 let op = self.create_operation_from_update(&update, now);
265
266 if op.operation_type == OperationType::Callback {
269 if let Some(ref cb_details) = op.callback_details {
270 if let Some(ref callback_id) = cb_details.callback_id {
271 let timeout = update
272 .callback_options
273 .as_ref()
274 .and_then(|opts| opts.timeout_seconds)
275 .map(std::time::Duration::from_secs);
276 let _ = self
277 .callback_manager
278 .register_callback(callback_id, timeout);
279 }
280 }
281 }
282
283 op
284 };
285
286 let event_type = match update.action {
288 OperationAction::Start => EventType::OperationStarted,
289 OperationAction::Succeed | OperationAction::Fail => EventType::OperationCompleted,
290 OperationAction::Cancel | OperationAction::Retry => EventType::OperationCompleted,
291 };
292
293 self.event_processor.create_history_event(
294 event_type,
295 Some(&operation_id),
296 "OperationDetails",
297 serde_json::json!({
298 "operation_id": operation_id,
299 "action": format!("{:?}", update.action),
300 }),
301 );
302
303 self.event_processor
305 .process_operation_update(&update, &operation);
306
307 if operation.operation_type == OperationType::Execution
309 && (operation.status == OperationStatus::Succeeded
310 || operation.status == OperationStatus::Failed)
311 {
312 self.is_execution_completed = true;
313 }
314
315 let events = OperationEvents {
316 operation,
317 update: Some(update),
318 };
319
320 if !self.operation_data_map.contains_key(&operation_id) {
322 self.operation_order.push(operation_id.clone());
323 }
324
325 self.operation_data_map.insert(operation_id.clone(), events);
326 self.dirty_operation_ids.insert(operation_id);
327
328 Ok(())
329 }
330
331 fn merge_operation_update(
333 &self,
334 operation: &mut Operation,
335 update: &OperationUpdate,
336 now: i64,
337 ) {
338 operation.status = match update.action {
340 OperationAction::Start => OperationStatus::Started,
341 OperationAction::Succeed => OperationStatus::Succeeded,
342 OperationAction::Fail => OperationStatus::Failed,
343 OperationAction::Cancel => OperationStatus::Cancelled,
344 OperationAction::Retry => OperationStatus::Pending, };
346
347 if matches!(
349 update.action,
350 OperationAction::Succeed | OperationAction::Fail | OperationAction::Cancel
351 ) {
352 operation.end_timestamp = Some(now);
353 }
354
355 if update.result.is_some() {
357 operation.result = update.result.clone();
358 }
359 if update.error.is_some() {
360 operation.error = update.error.clone();
361 }
362
363 if update.action == OperationAction::Retry && update.operation_type == OperationType::Step {
365 let next_attempt_timestamp = update
366 .step_options
367 .as_ref()
368 .and_then(|opts| opts.next_attempt_delay_seconds)
369 .map(|delay| now + (delay as i64 * 1000));
370
371 let current_attempt = operation
372 .step_details
373 .as_ref()
374 .and_then(|d| d.attempt)
375 .unwrap_or(0);
376
377 operation.step_details = Some(durable_execution_sdk::StepDetails {
378 result: None,
379 attempt: Some(current_attempt + 1),
380 next_attempt_timestamp,
381 error: update.error.clone(),
382 payload: update.result.clone(), });
384 }
385 }
386
387 fn create_operation_from_update(&self, update: &OperationUpdate, now: i64) -> Operation {
389 let status = match update.action {
390 OperationAction::Start => OperationStatus::Started,
391 OperationAction::Succeed => OperationStatus::Succeeded,
392 OperationAction::Fail => OperationStatus::Failed,
393 OperationAction::Cancel => OperationStatus::Cancelled,
394 OperationAction::Retry => OperationStatus::Pending, };
396
397 let end_timestamp = if matches!(
398 update.action,
399 OperationAction::Succeed | OperationAction::Fail | OperationAction::Cancel
400 ) {
401 Some(now)
402 } else {
403 None
404 };
405
406 let callback_details = if update.operation_type == OperationType::Callback {
408 let callback_id = Uuid::new_v4().to_string();
410 Some(durable_execution_sdk::CallbackDetails {
411 callback_id: Some(callback_id),
412 result: None,
413 error: None,
414 })
415 } else {
416 None
417 };
418
419 let wait_details = if update.operation_type == OperationType::Wait {
422 if let Some(ref wait_options) = update.wait_options {
423 let scheduled_end_timestamp = now + (wait_options.wait_seconds as i64 * 1000);
425 Some(durable_execution_sdk::WaitDetails {
426 scheduled_end_timestamp: Some(scheduled_end_timestamp),
427 })
428 } else {
429 None
430 }
431 } else {
432 None
433 };
434
435 let step_details = if update.operation_type == OperationType::Step {
438 if let Some(ref step_options) = update.step_options {
439 let next_attempt_timestamp = step_options
441 .next_attempt_delay_seconds
442 .map(|delay| now + (delay as i64 * 1000));
443 Some(durable_execution_sdk::StepDetails {
444 result: None,
445 attempt: Some(1), next_attempt_timestamp,
447 error: update.error.clone(),
448 payload: update.result.clone(), })
450 } else if update.action == OperationAction::Retry {
451 Some(durable_execution_sdk::StepDetails {
453 result: None,
454 attempt: Some(1),
455 next_attempt_timestamp: None,
456 error: update.error.clone(),
457 payload: update.result.clone(),
458 })
459 } else {
460 None
461 }
462 } else {
463 None
464 };
465
466 Operation {
467 operation_id: update.operation_id.clone(),
468 operation_type: update.operation_type,
469 status,
470 result: update.result.clone(),
471 error: update.error.clone(),
472 parent_id: update.parent_id.clone(),
473 name: update.name.clone(),
474 sub_type: update.sub_type.clone(),
475 start_timestamp: Some(now),
476 end_timestamp,
477 wait_details,
478 step_details,
479 callback_details,
480 chained_invoke_details: None,
481 context_details: None,
482 execution_details: None,
483 }
484 }
485
486 pub fn callback_manager(&self) -> &CallbackManager {
488 &self.callback_manager
489 }
490
491 pub fn callback_manager_mut(&mut self) -> &mut CallbackManager {
493 &mut self.callback_manager
494 }
495
496 pub fn complete_callback_operation(
503 &mut self,
504 callback_id: &str,
505 result: Option<String>,
506 error: Option<durable_execution_sdk::ErrorObject>,
507 ) {
508 let now = chrono::Utc::now().timestamp_millis();
509 for events in self.operation_data_map.values_mut() {
510 if events.operation.operation_type == OperationType::Callback {
511 if let Some(ref cb) = events.operation.callback_details {
512 if cb.callback_id.as_deref() == Some(callback_id) {
513 if error.is_some() {
514 events.operation.status = OperationStatus::Failed;
515 events.operation.error = error;
516 } else {
517 events.operation.status = OperationStatus::Succeeded;
518 events.operation.result = result;
519 }
520 events.operation.end_timestamp = Some(now);
521 if let Some(ref mut cb_details) = events.operation.callback_details {
523 cb_details.result = events.operation.result.clone();
524 cb_details.error = events.operation.error.clone();
525 }
526 self.dirty_operation_ids
527 .insert(events.operation.operation_id.clone());
528 return;
529 }
530 }
531 }
532 }
533 }
534
535 pub fn event_processor(&self) -> &EventProcessor {
537 &self.event_processor
538 }
539
540 pub fn event_processor_mut(&mut self) -> &mut EventProcessor {
542 &mut self.event_processor
543 }
544
545 pub fn get_history_events(&self) -> Vec<HistoryEvent> {
547 self.event_processor.get_events().to_vec()
548 }
549
550 pub fn get_nodejs_history_events(&self) -> Vec<NodeJsHistoryEvent> {
555 self.event_processor.get_nodejs_events().to_vec()
556 }
557
558 pub fn execution_id(&self) -> &str {
560 &self.execution_id
561 }
562
563 pub fn update_operation_data(&mut self, operation_id: &str, updated_operation: Operation) {
568 if let Some(existing) = self.operation_data_map.get_mut(operation_id) {
569 existing.operation = updated_operation;
571
572 self.dirty_operation_ids.insert(operation_id.to_string());
574
575 if existing.operation.operation_type == OperationType::Execution
577 && (existing.operation.status == OperationStatus::Succeeded
578 || existing.operation.status == OperationStatus::Failed)
579 {
580 self.is_execution_completed = true;
581 }
582 } else {
583 let events = OperationEvents {
585 operation: updated_operation,
586 update: None,
587 };
588 self.operation_data_map
589 .insert(operation_id.to_string(), events);
590 self.operation_order.push(operation_id.to_string());
591 self.dirty_operation_ids.insert(operation_id.to_string());
592 }
593 }
594}
595
596#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
598pub struct InvocationTimestamps {
599 pub start_timestamp: DateTime<Utc>,
601 pub end_timestamp: DateTime<Utc>,
603}
604
605#[cfg(test)]
606mod tests {
607 use super::*;
608
609 #[test]
610 fn test_initialize() {
611 let mut manager = CheckpointManager::new("exec-1");
612 let events = manager.initialize(r#"{"input": "test"}"#);
613
614 assert!(!events.operation.operation_id.is_empty());
615 assert_eq!(events.operation.operation_type, OperationType::Execution);
616 assert_eq!(events.operation.status, OperationStatus::Started);
617 assert!(manager.has_dirty_operations());
618 }
619
620 #[test]
621 fn test_start_invocation() {
622 let mut manager = CheckpointManager::new("exec-1");
623 manager.initialize("{}");
624
625 let ops = manager.start_invocation("inv-1");
626 assert_eq!(ops.len(), 1);
627 assert!(!manager.has_dirty_operations()); }
629
630 #[test]
631 fn test_complete_invocation() {
632 let mut manager = CheckpointManager::new("exec-1");
633 manager.initialize("{}");
634 manager.start_invocation("inv-1");
635
636 let timestamps = manager.complete_invocation("inv-1").unwrap();
637 assert!(timestamps.end_timestamp >= timestamps.start_timestamp);
638 }
639
640 #[test]
641 fn test_complete_unknown_invocation_fails() {
642 let mut manager = CheckpointManager::new("exec-1");
643 let result = manager.complete_invocation("unknown");
644 assert!(matches!(result, Err(TestError::InvocationNotFound(_))));
645 }
646
647 #[test]
648 fn test_process_checkpoint() {
649 let mut manager = CheckpointManager::new("exec-1");
650 manager.initialize("{}");
651 manager.get_dirty_operations(); let update = OperationUpdate::start("op-1", OperationType::Step).with_name("test-step");
654
655 let dirty = manager.process_checkpoint(vec![update]).unwrap();
656 assert_eq!(dirty.len(), 1);
657 assert_eq!(dirty[0].name, Some("test-step".to_string()));
658 }
659
660 #[test]
661 fn test_execution_completed_on_success() {
662 let mut manager = CheckpointManager::new("exec-1");
663 let events = manager.initialize("{}");
664 let exec_id = events.operation.operation_id.clone();
665
666 assert!(!manager.is_execution_completed());
667
668 let update = OperationUpdate {
669 operation_id: exec_id,
670 action: OperationAction::Succeed,
671 operation_type: OperationType::Execution,
672 result: Some("done".to_string()),
673 error: None,
674 parent_id: None,
675 name: None,
676 sub_type: None,
677 wait_options: None,
678 step_options: None,
679 callback_options: None,
680 chained_invoke_options: None,
681 context_options: None,
682 };
683
684 manager.process_checkpoint(vec![update]).unwrap();
685 assert!(manager.is_execution_completed());
686 }
687
688 #[test]
689 fn test_get_state() {
690 let mut manager = CheckpointManager::new("exec-1");
691 manager.initialize("{}");
692
693 let update = OperationUpdate::start("op-1", OperationType::Step);
694 manager.process_checkpoint(vec![update]).unwrap();
695
696 let state = manager.get_state();
697 assert_eq!(state.len(), 2); }
699
700 #[test]
705 fn test_initialize_generates_nodejs_execution_started_event() {
706 let mut manager = CheckpointManager::new("exec-1");
707 let events = manager.initialize(r#"{"input": "test"}"#);
708
709 let nodejs_events = manager.get_nodejs_history_events();
710 assert_eq!(nodejs_events.len(), 1);
711 assert_eq!(
712 nodejs_events[0].event_type,
713 NodeJsEventType::ExecutionStarted
714 );
715 assert_eq!(nodejs_events[0].event_id, 1);
716 assert_eq!(
717 nodejs_events[0].id,
718 Some(events.operation.operation_id.clone())
719 );
720 }
721
722 #[test]
723 fn test_process_operation_update_generates_nodejs_events() {
724 let mut manager = CheckpointManager::new("exec-1");
725 manager.initialize("{}");
726
727 let update = OperationUpdate::start("step-1", OperationType::Step).with_name("my-step");
729 manager.process_checkpoint(vec![update]).unwrap();
730
731 let nodejs_events = manager.get_nodejs_history_events();
732 assert_eq!(nodejs_events.len(), 2); assert_eq!(
734 nodejs_events[0].event_type,
735 NodeJsEventType::ExecutionStarted
736 );
737 assert_eq!(nodejs_events[1].event_type, NodeJsEventType::StepStarted);
738 assert_eq!(nodejs_events[1].name, Some("my-step".to_string()));
739 }
740
741 #[test]
742 fn test_complete_invocation_generates_nodejs_event() {
743 let mut manager = CheckpointManager::new("exec-1");
744 manager.initialize("{}");
745 manager.start_invocation("inv-1");
746
747 manager.complete_invocation("inv-1").unwrap();
748
749 let nodejs_events = manager.get_nodejs_history_events();
750 assert_eq!(nodejs_events.len(), 2); assert_eq!(
752 nodejs_events[1].event_type,
753 NodeJsEventType::InvocationCompleted
754 );
755 }
756
757 #[test]
758 fn test_step_lifecycle_generates_correct_nodejs_events() {
759 let mut manager = CheckpointManager::new("exec-1");
760 manager.initialize("{}");
761
762 let start_update =
764 OperationUpdate::start("step-1", OperationType::Step).with_name("compute");
765 manager.process_checkpoint(vec![start_update]).unwrap();
766
767 let succeed_update =
769 OperationUpdate::succeed("step-1", OperationType::Step, Some("42".to_string()));
770 manager.process_checkpoint(vec![succeed_update]).unwrap();
771
772 let nodejs_events = manager.get_nodejs_history_events();
773 assert_eq!(nodejs_events.len(), 3); assert_eq!(
775 nodejs_events[0].event_type,
776 NodeJsEventType::ExecutionStarted
777 );
778 assert_eq!(nodejs_events[1].event_type, NodeJsEventType::StepStarted);
779 assert_eq!(nodejs_events[2].event_type, NodeJsEventType::StepSucceeded);
780 }
781
782 #[test]
783 fn test_execution_lifecycle_generates_correct_nodejs_events() {
784 let mut manager = CheckpointManager::new("exec-1");
785 let init_events = manager.initialize("{}");
786 let exec_id = init_events.operation.operation_id.clone();
787
788 let succeed_update = OperationUpdate::succeed(
790 &exec_id,
791 OperationType::Execution,
792 Some("result".to_string()),
793 );
794 manager.process_checkpoint(vec![succeed_update]).unwrap();
795
796 let nodejs_events = manager.get_nodejs_history_events();
797 assert_eq!(nodejs_events.len(), 2); assert_eq!(
799 nodejs_events[0].event_type,
800 NodeJsEventType::ExecutionStarted
801 );
802 assert_eq!(
803 nodejs_events[1].event_type,
804 NodeJsEventType::ExecutionSucceeded
805 );
806 }
807
808 #[test]
809 fn test_wait_lifecycle_generates_correct_nodejs_events() {
810 let mut manager = CheckpointManager::new("exec-1");
811 manager.initialize("{}");
812
813 let start_update = OperationUpdate::start_wait("wait-1", 30);
815 manager.process_checkpoint(vec![start_update]).unwrap();
816
817 let succeed_update = OperationUpdate::succeed("wait-1", OperationType::Wait, None);
819 manager.process_checkpoint(vec![succeed_update]).unwrap();
820
821 let nodejs_events = manager.get_nodejs_history_events();
822 assert_eq!(nodejs_events.len(), 3); assert_eq!(nodejs_events[1].event_type, NodeJsEventType::WaitStarted);
824 assert_eq!(nodejs_events[2].event_type, NodeJsEventType::WaitSucceeded);
825 }
826
827 #[test]
828 fn test_retry_operation_generates_step_failed_event() {
829 use durable_execution_sdk::error::ErrorObject;
830 use durable_execution_sdk::StepOptions;
831
832 let mut manager = CheckpointManager::new("exec-1");
833 manager.initialize("{}");
834
835 let start_update = OperationUpdate::start("step-1", OperationType::Step);
837 manager.process_checkpoint(vec![start_update]).unwrap();
838
839 let retry_update = OperationUpdate {
841 operation_id: "step-1".to_string(),
842 action: OperationAction::Retry,
843 operation_type: OperationType::Step,
844 result: None,
845 error: Some(ErrorObject::new("RetryError", "Temporary failure")),
846 parent_id: None,
847 name: None,
848 sub_type: None,
849 wait_options: None,
850 step_options: Some(StepOptions {
851 next_attempt_delay_seconds: Some(5),
852 }),
853 callback_options: None,
854 chained_invoke_options: None,
855 context_options: None,
856 };
857 manager.process_checkpoint(vec![retry_update]).unwrap();
858
859 let nodejs_events = manager.get_nodejs_history_events();
860 assert_eq!(nodejs_events.len(), 3); assert_eq!(nodejs_events[2].event_type, NodeJsEventType::StepFailed);
862 }
863
864 #[test]
865 fn test_nodejs_event_ids_are_sequential() {
866 let mut manager = CheckpointManager::new("exec-1");
867 manager.initialize("{}");
868
869 let step1 = OperationUpdate::start("step-1", OperationType::Step);
871 let step2 = OperationUpdate::start("step-2", OperationType::Step);
872 manager.process_checkpoint(vec![step1, step2]).unwrap();
873
874 let nodejs_events = manager.get_nodejs_history_events();
875 assert_eq!(nodejs_events.len(), 3);
876 assert_eq!(nodejs_events[0].event_id, 1);
877 assert_eq!(nodejs_events[1].event_id, 2);
878 assert_eq!(nodejs_events[2].event_id, 3);
879 }
880}