1use chrono::{DateTime, SecondsFormat, Utc};
7use serde::{Deserialize, Serialize};
8
9use durable_execution_sdk::operation::{
10 Operation, OperationAction, OperationType, OperationUpdate,
11};
12
13use super::nodejs_event_types::{
14 CallbackStartedDetails, CallbackStartedDetailsWrapper, ContextFailedDetails,
15 ContextFailedDetailsWrapper, ContextStartedDetails, ContextStartedDetailsWrapper,
16 ContextSucceededDetails, ContextSucceededDetailsWrapper, ExecutionFailedDetails,
17 ExecutionFailedDetailsWrapper, ExecutionStartedDetails, ExecutionStartedDetailsWrapper,
18 ExecutionSucceededDetails, ExecutionSucceededDetailsWrapper, NodeJsEventDetails,
19 NodeJsEventType, NodeJsHistoryEvent, PayloadWrapper, RetryDetails, StepFailedDetails,
20 StepFailedDetailsWrapper, StepStartedDetails, StepStartedDetailsWrapper, StepSucceededDetails,
21 StepSucceededDetailsWrapper, WaitStartedDetails, WaitStartedDetailsWrapper,
22 WaitSucceededDetails, WaitSucceededDetailsWrapper,
23};
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27pub enum EventType {
28 OperationStarted,
30 OperationCompleted,
32 InvocationStarted,
34 InvocationCompleted,
36 ExecutionStarted,
38 ExecutionCompleted,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct HistoryEvent {
45 pub id: u64,
47 pub event_type: EventType,
49 pub timestamp: DateTime<Utc>,
51 pub operation_id: Option<String>,
53 pub details_type: String,
55 pub details: serde_json::Value,
57}
58
59#[derive(Debug, Default)]
61pub struct EventProcessor {
62 events: Vec<HistoryEvent>,
64 event_counter: u64,
66 nodejs_events: Vec<NodeJsHistoryEvent>,
68 nodejs_event_counter: u64,
70}
71
72impl EventProcessor {
73 pub fn new() -> Self {
75 Self {
76 events: Vec::new(),
77 event_counter: 0,
78 nodejs_events: Vec::new(),
79 nodejs_event_counter: 0,
80 }
81 }
82
83 pub fn format_timestamp(timestamp: DateTime<Utc>) -> String {
93 timestamp.to_rfc3339_opts(SecondsFormat::Millis, true)
94 }
95
96 pub fn current_timestamp() -> String {
98 Self::format_timestamp(Utc::now())
99 }
100
101 pub fn create_nodejs_event(
116 &mut self,
117 event_type: NodeJsEventType,
118 operation: Option<&Operation>,
119 details: NodeJsEventDetails,
120 ) -> NodeJsHistoryEvent {
121 self.nodejs_event_counter += 1;
122
123 let event = NodeJsHistoryEvent {
124 event_type,
125 event_id: self.nodejs_event_counter,
126 id: operation.map(|op| op.operation_id.clone()),
127 event_timestamp: Self::current_timestamp(),
128 sub_type: operation.and_then(|op| op.sub_type.clone()),
129 name: operation.and_then(|op| op.name.clone()),
130 parent_id: operation.and_then(|op| op.parent_id.clone()),
131 details,
132 };
133
134 self.nodejs_events.push(event.clone());
135 event
136 }
137
138 pub fn process_operation_update(
167 &mut self,
168 update: &OperationUpdate,
169 operation: &Operation,
170 ) -> Vec<NodeJsHistoryEvent> {
171 let mut events = Vec::new();
172
173 match (update.action, update.operation_type) {
174 (OperationAction::Start, OperationType::Execution) => {
176 let details =
177 NodeJsEventDetails::ExecutionStarted(ExecutionStartedDetailsWrapper {
178 execution_started_details: ExecutionStartedDetails {
179 input: PayloadWrapper::new(update.result.clone().unwrap_or_default()),
180 execution_timeout: None,
181 },
182 });
183 events.push(self.create_nodejs_event(
184 NodeJsEventType::ExecutionStarted,
185 Some(operation),
186 details,
187 ));
188 }
189 (OperationAction::Succeed, OperationType::Execution) => {
190 let details =
191 NodeJsEventDetails::ExecutionSucceeded(ExecutionSucceededDetailsWrapper {
192 execution_succeeded_details: ExecutionSucceededDetails {
193 result: PayloadWrapper::new(update.result.clone().unwrap_or_default()),
194 },
195 });
196 events.push(self.create_nodejs_event(
197 NodeJsEventType::ExecutionSucceeded,
198 Some(operation),
199 details,
200 ));
201 }
202 (OperationAction::Fail, OperationType::Execution) => {
203 let error_payload = update
204 .error
205 .as_ref()
206 .map(|e| serde_json::to_string(e).unwrap_or_default())
207 .unwrap_or_default();
208 let details = NodeJsEventDetails::ExecutionFailed(ExecutionFailedDetailsWrapper {
209 execution_failed_details: ExecutionFailedDetails {
210 error: PayloadWrapper::new(error_payload),
211 },
212 });
213 events.push(self.create_nodejs_event(
214 NodeJsEventType::ExecutionFailed,
215 Some(operation),
216 details,
217 ));
218 }
219
220 (OperationAction::Start, OperationType::Step) => {
222 let details = NodeJsEventDetails::StepStarted(StepStartedDetailsWrapper {
223 step_started_details: StepStartedDetails {},
224 });
225 events.push(self.create_nodejs_event(
226 NodeJsEventType::StepStarted,
227 Some(operation),
228 details,
229 ));
230 }
231 (OperationAction::Succeed, OperationType::Step) => {
232 let retry_details = Self::extract_retry_details(update, operation);
233 let details = NodeJsEventDetails::StepSucceeded(StepSucceededDetailsWrapper {
234 step_succeeded_details: StepSucceededDetails {
235 result: PayloadWrapper::new(update.result.clone().unwrap_or_default()),
236 retry_details,
237 },
238 });
239 events.push(self.create_nodejs_event(
240 NodeJsEventType::StepSucceeded,
241 Some(operation),
242 details,
243 ));
244 }
245 (OperationAction::Fail, OperationType::Step) => {
246 let retry_details = Self::extract_retry_details(update, operation);
247 let error_payload = update
248 .error
249 .as_ref()
250 .map(|e| serde_json::to_string(e).unwrap_or_default())
251 .unwrap_or_default();
252 let details = NodeJsEventDetails::StepFailed(StepFailedDetailsWrapper {
253 step_failed_details: StepFailedDetails {
254 error: PayloadWrapper::new(error_payload),
255 retry_details,
256 },
257 });
258 events.push(self.create_nodejs_event(
259 NodeJsEventType::StepFailed,
260 Some(operation),
261 details,
262 ));
263 }
264 (OperationAction::Retry, OperationType::Step) => {
265 let retry_details = Self::extract_retry_details(update, operation);
267 let error_payload = update
268 .error
269 .as_ref()
270 .map(|e| serde_json::to_string(e).unwrap_or_default())
271 .unwrap_or_default();
272 let details = NodeJsEventDetails::StepFailed(StepFailedDetailsWrapper {
273 step_failed_details: StepFailedDetails {
274 error: PayloadWrapper::new(error_payload),
275 retry_details,
276 },
277 });
278 events.push(self.create_nodejs_event(
279 NodeJsEventType::StepFailed,
280 Some(operation),
281 details,
282 ));
283 }
284
285 (OperationAction::Start, OperationType::Wait) => {
287 let (duration, scheduled_end) = Self::extract_wait_details(update);
288 let details = NodeJsEventDetails::WaitStarted(WaitStartedDetailsWrapper {
289 wait_started_details: WaitStartedDetails {
290 duration,
291 scheduled_end_timestamp: scheduled_end,
292 },
293 });
294 events.push(self.create_nodejs_event(
295 NodeJsEventType::WaitStarted,
296 Some(operation),
297 details,
298 ));
299 }
300 (OperationAction::Succeed, OperationType::Wait) => {
301 let details = NodeJsEventDetails::WaitSucceeded(WaitSucceededDetailsWrapper {
302 wait_succeeded_details: WaitSucceededDetails {},
303 });
304 events.push(self.create_nodejs_event(
305 NodeJsEventType::WaitSucceeded,
306 Some(operation),
307 details,
308 ));
309 }
310
311 (OperationAction::Start, OperationType::Callback) => {
313 let (callback_id, timeout, heartbeat_timeout, input) =
314 Self::extract_callback_details(update, operation);
315 let details = NodeJsEventDetails::CallbackStarted(CallbackStartedDetailsWrapper {
316 callback_started_details: CallbackStartedDetails {
317 callback_id,
318 timeout,
319 heartbeat_timeout,
320 input,
321 },
322 });
323 events.push(self.create_nodejs_event(
324 NodeJsEventType::CallbackStarted,
325 Some(operation),
326 details,
327 ));
328 }
329
330 (OperationAction::Start, OperationType::Context) => {
332 let details = NodeJsEventDetails::ContextStarted(ContextStartedDetailsWrapper {
333 context_started_details: ContextStartedDetails {},
334 });
335 events.push(self.create_nodejs_event(
336 NodeJsEventType::ContextStarted,
337 Some(operation),
338 details,
339 ));
340 }
341 (OperationAction::Succeed, OperationType::Context) => {
342 let details =
343 NodeJsEventDetails::ContextSucceeded(ContextSucceededDetailsWrapper {
344 context_succeeded_details: ContextSucceededDetails {
345 result: PayloadWrapper::new(update.result.clone().unwrap_or_default()),
346 },
347 });
348 events.push(self.create_nodejs_event(
349 NodeJsEventType::ContextSucceeded,
350 Some(operation),
351 details,
352 ));
353 }
354 (OperationAction::Fail, OperationType::Context) => {
355 let error_payload = update
356 .error
357 .as_ref()
358 .map(|e| serde_json::to_string(e).unwrap_or_default())
359 .unwrap_or_default();
360 let details = NodeJsEventDetails::ContextFailed(ContextFailedDetailsWrapper {
361 context_failed_details: ContextFailedDetails {
362 error: PayloadWrapper::new(error_payload),
363 },
364 });
365 events.push(self.create_nodejs_event(
366 NodeJsEventType::ContextFailed,
367 Some(operation),
368 details,
369 ));
370 }
371
372 _ => {}
374 }
375
376 events
377 }
378
379 fn extract_retry_details(update: &OperationUpdate, operation: &Operation) -> RetryDetails {
381 let current_attempt = operation
382 .step_details
383 .as_ref()
384 .and_then(|d| d.attempt)
385 .map(|a| a + 1); let next_attempt_delay_seconds = update
388 .step_options
389 .as_ref()
390 .and_then(|o| o.next_attempt_delay_seconds);
391
392 RetryDetails {
393 current_attempt,
394 next_attempt_delay_seconds,
395 }
396 }
397
398 fn extract_wait_details(update: &OperationUpdate) -> (Option<String>, Option<String>) {
400 let wait_seconds = update.wait_options.as_ref().map(|o| o.wait_seconds);
401
402 let duration = wait_seconds.map(|s| format!("PT{}S", s));
403
404 let scheduled_end = wait_seconds.map(|s| {
405 let end_time = Utc::now() + chrono::Duration::seconds(s as i64);
406 Self::format_timestamp(end_time)
407 });
408
409 (duration, scheduled_end)
410 }
411
412 fn extract_callback_details(
414 update: &OperationUpdate,
415 operation: &Operation,
416 ) -> (String, Option<u64>, Option<u64>, Option<PayloadWrapper>) {
417 let callback_id = operation
418 .callback_details
419 .as_ref()
420 .and_then(|d| d.callback_id.clone())
421 .unwrap_or_else(|| operation.operation_id.clone());
422
423 let timeout = update
424 .callback_options
425 .as_ref()
426 .and_then(|o| o.timeout_seconds);
427 let heartbeat_timeout = update
428 .callback_options
429 .as_ref()
430 .and_then(|o| o.heartbeat_timeout_seconds);
431
432 let input = update
433 .result
434 .as_ref()
435 .map(|r| PayloadWrapper::new(r.clone()));
436
437 (callback_id, timeout, heartbeat_timeout, input)
438 }
439
440 pub fn get_nodejs_events(&self) -> &[NodeJsHistoryEvent] {
442 &self.nodejs_events
443 }
444
445 pub fn into_nodejs_events(self) -> Vec<NodeJsHistoryEvent> {
447 self.nodejs_events
448 }
449
450 pub fn nodejs_event_count(&self) -> u64 {
452 self.nodejs_event_counter
453 }
454
455 pub fn clear_nodejs_events(&mut self) {
457 self.nodejs_events.clear();
458 self.nodejs_event_counter = 0;
459 }
460
461 pub fn create_history_event<T: Serialize>(
474 &mut self,
475 event_type: EventType,
476 operation_id: Option<&str>,
477 details_type: &str,
478 details: T,
479 ) -> HistoryEvent {
480 self.event_counter += 1;
481
482 let event = HistoryEvent {
483 id: self.event_counter,
484 event_type,
485 timestamp: Utc::now(),
486 operation_id: operation_id.map(String::from),
487 details_type: details_type.to_string(),
488 details: serde_json::to_value(details).unwrap_or(serde_json::Value::Null),
489 };
490
491 self.events.push(event.clone());
492 event
493 }
494
495 pub fn get_events(&self) -> &[HistoryEvent] {
497 &self.events
498 }
499
500 pub fn into_events(self) -> Vec<HistoryEvent> {
502 self.events
503 }
504
505 pub fn clear(&mut self) {
507 self.events.clear();
508 self.event_counter = 0;
509 self.nodejs_events.clear();
510 self.nodejs_event_counter = 0;
511 }
512
513 pub fn event_count(&self) -> u64 {
515 self.event_counter
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use super::*;
522 use durable_execution_sdk::operation::{CallbackDetails, CallbackOptions, StepDetails};
523
524 #[derive(Serialize)]
525 struct TestDetails {
526 message: String,
527 }
528
529 #[test]
530 fn test_create_history_event() {
531 let mut processor = EventProcessor::new();
532
533 let event = processor.create_history_event(
534 EventType::OperationStarted,
535 Some("op-123"),
536 "TestDetails",
537 TestDetails {
538 message: "test".to_string(),
539 },
540 );
541
542 assert_eq!(event.id, 1);
543 assert_eq!(event.event_type, EventType::OperationStarted);
544 assert_eq!(event.operation_id, Some("op-123".to_string()));
545 assert_eq!(event.details_type, "TestDetails");
546 assert_eq!(event.details["message"], "test");
547 }
548
549 #[test]
550 fn test_event_counter_increments() {
551 let mut processor = EventProcessor::new();
552
553 let event1 = processor.create_history_event(
554 EventType::OperationStarted,
555 None,
556 "Details",
557 serde_json::json!({}),
558 );
559 let event2 = processor.create_history_event(
560 EventType::OperationCompleted,
561 None,
562 "Details",
563 serde_json::json!({}),
564 );
565
566 assert_eq!(event1.id, 1);
567 assert_eq!(event2.id, 2);
568 assert_eq!(processor.event_count(), 2);
569 }
570
571 #[test]
572 fn test_get_events() {
573 let mut processor = EventProcessor::new();
574
575 processor.create_history_event(
576 EventType::OperationStarted,
577 Some("op-1"),
578 "Details",
579 serde_json::json!({}),
580 );
581 processor.create_history_event(
582 EventType::OperationCompleted,
583 Some("op-1"),
584 "Details",
585 serde_json::json!({}),
586 );
587
588 let events = processor.get_events();
589 assert_eq!(events.len(), 2);
590 assert_eq!(events[0].event_type, EventType::OperationStarted);
591 assert_eq!(events[1].event_type, EventType::OperationCompleted);
592 }
593
594 #[test]
595 fn test_clear_events() {
596 let mut processor = EventProcessor::new();
597
598 processor.create_history_event(
599 EventType::OperationStarted,
600 None,
601 "Details",
602 serde_json::json!({}),
603 );
604
605 assert_eq!(processor.event_count(), 1);
606
607 processor.clear();
608
609 assert_eq!(processor.event_count(), 0);
610 assert!(processor.get_events().is_empty());
611 }
612
613 #[test]
614 fn test_event_without_operation_id() {
615 let mut processor = EventProcessor::new();
616
617 let event = processor.create_history_event(
618 EventType::InvocationCompleted,
619 None,
620 "InvocationCompletedDetails",
621 serde_json::json!({"status": "success"}),
622 );
623
624 assert_eq!(event.operation_id, None);
625 assert_eq!(event.event_type, EventType::InvocationCompleted);
626 }
627
628 #[test]
633 fn test_format_timestamp_iso8601() {
634 use chrono::TimeZone;
635 let timestamp = Utc.with_ymd_and_hms(2025, 12, 3, 22, 58, 35).unwrap()
636 + chrono::Duration::milliseconds(94);
637 let formatted = EventProcessor::format_timestamp(timestamp);
638 assert_eq!(formatted, "2025-12-03T22:58:35.094Z");
639 }
640
641 #[test]
642 fn test_current_timestamp_format() {
643 let timestamp = EventProcessor::current_timestamp();
644 let re = regex::Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$").unwrap();
646 assert!(
647 re.is_match(×tamp),
648 "Timestamp '{}' does not match ISO 8601 format",
649 timestamp
650 );
651 }
652
653 #[test]
654 fn test_create_nodejs_event_sequential_ids() {
655 let mut processor = EventProcessor::new();
656
657 let event1 = processor.create_nodejs_event(
658 NodeJsEventType::ExecutionStarted,
659 None,
660 NodeJsEventDetails::default(),
661 );
662 let event2 = processor.create_nodejs_event(
663 NodeJsEventType::StepStarted,
664 None,
665 NodeJsEventDetails::default(),
666 );
667 let event3 = processor.create_nodejs_event(
668 NodeJsEventType::StepSucceeded,
669 None,
670 NodeJsEventDetails::default(),
671 );
672
673 assert_eq!(event1.event_id, 1);
674 assert_eq!(event2.event_id, 2);
675 assert_eq!(event3.event_id, 3);
676 assert_eq!(processor.nodejs_event_count(), 3);
677 }
678
679 #[test]
680 fn test_create_nodejs_event_with_operation() {
681 let mut processor = EventProcessor::new();
682
683 let mut operation = Operation::new("step-123", OperationType::Step);
684 operation.name = Some("my-step".to_string());
685 operation.sub_type = Some("Step".to_string());
686 operation.parent_id = Some("exec-001".to_string());
687
688 let event = processor.create_nodejs_event(
689 NodeJsEventType::StepStarted,
690 Some(&operation),
691 NodeJsEventDetails::default(),
692 );
693
694 assert_eq!(event.event_type, NodeJsEventType::StepStarted);
695 assert_eq!(event.id, Some("step-123".to_string()));
696 assert_eq!(event.name, Some("my-step".to_string()));
697 assert_eq!(event.sub_type, Some("Step".to_string()));
698 assert_eq!(event.parent_id, Some("exec-001".to_string()));
699 }
700
701 #[test]
702 fn test_create_nodejs_event_without_operation() {
703 let mut processor = EventProcessor::new();
704
705 let event = processor.create_nodejs_event(
706 NodeJsEventType::InvocationCompleted,
707 None,
708 NodeJsEventDetails::default(),
709 );
710
711 assert_eq!(event.event_type, NodeJsEventType::InvocationCompleted);
712 assert_eq!(event.id, None);
713 assert_eq!(event.name, None);
714 assert_eq!(event.sub_type, None);
715 assert_eq!(event.parent_id, None);
716 }
717
718 #[test]
719 fn test_process_operation_update_execution_started() {
720 let mut processor = EventProcessor::new();
721
722 let operation = Operation::new("exec-001", OperationType::Execution);
723 let mut update = OperationUpdate::start("exec-001", OperationType::Execution);
724 update.result = Some(r#"{"input": "test"}"#.to_string());
725
726 let events = processor.process_operation_update(&update, &operation);
727
728 assert_eq!(events.len(), 1);
729 assert_eq!(events[0].event_type, NodeJsEventType::ExecutionStarted);
730 assert_eq!(events[0].id, Some("exec-001".to_string()));
731 }
732
733 #[test]
734 fn test_process_operation_update_execution_succeeded() {
735 let mut processor = EventProcessor::new();
736
737 let operation = Operation::new("exec-001", OperationType::Execution);
738 let update = OperationUpdate::succeed(
739 "exec-001",
740 OperationType::Execution,
741 Some("result".to_string()),
742 );
743
744 let events = processor.process_operation_update(&update, &operation);
745
746 assert_eq!(events.len(), 1);
747 assert_eq!(events[0].event_type, NodeJsEventType::ExecutionSucceeded);
748 }
749
750 #[test]
751 fn test_process_operation_update_step_started() {
752 let mut processor = EventProcessor::new();
753
754 let operation = Operation::new("step-001", OperationType::Step);
755 let update = OperationUpdate::start("step-001", OperationType::Step);
756
757 let events = processor.process_operation_update(&update, &operation);
758
759 assert_eq!(events.len(), 1);
760 assert_eq!(events[0].event_type, NodeJsEventType::StepStarted);
761 }
762
763 #[test]
764 fn test_process_operation_update_step_succeeded() {
765 let mut processor = EventProcessor::new();
766
767 let mut operation = Operation::new("step-001", OperationType::Step);
768 operation.step_details = Some(StepDetails {
769 result: None,
770 attempt: Some(0),
771 next_attempt_timestamp: None,
772 error: None,
773 payload: None,
774 });
775
776 let update =
777 OperationUpdate::succeed("step-001", OperationType::Step, Some("42".to_string()));
778
779 let events = processor.process_operation_update(&update, &operation);
780
781 assert_eq!(events.len(), 1);
782 assert_eq!(events[0].event_type, NodeJsEventType::StepSucceeded);
783 }
784
785 #[test]
786 fn test_process_operation_update_wait_started() {
787 let mut processor = EventProcessor::new();
788
789 let operation = Operation::new("wait-001", OperationType::Wait);
790 let update = OperationUpdate::start_wait("wait-001", 30);
791
792 let events = processor.process_operation_update(&update, &operation);
793
794 assert_eq!(events.len(), 1);
795 assert_eq!(events[0].event_type, NodeJsEventType::WaitStarted);
796 }
797
798 #[test]
799 fn test_process_operation_update_wait_succeeded() {
800 let mut processor = EventProcessor::new();
801
802 let operation = Operation::new("wait-001", OperationType::Wait);
803 let update = OperationUpdate::succeed("wait-001", OperationType::Wait, None);
804
805 let events = processor.process_operation_update(&update, &operation);
806
807 assert_eq!(events.len(), 1);
808 assert_eq!(events[0].event_type, NodeJsEventType::WaitSucceeded);
809 }
810
811 #[test]
812 fn test_process_operation_update_callback_started() {
813 let mut processor = EventProcessor::new();
814
815 let mut operation = Operation::new("cb-001", OperationType::Callback);
816 operation.callback_details = Some(CallbackDetails {
817 callback_id: Some("callback-token-123".to_string()),
818 result: None,
819 error: None,
820 });
821
822 let mut update = OperationUpdate::start("cb-001", OperationType::Callback);
823 update.callback_options = Some(CallbackOptions {
824 timeout_seconds: Some(60),
825 heartbeat_timeout_seconds: Some(10),
826 });
827
828 let events = processor.process_operation_update(&update, &operation);
829
830 assert_eq!(events.len(), 1);
831 assert_eq!(events[0].event_type, NodeJsEventType::CallbackStarted);
832 }
833
834 #[test]
835 fn test_process_operation_update_context_started() {
836 let mut processor = EventProcessor::new();
837
838 let operation = Operation::new("ctx-001", OperationType::Context);
839 let update = OperationUpdate::start("ctx-001", OperationType::Context);
840
841 let events = processor.process_operation_update(&update, &operation);
842
843 assert_eq!(events.len(), 1);
844 assert_eq!(events[0].event_type, NodeJsEventType::ContextStarted);
845 }
846
847 #[test]
848 fn test_process_operation_update_context_succeeded() {
849 let mut processor = EventProcessor::new();
850
851 let operation = Operation::new("ctx-001", OperationType::Context);
852 let update = OperationUpdate::succeed(
853 "ctx-001",
854 OperationType::Context,
855 Some("result".to_string()),
856 );
857
858 let events = processor.process_operation_update(&update, &operation);
859
860 assert_eq!(events.len(), 1);
861 assert_eq!(events[0].event_type, NodeJsEventType::ContextSucceeded);
862 }
863
864 #[test]
865 fn test_get_nodejs_events() {
866 let mut processor = EventProcessor::new();
867
868 processor.create_nodejs_event(
869 NodeJsEventType::ExecutionStarted,
870 None,
871 NodeJsEventDetails::default(),
872 );
873 processor.create_nodejs_event(
874 NodeJsEventType::StepStarted,
875 None,
876 NodeJsEventDetails::default(),
877 );
878
879 let events = processor.get_nodejs_events();
880 assert_eq!(events.len(), 2);
881 assert_eq!(events[0].event_type, NodeJsEventType::ExecutionStarted);
882 assert_eq!(events[1].event_type, NodeJsEventType::StepStarted);
883 }
884
885 #[test]
886 fn test_clear_nodejs_events() {
887 let mut processor = EventProcessor::new();
888
889 processor.create_nodejs_event(
890 NodeJsEventType::ExecutionStarted,
891 None,
892 NodeJsEventDetails::default(),
893 );
894
895 assert_eq!(processor.nodejs_event_count(), 1);
896
897 processor.clear_nodejs_events();
898
899 assert_eq!(processor.nodejs_event_count(), 0);
900 assert!(processor.get_nodejs_events().is_empty());
901 }
902
903 #[test]
904 fn test_clear_clears_both_event_types() {
905 let mut processor = EventProcessor::new();
906
907 processor.create_history_event(
909 EventType::OperationStarted,
910 None,
911 "Details",
912 serde_json::json!({}),
913 );
914
915 processor.create_nodejs_event(
917 NodeJsEventType::ExecutionStarted,
918 None,
919 NodeJsEventDetails::default(),
920 );
921
922 assert_eq!(processor.event_count(), 1);
923 assert_eq!(processor.nodejs_event_count(), 1);
924
925 processor.clear();
926
927 assert_eq!(processor.event_count(), 0);
928 assert_eq!(processor.nodejs_event_count(), 0);
929 assert!(processor.get_events().is_empty());
930 assert!(processor.get_nodejs_events().is_empty());
931 }
932
933 #[test]
934 fn test_event_type_mapping_all_combinations() {
935 let mut processor = EventProcessor::new();
936
937 let test_cases = vec![
939 (
940 OperationAction::Start,
941 OperationType::Execution,
942 NodeJsEventType::ExecutionStarted,
943 ),
944 (
945 OperationAction::Succeed,
946 OperationType::Execution,
947 NodeJsEventType::ExecutionSucceeded,
948 ),
949 (
950 OperationAction::Fail,
951 OperationType::Execution,
952 NodeJsEventType::ExecutionFailed,
953 ),
954 (
955 OperationAction::Start,
956 OperationType::Step,
957 NodeJsEventType::StepStarted,
958 ),
959 (
960 OperationAction::Succeed,
961 OperationType::Step,
962 NodeJsEventType::StepSucceeded,
963 ),
964 (
965 OperationAction::Fail,
966 OperationType::Step,
967 NodeJsEventType::StepFailed,
968 ),
969 (
970 OperationAction::Start,
971 OperationType::Wait,
972 NodeJsEventType::WaitStarted,
973 ),
974 (
975 OperationAction::Succeed,
976 OperationType::Wait,
977 NodeJsEventType::WaitSucceeded,
978 ),
979 (
980 OperationAction::Start,
981 OperationType::Callback,
982 NodeJsEventType::CallbackStarted,
983 ),
984 (
985 OperationAction::Start,
986 OperationType::Context,
987 NodeJsEventType::ContextStarted,
988 ),
989 (
990 OperationAction::Succeed,
991 OperationType::Context,
992 NodeJsEventType::ContextSucceeded,
993 ),
994 (
995 OperationAction::Fail,
996 OperationType::Context,
997 NodeJsEventType::ContextFailed,
998 ),
999 ];
1000
1001 for (action, op_type, expected_event_type) in test_cases {
1002 processor.clear();
1003
1004 let operation = Operation::new("test-op", op_type);
1005 let update = match action {
1006 OperationAction::Start => {
1007 if op_type == OperationType::Wait {
1008 OperationUpdate::start_wait("test-op", 10)
1009 } else {
1010 OperationUpdate::start("test-op", op_type)
1011 }
1012 }
1013 OperationAction::Succeed => OperationUpdate::succeed("test-op", op_type, None),
1014 OperationAction::Fail => {
1015 use durable_execution_sdk::error::ErrorObject;
1016 OperationUpdate::fail("test-op", op_type, ErrorObject::new("TestError", "test"))
1017 }
1018 _ => continue,
1019 };
1020
1021 let events = processor.process_operation_update(&update, &operation);
1022
1023 assert_eq!(
1024 events.len(),
1025 1,
1026 "Expected 1 event for {:?} + {:?}",
1027 action,
1028 op_type
1029 );
1030 assert_eq!(
1031 events[0].event_type, expected_event_type,
1032 "Expected {:?} for {:?} + {:?}, got {:?}",
1033 expected_event_type, action, op_type, events[0].event_type
1034 );
1035 }
1036 }
1037}