1use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, RwLock};
8
9use blake2::{Blake2b512, Digest};
10
11use crate::error::DurableResult;
12use crate::sealed::Sealed;
13use crate::state::ExecutionState;
14use crate::traits::{DurableValue, StepFn};
15use crate::types::OperationId;
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct OperationIdentifier {
24 pub operation_id: String,
26 pub parent_id: Option<String>,
28 pub name: Option<String>,
30}
31
32impl OperationIdentifier {
33 pub fn new(
35 operation_id: impl Into<String>,
36 parent_id: Option<String>,
37 name: Option<String>,
38 ) -> Self {
39 Self {
40 operation_id: operation_id.into(),
41 parent_id,
42 name,
43 }
44 }
45
46 pub fn root(operation_id: impl Into<String>) -> Self {
48 Self {
49 operation_id: operation_id.into(),
50 parent_id: None,
51 name: None,
52 }
53 }
54
55 pub fn with_parent(operation_id: impl Into<String>, parent_id: impl Into<String>) -> Self {
57 Self {
58 operation_id: operation_id.into(),
59 parent_id: Some(parent_id.into()),
60 name: None,
61 }
62 }
63
64 pub fn with_name(mut self, name: impl Into<String>) -> Self {
66 self.name = Some(name.into());
67 self
68 }
69
70 #[inline]
72 pub fn operation_id_typed(&self) -> OperationId {
73 OperationId::from(self.operation_id.clone())
74 }
75
76 #[inline]
78 pub fn parent_id_typed(&self) -> Option<OperationId> {
79 self.parent_id
80 .as_ref()
81 .map(|id| OperationId::from(id.clone()))
82 }
83
84 pub fn apply_to(
89 &self,
90 mut update: crate::operation::OperationUpdate,
91 ) -> crate::operation::OperationUpdate {
92 if let Some(ref parent_id) = self.parent_id {
93 update = update.with_parent_id(parent_id);
94 }
95 if let Some(ref name) = self.name {
96 update = update.with_name(name);
97 }
98 update
99 }
100}
101
102impl std::fmt::Display for OperationIdentifier {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 if let Some(ref name) = self.name {
105 write!(f, "{}({})", name, self.operation_id)
106 } else {
107 write!(f, "{}", self.operation_id)
108 }
109 }
110}
111
112#[derive(Debug)]
120pub struct OperationIdGenerator {
121 base_id: String,
123 step_counter: AtomicU64,
125}
126
127impl OperationIdGenerator {
128 pub fn new(base_id: impl Into<String>) -> Self {
134 Self {
135 base_id: base_id.into(),
136 step_counter: AtomicU64::new(0),
137 }
138 }
139
140 pub fn with_counter(base_id: impl Into<String>, initial_counter: u64) -> Self {
147 Self {
148 base_id: base_id.into(),
149 step_counter: AtomicU64::new(initial_counter),
150 }
151 }
152
153 pub fn next_id(&self) -> String {
171 let counter = self.step_counter.fetch_add(1, Ordering::Relaxed);
175 generate_operation_id(&self.base_id, counter)
176 }
177
178 pub fn id_for_counter(&self, counter: u64) -> String {
191 generate_operation_id(&self.base_id, counter)
192 }
193
194 pub fn current_counter(&self) -> u64 {
204 self.step_counter.load(Ordering::Relaxed)
208 }
209
210 pub fn base_id(&self) -> &str {
212 &self.base_id
213 }
214
215 pub fn create_child(&self, parent_operation_id: impl Into<String>) -> Self {
224 Self::new(parent_operation_id)
225 }
226}
227
228impl Clone for OperationIdGenerator {
229 fn clone(&self) -> Self {
230 Self {
231 base_id: self.base_id.clone(),
232 step_counter: AtomicU64::new(self.step_counter.load(Ordering::Relaxed)),
236 }
237 }
238}
239
240pub fn generate_operation_id(base_id: &str, counter: u64) -> String {
251 let mut hasher = Blake2b512::new();
252 hasher.update(base_id.as_bytes());
253 hasher.update(counter.to_le_bytes());
254 let result = hasher.finalize();
255
256 hex::encode(&result[..16])
258}
259
260#[allow(private_bounds)]
272pub trait Logger: Sealed + Send + Sync {
273 fn debug(&self, message: &str, info: &LogInfo);
275 fn info(&self, message: &str, info: &LogInfo);
277 fn warn(&self, message: &str, info: &LogInfo);
279 fn error(&self, message: &str, info: &LogInfo);
281}
282
283#[derive(Debug, Clone, Default)]
289pub struct LogInfo {
290 pub durable_execution_arn: Option<String>,
292 pub operation_id: Option<String>,
294 pub parent_id: Option<String>,
296 pub is_replay: bool,
302 pub extra: Vec<(String, String)>,
304}
305
306impl LogInfo {
307 pub fn new(durable_execution_arn: impl Into<String>) -> Self {
309 Self {
310 durable_execution_arn: Some(durable_execution_arn.into()),
311 ..Default::default()
312 }
313 }
314
315 pub fn with_operation_id(mut self, operation_id: impl Into<String>) -> Self {
317 self.operation_id = Some(operation_id.into());
318 self
319 }
320
321 pub fn with_parent_id(mut self, parent_id: impl Into<String>) -> Self {
323 self.parent_id = Some(parent_id.into());
324 self
325 }
326
327 pub fn with_replay(mut self, is_replay: bool) -> Self {
336 self.is_replay = is_replay;
337 self
338 }
339
340 pub fn with_extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
342 self.extra.push((key.into(), value.into()));
343 self
344 }
345}
346
347pub fn create_operation_span(
371 operation_type: &str,
372 op_id: &OperationIdentifier,
373 durable_execution_arn: &str,
374) -> tracing::Span {
375 tracing::info_span!(
376 "durable_operation",
377 operation_type = %operation_type,
378 operation_id = %op_id.operation_id,
379 parent_id = ?op_id.parent_id,
380 name = ?op_id.name,
381 durable_execution_arn = %durable_execution_arn,
382 status = tracing::field::Empty,
383 )
384}
385
386#[derive(Debug, Clone, Default)]
394pub struct TracingLogger;
395
396impl Sealed for TracingLogger {}
398
399impl TracingLogger {
400 fn format_extra_fields(extra: &[(String, String)]) -> String {
405 if extra.is_empty() {
406 String::new()
407 } else {
408 extra
409 .iter()
410 .map(|(k, v)| format!("{}={}", k, v))
411 .collect::<Vec<_>>()
412 .join(", ")
413 }
414 }
415}
416
417impl Logger for TracingLogger {
418 fn debug(&self, message: &str, info: &LogInfo) {
419 let extra_fields = Self::format_extra_fields(&info.extra);
420 tracing::debug!(
421 durable_execution_arn = ?info.durable_execution_arn,
422 operation_id = ?info.operation_id,
423 parent_id = ?info.parent_id,
424 is_replay = info.is_replay,
425 extra = %extra_fields,
426 "{}",
427 message
428 );
429 }
430
431 fn info(&self, message: &str, info: &LogInfo) {
432 let extra_fields = Self::format_extra_fields(&info.extra);
433 tracing::info!(
434 durable_execution_arn = ?info.durable_execution_arn,
435 operation_id = ?info.operation_id,
436 parent_id = ?info.parent_id,
437 is_replay = info.is_replay,
438 extra = %extra_fields,
439 "{}",
440 message
441 );
442 }
443
444 fn warn(&self, message: &str, info: &LogInfo) {
445 let extra_fields = Self::format_extra_fields(&info.extra);
446 tracing::warn!(
447 durable_execution_arn = ?info.durable_execution_arn,
448 operation_id = ?info.operation_id,
449 parent_id = ?info.parent_id,
450 is_replay = info.is_replay,
451 extra = %extra_fields,
452 "{}",
453 message
454 );
455 }
456
457 fn error(&self, message: &str, info: &LogInfo) {
458 let extra_fields = Self::format_extra_fields(&info.extra);
459 tracing::error!(
460 durable_execution_arn = ?info.durable_execution_arn,
461 operation_id = ?info.operation_id,
462 parent_id = ?info.parent_id,
463 is_replay = info.is_replay,
464 extra = %extra_fields,
465 "{}",
466 message
467 );
468 }
469}
470
471#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
477pub enum ReplayLoggingConfig {
478 #[default]
483 SuppressAll,
484
485 AllowAll,
490
491 ErrorsOnly,
496
497 WarningsAndErrors,
502}
503
504pub struct ReplayAwareLogger {
529 inner: Arc<dyn Logger>,
531 config: ReplayLoggingConfig,
533}
534
535impl Sealed for ReplayAwareLogger {}
537
538impl ReplayAwareLogger {
539 pub fn new(inner: Arc<dyn Logger>, config: ReplayLoggingConfig) -> Self {
558 Self { inner, config }
559 }
560
561 pub fn suppress_replay(inner: Arc<dyn Logger>) -> Self {
569 Self::new(inner, ReplayLoggingConfig::SuppressAll)
570 }
571
572 pub fn allow_all(inner: Arc<dyn Logger>) -> Self {
578 Self::new(inner, ReplayLoggingConfig::AllowAll)
579 }
580
581 pub fn config(&self) -> ReplayLoggingConfig {
583 self.config
584 }
585
586 pub fn inner(&self) -> &Arc<dyn Logger> {
588 &self.inner
589 }
590
591 fn should_suppress(&self, info: &LogInfo, level: LogLevel) -> bool {
593 if !info.is_replay {
594 return false;
595 }
596
597 match self.config {
598 ReplayLoggingConfig::SuppressAll => true,
599 ReplayLoggingConfig::AllowAll => false,
600 ReplayLoggingConfig::ErrorsOnly => level != LogLevel::Error,
601 ReplayLoggingConfig::WarningsAndErrors => {
602 level != LogLevel::Error && level != LogLevel::Warn
603 }
604 }
605 }
606}
607
608#[derive(Debug, Clone, Copy, PartialEq, Eq)]
610enum LogLevel {
611 Debug,
612 Info,
613 Warn,
614 Error,
615}
616
617impl std::fmt::Debug for ReplayAwareLogger {
618 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
619 f.debug_struct("ReplayAwareLogger")
620 .field("config", &self.config)
621 .finish()
622 }
623}
624
625impl Logger for ReplayAwareLogger {
626 fn debug(&self, message: &str, info: &LogInfo) {
627 if !self.should_suppress(info, LogLevel::Debug) {
628 self.inner.debug(message, info);
629 }
630 }
631
632 fn info(&self, message: &str, info: &LogInfo) {
633 if !self.should_suppress(info, LogLevel::Info) {
634 self.inner.info(message, info);
635 }
636 }
637
638 fn warn(&self, message: &str, info: &LogInfo) {
639 if !self.should_suppress(info, LogLevel::Warn) {
640 self.inner.warn(message, info);
641 }
642 }
643
644 fn error(&self, message: &str, info: &LogInfo) {
645 if !self.should_suppress(info, LogLevel::Error) {
646 self.inner.error(message, info);
647 }
648 }
649}
650
651pub struct CustomLogger<D, I, W, E>
672where
673 D: Fn(&str, &LogInfo) + Send + Sync,
674 I: Fn(&str, &LogInfo) + Send + Sync,
675 W: Fn(&str, &LogInfo) + Send + Sync,
676 E: Fn(&str, &LogInfo) + Send + Sync,
677{
678 debug_fn: D,
679 info_fn: I,
680 warn_fn: W,
681 error_fn: E,
682}
683
684impl<D, I, W, E> Sealed for CustomLogger<D, I, W, E>
686where
687 D: Fn(&str, &LogInfo) + Send + Sync,
688 I: Fn(&str, &LogInfo) + Send + Sync,
689 W: Fn(&str, &LogInfo) + Send + Sync,
690 E: Fn(&str, &LogInfo) + Send + Sync,
691{
692}
693
694impl<D, I, W, E> Logger for CustomLogger<D, I, W, E>
695where
696 D: Fn(&str, &LogInfo) + Send + Sync,
697 I: Fn(&str, &LogInfo) + Send + Sync,
698 W: Fn(&str, &LogInfo) + Send + Sync,
699 E: Fn(&str, &LogInfo) + Send + Sync,
700{
701 fn debug(&self, message: &str, info: &LogInfo) {
702 (self.debug_fn)(message, info);
703 }
704
705 fn info(&self, message: &str, info: &LogInfo) {
706 (self.info_fn)(message, info);
707 }
708
709 fn warn(&self, message: &str, info: &LogInfo) {
710 (self.warn_fn)(message, info);
711 }
712
713 fn error(&self, message: &str, info: &LogInfo) {
714 (self.error_fn)(message, info);
715 }
716}
717
718impl<D, I, W, E> std::fmt::Debug for CustomLogger<D, I, W, E>
719where
720 D: Fn(&str, &LogInfo) + Send + Sync,
721 I: Fn(&str, &LogInfo) + Send + Sync,
722 W: Fn(&str, &LogInfo) + Send + Sync,
723 E: Fn(&str, &LogInfo) + Send + Sync,
724{
725 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726 f.debug_struct("CustomLogger").finish()
727 }
728}
729
730pub fn custom_logger<D, I, W, E>(
761 debug_fn: D,
762 info_fn: I,
763 warn_fn: W,
764 error_fn: E,
765) -> CustomLogger<D, I, W, E>
766where
767 D: Fn(&str, &LogInfo) + Send + Sync,
768 I: Fn(&str, &LogInfo) + Send + Sync,
769 W: Fn(&str, &LogInfo) + Send + Sync,
770 E: Fn(&str, &LogInfo) + Send + Sync,
771{
772 CustomLogger {
773 debug_fn,
774 info_fn,
775 warn_fn,
776 error_fn,
777 }
778}
779
780pub fn simple_custom_logger<F>(log_fn: F) -> impl Logger
805where
806 F: Fn(&str, &str, &LogInfo) + Send + Sync + Clone + 'static,
807{
808 let debug_fn = log_fn.clone();
809 let info_fn = log_fn.clone();
810 let warn_fn = log_fn.clone();
811 let error_fn = log_fn;
812
813 custom_logger(
814 move |msg, info| debug_fn("DEBUG", msg, info),
815 move |msg, info| info_fn("INFO", msg, info),
816 move |msg, info| warn_fn("WARN", msg, info),
817 move |msg, info| error_fn("ERROR", msg, info),
818 )
819}
820
821pub struct DurableContext {
847 state: Arc<ExecutionState>,
849 lambda_context: Option<lambda_runtime::Context>,
851 parent_id: Option<String>,
853 id_generator: Arc<OperationIdGenerator>,
855 logger: Arc<RwLock<Arc<dyn Logger>>>,
857}
858
859static_assertions::assert_impl_all!(DurableContext: Send, Sync);
861
862impl DurableContext {
863 pub fn new(state: Arc<ExecutionState>) -> Self {
869 let base_id = state.durable_execution_arn().to_string();
870 Self {
871 state,
872 lambda_context: None,
873 parent_id: None,
874 id_generator: Arc::new(OperationIdGenerator::new(base_id)),
875 logger: Arc::new(RwLock::new(Arc::new(TracingLogger))),
876 }
877 }
878
879 pub fn from_lambda_context(
888 state: Arc<ExecutionState>,
889 lambda_context: lambda_runtime::Context,
890 ) -> Self {
891 let base_id = state.durable_execution_arn().to_string();
892 Self {
893 state,
894 lambda_context: Some(lambda_context),
895 parent_id: None,
896 id_generator: Arc::new(OperationIdGenerator::new(base_id)),
897 logger: Arc::new(RwLock::new(Arc::new(TracingLogger))),
898 }
899 }
900
901 pub fn create_child_context(&self, parent_operation_id: impl Into<String>) -> Self {
911 let parent_id = parent_operation_id.into();
912 Self {
913 state: self.state.clone(),
914 lambda_context: self.lambda_context.clone(),
915 parent_id: Some(parent_id.clone()),
916 id_generator: Arc::new(OperationIdGenerator::new(parent_id)),
917 logger: self.logger.clone(),
918 }
919 }
920
921 pub fn set_logger(&mut self, logger: Arc<dyn Logger>) {
927 *self.logger.write().unwrap() = logger;
928 }
929
930 pub fn with_logger(self, logger: Arc<dyn Logger>) -> Self {
936 *self.logger.write().unwrap() = logger;
937 self
938 }
939
940 pub fn configure_logger(&self, logger: Arc<dyn Logger>) {
949 *self.logger.write().unwrap() = logger;
950 }
951
952 pub fn state(&self) -> &Arc<ExecutionState> {
954 &self.state
955 }
956
957 pub fn durable_execution_arn(&self) -> &str {
959 self.state.durable_execution_arn()
960 }
961
962 pub fn parent_id(&self) -> Option<&str> {
964 self.parent_id.as_deref()
965 }
966
967 pub fn lambda_context(&self) -> Option<&lambda_runtime::Context> {
969 self.lambda_context.as_ref()
970 }
971
972 pub fn logger(&self) -> Arc<dyn Logger> {
974 self.logger.read().unwrap().clone()
975 }
976
977 pub fn next_operation_id(&self) -> String {
982 self.id_generator.next_id()
983 }
984
985 pub fn next_operation_identifier(&self, name: Option<String>) -> OperationIdentifier {
991 OperationIdentifier::new(self.next_operation_id(), self.parent_id.clone(), name)
992 }
993
994 pub fn current_step_counter(&self) -> u64 {
996 self.id_generator.current_counter()
997 }
998
999 pub fn create_log_info(&self) -> LogInfo {
1005 let mut info = LogInfo::new(self.durable_execution_arn());
1006 if let Some(ref parent_id) = self.parent_id {
1007 info = info.with_parent_id(parent_id);
1008 }
1009 info = info.with_replay(self.state.is_replay());
1011 info
1012 }
1013
1014 pub fn create_log_info_with_operation(&self, operation_id: &str) -> LogInfo {
1020 self.create_log_info().with_operation_id(operation_id)
1021 }
1022
1023 pub fn create_log_info_with_replay(&self, operation_id: &str, is_replay: bool) -> LogInfo {
1034 let mut info = LogInfo::new(self.durable_execution_arn());
1035 if let Some(ref parent_id) = self.parent_id {
1036 info = info.with_parent_id(parent_id);
1037 }
1038 info.with_operation_id(operation_id).with_replay(is_replay)
1039 }
1040
1041 pub fn log_info(&self, message: &str) {
1060 self.log_with_level(LogLevel::Info, message, &[]);
1061 }
1062
1063 pub fn log_info_with(&self, message: &str, fields: &[(&str, &str)]) {
1079 self.log_with_level(LogLevel::Info, message, fields);
1080 }
1081
1082 pub fn log_debug(&self, message: &str) {
1097 self.log_with_level(LogLevel::Debug, message, &[]);
1098 }
1099
1100 pub fn log_debug_with(&self, message: &str, fields: &[(&str, &str)]) {
1116 self.log_with_level(LogLevel::Debug, message, fields);
1117 }
1118
1119 pub fn log_warn(&self, message: &str) {
1134 self.log_with_level(LogLevel::Warn, message, &[]);
1135 }
1136
1137 pub fn log_warn_with(&self, message: &str, fields: &[(&str, &str)]) {
1153 self.log_with_level(LogLevel::Warn, message, fields);
1154 }
1155
1156 pub fn log_error(&self, message: &str) {
1171 self.log_with_level(LogLevel::Error, message, &[]);
1172 }
1173
1174 pub fn log_error_with(&self, message: &str, fields: &[(&str, &str)]) {
1190 self.log_with_level(LogLevel::Error, message, fields);
1191 }
1192
1193 fn log_with_level(&self, level: LogLevel, message: &str, extra: &[(&str, &str)]) {
1204 let mut log_info = self.create_log_info();
1205
1206 for (key, value) in extra {
1207 log_info = log_info.with_extra(*key, *value);
1208 }
1209
1210 let logger = self.logger.read().unwrap();
1211 match level {
1212 LogLevel::Debug => logger.debug(message, &log_info),
1213 LogLevel::Info => logger.info(message, &log_info),
1214 LogLevel::Warn => logger.warn(message, &log_info),
1215 LogLevel::Error => logger.error(message, &log_info),
1216 }
1217 }
1218
1219 pub fn get_original_input<T>(&self) -> DurableResult<T>
1252 where
1253 T: serde::de::DeserializeOwned,
1254 {
1255 let input_payload = self.state.get_original_input_raw().ok_or_else(|| {
1257 crate::error::DurableError::Validation {
1258 message: "No original input available. The EXECUTION operation may not exist or has no input payload.".to_string(),
1259 }
1260 })?;
1261
1262 serde_json::from_str(input_payload).map_err(|e| crate::error::DurableError::SerDes {
1264 message: format!("Failed to deserialize original input: {}", e),
1265 })
1266 }
1267
1268 pub fn get_original_input_raw(&self) -> Option<&str> {
1277 self.state.get_original_input_raw()
1278 }
1279
1280 pub async fn complete_execution_success<T>(&self, result: &T) -> DurableResult<()>
1315 where
1316 T: serde::Serialize,
1317 {
1318 let serialized =
1319 serde_json::to_string(result).map_err(|e| crate::error::DurableError::SerDes {
1320 message: format!("Failed to serialize execution result: {}", e),
1321 })?;
1322
1323 self.state
1324 .complete_execution_success(Some(serialized))
1325 .await
1326 }
1327
1328 pub async fn complete_execution_failure(
1356 &self,
1357 error: crate::error::ErrorObject,
1358 ) -> DurableResult<()> {
1359 self.state.complete_execution_failure(error).await
1360 }
1361
1362 pub async fn complete_execution_if_large<T>(&self, result: &T) -> DurableResult<bool>
1397 where
1398 T: serde::Serialize,
1399 {
1400 if crate::lambda::DurableExecutionInvocationOutput::would_exceed_max_size(result) {
1401 self.complete_execution_success(result).await?;
1402 Ok(true)
1403 } else {
1404 Ok(false)
1405 }
1406 }
1407
1408 pub async fn step<T, F>(
1435 &self,
1436 func: F,
1437 config: Option<crate::config::StepConfig>,
1438 ) -> DurableResult<T>
1439 where
1440 T: DurableValue,
1441 F: StepFn<T>,
1442 {
1443 let op_id = self.next_operation_identifier(None);
1444 let config = config.unwrap_or_default();
1445
1446 let logger = self.logger.read().unwrap().clone();
1447 let result =
1448 crate::handlers::step_handler(func, &self.state, &op_id, &config, &logger).await;
1449
1450 if result.is_ok() {
1452 self.state.track_replay(&op_id.operation_id).await;
1453 }
1454
1455 result
1456 }
1457
1458 pub async fn step_named<T, F>(
1476 &self,
1477 name: &str,
1478 func: F,
1479 config: Option<crate::config::StepConfig>,
1480 ) -> DurableResult<T>
1481 where
1482 T: DurableValue,
1483 F: StepFn<T>,
1484 {
1485 let op_id = self.next_operation_identifier(Some(name.to_string()));
1486 let config = config.unwrap_or_default();
1487
1488 let logger = self.logger.read().unwrap().clone();
1489 let result =
1490 crate::handlers::step_handler(func, &self.state, &op_id, &config, &logger).await;
1491
1492 if result.is_ok() {
1494 self.state.track_replay(&op_id.operation_id).await;
1495 }
1496
1497 result
1498 }
1499
1500 pub async fn wait(
1525 &self,
1526 duration: crate::duration::Duration,
1527 name: Option<&str>,
1528 ) -> DurableResult<()> {
1529 let op_id = self.next_operation_identifier(name.map(|s| s.to_string()));
1530
1531 let logger = self.logger.read().unwrap().clone();
1532 let result = crate::handlers::wait_handler(duration, &self.state, &op_id, &logger).await;
1533
1534 if result.is_ok() {
1536 self.state.track_replay(&op_id.operation_id).await;
1537 }
1538
1539 result
1540 }
1541
1542 pub async fn cancel_wait(&self, operation_id: &str) -> DurableResult<()> {
1569 let logger = self.logger.read().unwrap().clone();
1570 crate::handlers::wait_cancel_handler(&self.state, operation_id, &logger).await
1571 }
1572
1573 pub async fn create_callback<T>(
1599 &self,
1600 config: Option<crate::config::CallbackConfig>,
1601 ) -> DurableResult<crate::handlers::Callback<T>>
1602 where
1603 T: serde::Serialize + serde::de::DeserializeOwned,
1604 {
1605 let op_id = self.next_operation_identifier(None);
1606 let config = config.unwrap_or_default();
1607
1608 let logger = self.logger.read().unwrap().clone();
1609 let result = crate::handlers::callback_handler(&self.state, &op_id, &config, &logger).await;
1610
1611 if result.is_ok() {
1613 self.state.track_replay(&op_id.operation_id).await;
1614 }
1615
1616 result
1617 }
1618
1619 pub async fn create_callback_named<T>(
1623 &self,
1624 name: &str,
1625 config: Option<crate::config::CallbackConfig>,
1626 ) -> DurableResult<crate::handlers::Callback<T>>
1627 where
1628 T: serde::Serialize + serde::de::DeserializeOwned,
1629 {
1630 let op_id = self.next_operation_identifier(Some(name.to_string()));
1631 let config = config.unwrap_or_default();
1632
1633 let logger = self.logger.read().unwrap().clone();
1634 let result = crate::handlers::callback_handler(&self.state, &op_id, &config, &logger).await;
1635
1636 if result.is_ok() {
1638 self.state.track_replay(&op_id.operation_id).await;
1639 }
1640
1641 result
1642 }
1643
1644 pub async fn invoke<P, R>(
1670 &self,
1671 function_name: &str,
1672 payload: P,
1673 config: Option<crate::config::InvokeConfig<P, R>>,
1674 ) -> DurableResult<R>
1675 where
1676 P: serde::Serialize + serde::de::DeserializeOwned + Send,
1677 R: serde::Serialize + serde::de::DeserializeOwned + Send,
1678 {
1679 let op_id = self.next_operation_identifier(Some(format!("invoke:{}", function_name)));
1680 let config = config.unwrap_or_default();
1681
1682 let logger = self.logger.read().unwrap().clone();
1683 let result = crate::handlers::invoke_handler(
1684 function_name,
1685 payload,
1686 &self.state,
1687 &op_id,
1688 &config,
1689 &logger,
1690 )
1691 .await;
1692
1693 if result.is_ok() {
1695 self.state.track_replay(&op_id.operation_id).await;
1696 }
1697
1698 result
1699 }
1700
1701 pub async fn map<T, U, F, Fut>(
1731 &self,
1732 items: Vec<T>,
1733 func: F,
1734 config: Option<crate::config::MapConfig>,
1735 ) -> DurableResult<crate::concurrency::BatchResult<U>>
1736 where
1737 T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + Clone + 'static,
1738 U: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1739 F: Fn(DurableContext, T, usize) -> Fut + Send + Sync + Clone + 'static,
1740 Fut: std::future::Future<Output = DurableResult<U>> + Send + 'static,
1741 {
1742 let op_id = self.next_operation_identifier(Some("map".to_string()));
1743 let config = config.unwrap_or_default();
1744
1745 let logger = self.logger.read().unwrap().clone();
1746 let result =
1747 crate::handlers::map_handler(items, func, &self.state, &op_id, self, &config, &logger)
1748 .await;
1749
1750 if result.is_ok() {
1752 self.state.track_replay(&op_id.operation_id).await;
1753 }
1754
1755 result
1756 }
1757
1758 pub async fn parallel<T, F, Fut>(
1785 &self,
1786 branches: Vec<F>,
1787 config: Option<crate::config::ParallelConfig>,
1788 ) -> DurableResult<crate::concurrency::BatchResult<T>>
1789 where
1790 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1791 F: FnOnce(DurableContext) -> Fut + Send + 'static,
1792 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
1793 {
1794 let op_id = self.next_operation_identifier(Some("parallel".to_string()));
1795 let config = config.unwrap_or_default();
1796
1797 let logger = self.logger.read().unwrap().clone();
1798 let result = crate::handlers::parallel_handler(
1799 branches,
1800 &self.state,
1801 &op_id,
1802 self,
1803 &config,
1804 &logger,
1805 )
1806 .await;
1807
1808 if result.is_ok() {
1810 self.state.track_replay(&op_id.operation_id).await;
1811 }
1812
1813 result
1814 }
1815
1816 pub async fn run_in_child_context<T, F, Fut>(
1840 &self,
1841 func: F,
1842 config: Option<crate::config::ChildConfig>,
1843 ) -> DurableResult<T>
1844 where
1845 T: serde::Serialize + serde::de::DeserializeOwned + Send,
1846 F: FnOnce(DurableContext) -> Fut + Send,
1847 Fut: std::future::Future<Output = DurableResult<T>> + Send,
1848 {
1849 let op_id = self.next_operation_identifier(Some("child_context".to_string()));
1850 let config = config.unwrap_or_default();
1851
1852 let logger = self.logger.read().unwrap().clone();
1853 let result =
1854 crate::handlers::child_handler(func, &self.state, &op_id, self, &config, &logger).await;
1855
1856 if result.is_ok() {
1858 self.state.track_replay(&op_id.operation_id).await;
1859 }
1860
1861 result
1862 }
1863
1864 pub async fn run_in_child_context_named<T, F, Fut>(
1868 &self,
1869 name: &str,
1870 func: F,
1871 config: Option<crate::config::ChildConfig>,
1872 ) -> DurableResult<T>
1873 where
1874 T: serde::Serialize + serde::de::DeserializeOwned + Send,
1875 F: FnOnce(DurableContext) -> Fut + Send,
1876 Fut: std::future::Future<Output = DurableResult<T>> + Send,
1877 {
1878 let op_id = self.next_operation_identifier(Some(name.to_string()));
1879 let config = config.unwrap_or_default();
1880
1881 let logger = self.logger.read().unwrap().clone();
1882 let result =
1883 crate::handlers::child_handler(func, &self.state, &op_id, self, &config, &logger).await;
1884
1885 if result.is_ok() {
1887 self.state.track_replay(&op_id.operation_id).await;
1888 }
1889
1890 result
1891 }
1892
1893 pub async fn wait_for_condition<T, S, F>(
1936 &self,
1937 check: F,
1938 config: WaitForConditionConfig<S>,
1939 ) -> DurableResult<T>
1940 where
1941 T: serde::Serialize + serde::de::DeserializeOwned + Send,
1942 S: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
1943 F: Fn(&S, &WaitForConditionContext) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
1944 + Send
1945 + Sync,
1946 {
1947 let op_id = self.next_operation_identifier(Some("wait_for_condition".to_string()));
1948
1949 let logger = self.logger.read().unwrap().clone();
1950 let result = crate::handlers::wait_for_condition_handler(
1953 check,
1954 config,
1955 &self.state,
1956 &op_id,
1957 &logger,
1958 )
1959 .await;
1960
1961 if result.is_ok() {
1963 self.state.track_replay(&op_id.operation_id).await;
1964 }
1965
1966 result
1967 }
1968
1969 pub async fn wait_for_callback<T, F, Fut>(
2000 &self,
2001 submitter: F,
2002 config: Option<crate::config::CallbackConfig>,
2003 ) -> DurableResult<T>
2004 where
2005 T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync,
2006 F: FnOnce(String) -> Fut + Send + 'static,
2007 Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
2008 + Send
2009 + 'static,
2010 {
2011 let callback: crate::handlers::Callback<T> = self.create_callback(config).await?;
2013 let callback_id = callback.callback_id.clone();
2014
2015 let op_id = self.next_operation_identifier(Some("wait_for_callback_submitter".to_string()));
2017
2018 let child_config = crate::config::ChildConfig::default();
2021
2022 let logger = self.logger.read().unwrap().clone();
2023 crate::handlers::child_handler(
2024 |child_ctx| {
2025 let callback_id = callback_id.clone();
2026 async move {
2027 child_ctx
2030 .step_named(
2031 "execute_submitter",
2032 move |_| {
2033 Ok(())
2036 },
2037 None,
2038 )
2039 .await?;
2040
2041 submitter(callback_id).await.map_err(|e| {
2045 crate::error::DurableError::UserCode {
2046 message: e.to_string(),
2047 error_type: "SubmitterError".to_string(),
2048 stack_trace: None,
2049 }
2050 })?;
2051
2052 Ok(())
2053 }
2054 },
2055 &self.state,
2056 &op_id,
2057 self,
2058 &child_config,
2059 &logger,
2060 )
2061 .await?;
2062
2063 self.state.track_replay(&op_id.operation_id).await;
2065
2066 callback.result().await
2068 }
2069
2070 pub async fn all<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<Vec<T>>
2098 where
2099 T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2100 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2101 {
2102 let op_id = self.next_operation_identifier(Some("all".to_string()));
2103
2104 let logger = self.logger.read().unwrap().clone();
2105 let result = crate::handlers::all_handler(futures, &self.state, &op_id, &logger).await;
2106
2107 if result.is_ok() {
2109 self.state.track_replay(&op_id.operation_id).await;
2110 }
2111
2112 result
2113 }
2114
2115 pub async fn all_settled<T, Fut>(
2140 &self,
2141 futures: Vec<Fut>,
2142 ) -> DurableResult<crate::concurrency::BatchResult<T>>
2143 where
2144 T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2145 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2146 {
2147 let op_id = self.next_operation_identifier(Some("all_settled".to_string()));
2148
2149 let logger = self.logger.read().unwrap().clone();
2150 let result =
2151 crate::handlers::all_settled_handler(futures, &self.state, &op_id, &logger).await;
2152
2153 if result.is_ok() {
2155 self.state.track_replay(&op_id.operation_id).await;
2156 }
2157
2158 result
2159 }
2160
2161 pub async fn race<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<T>
2184 where
2185 T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2186 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2187 {
2188 let op_id = self.next_operation_identifier(Some("race".to_string()));
2189
2190 let logger = self.logger.read().unwrap().clone();
2191 let result = crate::handlers::race_handler(futures, &self.state, &op_id, &logger).await;
2192
2193 if result.is_ok() {
2195 self.state.track_replay(&op_id.operation_id).await;
2196 }
2197
2198 result
2199 }
2200
2201 pub async fn any<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<T>
2226 where
2227 T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2228 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2229 {
2230 let op_id = self.next_operation_identifier(Some("any".to_string()));
2231
2232 let logger = self.logger.read().unwrap().clone();
2233 let result = crate::handlers::any_handler(futures, &self.state, &op_id, &logger).await;
2234
2235 if result.is_ok() {
2237 self.state.track_replay(&op_id.operation_id).await;
2238 }
2239
2240 result
2241 }
2242}
2243
2244#[allow(clippy::type_complexity)]
2246pub struct WaitForConditionConfig<S> {
2247 pub initial_state: S,
2249 pub wait_strategy: Box<dyn Fn(&S, usize) -> crate::config::WaitDecision + Send + Sync>,
2254 pub timeout: Option<crate::duration::Duration>,
2256 pub serdes: Option<std::sync::Arc<dyn crate::config::SerDesAny>>,
2258}
2259
2260impl<S> std::fmt::Debug for WaitForConditionConfig<S>
2261where
2262 S: std::fmt::Debug,
2263{
2264 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2265 f.debug_struct("WaitForConditionConfig")
2266 .field("initial_state", &self.initial_state)
2267 .field("wait_strategy", &"<fn>")
2268 .field("timeout", &self.timeout)
2269 .field("serdes", &self.serdes.is_some())
2270 .finish()
2271 }
2272}
2273
2274impl<S> WaitForConditionConfig<S> {
2275 pub fn from_interval(
2298 initial_state: S,
2299 interval: crate::duration::Duration,
2300 max_attempts: Option<usize>,
2301 ) -> Self
2302 where
2303 S: Send + Sync + 'static,
2304 {
2305 let interval_secs = interval.to_seconds();
2306 let max = max_attempts.unwrap_or(usize::MAX);
2307
2308 Self {
2309 initial_state,
2310 wait_strategy: Box::new(move |_state: &S, attempts_made: usize| {
2311 if attempts_made >= max {
2317 return crate::config::WaitDecision::Done;
2318 }
2319 crate::config::WaitDecision::Continue {
2320 delay: crate::duration::Duration::from_seconds(interval_secs),
2321 }
2322 }),
2323 timeout: None,
2324 serdes: None,
2325 }
2326 }
2327}
2328
2329impl<S: Default + Send + Sync + 'static> Default for WaitForConditionConfig<S> {
2330 fn default() -> Self {
2331 Self::from_interval(
2332 S::default(),
2333 crate::duration::Duration::from_seconds(5),
2334 None,
2335 )
2336 }
2337}
2338
2339#[derive(Debug, Clone)]
2341pub struct WaitForConditionContext {
2342 pub attempt: usize,
2344 pub max_attempts: Option<usize>,
2346}
2347
2348impl Clone for DurableContext {
2349 fn clone(&self) -> Self {
2350 Self {
2351 state: self.state.clone(),
2352 lambda_context: self.lambda_context.clone(),
2353 parent_id: self.parent_id.clone(),
2354 id_generator: self.id_generator.clone(),
2355 logger: self.logger.clone(),
2356 }
2357 }
2358}
2359
2360impl std::fmt::Debug for DurableContext {
2361 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2362 f.debug_struct("DurableContext")
2363 .field("durable_execution_arn", &self.durable_execution_arn())
2364 .field("parent_id", &self.parent_id)
2365 .field("step_counter", &self.current_step_counter())
2366 .finish_non_exhaustive()
2367 }
2368}
2369
2370mod hex {
2372 const HEX_CHARS: &[u8; 16] = b"0123456789abcdef";
2373
2374 pub fn encode(bytes: &[u8]) -> String {
2375 let mut result = String::with_capacity(bytes.len() * 2);
2376 for &byte in bytes {
2377 result.push(HEX_CHARS[(byte >> 4) as usize] as char);
2378 result.push(HEX_CHARS[(byte & 0x0f) as usize] as char);
2379 }
2380 result
2381 }
2382}
2383
2384#[cfg(test)]
2385mod tests {
2386 use super::*;
2387
2388 #[test]
2389 fn test_operation_identifier_new() {
2390 let id = OperationIdentifier::new(
2391 "op-123",
2392 Some("parent-456".to_string()),
2393 Some("my-step".to_string()),
2394 );
2395 assert_eq!(id.operation_id, "op-123");
2396 assert_eq!(id.parent_id, Some("parent-456".to_string()));
2397 assert_eq!(id.name, Some("my-step".to_string()));
2398 }
2399
2400 #[test]
2401 fn test_operation_identifier_root() {
2402 let id = OperationIdentifier::root("op-123");
2403 assert_eq!(id.operation_id, "op-123");
2404 assert!(id.parent_id.is_none());
2405 assert!(id.name.is_none());
2406 }
2407
2408 #[test]
2409 fn test_operation_identifier_with_parent() {
2410 let id = OperationIdentifier::with_parent("op-123", "parent-456");
2411 assert_eq!(id.operation_id, "op-123");
2412 assert_eq!(id.parent_id, Some("parent-456".to_string()));
2413 assert!(id.name.is_none());
2414 }
2415
2416 #[test]
2417 fn test_operation_identifier_with_name() {
2418 let id = OperationIdentifier::root("op-123").with_name("my-step");
2419 assert_eq!(id.name, Some("my-step".to_string()));
2420 }
2421
2422 #[test]
2423 fn test_operation_identifier_display() {
2424 let id = OperationIdentifier::root("op-123");
2425 assert_eq!(format!("{}", id), "op-123");
2426
2427 let id_with_name = OperationIdentifier::root("op-123").with_name("my-step");
2428 assert_eq!(format!("{}", id_with_name), "my-step(op-123)");
2429 }
2430
2431 #[test]
2432 fn test_generate_operation_id_deterministic() {
2433 let id1 = generate_operation_id("base-123", 0);
2434 let id2 = generate_operation_id("base-123", 0);
2435 assert_eq!(id1, id2);
2436 }
2437
2438 #[test]
2439 fn test_generate_operation_id_different_counters() {
2440 let id1 = generate_operation_id("base-123", 0);
2441 let id2 = generate_operation_id("base-123", 1);
2442 assert_ne!(id1, id2);
2443 }
2444
2445 #[test]
2446 fn test_generate_operation_id_different_bases() {
2447 let id1 = generate_operation_id("base-123", 0);
2448 let id2 = generate_operation_id("base-456", 0);
2449 assert_ne!(id1, id2);
2450 }
2451
2452 #[test]
2453 fn test_generate_operation_id_length() {
2454 let id = generate_operation_id("base-123", 0);
2455 assert_eq!(id.len(), 32); }
2457
2458 #[test]
2459 fn test_operation_id_generator_new() {
2460 let gen = OperationIdGenerator::new("base-123");
2461 assert_eq!(gen.base_id(), "base-123");
2462 assert_eq!(gen.current_counter(), 0);
2463 }
2464
2465 #[test]
2466 fn test_operation_id_generator_with_counter() {
2467 let gen = OperationIdGenerator::with_counter("base-123", 10);
2468 assert_eq!(gen.current_counter(), 10);
2469 }
2470
2471 #[test]
2472 fn test_operation_id_generator_next_id() {
2473 let gen = OperationIdGenerator::new("base-123");
2474
2475 let id1 = gen.next_id();
2476 assert_eq!(gen.current_counter(), 1);
2477
2478 let id2 = gen.next_id();
2479 assert_eq!(gen.current_counter(), 2);
2480
2481 assert_ne!(id1, id2);
2482 }
2483
2484 #[test]
2485 fn test_operation_id_generator_id_for_counter() {
2486 let gen = OperationIdGenerator::new("base-123");
2487
2488 let id_0 = gen.id_for_counter(0);
2489 let id_1 = gen.id_for_counter(1);
2490
2491 assert_eq!(gen.current_counter(), 0);
2493
2494 let next = gen.next_id();
2496 assert_eq!(next, id_0);
2497
2498 let next = gen.next_id();
2499 assert_eq!(next, id_1);
2500 }
2501
2502 #[test]
2503 fn test_operation_id_generator_create_child() {
2504 let gen = OperationIdGenerator::new("base-123");
2505 gen.next_id(); let child = gen.create_child("child-op-id");
2508 assert_eq!(child.base_id(), "child-op-id");
2509 assert_eq!(child.current_counter(), 0);
2510 }
2511
2512 #[test]
2513 fn test_operation_id_generator_clone() {
2514 let gen = OperationIdGenerator::new("base-123");
2515 gen.next_id();
2516 gen.next_id();
2517
2518 let cloned = gen.clone();
2519 assert_eq!(cloned.base_id(), gen.base_id());
2520 assert_eq!(cloned.current_counter(), gen.current_counter());
2521 }
2522
2523 #[test]
2524 fn test_operation_id_generator_thread_safety() {
2525 use std::thread;
2526
2527 let gen = Arc::new(OperationIdGenerator::new("base-123"));
2528 let mut handles = vec![];
2529
2530 for _ in 0..10 {
2531 let gen_clone = gen.clone();
2532 handles.push(thread::spawn(move || {
2533 let mut ids = vec![];
2534 for _ in 0..100 {
2535 ids.push(gen_clone.next_id());
2536 }
2537 ids
2538 }));
2539 }
2540
2541 let mut all_ids = vec![];
2542 for handle in handles {
2543 all_ids.extend(handle.join().unwrap());
2544 }
2545
2546 let unique_count = {
2548 let mut set = std::collections::HashSet::new();
2549 for id in &all_ids {
2550 set.insert(id.clone());
2551 }
2552 set.len()
2553 };
2554
2555 assert_eq!(unique_count, 1000);
2556 assert_eq!(gen.current_counter(), 1000);
2557 }
2558
2559 #[test]
2560 fn test_log_info_new() {
2561 let info = LogInfo::new("arn:test");
2562 assert_eq!(info.durable_execution_arn, Some("arn:test".to_string()));
2563 assert!(info.operation_id.is_none());
2564 assert!(info.parent_id.is_none());
2565 }
2566
2567 #[test]
2568 fn test_log_info_with_operation_id() {
2569 let info = LogInfo::new("arn:test").with_operation_id("op-123");
2570 assert_eq!(info.operation_id, Some("op-123".to_string()));
2571 }
2572
2573 #[test]
2574 fn test_log_info_with_parent_id() {
2575 let info = LogInfo::new("arn:test").with_parent_id("parent-456");
2576 assert_eq!(info.parent_id, Some("parent-456".to_string()));
2577 }
2578
2579 #[test]
2580 fn test_log_info_with_extra() {
2581 let info = LogInfo::new("arn:test")
2582 .with_extra("key1", "value1")
2583 .with_extra("key2", "value2");
2584 assert_eq!(info.extra.len(), 2);
2585 assert_eq!(info.extra[0], ("key1".to_string(), "value1".to_string()));
2586 }
2587
2588 #[test]
2589 fn test_hex_encode() {
2590 assert_eq!(hex::encode(&[0x00]), "00");
2591 assert_eq!(hex::encode(&[0xff]), "ff");
2592 assert_eq!(hex::encode(&[0x12, 0x34, 0xab, 0xcd]), "1234abcd");
2593 }
2594}
2595
2596#[cfg(test)]
2597mod durable_context_tests {
2598 use super::*;
2599 use crate::client::SharedDurableServiceClient;
2600 use crate::lambda::InitialExecutionState;
2601 use std::sync::Arc;
2602
2603 #[cfg(test)]
2604 fn create_mock_client() -> SharedDurableServiceClient {
2605 use crate::client::MockDurableServiceClient;
2606 Arc::new(MockDurableServiceClient::new())
2607 }
2608
2609 fn create_test_state() -> Arc<ExecutionState> {
2610 let client = create_mock_client();
2611 Arc::new(ExecutionState::new(
2612 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
2613 "token-123",
2614 InitialExecutionState::new(),
2615 client,
2616 ))
2617 }
2618
2619 #[test]
2620 fn test_durable_context_new() {
2621 let state = create_test_state();
2622 let ctx = DurableContext::new(state.clone());
2623
2624 assert_eq!(
2625 ctx.durable_execution_arn(),
2626 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123"
2627 );
2628 assert!(ctx.parent_id().is_none());
2629 assert!(ctx.lambda_context().is_none());
2630 assert_eq!(ctx.current_step_counter(), 0);
2631 }
2632
2633 #[test]
2634 fn test_durable_context_next_operation_id() {
2635 let state = create_test_state();
2636 let ctx = DurableContext::new(state);
2637
2638 let id1 = ctx.next_operation_id();
2639 let id2 = ctx.next_operation_id();
2640
2641 assert_ne!(id1, id2);
2642 assert_eq!(ctx.current_step_counter(), 2);
2643 }
2644
2645 #[test]
2646 fn test_durable_context_next_operation_identifier() {
2647 let state = create_test_state();
2648 let ctx = DurableContext::new(state);
2649
2650 let op_id = ctx.next_operation_identifier(Some("my-step".to_string()));
2651
2652 assert!(op_id.parent_id.is_none());
2653 assert_eq!(op_id.name, Some("my-step".to_string()));
2654 assert!(!op_id.operation_id.is_empty());
2655 }
2656
2657 #[test]
2658 fn test_durable_context_create_child_context() {
2659 let state = create_test_state();
2660 let ctx = DurableContext::new(state);
2661
2662 let parent_op_id = ctx.next_operation_id();
2664
2665 let child_ctx = ctx.create_child_context(&parent_op_id);
2667
2668 assert_eq!(child_ctx.parent_id(), Some(parent_op_id.as_str()));
2669 assert_eq!(child_ctx.current_step_counter(), 0);
2670 assert_eq!(
2671 child_ctx.durable_execution_arn(),
2672 ctx.durable_execution_arn()
2673 );
2674 }
2675
2676 #[test]
2677 fn test_durable_context_child_generates_different_ids() {
2678 let state = create_test_state();
2679 let ctx = DurableContext::new(state);
2680
2681 let parent_op_id = ctx.next_operation_id();
2682 let child_ctx = ctx.create_child_context(&parent_op_id);
2683
2684 let child_id = child_ctx.next_operation_id();
2686 let parent_id = ctx.next_operation_id();
2687
2688 assert_ne!(child_id, parent_id);
2689 }
2690
2691 #[test]
2692 fn test_durable_context_child_operation_identifier_has_parent() {
2693 let state = create_test_state();
2694 let ctx = DurableContext::new(state);
2695
2696 let parent_op_id = ctx.next_operation_id();
2697 let child_ctx = ctx.create_child_context(&parent_op_id);
2698
2699 let child_op_id = child_ctx.next_operation_identifier(None);
2700
2701 assert_eq!(child_op_id.parent_id, Some(parent_op_id));
2702 }
2703
2704 #[test]
2705 fn test_durable_context_set_logger() {
2706 let state = create_test_state();
2707 let mut ctx = DurableContext::new(state);
2708
2709 let custom_logger: Arc<dyn Logger> = Arc::new(TracingLogger);
2711 ctx.set_logger(custom_logger);
2712
2713 let _ = ctx.logger();
2715 }
2716
2717 #[test]
2718 fn test_durable_context_with_logger() {
2719 let state = create_test_state();
2720 let ctx = DurableContext::new(state);
2721
2722 let custom_logger: Arc<dyn Logger> = Arc::new(TracingLogger);
2723 let ctx_with_logger = ctx.with_logger(custom_logger);
2724
2725 let _ = ctx_with_logger.logger();
2727 }
2728
2729 #[test]
2730 fn test_durable_context_create_log_info() {
2731 let state = create_test_state();
2732 let ctx = DurableContext::new(state);
2733
2734 let info = ctx.create_log_info();
2735
2736 assert_eq!(
2737 info.durable_execution_arn,
2738 Some("arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123".to_string())
2739 );
2740 assert!(info.parent_id.is_none());
2741 }
2742
2743 #[test]
2744 fn test_durable_context_create_log_info_with_parent() {
2745 let state = create_test_state();
2746 let ctx = DurableContext::new(state);
2747
2748 let parent_op_id = ctx.next_operation_id();
2749 let child_ctx = ctx.create_child_context(&parent_op_id);
2750
2751 let info = child_ctx.create_log_info();
2752
2753 assert_eq!(info.parent_id, Some(parent_op_id));
2754 }
2755
2756 #[test]
2757 fn test_durable_context_create_log_info_with_operation() {
2758 let state = create_test_state();
2759 let ctx = DurableContext::new(state);
2760
2761 let info = ctx.create_log_info_with_operation("op-123");
2762
2763 assert_eq!(info.operation_id, Some("op-123".to_string()));
2764 }
2765
2766 #[test]
2767 fn test_log_info_with_replay() {
2768 let info = LogInfo::new("arn:test")
2769 .with_operation_id("op-123")
2770 .with_replay(true);
2771
2772 assert!(info.is_replay);
2773 assert_eq!(info.operation_id, Some("op-123".to_string()));
2774 }
2775
2776 #[test]
2777 fn test_log_info_default_not_replay() {
2778 let info = LogInfo::default();
2779 assert!(!info.is_replay);
2780 }
2781
2782 #[test]
2783 fn test_replay_logging_config_default() {
2784 let config = ReplayLoggingConfig::default();
2785 assert_eq!(config, ReplayLoggingConfig::SuppressAll);
2786 }
2787
2788 #[test]
2789 fn test_replay_aware_logger_suppress_all() {
2790 use std::sync::atomic::{AtomicUsize, Ordering};
2791
2792 let debug_count = Arc::new(AtomicUsize::new(0));
2794 let info_count = Arc::new(AtomicUsize::new(0));
2795 let warn_count = Arc::new(AtomicUsize::new(0));
2796 let error_count = Arc::new(AtomicUsize::new(0));
2797
2798 let inner = Arc::new(custom_logger(
2799 {
2800 let count = debug_count.clone();
2801 move |_, _| {
2802 count.fetch_add(1, Ordering::SeqCst);
2803 }
2804 },
2805 {
2806 let count = info_count.clone();
2807 move |_, _| {
2808 count.fetch_add(1, Ordering::SeqCst);
2809 }
2810 },
2811 {
2812 let count = warn_count.clone();
2813 move |_, _| {
2814 count.fetch_add(1, Ordering::SeqCst);
2815 }
2816 },
2817 {
2818 let count = error_count.clone();
2819 move |_, _| {
2820 count.fetch_add(1, Ordering::SeqCst);
2821 }
2822 },
2823 ));
2824
2825 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::SuppressAll);
2826
2827 let non_replay_info = LogInfo::new("arn:test").with_replay(false);
2829 logger.debug("test", &non_replay_info);
2830 logger.info("test", &non_replay_info);
2831 logger.warn("test", &non_replay_info);
2832 logger.error("test", &non_replay_info);
2833
2834 assert_eq!(debug_count.load(Ordering::SeqCst), 1);
2835 assert_eq!(info_count.load(Ordering::SeqCst), 1);
2836 assert_eq!(warn_count.load(Ordering::SeqCst), 1);
2837 assert_eq!(error_count.load(Ordering::SeqCst), 1);
2838
2839 let replay_info = LogInfo::new("arn:test").with_replay(true);
2841 logger.debug("test", &replay_info);
2842 logger.info("test", &replay_info);
2843 logger.warn("test", &replay_info);
2844 logger.error("test", &replay_info);
2845
2846 assert_eq!(debug_count.load(Ordering::SeqCst), 1);
2848 assert_eq!(info_count.load(Ordering::SeqCst), 1);
2849 assert_eq!(warn_count.load(Ordering::SeqCst), 1);
2850 assert_eq!(error_count.load(Ordering::SeqCst), 1);
2851 }
2852
2853 #[test]
2854 fn test_replay_aware_logger_allow_all() {
2855 use std::sync::atomic::{AtomicUsize, Ordering};
2856
2857 let call_count = Arc::new(AtomicUsize::new(0));
2858
2859 let inner = Arc::new(custom_logger(
2860 {
2861 let count = call_count.clone();
2862 move |_, _| {
2863 count.fetch_add(1, Ordering::SeqCst);
2864 }
2865 },
2866 {
2867 let count = call_count.clone();
2868 move |_, _| {
2869 count.fetch_add(1, Ordering::SeqCst);
2870 }
2871 },
2872 {
2873 let count = call_count.clone();
2874 move |_, _| {
2875 count.fetch_add(1, Ordering::SeqCst);
2876 }
2877 },
2878 {
2879 let count = call_count.clone();
2880 move |_, _| {
2881 count.fetch_add(1, Ordering::SeqCst);
2882 }
2883 },
2884 ));
2885
2886 let logger = ReplayAwareLogger::allow_all(inner);
2887
2888 let replay_info = LogInfo::new("arn:test").with_replay(true);
2890 logger.debug("test", &replay_info);
2891 logger.info("test", &replay_info);
2892 logger.warn("test", &replay_info);
2893 logger.error("test", &replay_info);
2894
2895 assert_eq!(call_count.load(Ordering::SeqCst), 4);
2896 }
2897
2898 #[test]
2899 fn test_replay_aware_logger_errors_only() {
2900 use std::sync::atomic::{AtomicUsize, Ordering};
2901
2902 let debug_count = Arc::new(AtomicUsize::new(0));
2903 let info_count = Arc::new(AtomicUsize::new(0));
2904 let warn_count = Arc::new(AtomicUsize::new(0));
2905 let error_count = Arc::new(AtomicUsize::new(0));
2906
2907 let inner = Arc::new(custom_logger(
2908 {
2909 let count = debug_count.clone();
2910 move |_, _| {
2911 count.fetch_add(1, Ordering::SeqCst);
2912 }
2913 },
2914 {
2915 let count = info_count.clone();
2916 move |_, _| {
2917 count.fetch_add(1, Ordering::SeqCst);
2918 }
2919 },
2920 {
2921 let count = warn_count.clone();
2922 move |_, _| {
2923 count.fetch_add(1, Ordering::SeqCst);
2924 }
2925 },
2926 {
2927 let count = error_count.clone();
2928 move |_, _| {
2929 count.fetch_add(1, Ordering::SeqCst);
2930 }
2931 },
2932 ));
2933
2934 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::ErrorsOnly);
2935
2936 let replay_info = LogInfo::new("arn:test").with_replay(true);
2938 logger.debug("test", &replay_info);
2939 logger.info("test", &replay_info);
2940 logger.warn("test", &replay_info);
2941 logger.error("test", &replay_info);
2942
2943 assert_eq!(debug_count.load(Ordering::SeqCst), 0);
2944 assert_eq!(info_count.load(Ordering::SeqCst), 0);
2945 assert_eq!(warn_count.load(Ordering::SeqCst), 0);
2946 assert_eq!(error_count.load(Ordering::SeqCst), 1);
2947 }
2948
2949 #[test]
2950 fn test_replay_aware_logger_warnings_and_errors() {
2951 use std::sync::atomic::{AtomicUsize, Ordering};
2952
2953 let debug_count = Arc::new(AtomicUsize::new(0));
2954 let info_count = Arc::new(AtomicUsize::new(0));
2955 let warn_count = Arc::new(AtomicUsize::new(0));
2956 let error_count = Arc::new(AtomicUsize::new(0));
2957
2958 let inner = Arc::new(custom_logger(
2959 {
2960 let count = debug_count.clone();
2961 move |_, _| {
2962 count.fetch_add(1, Ordering::SeqCst);
2963 }
2964 },
2965 {
2966 let count = info_count.clone();
2967 move |_, _| {
2968 count.fetch_add(1, Ordering::SeqCst);
2969 }
2970 },
2971 {
2972 let count = warn_count.clone();
2973 move |_, _| {
2974 count.fetch_add(1, Ordering::SeqCst);
2975 }
2976 },
2977 {
2978 let count = error_count.clone();
2979 move |_, _| {
2980 count.fetch_add(1, Ordering::SeqCst);
2981 }
2982 },
2983 ));
2984
2985 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::WarningsAndErrors);
2986
2987 let replay_info = LogInfo::new("arn:test").with_replay(true);
2989 logger.debug("test", &replay_info);
2990 logger.info("test", &replay_info);
2991 logger.warn("test", &replay_info);
2992 logger.error("test", &replay_info);
2993
2994 assert_eq!(debug_count.load(Ordering::SeqCst), 0);
2995 assert_eq!(info_count.load(Ordering::SeqCst), 0);
2996 assert_eq!(warn_count.load(Ordering::SeqCst), 1);
2997 assert_eq!(error_count.load(Ordering::SeqCst), 1);
2998 }
2999
3000 #[test]
3001 fn test_replay_aware_logger_suppress_replay_constructor() {
3002 let inner: Arc<dyn Logger> = Arc::new(TracingLogger);
3003 let logger = ReplayAwareLogger::suppress_replay(inner);
3004
3005 assert_eq!(logger.config(), ReplayLoggingConfig::SuppressAll);
3006 }
3007
3008 #[test]
3009 fn test_replay_aware_logger_debug() {
3010 let inner: Arc<dyn Logger> = Arc::new(TracingLogger);
3011 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::SuppressAll);
3012
3013 let debug_str = format!("{:?}", logger);
3014 assert!(debug_str.contains("ReplayAwareLogger"));
3015 assert!(debug_str.contains("SuppressAll"));
3016 }
3017
3018 #[test]
3019 fn test_durable_context_create_log_info_with_replay_method() {
3020 let state = create_test_state();
3021 let ctx = DurableContext::new(state);
3022
3023 let info = ctx.create_log_info_with_replay("op-123", true);
3024
3025 assert_eq!(info.operation_id, Some("op-123".to_string()));
3026 assert!(info.is_replay);
3027 }
3028
3029 #[test]
3030 fn test_durable_context_clone() {
3031 let state = create_test_state();
3032 let ctx = DurableContext::new(state);
3033
3034 ctx.next_operation_id();
3035 ctx.next_operation_id();
3036
3037 let cloned = ctx.clone();
3038
3039 assert_eq!(cloned.durable_execution_arn(), ctx.durable_execution_arn());
3040 assert_eq!(cloned.current_step_counter(), ctx.current_step_counter());
3041 }
3042
3043 #[test]
3044 fn test_durable_context_debug() {
3045 let state = create_test_state();
3046 let ctx = DurableContext::new(state);
3047
3048 let debug_str = format!("{:?}", ctx);
3049
3050 assert!(debug_str.contains("DurableContext"));
3051 assert!(debug_str.contains("durable_execution_arn"));
3052 }
3053
3054 #[test]
3055 fn test_durable_context_state_access() {
3056 let state = create_test_state();
3057 let ctx = DurableContext::new(state.clone());
3058
3059 assert!(Arc::ptr_eq(&ctx.state(), &state));
3061 }
3062
3063 #[test]
3064 fn test_durable_context_send_sync() {
3065 fn assert_send_sync<T: Send + Sync>() {}
3067 assert_send_sync::<DurableContext>();
3068 }
3069
3070 #[test]
3075 fn test_log_info_method() {
3076 use std::sync::atomic::{AtomicUsize, Ordering};
3077 use std::sync::Mutex;
3078
3079 let info_count = Arc::new(AtomicUsize::new(0));
3080 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3081
3082 let captured_info_clone = captured_info.clone();
3083 let inner = Arc::new(custom_logger(
3084 |_, _| {},
3085 {
3086 let count = info_count.clone();
3087 move |_, info: &LogInfo| {
3088 count.fetch_add(1, Ordering::SeqCst);
3089 *captured_info_clone.lock().unwrap() = Some(info.clone());
3090 }
3091 },
3092 |_, _| {},
3093 |_, _| {},
3094 ));
3095
3096 let state = create_test_state();
3097 let ctx = DurableContext::new(state).with_logger(inner);
3098
3099 ctx.log_info("Test message");
3100
3101 assert_eq!(info_count.load(Ordering::SeqCst), 1);
3102
3103 let captured = captured_info.lock().unwrap();
3104 let info = captured.as_ref().unwrap();
3105 assert_eq!(
3106 info.durable_execution_arn,
3107 Some("arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123".to_string())
3108 );
3109 }
3110
3111 #[test]
3112 fn test_log_debug_method() {
3113 use std::sync::atomic::{AtomicUsize, Ordering};
3114
3115 let debug_count = Arc::new(AtomicUsize::new(0));
3116
3117 let inner = Arc::new(custom_logger(
3118 {
3119 let count = debug_count.clone();
3120 move |_, _| {
3121 count.fetch_add(1, Ordering::SeqCst);
3122 }
3123 },
3124 |_, _| {},
3125 |_, _| {},
3126 |_, _| {},
3127 ));
3128
3129 let state = create_test_state();
3130 let ctx = DurableContext::new(state).with_logger(inner);
3131
3132 ctx.log_debug("Debug message");
3133
3134 assert_eq!(debug_count.load(Ordering::SeqCst), 1);
3135 }
3136
3137 #[test]
3138 fn test_log_warn_method() {
3139 use std::sync::atomic::{AtomicUsize, Ordering};
3140
3141 let warn_count = Arc::new(AtomicUsize::new(0));
3142
3143 let inner = Arc::new(custom_logger(
3144 |_, _| {},
3145 |_, _| {},
3146 {
3147 let count = warn_count.clone();
3148 move |_, _| {
3149 count.fetch_add(1, Ordering::SeqCst);
3150 }
3151 },
3152 |_, _| {},
3153 ));
3154
3155 let state = create_test_state();
3156 let ctx = DurableContext::new(state).with_logger(inner);
3157
3158 ctx.log_warn("Warning message");
3159
3160 assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3161 }
3162
3163 #[test]
3164 fn test_log_error_method() {
3165 use std::sync::atomic::{AtomicUsize, Ordering};
3166
3167 let error_count = Arc::new(AtomicUsize::new(0));
3168
3169 let inner = Arc::new(custom_logger(|_, _| {}, |_, _| {}, |_, _| {}, {
3170 let count = error_count.clone();
3171 move |_, _| {
3172 count.fetch_add(1, Ordering::SeqCst);
3173 }
3174 }));
3175
3176 let state = create_test_state();
3177 let ctx = DurableContext::new(state).with_logger(inner);
3178
3179 ctx.log_error("Error message");
3180
3181 assert_eq!(error_count.load(Ordering::SeqCst), 1);
3182 }
3183
3184 #[test]
3185 fn test_log_info_with_extra_fields() {
3186 use std::sync::Mutex;
3187
3188 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3189
3190 let captured_info_clone = captured_info.clone();
3191 let inner = Arc::new(custom_logger(
3192 |_, _| {},
3193 move |_, info: &LogInfo| {
3194 *captured_info_clone.lock().unwrap() = Some(info.clone());
3195 },
3196 |_, _| {},
3197 |_, _| {},
3198 ));
3199
3200 let state = create_test_state();
3201 let ctx = DurableContext::new(state).with_logger(inner);
3202
3203 ctx.log_info_with("Test message", &[("order_id", "123"), ("amount", "99.99")]);
3204
3205 let captured = captured_info.lock().unwrap();
3206 let info = captured.as_ref().unwrap();
3207
3208 assert_eq!(info.extra.len(), 2);
3210 assert!(info
3211 .extra
3212 .contains(&("order_id".to_string(), "123".to_string())));
3213 assert!(info
3214 .extra
3215 .contains(&("amount".to_string(), "99.99".to_string())));
3216 }
3217
3218 #[test]
3219 fn test_log_debug_with_extra_fields() {
3220 use std::sync::Mutex;
3221
3222 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3223
3224 let captured_info_clone = captured_info.clone();
3225 let inner = Arc::new(custom_logger(
3226 move |_, info: &LogInfo| {
3227 *captured_info_clone.lock().unwrap() = Some(info.clone());
3228 },
3229 |_, _| {},
3230 |_, _| {},
3231 |_, _| {},
3232 ));
3233
3234 let state = create_test_state();
3235 let ctx = DurableContext::new(state).with_logger(inner);
3236
3237 ctx.log_debug_with("Debug message", &[("key", "value")]);
3238
3239 let captured = captured_info.lock().unwrap();
3240 let info = captured.as_ref().unwrap();
3241
3242 assert_eq!(info.extra.len(), 1);
3243 assert!(info
3244 .extra
3245 .contains(&("key".to_string(), "value".to_string())));
3246 }
3247
3248 #[test]
3249 fn test_log_warn_with_extra_fields() {
3250 use std::sync::Mutex;
3251
3252 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3253
3254 let captured_info_clone = captured_info.clone();
3255 let inner = Arc::new(custom_logger(
3256 |_, _| {},
3257 |_, _| {},
3258 move |_, info: &LogInfo| {
3259 *captured_info_clone.lock().unwrap() = Some(info.clone());
3260 },
3261 |_, _| {},
3262 ));
3263
3264 let state = create_test_state();
3265 let ctx = DurableContext::new(state).with_logger(inner);
3266
3267 ctx.log_warn_with("Warning message", &[("retry", "3")]);
3268
3269 let captured = captured_info.lock().unwrap();
3270 let info = captured.as_ref().unwrap();
3271
3272 assert_eq!(info.extra.len(), 1);
3273 assert!(info.extra.contains(&("retry".to_string(), "3".to_string())));
3274 }
3275
3276 #[test]
3277 fn test_log_error_with_extra_fields() {
3278 use std::sync::Mutex;
3279
3280 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3281
3282 let captured_info_clone = captured_info.clone();
3283 let inner = Arc::new(custom_logger(
3284 |_, _| {},
3285 |_, _| {},
3286 |_, _| {},
3287 move |_, info: &LogInfo| {
3288 *captured_info_clone.lock().unwrap() = Some(info.clone());
3289 },
3290 ));
3291
3292 let state = create_test_state();
3293 let ctx = DurableContext::new(state).with_logger(inner);
3294
3295 ctx.log_error_with(
3296 "Error message",
3297 &[("error_code", "E001"), ("details", "Something went wrong")],
3298 );
3299
3300 let captured = captured_info.lock().unwrap();
3301 let info = captured.as_ref().unwrap();
3302
3303 assert_eq!(info.extra.len(), 2);
3304 assert!(info
3305 .extra
3306 .contains(&("error_code".to_string(), "E001".to_string())));
3307 assert!(info
3308 .extra
3309 .contains(&("details".to_string(), "Something went wrong".to_string())));
3310 }
3311
3312 #[test]
3313 fn test_log_methods_include_parent_id_in_child_context() {
3314 use std::sync::Mutex;
3315
3316 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3317
3318 let captured_info_clone = captured_info.clone();
3319 let inner: Arc<dyn Logger> = Arc::new(custom_logger(
3320 |_, _| {},
3321 move |_, info: &LogInfo| {
3322 *captured_info_clone.lock().unwrap() = Some(info.clone());
3323 },
3324 |_, _| {},
3325 |_, _| {},
3326 ));
3327
3328 let state = create_test_state();
3329 let ctx = DurableContext::new(state).with_logger(inner.clone());
3330
3331 let parent_op_id = ctx.next_operation_id();
3332 let child_ctx = ctx.create_child_context(&parent_op_id).with_logger(inner);
3333
3334 child_ctx.log_info("Child context message");
3335
3336 let captured = captured_info.lock().unwrap();
3337 let info = captured.as_ref().unwrap();
3338
3339 assert_eq!(info.parent_id, Some(parent_op_id));
3341 }
3342
3343 #[test]
3348 fn test_configure_logger_swaps_logger() {
3349 use std::sync::atomic::{AtomicUsize, Ordering};
3351
3352 let original_count = Arc::new(AtomicUsize::new(0));
3353 let new_count = Arc::new(AtomicUsize::new(0));
3354
3355 let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3356 |_, _| {},
3357 {
3358 let count = original_count.clone();
3359 move |_, _| {
3360 count.fetch_add(1, Ordering::SeqCst);
3361 }
3362 },
3363 |_, _| {},
3364 |_, _| {},
3365 ));
3366
3367 let new_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3368 |_, _| {},
3369 {
3370 let count = new_count.clone();
3371 move |_, _| {
3372 count.fetch_add(1, Ordering::SeqCst);
3373 }
3374 },
3375 |_, _| {},
3376 |_, _| {},
3377 ));
3378
3379 let state = create_test_state();
3380 let ctx = DurableContext::new(state).with_logger(original_logger);
3381
3382 ctx.log_info("before swap");
3384 assert_eq!(original_count.load(Ordering::SeqCst), 1);
3385 assert_eq!(new_count.load(Ordering::SeqCst), 0);
3386
3387 ctx.configure_logger(new_logger);
3389
3390 ctx.log_info("after swap");
3392 assert_eq!(original_count.load(Ordering::SeqCst), 1); assert_eq!(new_count.load(Ordering::SeqCst), 1);
3394 }
3395
3396 #[test]
3397 fn test_original_logger_used_when_configure_logger_not_called() {
3398 use std::sync::atomic::{AtomicUsize, Ordering};
3400
3401 let original_count = Arc::new(AtomicUsize::new(0));
3402
3403 let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3404 |_, _| {},
3405 {
3406 let count = original_count.clone();
3407 move |_, _| {
3408 count.fetch_add(1, Ordering::SeqCst);
3409 }
3410 },
3411 |_, _| {},
3412 |_, _| {},
3413 ));
3414
3415 let state = create_test_state();
3416 let ctx = DurableContext::new(state).with_logger(original_logger);
3417
3418 ctx.log_info("message 1");
3420 ctx.log_info("message 2");
3421 ctx.log_info("message 3");
3422
3423 assert_eq!(original_count.load(Ordering::SeqCst), 3);
3424 }
3425
3426 #[test]
3427 fn test_configure_logger_affects_child_contexts() {
3428 use std::sync::atomic::{AtomicUsize, Ordering};
3430
3431 let original_count = Arc::new(AtomicUsize::new(0));
3432 let new_count = Arc::new(AtomicUsize::new(0));
3433
3434 let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3435 |_, _| {},
3436 {
3437 let count = original_count.clone();
3438 move |_, _| {
3439 count.fetch_add(1, Ordering::SeqCst);
3440 }
3441 },
3442 |_, _| {},
3443 |_, _| {},
3444 ));
3445
3446 let new_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3447 |_, _| {},
3448 {
3449 let count = new_count.clone();
3450 move |_, _| {
3451 count.fetch_add(1, Ordering::SeqCst);
3452 }
3453 },
3454 |_, _| {},
3455 |_, _| {},
3456 ));
3457
3458 let state = create_test_state();
3459 let ctx = DurableContext::new(state).with_logger(original_logger);
3460 let parent_op_id = ctx.next_operation_id();
3461 let child_ctx = ctx.create_child_context(&parent_op_id);
3462
3463 child_ctx.log_info("child before swap");
3465 assert_eq!(original_count.load(Ordering::SeqCst), 1);
3466
3467 ctx.configure_logger(new_logger);
3469
3470 child_ctx.log_info("child after swap");
3472 assert_eq!(new_count.load(Ordering::SeqCst), 1);
3473 assert_eq!(original_count.load(Ordering::SeqCst), 1); }
3475}
3476
3477#[cfg(test)]
3478mod property_tests {
3479 use super::*;
3480 use proptest::prelude::*;
3481
3482 proptest! {
3488 #![proptest_config(ProptestConfig::with_cases(100))]
3489
3490 #[test]
3493 fn prop_operation_id_determinism(
3494 base_id in "[a-zA-Z0-9:/-]{1,100}",
3495 counter in 0u64..10000u64,
3496 ) {
3497 let id1 = generate_operation_id(&base_id, counter);
3499 let id2 = generate_operation_id(&base_id, counter);
3500
3501 prop_assert_eq!(&id1, &id2, "Same base_id and counter must produce identical IDs");
3503
3504 prop_assert_eq!(id1.len(), 32, "ID must be 32 hex characters");
3506 prop_assert!(id1.chars().all(|c| c.is_ascii_hexdigit()), "ID must be valid hex");
3507 }
3508
3509 #[test]
3512 fn prop_operation_id_generator_determinism(
3513 base_id in "[a-zA-Z0-9:/-]{1,100}",
3514 num_ids in 1usize..50usize,
3515 ) {
3516 let gen1 = OperationIdGenerator::new(&base_id);
3518 let gen2 = OperationIdGenerator::new(&base_id);
3519
3520 let ids1: Vec<String> = (0..num_ids).map(|_| gen1.next_id()).collect();
3522 let ids2: Vec<String> = (0..num_ids).map(|_| gen2.next_id()).collect();
3523
3524 prop_assert_eq!(&ids1, &ids2, "Same generator sequence must produce identical IDs");
3526
3527 let unique_count = {
3529 let mut set = std::collections::HashSet::new();
3530 for id in &ids1 {
3531 set.insert(id.clone());
3532 }
3533 set.len()
3534 };
3535 prop_assert_eq!(unique_count, num_ids, "All IDs in sequence must be unique");
3536 }
3537
3538 #[test]
3541 fn prop_operation_id_replay_determinism(
3542 base_id in "[a-zA-Z0-9:/-]{1,100}",
3543 operations in prop::collection::vec(0u32..3u32, 1..20),
3544 ) {
3545 let gen1 = OperationIdGenerator::new(&base_id);
3549 let gen2 = OperationIdGenerator::new(&base_id);
3550
3551 let mut ids1 = Vec::new();
3552 let mut ids2 = Vec::new();
3553
3554 for op_type in &operations {
3556 ids1.push(gen1.next_id());
3558
3559 if *op_type == 2 {
3561 let parent_id = ids1.last().unwrap().clone();
3562 let child_gen = gen1.create_child(parent_id);
3563 ids1.push(child_gen.next_id());
3564 }
3565 }
3566
3567 for op_type in &operations {
3569 ids2.push(gen2.next_id());
3570
3571 if *op_type == 2 {
3572 let parent_id = ids2.last().unwrap().clone();
3573 let child_gen = gen2.create_child(parent_id);
3574 ids2.push(child_gen.next_id());
3575 }
3576 }
3577
3578 prop_assert_eq!(&ids1, &ids2, "Replay must produce identical operation IDs");
3580 }
3581 }
3582
3583 mod concurrent_id_tests {
3588 use super::*;
3589 use std::sync::Arc;
3590 use std::thread;
3591
3592 proptest! {
3593 #![proptest_config(ProptestConfig::with_cases(100))]
3594
3595 #[test]
3598 fn prop_concurrent_id_uniqueness(
3599 base_id in "[a-zA-Z0-9:/-]{1,100}",
3600 num_threads in 2usize..10usize,
3601 ids_per_thread in 10usize..100usize,
3602 ) {
3603 let gen = Arc::new(OperationIdGenerator::new(&base_id));
3604 let mut handles = vec![];
3605
3606 for _ in 0..num_threads {
3608 let gen_clone = gen.clone();
3609 let count = ids_per_thread;
3610 handles.push(thread::spawn(move || {
3611 let mut ids = Vec::with_capacity(count);
3612 for _ in 0..count {
3613 ids.push(gen_clone.next_id());
3614 }
3615 ids
3616 }));
3617 }
3618
3619 let mut all_ids = Vec::new();
3621 for handle in handles {
3622 all_ids.extend(handle.join().unwrap());
3623 }
3624
3625 let total_expected = num_threads * ids_per_thread;
3626
3627 prop_assert_eq!(all_ids.len(), total_expected, "Should have generated {} IDs", total_expected);
3629
3630 let unique_count = {
3632 let mut set = std::collections::HashSet::new();
3633 for id in &all_ids {
3634 set.insert(id.clone());
3635 }
3636 set.len()
3637 };
3638
3639 prop_assert_eq!(
3640 unique_count,
3641 total_expected,
3642 "All {} IDs must be unique, but only {} were unique",
3643 total_expected,
3644 unique_count
3645 );
3646
3647 prop_assert_eq!(
3649 gen.current_counter() as usize,
3650 total_expected,
3651 "Counter should equal total IDs generated"
3652 );
3653 }
3654
3655 #[test]
3658 fn prop_concurrent_id_uniqueness_stress(
3659 base_id in "[a-zA-Z0-9:/-]{1,50}",
3660 ) {
3661 let num_threads = 20;
3663 let ids_per_thread = 500;
3664
3665 let gen = Arc::new(OperationIdGenerator::new(&base_id));
3666 let mut handles = vec![];
3667
3668 for _ in 0..num_threads {
3669 let gen_clone = gen.clone();
3670 handles.push(thread::spawn(move || {
3671 let mut ids = Vec::with_capacity(ids_per_thread);
3672 for _ in 0..ids_per_thread {
3673 ids.push(gen_clone.next_id());
3674 }
3675 ids
3676 }));
3677 }
3678
3679 let mut all_ids = Vec::new();
3680 for handle in handles {
3681 all_ids.extend(handle.join().unwrap());
3682 }
3683
3684 let total_expected = num_threads * ids_per_thread;
3685
3686 let unique_count = {
3688 let mut set = std::collections::HashSet::new();
3689 for id in &all_ids {
3690 set.insert(id.clone());
3691 }
3692 set.len()
3693 };
3694
3695 prop_assert_eq!(
3696 unique_count,
3697 total_expected,
3698 "All {} IDs must be unique under high concurrency",
3699 total_expected
3700 );
3701 }
3702 }
3703 }
3704
3705 mod logging_automatic_context_tests {
3711 use super::*;
3712 use std::sync::{Arc, Mutex};
3713
3714 proptest! {
3715 #![proptest_config(ProptestConfig::with_cases(100))]
3716
3717 #[test]
3720 fn prop_logging_automatic_context(
3721 message in "[a-zA-Z0-9 ]{1,100}",
3722 log_level in 0u8..4u8,
3723 ) {
3724 use crate::client::MockDurableServiceClient;
3725 use crate::lambda::InitialExecutionState;
3726
3727 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3728 let captured_info_clone = captured_info.clone();
3729
3730 let inner = Arc::new(custom_logger(
3732 {
3733 let captured = captured_info_clone.clone();
3734 move |_, info: &LogInfo| {
3735 *captured.lock().unwrap() = Some(info.clone());
3736 }
3737 },
3738 {
3739 let captured = captured_info_clone.clone();
3740 move |_, info: &LogInfo| {
3741 *captured.lock().unwrap() = Some(info.clone());
3742 }
3743 },
3744 {
3745 let captured = captured_info_clone.clone();
3746 move |_, info: &LogInfo| {
3747 *captured.lock().unwrap() = Some(info.clone());
3748 }
3749 },
3750 {
3751 let captured = captured_info_clone.clone();
3752 move |_, info: &LogInfo| {
3753 *captured.lock().unwrap() = Some(info.clone());
3754 }
3755 },
3756 ));
3757
3758 let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3759 let state = Arc::new(ExecutionState::new(
3760 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3761 "token-123",
3762 InitialExecutionState::new(),
3763 client,
3764 ));
3765 let ctx = DurableContext::new(state).with_logger(inner);
3766
3767 match log_level {
3769 0 => ctx.log_debug(&message),
3770 1 => ctx.log_info(&message),
3771 2 => ctx.log_warn(&message),
3772 _ => ctx.log_error(&message),
3773 }
3774
3775 let captured = captured_info.lock().unwrap();
3777 let info = captured.as_ref().expect("LogInfo should be captured");
3778
3779 prop_assert!(
3781 info.durable_execution_arn.is_some(),
3782 "durable_execution_arn must be automatically included"
3783 );
3784 prop_assert_eq!(
3785 info.durable_execution_arn.as_ref().unwrap(),
3786 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3787 "durable_execution_arn must match the context's ARN"
3788 );
3789 }
3790
3791 #[test]
3794 fn prop_logging_automatic_context_child(
3795 message in "[a-zA-Z0-9 ]{1,100}",
3796 log_level in 0u8..4u8,
3797 ) {
3798 use crate::client::MockDurableServiceClient;
3799 use crate::lambda::InitialExecutionState;
3800
3801 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3802 let captured_info_clone = captured_info.clone();
3803
3804 let inner: Arc<dyn Logger> = Arc::new(custom_logger(
3806 {
3807 let captured = captured_info_clone.clone();
3808 move |_, info: &LogInfo| {
3809 *captured.lock().unwrap() = Some(info.clone());
3810 }
3811 },
3812 {
3813 let captured = captured_info_clone.clone();
3814 move |_, info: &LogInfo| {
3815 *captured.lock().unwrap() = Some(info.clone());
3816 }
3817 },
3818 {
3819 let captured = captured_info_clone.clone();
3820 move |_, info: &LogInfo| {
3821 *captured.lock().unwrap() = Some(info.clone());
3822 }
3823 },
3824 {
3825 let captured = captured_info_clone.clone();
3826 move |_, info: &LogInfo| {
3827 *captured.lock().unwrap() = Some(info.clone());
3828 }
3829 },
3830 ));
3831
3832 let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3833 let state = Arc::new(ExecutionState::new(
3834 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3835 "token-123",
3836 InitialExecutionState::new(),
3837 client,
3838 ));
3839 let ctx = DurableContext::new(state).with_logger(inner.clone());
3840
3841 let parent_op_id = ctx.next_operation_id();
3843 let child_ctx = ctx.create_child_context(&parent_op_id).with_logger(inner);
3844
3845 match log_level {
3847 0 => child_ctx.log_debug(&message),
3848 1 => child_ctx.log_info(&message),
3849 2 => child_ctx.log_warn(&message),
3850 _ => child_ctx.log_error(&message),
3851 }
3852
3853 let captured = captured_info.lock().unwrap();
3855 let info = captured.as_ref().expect("LogInfo should be captured");
3856
3857 prop_assert!(
3859 info.durable_execution_arn.is_some(),
3860 "durable_execution_arn must be automatically included in child context"
3861 );
3862
3863 prop_assert!(
3865 info.parent_id.is_some(),
3866 "parent_id must be automatically included in child context"
3867 );
3868 prop_assert_eq!(
3869 info.parent_id.as_ref().unwrap(),
3870 &parent_op_id,
3871 "parent_id must match the parent operation ID"
3872 );
3873 }
3874 }
3875 }
3876
3877 mod logging_extra_fields_tests {
3882 use super::*;
3883 use std::sync::{Arc, Mutex};
3884
3885 proptest! {
3886 #![proptest_config(ProptestConfig::with_cases(100))]
3887
3888 #[test]
3891 fn prop_logging_extra_fields(
3892 message in "[a-zA-Z0-9 ]{1,100}",
3893 log_level in 0u8..4u8,
3894 key1 in "[a-zA-Z_][a-zA-Z0-9_]{0,20}",
3895 value1 in "[a-zA-Z0-9]{1,50}",
3896 key2 in "[a-zA-Z_][a-zA-Z0-9_]{0,20}",
3897 value2 in "[a-zA-Z0-9]{1,50}",
3898 ) {
3899 use crate::client::MockDurableServiceClient;
3900 use crate::lambda::InitialExecutionState;
3901
3902 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3903 let captured_info_clone = captured_info.clone();
3904
3905 let inner = Arc::new(custom_logger(
3907 {
3908 let captured = captured_info_clone.clone();
3909 move |_, info: &LogInfo| {
3910 *captured.lock().unwrap() = Some(info.clone());
3911 }
3912 },
3913 {
3914 let captured = captured_info_clone.clone();
3915 move |_, info: &LogInfo| {
3916 *captured.lock().unwrap() = Some(info.clone());
3917 }
3918 },
3919 {
3920 let captured = captured_info_clone.clone();
3921 move |_, info: &LogInfo| {
3922 *captured.lock().unwrap() = Some(info.clone());
3923 }
3924 },
3925 {
3926 let captured = captured_info_clone.clone();
3927 move |_, info: &LogInfo| {
3928 *captured.lock().unwrap() = Some(info.clone());
3929 }
3930 },
3931 ));
3932
3933 let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3934 let state = Arc::new(ExecutionState::new(
3935 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3936 "token-123",
3937 InitialExecutionState::new(),
3938 client,
3939 ));
3940 let ctx = DurableContext::new(state).with_logger(inner);
3941
3942 let fields: Vec<(&str, &str)> = vec![(&key1, &value1), (&key2, &value2)];
3944
3945 match log_level {
3947 0 => ctx.log_debug_with(&message, &fields),
3948 1 => ctx.log_info_with(&message, &fields),
3949 2 => ctx.log_warn_with(&message, &fields),
3950 _ => ctx.log_error_with(&message, &fields),
3951 }
3952
3953 let captured = captured_info.lock().unwrap();
3955 let info = captured.as_ref().expect("LogInfo should be captured");
3956
3957 prop_assert_eq!(
3959 info.extra.len(),
3960 2,
3961 "Extra fields must be included in the log output"
3962 );
3963
3964 prop_assert!(
3966 info.extra.contains(&(key1.clone(), value1.clone())),
3967 "First extra field must be present: {}={}", key1, value1
3968 );
3969 prop_assert!(
3970 info.extra.contains(&(key2.clone(), value2.clone())),
3971 "Second extra field must be present: {}={}", key2, value2
3972 );
3973 }
3974
3975 #[test]
3978 fn prop_logging_extra_fields_empty(
3979 message in "[a-zA-Z0-9 ]{1,100}",
3980 log_level in 0u8..4u8,
3981 ) {
3982 use crate::client::MockDurableServiceClient;
3983 use crate::lambda::InitialExecutionState;
3984
3985 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3986 let captured_info_clone = captured_info.clone();
3987
3988 let inner = Arc::new(custom_logger(
3990 {
3991 let captured = captured_info_clone.clone();
3992 move |_, info: &LogInfo| {
3993 *captured.lock().unwrap() = Some(info.clone());
3994 }
3995 },
3996 {
3997 let captured = captured_info_clone.clone();
3998 move |_, info: &LogInfo| {
3999 *captured.lock().unwrap() = Some(info.clone());
4000 }
4001 },
4002 {
4003 let captured = captured_info_clone.clone();
4004 move |_, info: &LogInfo| {
4005 *captured.lock().unwrap() = Some(info.clone());
4006 }
4007 },
4008 {
4009 let captured = captured_info_clone.clone();
4010 move |_, info: &LogInfo| {
4011 *captured.lock().unwrap() = Some(info.clone());
4012 }
4013 },
4014 ));
4015
4016 let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
4017 let state = Arc::new(ExecutionState::new(
4018 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
4019 "token-123",
4020 InitialExecutionState::new(),
4021 client,
4022 ));
4023 let ctx = DurableContext::new(state).with_logger(inner);
4024
4025 let empty_fields: &[(&str, &str)] = &[];
4027 match log_level {
4028 0 => ctx.log_debug_with(&message, empty_fields),
4029 1 => ctx.log_info_with(&message, empty_fields),
4030 2 => ctx.log_warn_with(&message, empty_fields),
4031 _ => ctx.log_error_with(&message, empty_fields),
4032 }
4033
4034 let captured = captured_info.lock().unwrap();
4036 let info = captured.as_ref().expect("LogInfo should be captured");
4037
4038 prop_assert!(
4039 info.extra.is_empty(),
4040 "Extra fields should be empty when none are provided"
4041 );
4042
4043 prop_assert!(
4045 info.durable_execution_arn.is_some(),
4046 "durable_execution_arn must still be present even with empty extra fields"
4047 );
4048 }
4049 }
4050 }
4051}
4052
4053#[cfg(test)]
4054mod sealed_trait_tests {
4055 use super::*;
4056 use std::sync::atomic::{AtomicUsize, Ordering};
4057
4058 mod logger_tests {
4067 use super::*;
4068
4069 #[test]
4070 fn test_tracing_logger_implements_logger() {
4071 let logger: &dyn Logger = &TracingLogger;
4073 let info = LogInfo::default();
4074
4075 logger.debug("test debug", &info);
4077 logger.info("test info", &info);
4078 logger.warn("test warn", &info);
4079 logger.error("test error", &info);
4080 }
4081
4082 #[test]
4083 fn test_replay_aware_logger_implements_logger() {
4084 let inner = Arc::new(TracingLogger);
4086 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::AllowAll);
4087 let logger_ref: &dyn Logger = &logger;
4088 let info = LogInfo::default();
4089
4090 logger_ref.debug("test debug", &info);
4092 logger_ref.info("test info", &info);
4093 logger_ref.warn("test warn", &info);
4094 logger_ref.error("test error", &info);
4095 }
4096
4097 #[test]
4098 fn test_custom_logger_implements_logger() {
4099 let call_count = Arc::new(AtomicUsize::new(0));
4101 let count_clone = call_count.clone();
4102
4103 let logger = custom_logger(
4104 {
4105 let count = count_clone.clone();
4106 move |_msg, _info| {
4107 count.fetch_add(1, Ordering::SeqCst);
4108 }
4109 },
4110 {
4111 let count = count_clone.clone();
4112 move |_msg, _info| {
4113 count.fetch_add(1, Ordering::SeqCst);
4114 }
4115 },
4116 {
4117 let count = count_clone.clone();
4118 move |_msg, _info| {
4119 count.fetch_add(1, Ordering::SeqCst);
4120 }
4121 },
4122 {
4123 let count = count_clone.clone();
4124 move |_msg, _info| {
4125 count.fetch_add(1, Ordering::SeqCst);
4126 }
4127 },
4128 );
4129
4130 let logger_ref: &dyn Logger = &logger;
4131 let info = LogInfo::default();
4132
4133 logger_ref.debug("test", &info);
4134 logger_ref.info("test", &info);
4135 logger_ref.warn("test", &info);
4136 logger_ref.error("test", &info);
4137
4138 assert_eq!(call_count.load(Ordering::SeqCst), 4);
4139 }
4140
4141 #[test]
4142 fn test_simple_custom_logger() {
4143 let call_count = Arc::new(AtomicUsize::new(0));
4144 let count_clone = call_count.clone();
4145
4146 let logger = simple_custom_logger(move |_level, _msg, _info| {
4147 count_clone.fetch_add(1, Ordering::SeqCst);
4148 });
4149
4150 let info = LogInfo::default();
4151
4152 logger.debug("test", &info);
4153 logger.info("test", &info);
4154 logger.warn("test", &info);
4155 logger.error("test", &info);
4156
4157 assert_eq!(call_count.load(Ordering::SeqCst), 4);
4158 }
4159
4160 #[test]
4161 fn test_custom_logger_receives_correct_messages() {
4162 let messages = Arc::new(std::sync::Mutex::new(Vec::new()));
4163 let messages_clone = messages.clone();
4164
4165 let logger = simple_custom_logger(move |level, msg, _info| {
4166 messages_clone
4167 .lock()
4168 .unwrap()
4169 .push(format!("[{}] {}", level, msg));
4170 });
4171
4172 let info = LogInfo::default();
4173
4174 logger.debug("debug message", &info);
4175 logger.info("info message", &info);
4176 logger.warn("warn message", &info);
4177 logger.error("error message", &info);
4178
4179 let logged = messages.lock().unwrap();
4180 assert_eq!(logged.len(), 4);
4181 assert_eq!(logged[0], "[DEBUG] debug message");
4182 assert_eq!(logged[1], "[INFO] info message");
4183 assert_eq!(logged[2], "[WARN] warn message");
4184 assert_eq!(logged[3], "[ERROR] error message");
4185 }
4186
4187 #[test]
4188 fn test_custom_logger_receives_log_info() {
4189 let received_info = Arc::new(std::sync::Mutex::new(None));
4190 let info_clone = received_info.clone();
4191
4192 let logger = simple_custom_logger(move |_level, _msg, info| {
4193 *info_clone.lock().unwrap() = Some(info.clone());
4194 });
4195
4196 let info = LogInfo::new("arn:aws:test")
4197 .with_operation_id("op-123")
4198 .with_parent_id("parent-456")
4199 .with_replay(true);
4200
4201 logger.info("test", &info);
4202
4203 let received = received_info.lock().unwrap().clone().unwrap();
4204 assert_eq!(
4205 received.durable_execution_arn,
4206 Some("arn:aws:test".to_string())
4207 );
4208 assert_eq!(received.operation_id, Some("op-123".to_string()));
4209 assert_eq!(received.parent_id, Some("parent-456".to_string()));
4210 assert!(received.is_replay);
4211 }
4212
4213 #[test]
4214 fn test_replay_aware_logger_suppresses_during_replay() {
4215 let call_count = Arc::new(AtomicUsize::new(0));
4216 let count_clone = call_count.clone();
4217
4218 let inner_logger = Arc::new(custom_logger(
4219 {
4220 let count = count_clone.clone();
4221 move |_msg, _info| {
4222 count.fetch_add(1, Ordering::SeqCst);
4223 }
4224 },
4225 {
4226 let count = count_clone.clone();
4227 move |_msg, _info| {
4228 count.fetch_add(1, Ordering::SeqCst);
4229 }
4230 },
4231 {
4232 let count = count_clone.clone();
4233 move |_msg, _info| {
4234 count.fetch_add(1, Ordering::SeqCst);
4235 }
4236 },
4237 {
4238 let count = count_clone.clone();
4239 move |_msg, _info| {
4240 count.fetch_add(1, Ordering::SeqCst);
4241 }
4242 },
4243 ));
4244
4245 let logger = ReplayAwareLogger::new(inner_logger, ReplayLoggingConfig::SuppressAll);
4246
4247 let non_replay_info = LogInfo::default().with_replay(false);
4249 logger.debug("test", &non_replay_info);
4250 logger.info("test", &non_replay_info);
4251 logger.warn("test", &non_replay_info);
4252 logger.error("test", &non_replay_info);
4253 assert_eq!(call_count.load(Ordering::SeqCst), 4);
4254
4255 let replay_info = LogInfo::default().with_replay(true);
4257 logger.debug("test", &replay_info);
4258 logger.info("test", &replay_info);
4259 logger.warn("test", &replay_info);
4260 logger.error("test", &replay_info);
4261 assert_eq!(call_count.load(Ordering::SeqCst), 4); }
4263
4264 #[test]
4265 fn test_replay_aware_logger_errors_only_during_replay() {
4266 let call_count = Arc::new(AtomicUsize::new(0));
4267 let count_clone = call_count.clone();
4268
4269 let inner_logger = Arc::new(custom_logger(
4270 {
4271 let count = count_clone.clone();
4272 move |_msg, _info| {
4273 count.fetch_add(1, Ordering::SeqCst);
4274 }
4275 },
4276 {
4277 let count = count_clone.clone();
4278 move |_msg, _info| {
4279 count.fetch_add(1, Ordering::SeqCst);
4280 }
4281 },
4282 {
4283 let count = count_clone.clone();
4284 move |_msg, _info| {
4285 count.fetch_add(1, Ordering::SeqCst);
4286 }
4287 },
4288 {
4289 let count = count_clone.clone();
4290 move |_msg, _info| {
4291 count.fetch_add(1, Ordering::SeqCst);
4292 }
4293 },
4294 ));
4295
4296 let logger = ReplayAwareLogger::new(inner_logger, ReplayLoggingConfig::ErrorsOnly);
4297
4298 let replay_info = LogInfo::default().with_replay(true);
4299 logger.debug("test", &replay_info);
4300 logger.info("test", &replay_info);
4301 logger.warn("test", &replay_info);
4302 logger.error("test", &replay_info);
4303
4304 assert_eq!(call_count.load(Ordering::SeqCst), 1);
4306 }
4307 }
4308}