Skip to main content

durable_execution_sdk_testing/checkpoint_server/
checkpoint_manager.rs

1//! Checkpoint manager for managing checkpoints of a single execution.
2//!
3//! This module implements the CheckpointManager which manages checkpoints for
4//! a single execution, including operation state, callback lifecycle, and
5//! history events, matching the Node.js SDK's checkpoint manager.
6
7use std::collections::{HashMap, HashSet};
8
9use chrono::{DateTime, Utc};
10use uuid::Uuid;
11
12use durable_execution_sdk::{
13    Operation, OperationAction, OperationStatus, OperationType, OperationUpdate,
14};
15
16use super::callback_manager::CallbackManager;
17use super::event_processor::{EventProcessor, EventType, HistoryEvent};
18use super::nodejs_event_types::{
19    ErrorWrapper, ExecutionStartedDetails, ExecutionStartedDetailsWrapper,
20    InvocationCompletedDetails, InvocationCompletedDetailsWrapper, NodeJsEventDetails,
21    NodeJsEventType, NodeJsHistoryEvent, PayloadWrapper,
22};
23use super::types::{ExecutionId, InvocationId};
24use crate::error::TestError;
25
26/// Operation with associated event data.
27#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28pub struct OperationEvents {
29    /// The operation data
30    pub operation: Operation,
31    /// The original update that created/modified this operation
32    pub update: Option<OperationUpdate>,
33}
34
35/// A checkpoint operation with update tracking.
36#[derive(Debug, Clone)]
37pub struct CheckpointOperation {
38    /// The operation events
39    pub events: OperationEvents,
40    /// Whether this operation has pending updates
41    pub has_pending_update: bool,
42}
43
44/// Manages checkpoints for a single execution.
45#[derive(Debug)]
46pub struct CheckpointManager {
47    /// The execution ID
48    execution_id: ExecutionId,
49    /// Map of operation ID to operation events
50    operation_data_map: HashMap<String, OperationEvents>,
51    /// Order of operation IDs (for preserving insertion order)
52    operation_order: Vec<String>,
53    /// Callback manager for this execution
54    callback_manager: CallbackManager,
55    /// Event processor for generating history events
56    event_processor: EventProcessor,
57    /// Map of invocation ID to start timestamp
58    invocations_map: HashMap<InvocationId, DateTime<Utc>>,
59    /// Set of operation IDs that have been modified since last checkpoint
60    dirty_operation_ids: HashSet<String>,
61    /// Whether the execution has completed
62    is_execution_completed: bool,
63}
64
65impl CheckpointManager {
66    /// Create a new checkpoint manager for an execution.
67    pub fn new(execution_id: &str) -> Self {
68        Self {
69            execution_id: execution_id.to_string(),
70            operation_data_map: HashMap::new(),
71            operation_order: Vec::new(),
72            callback_manager: CallbackManager::new(execution_id),
73            event_processor: EventProcessor::new(),
74            invocations_map: HashMap::new(),
75            dirty_operation_ids: HashSet::new(),
76            is_execution_completed: false,
77        }
78    }
79
80    /// Initialize with the first operation (EXECUTION type).
81    pub fn initialize(&mut self, payload: &str) -> OperationEvents {
82        let initial_id = Uuid::new_v4().to_string();
83        let now = Utc::now().timestamp_millis();
84
85        let initial_operation = Operation {
86            operation_id: initial_id.clone(),
87            operation_type: OperationType::Execution,
88            status: OperationStatus::Started,
89            result: Some(payload.to_string()),
90            error: None,
91            parent_id: None,
92            name: None,
93            sub_type: None,
94            start_timestamp: Some(now),
95            end_timestamp: None,
96            wait_details: None,
97            step_details: None,
98            callback_details: None,
99            chained_invoke_details: None,
100            context_details: None,
101            execution_details: None,
102        };
103
104        let events = OperationEvents {
105            operation: initial_operation.clone(),
106            update: None,
107        };
108
109        self.operation_data_map
110            .insert(initial_id.clone(), events.clone());
111        self.operation_order.push(initial_id.clone());
112        self.dirty_operation_ids.insert(initial_id);
113
114        // Generate legacy history event
115        self.event_processor.create_history_event(
116            EventType::OperationStarted,
117            None,
118            "ExecutionStartedDetails",
119            serde_json::json!({ "input_payload": payload }),
120        );
121
122        // Generate Node.js-compatible ExecutionStarted event
123        let nodejs_details = NodeJsEventDetails::ExecutionStarted(ExecutionStartedDetailsWrapper {
124            execution_started_details: ExecutionStartedDetails {
125                input: PayloadWrapper::new(payload.to_string()),
126                execution_timeout: None,
127            },
128        });
129        self.event_processor.create_nodejs_event(
130            NodeJsEventType::ExecutionStarted,
131            Some(&events.operation),
132            nodejs_details,
133        );
134
135        events
136    }
137
138    /// Get operation data by ID.
139    pub fn get_operation_data(&self, operation_id: &str) -> Option<&OperationEvents> {
140        self.operation_data_map.get(operation_id)
141    }
142
143    /// Get all operation data.
144    pub fn get_all_operation_data(&self) -> &HashMap<String, OperationEvents> {
145        &self.operation_data_map
146    }
147
148    /// Check if there are dirty operations.
149    pub fn has_dirty_operations(&self) -> bool {
150        !self.dirty_operation_ids.is_empty()
151    }
152
153    /// Get and clear dirty operations.
154    pub fn get_dirty_operations(&mut self) -> Vec<Operation> {
155        let dirty_ops: Vec<Operation> = self
156            .dirty_operation_ids
157            .iter()
158            .filter_map(|id| self.operation_data_map.get(id).map(|e| e.operation.clone()))
159            .collect();
160
161        self.dirty_operation_ids.clear();
162        dirty_ops
163    }
164
165    /// Check if execution is completed.
166    pub fn is_execution_completed(&self) -> bool {
167        self.is_execution_completed
168    }
169
170    /// Start an invocation.
171    pub fn start_invocation(&mut self, invocation_id: &str) -> Vec<OperationEvents> {
172        self.invocations_map
173            .insert(invocation_id.to_string(), Utc::now());
174
175        // Clear dirty operations since client should know state from operation data
176        self.dirty_operation_ids.clear();
177
178        // Return operations in insertion order
179        self.operation_order
180            .iter()
181            .filter_map(|id| self.operation_data_map.get(id).cloned())
182            .collect()
183    }
184
185    /// Complete an invocation.
186    pub fn complete_invocation(
187        &mut self,
188        invocation_id: &str,
189    ) -> Result<InvocationTimestamps, TestError> {
190        let start_timestamp = self
191            .invocations_map
192            .remove(invocation_id)
193            .ok_or_else(|| TestError::InvocationNotFound(invocation_id.to_string()))?;
194
195        let end_timestamp = Utc::now();
196
197        // Generate legacy history event
198        self.event_processor.create_history_event(
199            EventType::InvocationCompleted,
200            None,
201            "InvocationCompletedDetails",
202            serde_json::json!({
203                "start_timestamp": start_timestamp,
204                "end_timestamp": end_timestamp,
205                "request_id": invocation_id,
206            }),
207        );
208
209        // Generate Node.js-compatible InvocationCompleted event
210        let nodejs_details =
211            NodeJsEventDetails::InvocationCompleted(InvocationCompletedDetailsWrapper {
212                invocation_completed_details: InvocationCompletedDetails {
213                    start_timestamp: EventProcessor::format_timestamp(start_timestamp),
214                    end_timestamp: EventProcessor::format_timestamp(end_timestamp),
215                    request_id: invocation_id.to_string(),
216                    error: ErrorWrapper::empty(),
217                },
218            });
219        self.event_processor.create_nodejs_event(
220            NodeJsEventType::InvocationCompleted,
221            None,
222            nodejs_details,
223        );
224
225        Ok(InvocationTimestamps {
226            start_timestamp,
227            end_timestamp,
228        })
229    }
230
231    /// Get current state (all operations in insertion order).
232    pub fn get_state(&self) -> Vec<Operation> {
233        self.operation_order
234            .iter()
235            .filter_map(|id| self.operation_data_map.get(id).map(|e| e.operation.clone()))
236            .collect()
237    }
238
239    /// Process a checkpoint with operation updates.
240    pub fn process_checkpoint(
241        &mut self,
242        updates: Vec<OperationUpdate>,
243    ) -> Result<Vec<Operation>, TestError> {
244        for update in updates {
245            self.process_operation_update(update)?;
246        }
247
248        Ok(self.get_dirty_operations())
249    }
250
251    /// Process a single operation update.
252    fn process_operation_update(&mut self, update: OperationUpdate) -> Result<(), TestError> {
253        let operation_id = update.operation_id.clone();
254        let now = Utc::now().timestamp_millis();
255
256        // Get or create operation
257        let operation = if let Some(existing) = self.operation_data_map.get(&operation_id) {
258            let mut op = existing.operation.clone();
259            // Merge update into existing operation
260            self.merge_operation_update(&mut op, &update, now);
261            op
262        } else {
263            // Create new operation from update
264            let op = self.create_operation_from_update(&update, now);
265
266            // Register callback in the CallbackManager so external systems can
267            // send success/failure/heartbeat responses via the callback_id.
268            if op.operation_type == OperationType::Callback {
269                if let Some(ref cb_details) = op.callback_details {
270                    if let Some(ref callback_id) = cb_details.callback_id {
271                        let timeout = update
272                            .callback_options
273                            .as_ref()
274                            .and_then(|opts| opts.timeout_seconds)
275                            .map(std::time::Duration::from_secs);
276                        let _ = self
277                            .callback_manager
278                            .register_callback(callback_id, timeout);
279                    }
280                }
281            }
282
283            op
284        };
285
286        // Generate appropriate legacy history event based on action
287        let event_type = match update.action {
288            OperationAction::Start => EventType::OperationStarted,
289            OperationAction::Succeed | OperationAction::Fail => EventType::OperationCompleted,
290            OperationAction::Cancel | OperationAction::Retry => EventType::OperationCompleted,
291        };
292
293        self.event_processor.create_history_event(
294            event_type,
295            Some(&operation_id),
296            "OperationDetails",
297            serde_json::json!({
298                "operation_id": operation_id,
299                "action": format!("{:?}", update.action),
300            }),
301        );
302
303        // Generate Node.js-compatible events using the EventProcessor's process_operation_update method
304        self.event_processor
305            .process_operation_update(&update, &operation);
306
307        // Check if this completes the execution
308        if operation.operation_type == OperationType::Execution
309            && (operation.status == OperationStatus::Succeeded
310                || operation.status == OperationStatus::Failed)
311        {
312            self.is_execution_completed = true;
313        }
314
315        let events = OperationEvents {
316            operation,
317            update: Some(update),
318        };
319
320        // Track order for new operations
321        if !self.operation_data_map.contains_key(&operation_id) {
322            self.operation_order.push(operation_id.clone());
323        }
324
325        self.operation_data_map.insert(operation_id.clone(), events);
326        self.dirty_operation_ids.insert(operation_id);
327
328        Ok(())
329    }
330
331    /// Merge an operation update into an existing operation.
332    fn merge_operation_update(
333        &self,
334        operation: &mut Operation,
335        update: &OperationUpdate,
336        now: i64,
337    ) {
338        // Update status based on action
339        operation.status = match update.action {
340            OperationAction::Start => OperationStatus::Started,
341            OperationAction::Succeed => OperationStatus::Succeeded,
342            OperationAction::Fail => OperationStatus::Failed,
343            OperationAction::Cancel => OperationStatus::Cancelled,
344            OperationAction::Retry => OperationStatus::Pending, // Retry sets status to Pending
345        };
346
347        // Set end timestamp for completion actions
348        if matches!(
349            update.action,
350            OperationAction::Succeed | OperationAction::Fail | OperationAction::Cancel
351        ) {
352            operation.end_timestamp = Some(now);
353        }
354
355        // Merge result/error
356        if update.result.is_some() {
357            operation.result = update.result.clone();
358        }
359        if update.error.is_some() {
360            operation.error = update.error.clone();
361        }
362
363        // Handle step retry - update step_details with next_attempt_timestamp
364        if update.action == OperationAction::Retry && update.operation_type == OperationType::Step {
365            let next_attempt_timestamp = update
366                .step_options
367                .as_ref()
368                .and_then(|opts| opts.next_attempt_delay_seconds)
369                .map(|delay| now + (delay as i64 * 1000));
370
371            let current_attempt = operation
372                .step_details
373                .as_ref()
374                .and_then(|d| d.attempt)
375                .unwrap_or(0);
376
377            operation.step_details = Some(durable_execution_sdk::StepDetails {
378                result: None,
379                attempt: Some(current_attempt + 1),
380                next_attempt_timestamp,
381                error: update.error.clone(),
382                payload: update.result.clone(), // For wait-for-condition, payload is in result field
383            });
384        }
385    }
386
387    /// Create a new operation from an update.
388    fn create_operation_from_update(&self, update: &OperationUpdate, now: i64) -> Operation {
389        let status = match update.action {
390            OperationAction::Start => OperationStatus::Started,
391            OperationAction::Succeed => OperationStatus::Succeeded,
392            OperationAction::Fail => OperationStatus::Failed,
393            OperationAction::Cancel => OperationStatus::Cancelled,
394            OperationAction::Retry => OperationStatus::Pending, // Retry sets status to Pending
395        };
396
397        let end_timestamp = if matches!(
398            update.action,
399            OperationAction::Succeed | OperationAction::Fail | OperationAction::Cancel
400        ) {
401            Some(now)
402        } else {
403            None
404        };
405
406        // Generate callback_details for callback operations
407        let callback_details = if update.operation_type == OperationType::Callback {
408            // Generate a unique callback_id for the callback operation
409            let callback_id = Uuid::new_v4().to_string();
410            Some(durable_execution_sdk::CallbackDetails {
411                callback_id: Some(callback_id),
412                result: None,
413                error: None,
414            })
415        } else {
416            None
417        };
418
419        // Generate wait_details for wait operations from wait_options
420        // This converts wait_seconds to scheduled_end_timestamp for time-skipping support
421        let wait_details = if update.operation_type == OperationType::Wait {
422            if let Some(ref wait_options) = update.wait_options {
423                // Calculate scheduled_end_timestamp as now + (wait_seconds * 1000) milliseconds
424                let scheduled_end_timestamp = now + (wait_options.wait_seconds as i64 * 1000);
425                Some(durable_execution_sdk::WaitDetails {
426                    scheduled_end_timestamp: Some(scheduled_end_timestamp),
427                })
428            } else {
429                None
430            }
431        } else {
432            None
433        };
434
435        // Generate step_details for step operations with retry
436        // This converts next_attempt_delay_seconds to next_attempt_timestamp for time-skipping support
437        let step_details = if update.operation_type == OperationType::Step {
438            if let Some(ref step_options) = update.step_options {
439                // Calculate next_attempt_timestamp as now + (delay_seconds * 1000) milliseconds
440                let next_attempt_timestamp = step_options
441                    .next_attempt_delay_seconds
442                    .map(|delay| now + (delay as i64 * 1000));
443                Some(durable_execution_sdk::StepDetails {
444                    result: None,
445                    attempt: Some(1), // Start with attempt 1
446                    next_attempt_timestamp,
447                    error: update.error.clone(),
448                    payload: update.result.clone(), // For wait-for-condition, payload is in result field
449                })
450            } else if update.action == OperationAction::Retry {
451                // Retry without step_options - still create step_details
452                Some(durable_execution_sdk::StepDetails {
453                    result: None,
454                    attempt: Some(1),
455                    next_attempt_timestamp: None,
456                    error: update.error.clone(),
457                    payload: update.result.clone(),
458                })
459            } else {
460                None
461            }
462        } else {
463            None
464        };
465
466        Operation {
467            operation_id: update.operation_id.clone(),
468            operation_type: update.operation_type,
469            status,
470            result: update.result.clone(),
471            error: update.error.clone(),
472            parent_id: update.parent_id.clone(),
473            name: update.name.clone(),
474            sub_type: update.sub_type.clone(),
475            start_timestamp: Some(now),
476            end_timestamp,
477            wait_details,
478            step_details,
479            callback_details,
480            chained_invoke_details: None,
481            context_details: None,
482            execution_details: None,
483        }
484    }
485
486    /// Get the callback manager.
487    pub fn callback_manager(&self) -> &CallbackManager {
488        &self.callback_manager
489    }
490
491    /// Get mutable callback manager.
492    pub fn callback_manager_mut(&mut self) -> &mut CallbackManager {
493        &mut self.callback_manager
494    }
495
496    /// Complete a callback operation by updating its status and result/error.
497    ///
498    /// Finds the operation whose `callback_details.callback_id` matches the given
499    /// `callback_id` and transitions it to `Succeeded` (with result) or `Failed`
500    /// (with error). This is called when an external system sends a callback
501    /// response via the `CallbackManager`.
502    pub fn complete_callback_operation(
503        &mut self,
504        callback_id: &str,
505        result: Option<String>,
506        error: Option<durable_execution_sdk::ErrorObject>,
507    ) {
508        let now = chrono::Utc::now().timestamp_millis();
509        for events in self.operation_data_map.values_mut() {
510            if events.operation.operation_type == OperationType::Callback {
511                if let Some(ref cb) = events.operation.callback_details {
512                    if cb.callback_id.as_deref() == Some(callback_id) {
513                        if error.is_some() {
514                            events.operation.status = OperationStatus::Failed;
515                            events.operation.error = error;
516                        } else {
517                            events.operation.status = OperationStatus::Succeeded;
518                            events.operation.result = result;
519                        }
520                        events.operation.end_timestamp = Some(now);
521                        // Update callback_details with the result
522                        if let Some(ref mut cb_details) = events.operation.callback_details {
523                            cb_details.result = events.operation.result.clone();
524                            cb_details.error = events.operation.error.clone();
525                        }
526                        self.dirty_operation_ids
527                            .insert(events.operation.operation_id.clone());
528                        return;
529                    }
530                }
531            }
532        }
533    }
534
535    /// Get the event processor.
536    pub fn event_processor(&self) -> &EventProcessor {
537        &self.event_processor
538    }
539
540    /// Get mutable event processor.
541    pub fn event_processor_mut(&mut self) -> &mut EventProcessor {
542        &mut self.event_processor
543    }
544
545    /// Get all history events.
546    pub fn get_history_events(&self) -> Vec<HistoryEvent> {
547        self.event_processor.get_events().to_vec()
548    }
549
550    /// Get all Node.js-compatible history events.
551    ///
552    /// Returns a vector of events in the Node.js SDK compatible format,
553    /// suitable for cross-SDK history comparison.
554    pub fn get_nodejs_history_events(&self) -> Vec<NodeJsHistoryEvent> {
555        self.event_processor.get_nodejs_events().to_vec()
556    }
557
558    /// Get the execution ID.
559    pub fn execution_id(&self) -> &str {
560        &self.execution_id
561    }
562
563    /// Update operation data directly.
564    ///
565    /// This method is used by the orchestrator to update operation state
566    /// (e.g., marking wait operations as SUCCEEDED after time advancement).
567    pub fn update_operation_data(&mut self, operation_id: &str, updated_operation: Operation) {
568        if let Some(existing) = self.operation_data_map.get_mut(operation_id) {
569            // Update the operation
570            existing.operation = updated_operation;
571
572            // Mark as dirty so it's returned in the next get_state call
573            self.dirty_operation_ids.insert(operation_id.to_string());
574
575            // Check if this completes the execution
576            if existing.operation.operation_type == OperationType::Execution
577                && (existing.operation.status == OperationStatus::Succeeded
578                    || existing.operation.status == OperationStatus::Failed)
579            {
580                self.is_execution_completed = true;
581            }
582        } else {
583            // Operation doesn't exist, add it
584            let events = OperationEvents {
585                operation: updated_operation,
586                update: None,
587            };
588            self.operation_data_map
589                .insert(operation_id.to_string(), events);
590            self.operation_order.push(operation_id.to_string());
591            self.dirty_operation_ids.insert(operation_id.to_string());
592        }
593    }
594}
595
596/// Timestamps for an invocation.
597#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
598pub struct InvocationTimestamps {
599    /// When the invocation started
600    pub start_timestamp: DateTime<Utc>,
601    /// When the invocation ended
602    pub end_timestamp: DateTime<Utc>,
603}
604
605#[cfg(test)]
606mod tests {
607    use super::*;
608
609    #[test]
610    fn test_initialize() {
611        let mut manager = CheckpointManager::new("exec-1");
612        let events = manager.initialize(r#"{"input": "test"}"#);
613
614        assert!(!events.operation.operation_id.is_empty());
615        assert_eq!(events.operation.operation_type, OperationType::Execution);
616        assert_eq!(events.operation.status, OperationStatus::Started);
617        assert!(manager.has_dirty_operations());
618    }
619
620    #[test]
621    fn test_start_invocation() {
622        let mut manager = CheckpointManager::new("exec-1");
623        manager.initialize("{}");
624
625        let ops = manager.start_invocation("inv-1");
626        assert_eq!(ops.len(), 1);
627        assert!(!manager.has_dirty_operations()); // Cleared on start
628    }
629
630    #[test]
631    fn test_complete_invocation() {
632        let mut manager = CheckpointManager::new("exec-1");
633        manager.initialize("{}");
634        manager.start_invocation("inv-1");
635
636        let timestamps = manager.complete_invocation("inv-1").unwrap();
637        assert!(timestamps.end_timestamp >= timestamps.start_timestamp);
638    }
639
640    #[test]
641    fn test_complete_unknown_invocation_fails() {
642        let mut manager = CheckpointManager::new("exec-1");
643        let result = manager.complete_invocation("unknown");
644        assert!(matches!(result, Err(TestError::InvocationNotFound(_))));
645    }
646
647    #[test]
648    fn test_process_checkpoint() {
649        let mut manager = CheckpointManager::new("exec-1");
650        manager.initialize("{}");
651        manager.get_dirty_operations(); // Clear dirty
652
653        let update = OperationUpdate::start("op-1", OperationType::Step).with_name("test-step");
654
655        let dirty = manager.process_checkpoint(vec![update]).unwrap();
656        assert_eq!(dirty.len(), 1);
657        assert_eq!(dirty[0].name, Some("test-step".to_string()));
658    }
659
660    #[test]
661    fn test_execution_completed_on_success() {
662        let mut manager = CheckpointManager::new("exec-1");
663        let events = manager.initialize("{}");
664        let exec_id = events.operation.operation_id.clone();
665
666        assert!(!manager.is_execution_completed());
667
668        let update = OperationUpdate {
669            operation_id: exec_id,
670            action: OperationAction::Succeed,
671            operation_type: OperationType::Execution,
672            result: Some("done".to_string()),
673            error: None,
674            parent_id: None,
675            name: None,
676            sub_type: None,
677            wait_options: None,
678            step_options: None,
679            callback_options: None,
680            chained_invoke_options: None,
681            context_options: None,
682        };
683
684        manager.process_checkpoint(vec![update]).unwrap();
685        assert!(manager.is_execution_completed());
686    }
687
688    #[test]
689    fn test_get_state() {
690        let mut manager = CheckpointManager::new("exec-1");
691        manager.initialize("{}");
692
693        let update = OperationUpdate::start("op-1", OperationType::Step);
694        manager.process_checkpoint(vec![update]).unwrap();
695
696        let state = manager.get_state();
697        assert_eq!(state.len(), 2); // Initial execution + step
698    }
699
700    // ============================================================================
701    // Node.js-compatible event generation tests
702    // ============================================================================
703
704    #[test]
705    fn test_initialize_generates_nodejs_execution_started_event() {
706        let mut manager = CheckpointManager::new("exec-1");
707        let events = manager.initialize(r#"{"input": "test"}"#);
708
709        let nodejs_events = manager.get_nodejs_history_events();
710        assert_eq!(nodejs_events.len(), 1);
711        assert_eq!(
712            nodejs_events[0].event_type,
713            NodeJsEventType::ExecutionStarted
714        );
715        assert_eq!(nodejs_events[0].event_id, 1);
716        assert_eq!(
717            nodejs_events[0].id,
718            Some(events.operation.operation_id.clone())
719        );
720    }
721
722    #[test]
723    fn test_process_operation_update_generates_nodejs_events() {
724        let mut manager = CheckpointManager::new("exec-1");
725        manager.initialize("{}");
726
727        // Process a step start
728        let update = OperationUpdate::start("step-1", OperationType::Step).with_name("my-step");
729        manager.process_checkpoint(vec![update]).unwrap();
730
731        let nodejs_events = manager.get_nodejs_history_events();
732        assert_eq!(nodejs_events.len(), 2); // ExecutionStarted + StepStarted
733        assert_eq!(
734            nodejs_events[0].event_type,
735            NodeJsEventType::ExecutionStarted
736        );
737        assert_eq!(nodejs_events[1].event_type, NodeJsEventType::StepStarted);
738        assert_eq!(nodejs_events[1].name, Some("my-step".to_string()));
739    }
740
741    #[test]
742    fn test_complete_invocation_generates_nodejs_event() {
743        let mut manager = CheckpointManager::new("exec-1");
744        manager.initialize("{}");
745        manager.start_invocation("inv-1");
746
747        manager.complete_invocation("inv-1").unwrap();
748
749        let nodejs_events = manager.get_nodejs_history_events();
750        assert_eq!(nodejs_events.len(), 2); // ExecutionStarted + InvocationCompleted
751        assert_eq!(
752            nodejs_events[1].event_type,
753            NodeJsEventType::InvocationCompleted
754        );
755    }
756
757    #[test]
758    fn test_step_lifecycle_generates_correct_nodejs_events() {
759        let mut manager = CheckpointManager::new("exec-1");
760        manager.initialize("{}");
761
762        // Start step
763        let start_update =
764            OperationUpdate::start("step-1", OperationType::Step).with_name("compute");
765        manager.process_checkpoint(vec![start_update]).unwrap();
766
767        // Succeed step
768        let succeed_update =
769            OperationUpdate::succeed("step-1", OperationType::Step, Some("42".to_string()));
770        manager.process_checkpoint(vec![succeed_update]).unwrap();
771
772        let nodejs_events = manager.get_nodejs_history_events();
773        assert_eq!(nodejs_events.len(), 3); // ExecutionStarted + StepStarted + StepSucceeded
774        assert_eq!(
775            nodejs_events[0].event_type,
776            NodeJsEventType::ExecutionStarted
777        );
778        assert_eq!(nodejs_events[1].event_type, NodeJsEventType::StepStarted);
779        assert_eq!(nodejs_events[2].event_type, NodeJsEventType::StepSucceeded);
780    }
781
782    #[test]
783    fn test_execution_lifecycle_generates_correct_nodejs_events() {
784        let mut manager = CheckpointManager::new("exec-1");
785        let init_events = manager.initialize("{}");
786        let exec_id = init_events.operation.operation_id.clone();
787
788        // Succeed execution
789        let succeed_update = OperationUpdate::succeed(
790            &exec_id,
791            OperationType::Execution,
792            Some("result".to_string()),
793        );
794        manager.process_checkpoint(vec![succeed_update]).unwrap();
795
796        let nodejs_events = manager.get_nodejs_history_events();
797        assert_eq!(nodejs_events.len(), 2); // ExecutionStarted + ExecutionSucceeded
798        assert_eq!(
799            nodejs_events[0].event_type,
800            NodeJsEventType::ExecutionStarted
801        );
802        assert_eq!(
803            nodejs_events[1].event_type,
804            NodeJsEventType::ExecutionSucceeded
805        );
806    }
807
808    #[test]
809    fn test_wait_lifecycle_generates_correct_nodejs_events() {
810        let mut manager = CheckpointManager::new("exec-1");
811        manager.initialize("{}");
812
813        // Start wait
814        let start_update = OperationUpdate::start_wait("wait-1", 30);
815        manager.process_checkpoint(vec![start_update]).unwrap();
816
817        // Succeed wait
818        let succeed_update = OperationUpdate::succeed("wait-1", OperationType::Wait, None);
819        manager.process_checkpoint(vec![succeed_update]).unwrap();
820
821        let nodejs_events = manager.get_nodejs_history_events();
822        assert_eq!(nodejs_events.len(), 3); // ExecutionStarted + WaitStarted + WaitSucceeded
823        assert_eq!(nodejs_events[1].event_type, NodeJsEventType::WaitStarted);
824        assert_eq!(nodejs_events[2].event_type, NodeJsEventType::WaitSucceeded);
825    }
826
827    #[test]
828    fn test_retry_operation_generates_step_failed_event() {
829        use durable_execution_sdk::error::ErrorObject;
830        use durable_execution_sdk::StepOptions;
831
832        let mut manager = CheckpointManager::new("exec-1");
833        manager.initialize("{}");
834
835        // Start step
836        let start_update = OperationUpdate::start("step-1", OperationType::Step);
837        manager.process_checkpoint(vec![start_update]).unwrap();
838
839        // Retry step (generates StepFailed event with retry details)
840        let retry_update = OperationUpdate {
841            operation_id: "step-1".to_string(),
842            action: OperationAction::Retry,
843            operation_type: OperationType::Step,
844            result: None,
845            error: Some(ErrorObject::new("RetryError", "Temporary failure")),
846            parent_id: None,
847            name: None,
848            sub_type: None,
849            wait_options: None,
850            step_options: Some(StepOptions {
851                next_attempt_delay_seconds: Some(5),
852            }),
853            callback_options: None,
854            chained_invoke_options: None,
855            context_options: None,
856        };
857        manager.process_checkpoint(vec![retry_update]).unwrap();
858
859        let nodejs_events = manager.get_nodejs_history_events();
860        assert_eq!(nodejs_events.len(), 3); // ExecutionStarted + StepStarted + StepFailed
861        assert_eq!(nodejs_events[2].event_type, NodeJsEventType::StepFailed);
862    }
863
864    #[test]
865    fn test_nodejs_event_ids_are_sequential() {
866        let mut manager = CheckpointManager::new("exec-1");
867        manager.initialize("{}");
868
869        // Add multiple operations
870        let step1 = OperationUpdate::start("step-1", OperationType::Step);
871        let step2 = OperationUpdate::start("step-2", OperationType::Step);
872        manager.process_checkpoint(vec![step1, step2]).unwrap();
873
874        let nodejs_events = manager.get_nodejs_history_events();
875        assert_eq!(nodejs_events.len(), 3);
876        assert_eq!(nodejs_events[0].event_id, 1);
877        assert_eq!(nodejs_events[1].event_id, 2);
878        assert_eq!(nodejs_events[2].event_id, 3);
879    }
880}