Skip to main content

durable_execution_sdk_testing/checkpoint_server/
event_processor.rs

1//! Event processor for generating history events.
2//!
3//! This module implements the EventProcessor which generates history events
4//! for execution tracking, matching the Node.js SDK's event processor.
5
6use chrono::{DateTime, SecondsFormat, Utc};
7use serde::{Deserialize, Serialize};
8
9use durable_execution_sdk::operation::{
10    Operation, OperationAction, OperationType, OperationUpdate,
11};
12
13use super::nodejs_event_types::{
14    CallbackStartedDetails, CallbackStartedDetailsWrapper, ContextFailedDetails,
15    ContextFailedDetailsWrapper, ContextStartedDetails, ContextStartedDetailsWrapper,
16    ContextSucceededDetails, ContextSucceededDetailsWrapper, ExecutionFailedDetails,
17    ExecutionFailedDetailsWrapper, ExecutionStartedDetails, ExecutionStartedDetailsWrapper,
18    ExecutionSucceededDetails, ExecutionSucceededDetailsWrapper, NodeJsEventDetails,
19    NodeJsEventType, NodeJsHistoryEvent, PayloadWrapper, RetryDetails, StepFailedDetails,
20    StepFailedDetailsWrapper, StepStartedDetails, StepStartedDetailsWrapper, StepSucceededDetails,
21    StepSucceededDetailsWrapper, WaitStartedDetails, WaitStartedDetailsWrapper,
22    WaitSucceededDetails, WaitSucceededDetailsWrapper,
23};
24
25/// Event types for history tracking.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27pub enum EventType {
28    /// An operation has started
29    OperationStarted,
30    /// An operation has completed
31    OperationCompleted,
32    /// An invocation has started
33    InvocationStarted,
34    /// An invocation has completed
35    InvocationCompleted,
36    /// Execution has started
37    ExecutionStarted,
38    /// Execution has completed
39    ExecutionCompleted,
40}
41
42/// A history event generated during execution.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct HistoryEvent {
45    /// Unique event ID
46    pub id: u64,
47    /// The type of event
48    pub event_type: EventType,
49    /// Timestamp when the event occurred
50    pub timestamp: DateTime<Utc>,
51    /// Optional operation ID this event relates to
52    pub operation_id: Option<String>,
53    /// The details type (e.g., "InvocationCompletedDetails")
54    pub details_type: String,
55    /// The event details as JSON
56    pub details: serde_json::Value,
57}
58
59/// Processes and generates history events for execution tracking.
60#[derive(Debug, Default)]
61pub struct EventProcessor {
62    /// All generated events (legacy format)
63    events: Vec<HistoryEvent>,
64    /// Counter for generating unique event IDs (legacy format)
65    event_counter: u64,
66    /// Node.js-compatible events
67    nodejs_events: Vec<NodeJsHistoryEvent>,
68    /// Counter for generating unique Node.js event IDs (starts from 1)
69    nodejs_event_counter: u64,
70}
71
72impl EventProcessor {
73    /// Create a new event processor.
74    pub fn new() -> Self {
75        Self {
76            events: Vec::new(),
77            event_counter: 0,
78            nodejs_events: Vec::new(),
79            nodejs_event_counter: 0,
80        }
81    }
82
83    /// Format a timestamp as ISO 8601 string with millisecond precision in UTC.
84    ///
85    /// # Arguments
86    ///
87    /// * `timestamp` - The DateTime to format
88    ///
89    /// # Returns
90    ///
91    /// A string in the format "2025-12-03T22:58:35.094Z"
92    pub fn format_timestamp(timestamp: DateTime<Utc>) -> String {
93        timestamp.to_rfc3339_opts(SecondsFormat::Millis, true)
94    }
95
96    /// Get the current timestamp formatted as ISO 8601 string.
97    pub fn current_timestamp() -> String {
98        Self::format_timestamp(Utc::now())
99    }
100
101    /// Create a Node.js-compatible history event.
102    ///
103    /// This method generates events with sequential EventId starting from 1,
104    /// matching the Node.js SDK's event format.
105    ///
106    /// # Arguments
107    ///
108    /// * `event_type` - The type of event (e.g., ExecutionStarted, StepSucceeded)
109    /// * `operation` - Optional operation this event relates to
110    /// * `details` - Event-specific details
111    ///
112    /// # Returns
113    ///
114    /// The created Node.js-compatible history event.
115    pub fn create_nodejs_event(
116        &mut self,
117        event_type: NodeJsEventType,
118        operation: Option<&Operation>,
119        details: NodeJsEventDetails,
120    ) -> NodeJsHistoryEvent {
121        self.nodejs_event_counter += 1;
122
123        let event = NodeJsHistoryEvent {
124            event_type,
125            event_id: self.nodejs_event_counter,
126            id: operation.map(|op| op.operation_id.clone()),
127            event_timestamp: Self::current_timestamp(),
128            sub_type: operation.and_then(|op| op.sub_type.clone()),
129            name: operation.and_then(|op| op.name.clone()),
130            parent_id: operation.and_then(|op| op.parent_id.clone()),
131            details,
132        };
133
134        self.nodejs_events.push(event.clone());
135        event
136    }
137
138    /// Process an operation update and generate appropriate Node.js-compatible events.
139    ///
140    /// This method maps (OperationAction, OperationType) combinations to the appropriate
141    /// NodeJsEventType according to the design specification:
142    ///
143    /// | Action  | Type       | EventType           |
144    /// |---------|------------|---------------------|
145    /// | Start   | Execution  | ExecutionStarted    |
146    /// | Succeed | Execution  | ExecutionSucceeded  |
147    /// | Fail    | Execution  | ExecutionFailed     |
148    /// | Start   | Step       | StepStarted         |
149    /// | Succeed | Step       | StepSucceeded       |
150    /// | Fail    | Step       | StepFailed          |
151    /// | Start   | Wait       | WaitStarted         |
152    /// | Succeed | Wait       | WaitSucceeded       |
153    /// | Start   | Callback   | CallbackStarted     |
154    /// | Start   | Context    | ContextStarted      |
155    /// | Succeed | Context    | ContextSucceeded    |
156    /// | Fail    | Context    | ContextFailed       |
157    ///
158    /// # Arguments
159    ///
160    /// * `update` - The operation update containing action and type
161    /// * `operation` - The operation being updated
162    ///
163    /// # Returns
164    ///
165    /// A vector of generated Node.js-compatible history events.
166    pub fn process_operation_update(
167        &mut self,
168        update: &OperationUpdate,
169        operation: &Operation,
170    ) -> Vec<NodeJsHistoryEvent> {
171        let mut events = Vec::new();
172
173        match (update.action, update.operation_type) {
174            // Execution events
175            (OperationAction::Start, OperationType::Execution) => {
176                let details =
177                    NodeJsEventDetails::ExecutionStarted(ExecutionStartedDetailsWrapper {
178                        execution_started_details: ExecutionStartedDetails {
179                            input: PayloadWrapper::new(update.result.clone().unwrap_or_default()),
180                            execution_timeout: None,
181                        },
182                    });
183                events.push(self.create_nodejs_event(
184                    NodeJsEventType::ExecutionStarted,
185                    Some(operation),
186                    details,
187                ));
188            }
189            (OperationAction::Succeed, OperationType::Execution) => {
190                let details =
191                    NodeJsEventDetails::ExecutionSucceeded(ExecutionSucceededDetailsWrapper {
192                        execution_succeeded_details: ExecutionSucceededDetails {
193                            result: PayloadWrapper::new(update.result.clone().unwrap_or_default()),
194                        },
195                    });
196                events.push(self.create_nodejs_event(
197                    NodeJsEventType::ExecutionSucceeded,
198                    Some(operation),
199                    details,
200                ));
201            }
202            (OperationAction::Fail, OperationType::Execution) => {
203                let error_payload = update
204                    .error
205                    .as_ref()
206                    .map(|e| serde_json::to_string(e).unwrap_or_default())
207                    .unwrap_or_default();
208                let details = NodeJsEventDetails::ExecutionFailed(ExecutionFailedDetailsWrapper {
209                    execution_failed_details: ExecutionFailedDetails {
210                        error: PayloadWrapper::new(error_payload),
211                    },
212                });
213                events.push(self.create_nodejs_event(
214                    NodeJsEventType::ExecutionFailed,
215                    Some(operation),
216                    details,
217                ));
218            }
219
220            // Step events
221            (OperationAction::Start, OperationType::Step) => {
222                let details = NodeJsEventDetails::StepStarted(StepStartedDetailsWrapper {
223                    step_started_details: StepStartedDetails {},
224                });
225                events.push(self.create_nodejs_event(
226                    NodeJsEventType::StepStarted,
227                    Some(operation),
228                    details,
229                ));
230            }
231            (OperationAction::Succeed, OperationType::Step) => {
232                let retry_details = Self::extract_retry_details(update, operation);
233                let details = NodeJsEventDetails::StepSucceeded(StepSucceededDetailsWrapper {
234                    step_succeeded_details: StepSucceededDetails {
235                        result: PayloadWrapper::new(update.result.clone().unwrap_or_default()),
236                        retry_details,
237                    },
238                });
239                events.push(self.create_nodejs_event(
240                    NodeJsEventType::StepSucceeded,
241                    Some(operation),
242                    details,
243                ));
244            }
245            (OperationAction::Fail, OperationType::Step) => {
246                let retry_details = Self::extract_retry_details(update, operation);
247                let error_payload = update
248                    .error
249                    .as_ref()
250                    .map(|e| serde_json::to_string(e).unwrap_or_default())
251                    .unwrap_or_default();
252                let details = NodeJsEventDetails::StepFailed(StepFailedDetailsWrapper {
253                    step_failed_details: StepFailedDetails {
254                        error: PayloadWrapper::new(error_payload),
255                        retry_details,
256                    },
257                });
258                events.push(self.create_nodejs_event(
259                    NodeJsEventType::StepFailed,
260                    Some(operation),
261                    details,
262                ));
263            }
264            (OperationAction::Retry, OperationType::Step) => {
265                // Retry generates a StepFailed event with retry details
266                let retry_details = Self::extract_retry_details(update, operation);
267                let error_payload = update
268                    .error
269                    .as_ref()
270                    .map(|e| serde_json::to_string(e).unwrap_or_default())
271                    .unwrap_or_default();
272                let details = NodeJsEventDetails::StepFailed(StepFailedDetailsWrapper {
273                    step_failed_details: StepFailedDetails {
274                        error: PayloadWrapper::new(error_payload),
275                        retry_details,
276                    },
277                });
278                events.push(self.create_nodejs_event(
279                    NodeJsEventType::StepFailed,
280                    Some(operation),
281                    details,
282                ));
283            }
284
285            // Wait events
286            (OperationAction::Start, OperationType::Wait) => {
287                let (duration, scheduled_end) = Self::extract_wait_details(update);
288                let details = NodeJsEventDetails::WaitStarted(WaitStartedDetailsWrapper {
289                    wait_started_details: WaitStartedDetails {
290                        duration,
291                        scheduled_end_timestamp: scheduled_end,
292                    },
293                });
294                events.push(self.create_nodejs_event(
295                    NodeJsEventType::WaitStarted,
296                    Some(operation),
297                    details,
298                ));
299            }
300            (OperationAction::Succeed, OperationType::Wait) => {
301                let details = NodeJsEventDetails::WaitSucceeded(WaitSucceededDetailsWrapper {
302                    wait_succeeded_details: WaitSucceededDetails {},
303                });
304                events.push(self.create_nodejs_event(
305                    NodeJsEventType::WaitSucceeded,
306                    Some(operation),
307                    details,
308                ));
309            }
310
311            // Callback events
312            (OperationAction::Start, OperationType::Callback) => {
313                let (callback_id, timeout, heartbeat_timeout, input) =
314                    Self::extract_callback_details(update, operation);
315                let details = NodeJsEventDetails::CallbackStarted(CallbackStartedDetailsWrapper {
316                    callback_started_details: CallbackStartedDetails {
317                        callback_id,
318                        timeout,
319                        heartbeat_timeout,
320                        input,
321                    },
322                });
323                events.push(self.create_nodejs_event(
324                    NodeJsEventType::CallbackStarted,
325                    Some(operation),
326                    details,
327                ));
328            }
329
330            // Context events
331            (OperationAction::Start, OperationType::Context) => {
332                let details = NodeJsEventDetails::ContextStarted(ContextStartedDetailsWrapper {
333                    context_started_details: ContextStartedDetails {},
334                });
335                events.push(self.create_nodejs_event(
336                    NodeJsEventType::ContextStarted,
337                    Some(operation),
338                    details,
339                ));
340            }
341            (OperationAction::Succeed, OperationType::Context) => {
342                let details =
343                    NodeJsEventDetails::ContextSucceeded(ContextSucceededDetailsWrapper {
344                        context_succeeded_details: ContextSucceededDetails {
345                            result: PayloadWrapper::new(update.result.clone().unwrap_or_default()),
346                        },
347                    });
348                events.push(self.create_nodejs_event(
349                    NodeJsEventType::ContextSucceeded,
350                    Some(operation),
351                    details,
352                ));
353            }
354            (OperationAction::Fail, OperationType::Context) => {
355                let error_payload = update
356                    .error
357                    .as_ref()
358                    .map(|e| serde_json::to_string(e).unwrap_or_default())
359                    .unwrap_or_default();
360                let details = NodeJsEventDetails::ContextFailed(ContextFailedDetailsWrapper {
361                    context_failed_details: ContextFailedDetails {
362                        error: PayloadWrapper::new(error_payload),
363                    },
364                });
365                events.push(self.create_nodejs_event(
366                    NodeJsEventType::ContextFailed,
367                    Some(operation),
368                    details,
369                ));
370            }
371
372            // Other combinations are not mapped (e.g., Cancel actions, Invoke type)
373            _ => {}
374        }
375
376        events
377    }
378
379    /// Extract retry details from an operation update.
380    fn extract_retry_details(update: &OperationUpdate, operation: &Operation) -> RetryDetails {
381        let current_attempt = operation
382            .step_details
383            .as_ref()
384            .and_then(|d| d.attempt)
385            .map(|a| a + 1); // Convert 0-indexed to 1-indexed
386
387        let next_attempt_delay_seconds = update
388            .step_options
389            .as_ref()
390            .and_then(|o| o.next_attempt_delay_seconds);
391
392        RetryDetails {
393            current_attempt,
394            next_attempt_delay_seconds,
395        }
396    }
397
398    /// Extract wait details from an operation update.
399    fn extract_wait_details(update: &OperationUpdate) -> (Option<String>, Option<String>) {
400        let wait_seconds = update.wait_options.as_ref().map(|o| o.wait_seconds);
401
402        let duration = wait_seconds.map(|s| format!("PT{}S", s));
403
404        let scheduled_end = wait_seconds.map(|s| {
405            let end_time = Utc::now() + chrono::Duration::seconds(s as i64);
406            Self::format_timestamp(end_time)
407        });
408
409        (duration, scheduled_end)
410    }
411
412    /// Extract callback details from an operation update.
413    fn extract_callback_details(
414        update: &OperationUpdate,
415        operation: &Operation,
416    ) -> (String, Option<u64>, Option<u64>, Option<PayloadWrapper>) {
417        let callback_id = operation
418            .callback_details
419            .as_ref()
420            .and_then(|d| d.callback_id.clone())
421            .unwrap_or_else(|| operation.operation_id.clone());
422
423        let timeout = update
424            .callback_options
425            .as_ref()
426            .and_then(|o| o.timeout_seconds);
427        let heartbeat_timeout = update
428            .callback_options
429            .as_ref()
430            .and_then(|o| o.heartbeat_timeout_seconds);
431
432        let input = update
433            .result
434            .as_ref()
435            .map(|r| PayloadWrapper::new(r.clone()));
436
437        (callback_id, timeout, heartbeat_timeout, input)
438    }
439
440    /// Get all Node.js-compatible events.
441    pub fn get_nodejs_events(&self) -> &[NodeJsHistoryEvent] {
442        &self.nodejs_events
443    }
444
445    /// Get Node.js-compatible events as owned vector.
446    pub fn into_nodejs_events(self) -> Vec<NodeJsHistoryEvent> {
447        self.nodejs_events
448    }
449
450    /// Get the current Node.js event count.
451    pub fn nodejs_event_count(&self) -> u64 {
452        self.nodejs_event_counter
453    }
454
455    /// Clear all Node.js-compatible events.
456    pub fn clear_nodejs_events(&mut self) {
457        self.nodejs_events.clear();
458        self.nodejs_event_counter = 0;
459    }
460
461    /// Create a history event.
462    ///
463    /// # Arguments
464    ///
465    /// * `event_type` - The type of event
466    /// * `operation_id` - Optional operation ID this event relates to
467    /// * `details_type` - The details type name
468    /// * `details` - The event details (will be serialized to JSON)
469    ///
470    /// # Returns
471    ///
472    /// The created history event.
473    pub fn create_history_event<T: Serialize>(
474        &mut self,
475        event_type: EventType,
476        operation_id: Option<&str>,
477        details_type: &str,
478        details: T,
479    ) -> HistoryEvent {
480        self.event_counter += 1;
481
482        let event = HistoryEvent {
483            id: self.event_counter,
484            event_type,
485            timestamp: Utc::now(),
486            operation_id: operation_id.map(String::from),
487            details_type: details_type.to_string(),
488            details: serde_json::to_value(details).unwrap_or(serde_json::Value::Null),
489        };
490
491        self.events.push(event.clone());
492        event
493    }
494
495    /// Get all events.
496    pub fn get_events(&self) -> &[HistoryEvent] {
497        &self.events
498    }
499
500    /// Get events as owned vector.
501    pub fn into_events(self) -> Vec<HistoryEvent> {
502        self.events
503    }
504
505    /// Clear all events.
506    pub fn clear(&mut self) {
507        self.events.clear();
508        self.event_counter = 0;
509        self.nodejs_events.clear();
510        self.nodejs_event_counter = 0;
511    }
512
513    /// Get the current event count.
514    pub fn event_count(&self) -> u64 {
515        self.event_counter
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use super::*;
522    use durable_execution_sdk::operation::{CallbackDetails, CallbackOptions, StepDetails};
523
524    #[derive(Serialize)]
525    struct TestDetails {
526        message: String,
527    }
528
529    #[test]
530    fn test_create_history_event() {
531        let mut processor = EventProcessor::new();
532
533        let event = processor.create_history_event(
534            EventType::OperationStarted,
535            Some("op-123"),
536            "TestDetails",
537            TestDetails {
538                message: "test".to_string(),
539            },
540        );
541
542        assert_eq!(event.id, 1);
543        assert_eq!(event.event_type, EventType::OperationStarted);
544        assert_eq!(event.operation_id, Some("op-123".to_string()));
545        assert_eq!(event.details_type, "TestDetails");
546        assert_eq!(event.details["message"], "test");
547    }
548
549    #[test]
550    fn test_event_counter_increments() {
551        let mut processor = EventProcessor::new();
552
553        let event1 = processor.create_history_event(
554            EventType::OperationStarted,
555            None,
556            "Details",
557            serde_json::json!({}),
558        );
559        let event2 = processor.create_history_event(
560            EventType::OperationCompleted,
561            None,
562            "Details",
563            serde_json::json!({}),
564        );
565
566        assert_eq!(event1.id, 1);
567        assert_eq!(event2.id, 2);
568        assert_eq!(processor.event_count(), 2);
569    }
570
571    #[test]
572    fn test_get_events() {
573        let mut processor = EventProcessor::new();
574
575        processor.create_history_event(
576            EventType::OperationStarted,
577            Some("op-1"),
578            "Details",
579            serde_json::json!({}),
580        );
581        processor.create_history_event(
582            EventType::OperationCompleted,
583            Some("op-1"),
584            "Details",
585            serde_json::json!({}),
586        );
587
588        let events = processor.get_events();
589        assert_eq!(events.len(), 2);
590        assert_eq!(events[0].event_type, EventType::OperationStarted);
591        assert_eq!(events[1].event_type, EventType::OperationCompleted);
592    }
593
594    #[test]
595    fn test_clear_events() {
596        let mut processor = EventProcessor::new();
597
598        processor.create_history_event(
599            EventType::OperationStarted,
600            None,
601            "Details",
602            serde_json::json!({}),
603        );
604
605        assert_eq!(processor.event_count(), 1);
606
607        processor.clear();
608
609        assert_eq!(processor.event_count(), 0);
610        assert!(processor.get_events().is_empty());
611    }
612
613    #[test]
614    fn test_event_without_operation_id() {
615        let mut processor = EventProcessor::new();
616
617        let event = processor.create_history_event(
618            EventType::InvocationCompleted,
619            None,
620            "InvocationCompletedDetails",
621            serde_json::json!({"status": "success"}),
622        );
623
624        assert_eq!(event.operation_id, None);
625        assert_eq!(event.event_type, EventType::InvocationCompleted);
626    }
627
628    // ============================================================================
629    // Node.js-compatible event tests
630    // ============================================================================
631
632    #[test]
633    fn test_format_timestamp_iso8601() {
634        use chrono::TimeZone;
635        let timestamp = Utc.with_ymd_and_hms(2025, 12, 3, 22, 58, 35).unwrap()
636            + chrono::Duration::milliseconds(94);
637        let formatted = EventProcessor::format_timestamp(timestamp);
638        assert_eq!(formatted, "2025-12-03T22:58:35.094Z");
639    }
640
641    #[test]
642    fn test_current_timestamp_format() {
643        let timestamp = EventProcessor::current_timestamp();
644        // Verify ISO 8601 format with milliseconds and Z suffix
645        let re = regex::Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$").unwrap();
646        assert!(
647            re.is_match(&timestamp),
648            "Timestamp '{}' does not match ISO 8601 format",
649            timestamp
650        );
651    }
652
653    #[test]
654    fn test_create_nodejs_event_sequential_ids() {
655        let mut processor = EventProcessor::new();
656
657        let event1 = processor.create_nodejs_event(
658            NodeJsEventType::ExecutionStarted,
659            None,
660            NodeJsEventDetails::default(),
661        );
662        let event2 = processor.create_nodejs_event(
663            NodeJsEventType::StepStarted,
664            None,
665            NodeJsEventDetails::default(),
666        );
667        let event3 = processor.create_nodejs_event(
668            NodeJsEventType::StepSucceeded,
669            None,
670            NodeJsEventDetails::default(),
671        );
672
673        assert_eq!(event1.event_id, 1);
674        assert_eq!(event2.event_id, 2);
675        assert_eq!(event3.event_id, 3);
676        assert_eq!(processor.nodejs_event_count(), 3);
677    }
678
679    #[test]
680    fn test_create_nodejs_event_with_operation() {
681        let mut processor = EventProcessor::new();
682
683        let mut operation = Operation::new("step-123", OperationType::Step);
684        operation.name = Some("my-step".to_string());
685        operation.sub_type = Some("Step".to_string());
686        operation.parent_id = Some("exec-001".to_string());
687
688        let event = processor.create_nodejs_event(
689            NodeJsEventType::StepStarted,
690            Some(&operation),
691            NodeJsEventDetails::default(),
692        );
693
694        assert_eq!(event.event_type, NodeJsEventType::StepStarted);
695        assert_eq!(event.id, Some("step-123".to_string()));
696        assert_eq!(event.name, Some("my-step".to_string()));
697        assert_eq!(event.sub_type, Some("Step".to_string()));
698        assert_eq!(event.parent_id, Some("exec-001".to_string()));
699    }
700
701    #[test]
702    fn test_create_nodejs_event_without_operation() {
703        let mut processor = EventProcessor::new();
704
705        let event = processor.create_nodejs_event(
706            NodeJsEventType::InvocationCompleted,
707            None,
708            NodeJsEventDetails::default(),
709        );
710
711        assert_eq!(event.event_type, NodeJsEventType::InvocationCompleted);
712        assert_eq!(event.id, None);
713        assert_eq!(event.name, None);
714        assert_eq!(event.sub_type, None);
715        assert_eq!(event.parent_id, None);
716    }
717
718    #[test]
719    fn test_process_operation_update_execution_started() {
720        let mut processor = EventProcessor::new();
721
722        let operation = Operation::new("exec-001", OperationType::Execution);
723        let mut update = OperationUpdate::start("exec-001", OperationType::Execution);
724        update.result = Some(r#"{"input": "test"}"#.to_string());
725
726        let events = processor.process_operation_update(&update, &operation);
727
728        assert_eq!(events.len(), 1);
729        assert_eq!(events[0].event_type, NodeJsEventType::ExecutionStarted);
730        assert_eq!(events[0].id, Some("exec-001".to_string()));
731    }
732
733    #[test]
734    fn test_process_operation_update_execution_succeeded() {
735        let mut processor = EventProcessor::new();
736
737        let operation = Operation::new("exec-001", OperationType::Execution);
738        let update = OperationUpdate::succeed(
739            "exec-001",
740            OperationType::Execution,
741            Some("result".to_string()),
742        );
743
744        let events = processor.process_operation_update(&update, &operation);
745
746        assert_eq!(events.len(), 1);
747        assert_eq!(events[0].event_type, NodeJsEventType::ExecutionSucceeded);
748    }
749
750    #[test]
751    fn test_process_operation_update_step_started() {
752        let mut processor = EventProcessor::new();
753
754        let operation = Operation::new("step-001", OperationType::Step);
755        let update = OperationUpdate::start("step-001", OperationType::Step);
756
757        let events = processor.process_operation_update(&update, &operation);
758
759        assert_eq!(events.len(), 1);
760        assert_eq!(events[0].event_type, NodeJsEventType::StepStarted);
761    }
762
763    #[test]
764    fn test_process_operation_update_step_succeeded() {
765        let mut processor = EventProcessor::new();
766
767        let mut operation = Operation::new("step-001", OperationType::Step);
768        operation.step_details = Some(StepDetails {
769            result: None,
770            attempt: Some(0),
771            next_attempt_timestamp: None,
772            error: None,
773            payload: None,
774        });
775
776        let update =
777            OperationUpdate::succeed("step-001", OperationType::Step, Some("42".to_string()));
778
779        let events = processor.process_operation_update(&update, &operation);
780
781        assert_eq!(events.len(), 1);
782        assert_eq!(events[0].event_type, NodeJsEventType::StepSucceeded);
783    }
784
785    #[test]
786    fn test_process_operation_update_wait_started() {
787        let mut processor = EventProcessor::new();
788
789        let operation = Operation::new("wait-001", OperationType::Wait);
790        let update = OperationUpdate::start_wait("wait-001", 30);
791
792        let events = processor.process_operation_update(&update, &operation);
793
794        assert_eq!(events.len(), 1);
795        assert_eq!(events[0].event_type, NodeJsEventType::WaitStarted);
796    }
797
798    #[test]
799    fn test_process_operation_update_wait_succeeded() {
800        let mut processor = EventProcessor::new();
801
802        let operation = Operation::new("wait-001", OperationType::Wait);
803        let update = OperationUpdate::succeed("wait-001", OperationType::Wait, None);
804
805        let events = processor.process_operation_update(&update, &operation);
806
807        assert_eq!(events.len(), 1);
808        assert_eq!(events[0].event_type, NodeJsEventType::WaitSucceeded);
809    }
810
811    #[test]
812    fn test_process_operation_update_callback_started() {
813        let mut processor = EventProcessor::new();
814
815        let mut operation = Operation::new("cb-001", OperationType::Callback);
816        operation.callback_details = Some(CallbackDetails {
817            callback_id: Some("callback-token-123".to_string()),
818            result: None,
819            error: None,
820        });
821
822        let mut update = OperationUpdate::start("cb-001", OperationType::Callback);
823        update.callback_options = Some(CallbackOptions {
824            timeout_seconds: Some(60),
825            heartbeat_timeout_seconds: Some(10),
826        });
827
828        let events = processor.process_operation_update(&update, &operation);
829
830        assert_eq!(events.len(), 1);
831        assert_eq!(events[0].event_type, NodeJsEventType::CallbackStarted);
832    }
833
834    #[test]
835    fn test_process_operation_update_context_started() {
836        let mut processor = EventProcessor::new();
837
838        let operation = Operation::new("ctx-001", OperationType::Context);
839        let update = OperationUpdate::start("ctx-001", OperationType::Context);
840
841        let events = processor.process_operation_update(&update, &operation);
842
843        assert_eq!(events.len(), 1);
844        assert_eq!(events[0].event_type, NodeJsEventType::ContextStarted);
845    }
846
847    #[test]
848    fn test_process_operation_update_context_succeeded() {
849        let mut processor = EventProcessor::new();
850
851        let operation = Operation::new("ctx-001", OperationType::Context);
852        let update = OperationUpdate::succeed(
853            "ctx-001",
854            OperationType::Context,
855            Some("result".to_string()),
856        );
857
858        let events = processor.process_operation_update(&update, &operation);
859
860        assert_eq!(events.len(), 1);
861        assert_eq!(events[0].event_type, NodeJsEventType::ContextSucceeded);
862    }
863
864    #[test]
865    fn test_get_nodejs_events() {
866        let mut processor = EventProcessor::new();
867
868        processor.create_nodejs_event(
869            NodeJsEventType::ExecutionStarted,
870            None,
871            NodeJsEventDetails::default(),
872        );
873        processor.create_nodejs_event(
874            NodeJsEventType::StepStarted,
875            None,
876            NodeJsEventDetails::default(),
877        );
878
879        let events = processor.get_nodejs_events();
880        assert_eq!(events.len(), 2);
881        assert_eq!(events[0].event_type, NodeJsEventType::ExecutionStarted);
882        assert_eq!(events[1].event_type, NodeJsEventType::StepStarted);
883    }
884
885    #[test]
886    fn test_clear_nodejs_events() {
887        let mut processor = EventProcessor::new();
888
889        processor.create_nodejs_event(
890            NodeJsEventType::ExecutionStarted,
891            None,
892            NodeJsEventDetails::default(),
893        );
894
895        assert_eq!(processor.nodejs_event_count(), 1);
896
897        processor.clear_nodejs_events();
898
899        assert_eq!(processor.nodejs_event_count(), 0);
900        assert!(processor.get_nodejs_events().is_empty());
901    }
902
903    #[test]
904    fn test_clear_clears_both_event_types() {
905        let mut processor = EventProcessor::new();
906
907        // Add legacy event
908        processor.create_history_event(
909            EventType::OperationStarted,
910            None,
911            "Details",
912            serde_json::json!({}),
913        );
914
915        // Add Node.js event
916        processor.create_nodejs_event(
917            NodeJsEventType::ExecutionStarted,
918            None,
919            NodeJsEventDetails::default(),
920        );
921
922        assert_eq!(processor.event_count(), 1);
923        assert_eq!(processor.nodejs_event_count(), 1);
924
925        processor.clear();
926
927        assert_eq!(processor.event_count(), 0);
928        assert_eq!(processor.nodejs_event_count(), 0);
929        assert!(processor.get_events().is_empty());
930        assert!(processor.get_nodejs_events().is_empty());
931    }
932
933    #[test]
934    fn test_event_type_mapping_all_combinations() {
935        let mut processor = EventProcessor::new();
936
937        // Test all documented (Action, Type) -> EventType mappings
938        let test_cases = vec![
939            (
940                OperationAction::Start,
941                OperationType::Execution,
942                NodeJsEventType::ExecutionStarted,
943            ),
944            (
945                OperationAction::Succeed,
946                OperationType::Execution,
947                NodeJsEventType::ExecutionSucceeded,
948            ),
949            (
950                OperationAction::Fail,
951                OperationType::Execution,
952                NodeJsEventType::ExecutionFailed,
953            ),
954            (
955                OperationAction::Start,
956                OperationType::Step,
957                NodeJsEventType::StepStarted,
958            ),
959            (
960                OperationAction::Succeed,
961                OperationType::Step,
962                NodeJsEventType::StepSucceeded,
963            ),
964            (
965                OperationAction::Fail,
966                OperationType::Step,
967                NodeJsEventType::StepFailed,
968            ),
969            (
970                OperationAction::Start,
971                OperationType::Wait,
972                NodeJsEventType::WaitStarted,
973            ),
974            (
975                OperationAction::Succeed,
976                OperationType::Wait,
977                NodeJsEventType::WaitSucceeded,
978            ),
979            (
980                OperationAction::Start,
981                OperationType::Callback,
982                NodeJsEventType::CallbackStarted,
983            ),
984            (
985                OperationAction::Start,
986                OperationType::Context,
987                NodeJsEventType::ContextStarted,
988            ),
989            (
990                OperationAction::Succeed,
991                OperationType::Context,
992                NodeJsEventType::ContextSucceeded,
993            ),
994            (
995                OperationAction::Fail,
996                OperationType::Context,
997                NodeJsEventType::ContextFailed,
998            ),
999        ];
1000
1001        for (action, op_type, expected_event_type) in test_cases {
1002            processor.clear();
1003
1004            let operation = Operation::new("test-op", op_type);
1005            let update = match action {
1006                OperationAction::Start => {
1007                    if op_type == OperationType::Wait {
1008                        OperationUpdate::start_wait("test-op", 10)
1009                    } else {
1010                        OperationUpdate::start("test-op", op_type)
1011                    }
1012                }
1013                OperationAction::Succeed => OperationUpdate::succeed("test-op", op_type, None),
1014                OperationAction::Fail => {
1015                    use durable_execution_sdk::error::ErrorObject;
1016                    OperationUpdate::fail("test-op", op_type, ErrorObject::new("TestError", "test"))
1017                }
1018                _ => continue,
1019            };
1020
1021            let events = processor.process_operation_update(&update, &operation);
1022
1023            assert_eq!(
1024                events.len(),
1025                1,
1026                "Expected 1 event for {:?} + {:?}",
1027                action,
1028                op_type
1029            );
1030            assert_eq!(
1031                events[0].event_type, expected_event_type,
1032                "Expected {:?} for {:?} + {:?}, got {:?}",
1033                expected_event_type, action, op_type, events[0].event_type
1034            );
1035        }
1036    }
1037}