1use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, RwLock};
8
9use blake2::{Blake2b512, Digest};
10
11use crate::error::DurableResult;
12use crate::handlers::StepContext;
13use crate::sealed::Sealed;
14use crate::state::ExecutionState;
15use crate::traits::DurableValue;
16use crate::types::OperationId;
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct OperationIdentifier {
25 pub operation_id: String,
27 pub parent_id: Option<String>,
29 pub name: Option<String>,
31}
32
33impl OperationIdentifier {
34 pub fn new(
36 operation_id: impl Into<String>,
37 parent_id: Option<String>,
38 name: Option<String>,
39 ) -> Self {
40 Self {
41 operation_id: operation_id.into(),
42 parent_id,
43 name,
44 }
45 }
46
47 pub fn root(operation_id: impl Into<String>) -> Self {
49 Self {
50 operation_id: operation_id.into(),
51 parent_id: None,
52 name: None,
53 }
54 }
55
56 pub fn with_parent(operation_id: impl Into<String>, parent_id: impl Into<String>) -> Self {
58 Self {
59 operation_id: operation_id.into(),
60 parent_id: Some(parent_id.into()),
61 name: None,
62 }
63 }
64
65 pub fn with_name(mut self, name: impl Into<String>) -> Self {
67 self.name = Some(name.into());
68 self
69 }
70
71 #[inline]
73 pub fn operation_id_typed(&self) -> OperationId {
74 OperationId::from(self.operation_id.clone())
75 }
76
77 #[inline]
79 pub fn parent_id_typed(&self) -> Option<OperationId> {
80 self.parent_id
81 .as_ref()
82 .map(|id| OperationId::from(id.clone()))
83 }
84
85 pub fn apply_to(
90 &self,
91 mut update: crate::operation::OperationUpdate,
92 ) -> crate::operation::OperationUpdate {
93 if let Some(ref parent_id) = self.parent_id {
94 update = update.with_parent_id(parent_id);
95 }
96 if let Some(ref name) = self.name {
97 update = update.with_name(name);
98 }
99 update
100 }
101}
102
103impl std::fmt::Display for OperationIdentifier {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 if let Some(ref name) = self.name {
106 write!(f, "{}({})", name, self.operation_id)
107 } else {
108 write!(f, "{}", self.operation_id)
109 }
110 }
111}
112
113#[derive(Debug)]
121pub struct OperationIdGenerator {
122 base_id: String,
124 step_counter: AtomicU64,
126}
127
128impl OperationIdGenerator {
129 pub fn new(base_id: impl Into<String>) -> Self {
135 Self {
136 base_id: base_id.into(),
137 step_counter: AtomicU64::new(0),
138 }
139 }
140
141 pub fn with_counter(base_id: impl Into<String>, initial_counter: u64) -> Self {
148 Self {
149 base_id: base_id.into(),
150 step_counter: AtomicU64::new(initial_counter),
151 }
152 }
153
154 pub fn next_id(&self) -> String {
172 let counter = self.step_counter.fetch_add(1, Ordering::Relaxed);
176 generate_operation_id(&self.base_id, counter)
177 }
178
179 pub fn id_for_counter(&self, counter: u64) -> String {
192 generate_operation_id(&self.base_id, counter)
193 }
194
195 pub fn current_counter(&self) -> u64 {
205 self.step_counter.load(Ordering::Relaxed)
209 }
210
211 pub fn base_id(&self) -> &str {
213 &self.base_id
214 }
215
216 pub fn create_child(&self, parent_operation_id: impl Into<String>) -> Self {
225 Self::new(parent_operation_id)
226 }
227}
228
229impl Clone for OperationIdGenerator {
230 fn clone(&self) -> Self {
231 Self {
232 base_id: self.base_id.clone(),
233 step_counter: AtomicU64::new(self.step_counter.load(Ordering::Relaxed)),
237 }
238 }
239}
240
241pub fn generate_operation_id(base_id: &str, counter: u64) -> String {
252 let mut hasher = Blake2b512::new();
253 hasher.update(base_id.as_bytes());
254 hasher.update(counter.to_le_bytes());
255 let result = hasher.finalize();
256
257 hex::encode(&result[..16])
259}
260
261#[allow(private_bounds)]
273pub trait Logger: Sealed + Send + Sync {
274 fn debug(&self, message: &str, info: &LogInfo);
276 fn info(&self, message: &str, info: &LogInfo);
278 fn warn(&self, message: &str, info: &LogInfo);
280 fn error(&self, message: &str, info: &LogInfo);
282}
283
284#[derive(Debug, Clone, Default)]
290pub struct LogInfo {
291 pub durable_execution_arn: Option<String>,
293 pub operation_id: Option<String>,
295 pub parent_id: Option<String>,
297 pub is_replay: bool,
303 pub extra: Vec<(String, String)>,
305}
306
307impl LogInfo {
308 pub fn new(durable_execution_arn: impl Into<String>) -> Self {
310 Self {
311 durable_execution_arn: Some(durable_execution_arn.into()),
312 ..Default::default()
313 }
314 }
315
316 pub fn with_operation_id(mut self, operation_id: impl Into<String>) -> Self {
318 self.operation_id = Some(operation_id.into());
319 self
320 }
321
322 pub fn with_parent_id(mut self, parent_id: impl Into<String>) -> Self {
324 self.parent_id = Some(parent_id.into());
325 self
326 }
327
328 pub fn with_replay(mut self, is_replay: bool) -> Self {
337 self.is_replay = is_replay;
338 self
339 }
340
341 pub fn with_extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
343 self.extra.push((key.into(), value.into()));
344 self
345 }
346}
347
348pub fn create_operation_span(
372 operation_type: &str,
373 op_id: &OperationIdentifier,
374 durable_execution_arn: &str,
375) -> tracing::Span {
376 tracing::info_span!(
377 "durable_operation",
378 operation_type = %operation_type,
379 operation_id = %op_id.operation_id,
380 parent_id = ?op_id.parent_id,
381 name = ?op_id.name,
382 durable_execution_arn = %durable_execution_arn,
383 status = tracing::field::Empty,
384 )
385}
386
387#[derive(Debug, Clone, Default)]
395pub struct TracingLogger;
396
397impl Sealed for TracingLogger {}
399
400impl TracingLogger {
401 fn format_extra_fields(extra: &[(String, String)]) -> String {
406 if extra.is_empty() {
407 String::new()
408 } else {
409 extra
410 .iter()
411 .map(|(k, v)| format!("{}={}", k, v))
412 .collect::<Vec<_>>()
413 .join(", ")
414 }
415 }
416}
417
418impl Logger for TracingLogger {
419 fn debug(&self, message: &str, info: &LogInfo) {
420 let extra_fields = Self::format_extra_fields(&info.extra);
421 tracing::debug!(
422 durable_execution_arn = ?info.durable_execution_arn,
423 operation_id = ?info.operation_id,
424 parent_id = ?info.parent_id,
425 is_replay = info.is_replay,
426 extra = %extra_fields,
427 "{}",
428 message
429 );
430 }
431
432 fn info(&self, message: &str, info: &LogInfo) {
433 let extra_fields = Self::format_extra_fields(&info.extra);
434 tracing::info!(
435 durable_execution_arn = ?info.durable_execution_arn,
436 operation_id = ?info.operation_id,
437 parent_id = ?info.parent_id,
438 is_replay = info.is_replay,
439 extra = %extra_fields,
440 "{}",
441 message
442 );
443 }
444
445 fn warn(&self, message: &str, info: &LogInfo) {
446 let extra_fields = Self::format_extra_fields(&info.extra);
447 tracing::warn!(
448 durable_execution_arn = ?info.durable_execution_arn,
449 operation_id = ?info.operation_id,
450 parent_id = ?info.parent_id,
451 is_replay = info.is_replay,
452 extra = %extra_fields,
453 "{}",
454 message
455 );
456 }
457
458 fn error(&self, message: &str, info: &LogInfo) {
459 let extra_fields = Self::format_extra_fields(&info.extra);
460 tracing::error!(
461 durable_execution_arn = ?info.durable_execution_arn,
462 operation_id = ?info.operation_id,
463 parent_id = ?info.parent_id,
464 is_replay = info.is_replay,
465 extra = %extra_fields,
466 "{}",
467 message
468 );
469 }
470}
471
472#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
478pub enum ReplayLoggingConfig {
479 #[default]
484 SuppressAll,
485
486 AllowAll,
491
492 ErrorsOnly,
497
498 WarningsAndErrors,
503}
504
505pub struct ReplayAwareLogger {
530 inner: Arc<dyn Logger>,
532 config: ReplayLoggingConfig,
534}
535
536impl Sealed for ReplayAwareLogger {}
538
539impl ReplayAwareLogger {
540 pub fn new(inner: Arc<dyn Logger>, config: ReplayLoggingConfig) -> Self {
559 Self { inner, config }
560 }
561
562 pub fn suppress_replay(inner: Arc<dyn Logger>) -> Self {
570 Self::new(inner, ReplayLoggingConfig::SuppressAll)
571 }
572
573 pub fn allow_all(inner: Arc<dyn Logger>) -> Self {
579 Self::new(inner, ReplayLoggingConfig::AllowAll)
580 }
581
582 pub fn config(&self) -> ReplayLoggingConfig {
584 self.config
585 }
586
587 pub fn inner(&self) -> &Arc<dyn Logger> {
589 &self.inner
590 }
591
592 fn should_suppress(&self, info: &LogInfo, level: LogLevel) -> bool {
594 if !info.is_replay {
595 return false;
596 }
597
598 match self.config {
599 ReplayLoggingConfig::SuppressAll => true,
600 ReplayLoggingConfig::AllowAll => false,
601 ReplayLoggingConfig::ErrorsOnly => level != LogLevel::Error,
602 ReplayLoggingConfig::WarningsAndErrors => {
603 level != LogLevel::Error && level != LogLevel::Warn
604 }
605 }
606 }
607}
608
609#[derive(Debug, Clone, Copy, PartialEq, Eq)]
611enum LogLevel {
612 Debug,
613 Info,
614 Warn,
615 Error,
616}
617
618impl std::fmt::Debug for ReplayAwareLogger {
619 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
620 f.debug_struct("ReplayAwareLogger")
621 .field("config", &self.config)
622 .finish()
623 }
624}
625
626impl Logger for ReplayAwareLogger {
627 fn debug(&self, message: &str, info: &LogInfo) {
628 if !self.should_suppress(info, LogLevel::Debug) {
629 self.inner.debug(message, info);
630 }
631 }
632
633 fn info(&self, message: &str, info: &LogInfo) {
634 if !self.should_suppress(info, LogLevel::Info) {
635 self.inner.info(message, info);
636 }
637 }
638
639 fn warn(&self, message: &str, info: &LogInfo) {
640 if !self.should_suppress(info, LogLevel::Warn) {
641 self.inner.warn(message, info);
642 }
643 }
644
645 fn error(&self, message: &str, info: &LogInfo) {
646 if !self.should_suppress(info, LogLevel::Error) {
647 self.inner.error(message, info);
648 }
649 }
650}
651
652pub struct CustomLogger<D, I, W, E>
673where
674 D: Fn(&str, &LogInfo) + Send + Sync,
675 I: Fn(&str, &LogInfo) + Send + Sync,
676 W: Fn(&str, &LogInfo) + Send + Sync,
677 E: Fn(&str, &LogInfo) + Send + Sync,
678{
679 debug_fn: D,
680 info_fn: I,
681 warn_fn: W,
682 error_fn: E,
683}
684
685impl<D, I, W, E> Sealed for CustomLogger<D, I, W, E>
687where
688 D: Fn(&str, &LogInfo) + Send + Sync,
689 I: Fn(&str, &LogInfo) + Send + Sync,
690 W: Fn(&str, &LogInfo) + Send + Sync,
691 E: Fn(&str, &LogInfo) + Send + Sync,
692{
693}
694
695impl<D, I, W, E> Logger for CustomLogger<D, I, W, E>
696where
697 D: Fn(&str, &LogInfo) + Send + Sync,
698 I: Fn(&str, &LogInfo) + Send + Sync,
699 W: Fn(&str, &LogInfo) + Send + Sync,
700 E: Fn(&str, &LogInfo) + Send + Sync,
701{
702 fn debug(&self, message: &str, info: &LogInfo) {
703 (self.debug_fn)(message, info);
704 }
705
706 fn info(&self, message: &str, info: &LogInfo) {
707 (self.info_fn)(message, info);
708 }
709
710 fn warn(&self, message: &str, info: &LogInfo) {
711 (self.warn_fn)(message, info);
712 }
713
714 fn error(&self, message: &str, info: &LogInfo) {
715 (self.error_fn)(message, info);
716 }
717}
718
719impl<D, I, W, E> std::fmt::Debug for CustomLogger<D, I, W, E>
720where
721 D: Fn(&str, &LogInfo) + Send + Sync,
722 I: Fn(&str, &LogInfo) + Send + Sync,
723 W: Fn(&str, &LogInfo) + Send + Sync,
724 E: Fn(&str, &LogInfo) + Send + Sync,
725{
726 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
727 f.debug_struct("CustomLogger").finish()
728 }
729}
730
731pub fn custom_logger<D, I, W, E>(
762 debug_fn: D,
763 info_fn: I,
764 warn_fn: W,
765 error_fn: E,
766) -> CustomLogger<D, I, W, E>
767where
768 D: Fn(&str, &LogInfo) + Send + Sync,
769 I: Fn(&str, &LogInfo) + Send + Sync,
770 W: Fn(&str, &LogInfo) + Send + Sync,
771 E: Fn(&str, &LogInfo) + Send + Sync,
772{
773 CustomLogger {
774 debug_fn,
775 info_fn,
776 warn_fn,
777 error_fn,
778 }
779}
780
781pub fn simple_custom_logger<F>(log_fn: F) -> impl Logger
806where
807 F: Fn(&str, &str, &LogInfo) + Send + Sync + Clone + 'static,
808{
809 let debug_fn = log_fn.clone();
810 let info_fn = log_fn.clone();
811 let warn_fn = log_fn.clone();
812 let error_fn = log_fn;
813
814 custom_logger(
815 move |msg, info| debug_fn("DEBUG", msg, info),
816 move |msg, info| info_fn("INFO", msg, info),
817 move |msg, info| warn_fn("WARN", msg, info),
818 move |msg, info| error_fn("ERROR", msg, info),
819 )
820}
821
822pub struct DurableContext {
848 state: Arc<ExecutionState>,
850 lambda_context: Option<lambda_runtime::Context>,
852 parent_id: Option<String>,
854 id_generator: Arc<OperationIdGenerator>,
856 logger: Arc<RwLock<Arc<dyn Logger>>>,
858}
859
860static_assertions::assert_impl_all!(DurableContext: Send, Sync);
862
863impl DurableContext {
864 pub fn new(state: Arc<ExecutionState>) -> Self {
870 let base_id = state.durable_execution_arn().to_string();
871 Self {
872 state,
873 lambda_context: None,
874 parent_id: None,
875 id_generator: Arc::new(OperationIdGenerator::new(base_id)),
876 logger: Arc::new(RwLock::new(Arc::new(TracingLogger))),
877 }
878 }
879
880 pub fn from_lambda_context(
889 state: Arc<ExecutionState>,
890 lambda_context: lambda_runtime::Context,
891 ) -> Self {
892 let base_id = state.durable_execution_arn().to_string();
893 Self {
894 state,
895 lambda_context: Some(lambda_context),
896 parent_id: None,
897 id_generator: Arc::new(OperationIdGenerator::new(base_id)),
898 logger: Arc::new(RwLock::new(Arc::new(TracingLogger))),
899 }
900 }
901
902 pub fn create_child_context(&self, parent_operation_id: impl Into<String>) -> Self {
912 let parent_id = parent_operation_id.into();
913 Self {
914 state: self.state.clone(),
915 lambda_context: self.lambda_context.clone(),
916 parent_id: Some(parent_id.clone()),
917 id_generator: Arc::new(OperationIdGenerator::new(parent_id)),
918 logger: self.logger.clone(),
919 }
920 }
921
922 pub fn set_logger(&mut self, logger: Arc<dyn Logger>) {
928 *self.logger.write().unwrap() = logger;
929 }
930
931 pub fn with_logger(self, logger: Arc<dyn Logger>) -> Self {
937 *self.logger.write().unwrap() = logger;
938 self
939 }
940
941 pub fn configure_logger(&self, logger: Arc<dyn Logger>) {
950 *self.logger.write().unwrap() = logger;
951 }
952
953 pub fn state(&self) -> &Arc<ExecutionState> {
955 &self.state
956 }
957
958 pub fn durable_execution_arn(&self) -> &str {
960 self.state.durable_execution_arn()
961 }
962
963 pub fn parent_id(&self) -> Option<&str> {
965 self.parent_id.as_deref()
966 }
967
968 pub fn lambda_context(&self) -> Option<&lambda_runtime::Context> {
970 self.lambda_context.as_ref()
971 }
972
973 pub fn logger(&self) -> Arc<dyn Logger> {
975 self.logger.read().unwrap().clone()
976 }
977
978 pub fn next_operation_id(&self) -> String {
983 self.id_generator.next_id()
984 }
985
986 pub fn next_operation_identifier(&self, name: Option<String>) -> OperationIdentifier {
992 OperationIdentifier::new(self.next_operation_id(), self.parent_id.clone(), name)
993 }
994
995 pub fn current_step_counter(&self) -> u64 {
997 self.id_generator.current_counter()
998 }
999
1000 pub fn create_log_info(&self) -> LogInfo {
1006 let mut info = LogInfo::new(self.durable_execution_arn());
1007 if let Some(ref parent_id) = self.parent_id {
1008 info = info.with_parent_id(parent_id);
1009 }
1010 info = info.with_replay(self.state.is_replay());
1012 info
1013 }
1014
1015 pub fn create_log_info_with_operation(&self, operation_id: &str) -> LogInfo {
1021 self.create_log_info().with_operation_id(operation_id)
1022 }
1023
1024 pub fn create_log_info_with_replay(&self, operation_id: &str, is_replay: bool) -> LogInfo {
1035 let mut info = LogInfo::new(self.durable_execution_arn());
1036 if let Some(ref parent_id) = self.parent_id {
1037 info = info.with_parent_id(parent_id);
1038 }
1039 info.with_operation_id(operation_id).with_replay(is_replay)
1040 }
1041
1042 pub fn log_info(&self, message: &str) {
1061 self.log_with_level(LogLevel::Info, message, &[]);
1062 }
1063
1064 pub fn log_info_with(&self, message: &str, fields: &[(&str, &str)]) {
1080 self.log_with_level(LogLevel::Info, message, fields);
1081 }
1082
1083 pub fn log_debug(&self, message: &str) {
1098 self.log_with_level(LogLevel::Debug, message, &[]);
1099 }
1100
1101 pub fn log_debug_with(&self, message: &str, fields: &[(&str, &str)]) {
1117 self.log_with_level(LogLevel::Debug, message, fields);
1118 }
1119
1120 pub fn log_warn(&self, message: &str) {
1135 self.log_with_level(LogLevel::Warn, message, &[]);
1136 }
1137
1138 pub fn log_warn_with(&self, message: &str, fields: &[(&str, &str)]) {
1154 self.log_with_level(LogLevel::Warn, message, fields);
1155 }
1156
1157 pub fn log_error(&self, message: &str) {
1172 self.log_with_level(LogLevel::Error, message, &[]);
1173 }
1174
1175 pub fn log_error_with(&self, message: &str, fields: &[(&str, &str)]) {
1191 self.log_with_level(LogLevel::Error, message, fields);
1192 }
1193
1194 fn log_with_level(&self, level: LogLevel, message: &str, extra: &[(&str, &str)]) {
1205 let mut log_info = self.create_log_info();
1206
1207 for (key, value) in extra {
1208 log_info = log_info.with_extra(*key, *value);
1209 }
1210
1211 let logger = self.logger.read().unwrap();
1212 match level {
1213 LogLevel::Debug => logger.debug(message, &log_info),
1214 LogLevel::Info => logger.info(message, &log_info),
1215 LogLevel::Warn => logger.warn(message, &log_info),
1216 LogLevel::Error => logger.error(message, &log_info),
1217 }
1218 }
1219
1220 pub fn get_original_input<T>(&self) -> DurableResult<T>
1253 where
1254 T: serde::de::DeserializeOwned,
1255 {
1256 let input_payload = self.state.get_original_input_raw().ok_or_else(|| {
1258 crate::error::DurableError::Validation {
1259 message: "No original input available. The EXECUTION operation may not exist or has no input payload.".to_string(),
1260 }
1261 })?;
1262
1263 serde_json::from_str(input_payload).map_err(|e| crate::error::DurableError::SerDes {
1265 message: format!("Failed to deserialize original input: {}", e),
1266 })
1267 }
1268
1269 pub fn get_original_input_raw(&self) -> Option<&str> {
1278 self.state.get_original_input_raw()
1279 }
1280
1281 pub async fn complete_execution_success<T>(&self, result: &T) -> DurableResult<()>
1316 where
1317 T: serde::Serialize,
1318 {
1319 let serialized =
1320 serde_json::to_string(result).map_err(|e| crate::error::DurableError::SerDes {
1321 message: format!("Failed to serialize execution result: {}", e),
1322 })?;
1323
1324 self.state
1325 .complete_execution_success(Some(serialized))
1326 .await
1327 }
1328
1329 pub async fn complete_execution_failure(
1357 &self,
1358 error: crate::error::ErrorObject,
1359 ) -> DurableResult<()> {
1360 self.state.complete_execution_failure(error).await
1361 }
1362
1363 pub async fn complete_execution_if_large<T>(&self, result: &T) -> DurableResult<bool>
1398 where
1399 T: serde::Serialize,
1400 {
1401 if crate::lambda::DurableExecutionInvocationOutput::would_exceed_max_size(result) {
1402 self.complete_execution_success(result).await?;
1403 Ok(true)
1404 } else {
1405 Ok(false)
1406 }
1407 }
1408
1409 pub async fn step<T, F, Fut>(
1436 &self,
1437 func: F,
1438 config: Option<crate::config::StepConfig>,
1439 ) -> DurableResult<T>
1440 where
1441 T: DurableValue,
1442 F: FnOnce(StepContext) -> Fut + Send,
1443 Fut: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>
1444 + Send,
1445 {
1446 let op_id = self.next_operation_identifier(None);
1447 let config = config.unwrap_or_default();
1448
1449 let logger = self.logger.read().unwrap().clone();
1450 let result =
1451 crate::handlers::step_handler(func, &self.state, &op_id, &config, &logger).await;
1452
1453 if result.is_ok() {
1455 self.state.track_replay(&op_id.operation_id).await;
1456 }
1457
1458 result
1459 }
1460
1461 pub async fn step_named<T, F, Fut>(
1479 &self,
1480 name: &str,
1481 func: F,
1482 config: Option<crate::config::StepConfig>,
1483 ) -> DurableResult<T>
1484 where
1485 T: DurableValue,
1486 F: FnOnce(StepContext) -> Fut + Send,
1487 Fut: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>
1488 + Send,
1489 {
1490 let op_id = self.next_operation_identifier(Some(name.to_string()));
1491 let config = config.unwrap_or_default();
1492
1493 let logger = self.logger.read().unwrap().clone();
1494 let result =
1495 crate::handlers::step_handler(func, &self.state, &op_id, &config, &logger).await;
1496
1497 if result.is_ok() {
1499 self.state.track_replay(&op_id.operation_id).await;
1500 }
1501
1502 result
1503 }
1504
1505 pub async fn wait(
1530 &self,
1531 duration: crate::duration::Duration,
1532 name: Option<&str>,
1533 ) -> DurableResult<()> {
1534 let op_id = self.next_operation_identifier(name.map(|s| s.to_string()));
1535
1536 let logger = self.logger.read().unwrap().clone();
1537 let result = crate::handlers::wait_handler(duration, &self.state, &op_id, &logger).await;
1538
1539 if result.is_ok() {
1541 self.state.track_replay(&op_id.operation_id).await;
1542 }
1543
1544 result
1545 }
1546
1547 pub async fn cancel_wait(&self, operation_id: &str) -> DurableResult<()> {
1574 let logger = self.logger.read().unwrap().clone();
1575 crate::handlers::wait_cancel_handler(&self.state, operation_id, &logger).await
1576 }
1577
1578 pub async fn create_callback<T>(
1604 &self,
1605 config: Option<crate::config::CallbackConfig>,
1606 ) -> DurableResult<crate::handlers::Callback<T>>
1607 where
1608 T: serde::Serialize + serde::de::DeserializeOwned,
1609 {
1610 let op_id = self.next_operation_identifier(None);
1611 let config = config.unwrap_or_default();
1612
1613 let logger = self.logger.read().unwrap().clone();
1614 let result = crate::handlers::callback_handler(&self.state, &op_id, &config, &logger).await;
1615
1616 if result.is_ok() {
1618 self.state.track_replay(&op_id.operation_id).await;
1619 }
1620
1621 result
1622 }
1623
1624 pub async fn create_callback_named<T>(
1628 &self,
1629 name: &str,
1630 config: Option<crate::config::CallbackConfig>,
1631 ) -> DurableResult<crate::handlers::Callback<T>>
1632 where
1633 T: serde::Serialize + serde::de::DeserializeOwned,
1634 {
1635 let op_id = self.next_operation_identifier(Some(name.to_string()));
1636 let config = config.unwrap_or_default();
1637
1638 let logger = self.logger.read().unwrap().clone();
1639 let result = crate::handlers::callback_handler(&self.state, &op_id, &config, &logger).await;
1640
1641 if result.is_ok() {
1643 self.state.track_replay(&op_id.operation_id).await;
1644 }
1645
1646 result
1647 }
1648
1649 pub async fn invoke<P, R>(
1675 &self,
1676 function_name: &str,
1677 payload: P,
1678 config: Option<crate::config::InvokeConfig<P, R>>,
1679 ) -> DurableResult<R>
1680 where
1681 P: serde::Serialize + serde::de::DeserializeOwned + Send,
1682 R: serde::Serialize + serde::de::DeserializeOwned + Send,
1683 {
1684 let op_id = self.next_operation_identifier(Some(format!("invoke:{}", function_name)));
1685 let config = config.unwrap_or_default();
1686
1687 let logger = self.logger.read().unwrap().clone();
1688 let result = crate::handlers::invoke_handler(
1689 function_name,
1690 payload,
1691 &self.state,
1692 &op_id,
1693 &config,
1694 &logger,
1695 )
1696 .await;
1697
1698 if result.is_ok() {
1700 self.state.track_replay(&op_id.operation_id).await;
1701 }
1702
1703 result
1704 }
1705
1706 pub async fn map<T, U, F, Fut>(
1736 &self,
1737 items: Vec<T>,
1738 func: F,
1739 config: Option<crate::config::MapConfig>,
1740 ) -> DurableResult<crate::concurrency::BatchResult<U>>
1741 where
1742 T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + Clone + 'static,
1743 U: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1744 F: Fn(DurableContext, T, usize) -> Fut + Send + Sync + Clone + 'static,
1745 Fut: std::future::Future<Output = DurableResult<U>> + Send + 'static,
1746 {
1747 let op_id = self.next_operation_identifier(Some("map".to_string()));
1748 let config = config.unwrap_or_default();
1749
1750 let logger = self.logger.read().unwrap().clone();
1751 let result =
1752 crate::handlers::map_handler(items, func, &self.state, &op_id, self, &config, &logger)
1753 .await;
1754
1755 if result.is_ok() {
1757 self.state.track_replay(&op_id.operation_id).await;
1758 }
1759
1760 result
1761 }
1762
1763 pub async fn parallel<T, F, Fut>(
1790 &self,
1791 branches: Vec<F>,
1792 config: Option<crate::config::ParallelConfig>,
1793 ) -> DurableResult<crate::concurrency::BatchResult<T>>
1794 where
1795 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1796 F: FnOnce(DurableContext) -> Fut + Send + 'static,
1797 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
1798 {
1799 let op_id = self.next_operation_identifier(Some("parallel".to_string()));
1800 let config = config.unwrap_or_default();
1801
1802 let logger = self.logger.read().unwrap().clone();
1803 let result = crate::handlers::parallel_handler(
1804 branches,
1805 &self.state,
1806 &op_id,
1807 self,
1808 &config,
1809 &logger,
1810 )
1811 .await;
1812
1813 if result.is_ok() {
1815 self.state.track_replay(&op_id.operation_id).await;
1816 }
1817
1818 result
1819 }
1820
1821 pub async fn run_in_child_context<T, F, Fut>(
1845 &self,
1846 func: F,
1847 config: Option<crate::config::ChildConfig>,
1848 ) -> DurableResult<T>
1849 where
1850 T: serde::Serialize + serde::de::DeserializeOwned + Send,
1851 F: FnOnce(DurableContext) -> Fut + Send,
1852 Fut: std::future::Future<Output = DurableResult<T>> + Send,
1853 {
1854 let op_id = self.next_operation_identifier(Some("child_context".to_string()));
1855 let config = config.unwrap_or_default();
1856
1857 let logger = self.logger.read().unwrap().clone();
1858 let result =
1859 crate::handlers::child_handler(func, &self.state, &op_id, self, &config, &logger).await;
1860
1861 if result.is_ok() {
1863 self.state.track_replay(&op_id.operation_id).await;
1864 }
1865
1866 result
1867 }
1868
1869 pub async fn run_in_child_context_named<T, F, Fut>(
1873 &self,
1874 name: &str,
1875 func: F,
1876 config: Option<crate::config::ChildConfig>,
1877 ) -> DurableResult<T>
1878 where
1879 T: serde::Serialize + serde::de::DeserializeOwned + Send,
1880 F: FnOnce(DurableContext) -> Fut + Send,
1881 Fut: std::future::Future<Output = DurableResult<T>> + Send,
1882 {
1883 let op_id = self.next_operation_identifier(Some(name.to_string()));
1884 let config = config.unwrap_or_default();
1885
1886 let logger = self.logger.read().unwrap().clone();
1887 let result =
1888 crate::handlers::child_handler(func, &self.state, &op_id, self, &config, &logger).await;
1889
1890 if result.is_ok() {
1892 self.state.track_replay(&op_id.operation_id).await;
1893 }
1894
1895 result
1896 }
1897
1898 pub async fn wait_for_condition<T, S, F, Fut>(
1944 &self,
1945 check: F,
1946 config: WaitForConditionConfig<S>,
1947 ) -> DurableResult<T>
1948 where
1949 T: serde::Serialize + serde::de::DeserializeOwned + Send,
1950 S: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
1951 F: Fn(&S, &WaitForConditionContext) -> Fut + Send + Sync,
1952 Fut: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>
1953 + Send,
1954 {
1955 let op_id = self.next_operation_identifier(Some("wait_for_condition".to_string()));
1956
1957 let logger = self.logger.read().unwrap().clone();
1958 let result = crate::handlers::wait_for_condition_handler(
1961 check,
1962 config,
1963 &self.state,
1964 &op_id,
1965 &logger,
1966 )
1967 .await;
1968
1969 if result.is_ok() {
1971 self.state.track_replay(&op_id.operation_id).await;
1972 }
1973
1974 result
1975 }
1976
1977 pub async fn wait_for_callback<T, F, Fut>(
2008 &self,
2009 submitter: F,
2010 config: Option<crate::config::CallbackConfig>,
2011 ) -> DurableResult<T>
2012 where
2013 T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync,
2014 F: FnOnce(String) -> Fut + Send + 'static,
2015 Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
2016 + Send
2017 + 'static,
2018 {
2019 let callback: crate::handlers::Callback<T> = self.create_callback(config).await?;
2021 let callback_id = callback.callback_id.clone();
2022
2023 let op_id = self.next_operation_identifier(Some("wait_for_callback_submitter".to_string()));
2025
2026 let child_config = crate::config::ChildConfig::default();
2029
2030 let logger = self.logger.read().unwrap().clone();
2031 crate::handlers::child_handler(
2032 |child_ctx| {
2033 let callback_id = callback_id.clone();
2034 async move {
2035 child_ctx
2038 .step_named(
2039 "execute_submitter",
2040 move |_| async move {
2041 Ok(())
2044 },
2045 None,
2046 )
2047 .await?;
2048
2049 submitter(callback_id).await.map_err(|e| {
2053 crate::error::DurableError::UserCode {
2054 message: e.to_string(),
2055 error_type: "SubmitterError".to_string(),
2056 stack_trace: None,
2057 }
2058 })?;
2059
2060 Ok(())
2061 }
2062 },
2063 &self.state,
2064 &op_id,
2065 self,
2066 &child_config,
2067 &logger,
2068 )
2069 .await?;
2070
2071 self.state.track_replay(&op_id.operation_id).await;
2073
2074 callback.result().await
2076 }
2077
2078 pub async fn all<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<Vec<T>>
2106 where
2107 T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2108 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2109 {
2110 let op_id = self.next_operation_identifier(Some("all".to_string()));
2111
2112 let logger = self.logger.read().unwrap().clone();
2113 let result = crate::handlers::all_handler(futures, &self.state, &op_id, &logger).await;
2114
2115 if result.is_ok() {
2117 self.state.track_replay(&op_id.operation_id).await;
2118 }
2119
2120 result
2121 }
2122
2123 pub async fn all_settled<T, Fut>(
2148 &self,
2149 futures: Vec<Fut>,
2150 ) -> DurableResult<crate::concurrency::BatchResult<T>>
2151 where
2152 T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2153 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2154 {
2155 let op_id = self.next_operation_identifier(Some("all_settled".to_string()));
2156
2157 let logger = self.logger.read().unwrap().clone();
2158 let result =
2159 crate::handlers::all_settled_handler(futures, &self.state, &op_id, &logger).await;
2160
2161 if result.is_ok() {
2163 self.state.track_replay(&op_id.operation_id).await;
2164 }
2165
2166 result
2167 }
2168
2169 pub async fn race<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<T>
2192 where
2193 T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2194 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2195 {
2196 let op_id = self.next_operation_identifier(Some("race".to_string()));
2197
2198 let logger = self.logger.read().unwrap().clone();
2199 let result = crate::handlers::race_handler(futures, &self.state, &op_id, &logger).await;
2200
2201 if result.is_ok() {
2203 self.state.track_replay(&op_id.operation_id).await;
2204 }
2205
2206 result
2207 }
2208
2209 pub async fn any<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<T>
2234 where
2235 T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2236 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2237 {
2238 let op_id = self.next_operation_identifier(Some("any".to_string()));
2239
2240 let logger = self.logger.read().unwrap().clone();
2241 let result = crate::handlers::any_handler(futures, &self.state, &op_id, &logger).await;
2242
2243 if result.is_ok() {
2245 self.state.track_replay(&op_id.operation_id).await;
2246 }
2247
2248 result
2249 }
2250}
2251
2252#[allow(clippy::type_complexity)]
2254pub struct WaitForConditionConfig<S> {
2255 pub initial_state: S,
2257 pub wait_strategy: Box<dyn Fn(&S, usize) -> crate::config::WaitDecision + Send + Sync>,
2262 pub timeout: Option<crate::duration::Duration>,
2264 pub serdes: Option<std::sync::Arc<dyn crate::config::SerDesAny>>,
2266}
2267
2268impl<S> std::fmt::Debug for WaitForConditionConfig<S>
2269where
2270 S: std::fmt::Debug,
2271{
2272 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2273 f.debug_struct("WaitForConditionConfig")
2274 .field("initial_state", &self.initial_state)
2275 .field("wait_strategy", &"<fn>")
2276 .field("timeout", &self.timeout)
2277 .field("serdes", &self.serdes.is_some())
2278 .finish()
2279 }
2280}
2281
2282impl<S> WaitForConditionConfig<S> {
2283 pub fn from_interval(
2306 initial_state: S,
2307 interval: crate::duration::Duration,
2308 max_attempts: Option<usize>,
2309 ) -> Self
2310 where
2311 S: Send + Sync + 'static,
2312 {
2313 let interval_secs = interval.to_seconds();
2314 let max = max_attempts.unwrap_or(usize::MAX);
2315
2316 Self {
2317 initial_state,
2318 wait_strategy: Box::new(move |_state: &S, attempts_made: usize| {
2319 if attempts_made >= max {
2325 return crate::config::WaitDecision::Done;
2326 }
2327 crate::config::WaitDecision::Continue {
2328 delay: crate::duration::Duration::from_seconds(interval_secs),
2329 }
2330 }),
2331 timeout: None,
2332 serdes: None,
2333 }
2334 }
2335}
2336
2337impl<S: Default + Send + Sync + 'static> Default for WaitForConditionConfig<S> {
2338 fn default() -> Self {
2339 Self::from_interval(
2340 S::default(),
2341 crate::duration::Duration::from_seconds(5),
2342 None,
2343 )
2344 }
2345}
2346
2347#[derive(Debug, Clone)]
2349pub struct WaitForConditionContext {
2350 pub attempt: usize,
2352 pub max_attempts: Option<usize>,
2354}
2355
2356impl Clone for DurableContext {
2357 fn clone(&self) -> Self {
2358 Self {
2359 state: self.state.clone(),
2360 lambda_context: self.lambda_context.clone(),
2361 parent_id: self.parent_id.clone(),
2362 id_generator: self.id_generator.clone(),
2363 logger: self.logger.clone(),
2364 }
2365 }
2366}
2367
2368impl std::fmt::Debug for DurableContext {
2369 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2370 f.debug_struct("DurableContext")
2371 .field("durable_execution_arn", &self.durable_execution_arn())
2372 .field("parent_id", &self.parent_id)
2373 .field("step_counter", &self.current_step_counter())
2374 .finish_non_exhaustive()
2375 }
2376}
2377
2378mod hex {
2380 const HEX_CHARS: &[u8; 16] = b"0123456789abcdef";
2381
2382 pub fn encode(bytes: &[u8]) -> String {
2383 let mut result = String::with_capacity(bytes.len() * 2);
2384 for &byte in bytes {
2385 result.push(HEX_CHARS[(byte >> 4) as usize] as char);
2386 result.push(HEX_CHARS[(byte & 0x0f) as usize] as char);
2387 }
2388 result
2389 }
2390}
2391
2392#[cfg(test)]
2393mod tests {
2394 use super::*;
2395
2396 #[test]
2397 fn test_operation_identifier_new() {
2398 let id = OperationIdentifier::new(
2399 "op-123",
2400 Some("parent-456".to_string()),
2401 Some("my-step".to_string()),
2402 );
2403 assert_eq!(id.operation_id, "op-123");
2404 assert_eq!(id.parent_id, Some("parent-456".to_string()));
2405 assert_eq!(id.name, Some("my-step".to_string()));
2406 }
2407
2408 #[test]
2409 fn test_operation_identifier_root() {
2410 let id = OperationIdentifier::root("op-123");
2411 assert_eq!(id.operation_id, "op-123");
2412 assert!(id.parent_id.is_none());
2413 assert!(id.name.is_none());
2414 }
2415
2416 #[test]
2417 fn test_operation_identifier_with_parent() {
2418 let id = OperationIdentifier::with_parent("op-123", "parent-456");
2419 assert_eq!(id.operation_id, "op-123");
2420 assert_eq!(id.parent_id, Some("parent-456".to_string()));
2421 assert!(id.name.is_none());
2422 }
2423
2424 #[test]
2425 fn test_operation_identifier_with_name() {
2426 let id = OperationIdentifier::root("op-123").with_name("my-step");
2427 assert_eq!(id.name, Some("my-step".to_string()));
2428 }
2429
2430 #[test]
2431 fn test_operation_identifier_display() {
2432 let id = OperationIdentifier::root("op-123");
2433 assert_eq!(format!("{}", id), "op-123");
2434
2435 let id_with_name = OperationIdentifier::root("op-123").with_name("my-step");
2436 assert_eq!(format!("{}", id_with_name), "my-step(op-123)");
2437 }
2438
2439 #[test]
2440 fn test_generate_operation_id_deterministic() {
2441 let id1 = generate_operation_id("base-123", 0);
2442 let id2 = generate_operation_id("base-123", 0);
2443 assert_eq!(id1, id2);
2444 }
2445
2446 #[test]
2447 fn test_generate_operation_id_different_counters() {
2448 let id1 = generate_operation_id("base-123", 0);
2449 let id2 = generate_operation_id("base-123", 1);
2450 assert_ne!(id1, id2);
2451 }
2452
2453 #[test]
2454 fn test_generate_operation_id_different_bases() {
2455 let id1 = generate_operation_id("base-123", 0);
2456 let id2 = generate_operation_id("base-456", 0);
2457 assert_ne!(id1, id2);
2458 }
2459
2460 #[test]
2461 fn test_generate_operation_id_length() {
2462 let id = generate_operation_id("base-123", 0);
2463 assert_eq!(id.len(), 32); }
2465
2466 #[test]
2467 fn test_operation_id_generator_new() {
2468 let gen = OperationIdGenerator::new("base-123");
2469 assert_eq!(gen.base_id(), "base-123");
2470 assert_eq!(gen.current_counter(), 0);
2471 }
2472
2473 #[test]
2474 fn test_operation_id_generator_with_counter() {
2475 let gen = OperationIdGenerator::with_counter("base-123", 10);
2476 assert_eq!(gen.current_counter(), 10);
2477 }
2478
2479 #[test]
2480 fn test_operation_id_generator_next_id() {
2481 let gen = OperationIdGenerator::new("base-123");
2482
2483 let id1 = gen.next_id();
2484 assert_eq!(gen.current_counter(), 1);
2485
2486 let id2 = gen.next_id();
2487 assert_eq!(gen.current_counter(), 2);
2488
2489 assert_ne!(id1, id2);
2490 }
2491
2492 #[test]
2493 fn test_operation_id_generator_id_for_counter() {
2494 let gen = OperationIdGenerator::new("base-123");
2495
2496 let id_0 = gen.id_for_counter(0);
2497 let id_1 = gen.id_for_counter(1);
2498
2499 assert_eq!(gen.current_counter(), 0);
2501
2502 let next = gen.next_id();
2504 assert_eq!(next, id_0);
2505
2506 let next = gen.next_id();
2507 assert_eq!(next, id_1);
2508 }
2509
2510 #[test]
2511 fn test_operation_id_generator_create_child() {
2512 let gen = OperationIdGenerator::new("base-123");
2513 gen.next_id(); let child = gen.create_child("child-op-id");
2516 assert_eq!(child.base_id(), "child-op-id");
2517 assert_eq!(child.current_counter(), 0);
2518 }
2519
2520 #[test]
2521 fn test_operation_id_generator_clone() {
2522 let gen = OperationIdGenerator::new("base-123");
2523 gen.next_id();
2524 gen.next_id();
2525
2526 let cloned = gen.clone();
2527 assert_eq!(cloned.base_id(), gen.base_id());
2528 assert_eq!(cloned.current_counter(), gen.current_counter());
2529 }
2530
2531 #[test]
2532 fn test_operation_id_generator_thread_safety() {
2533 use std::thread;
2534
2535 let gen = Arc::new(OperationIdGenerator::new("base-123"));
2536 let mut handles = vec![];
2537
2538 for _ in 0..10 {
2539 let gen_clone = gen.clone();
2540 handles.push(thread::spawn(move || {
2541 let mut ids = vec![];
2542 for _ in 0..100 {
2543 ids.push(gen_clone.next_id());
2544 }
2545 ids
2546 }));
2547 }
2548
2549 let mut all_ids = vec![];
2550 for handle in handles {
2551 all_ids.extend(handle.join().unwrap());
2552 }
2553
2554 let unique_count = {
2556 let mut set = std::collections::HashSet::new();
2557 for id in &all_ids {
2558 set.insert(id.clone());
2559 }
2560 set.len()
2561 };
2562
2563 assert_eq!(unique_count, 1000);
2564 assert_eq!(gen.current_counter(), 1000);
2565 }
2566
2567 #[test]
2568 fn test_log_info_new() {
2569 let info = LogInfo::new("arn:test");
2570 assert_eq!(info.durable_execution_arn, Some("arn:test".to_string()));
2571 assert!(info.operation_id.is_none());
2572 assert!(info.parent_id.is_none());
2573 }
2574
2575 #[test]
2576 fn test_log_info_with_operation_id() {
2577 let info = LogInfo::new("arn:test").with_operation_id("op-123");
2578 assert_eq!(info.operation_id, Some("op-123".to_string()));
2579 }
2580
2581 #[test]
2582 fn test_log_info_with_parent_id() {
2583 let info = LogInfo::new("arn:test").with_parent_id("parent-456");
2584 assert_eq!(info.parent_id, Some("parent-456".to_string()));
2585 }
2586
2587 #[test]
2588 fn test_log_info_with_extra() {
2589 let info = LogInfo::new("arn:test")
2590 .with_extra("key1", "value1")
2591 .with_extra("key2", "value2");
2592 assert_eq!(info.extra.len(), 2);
2593 assert_eq!(info.extra[0], ("key1".to_string(), "value1".to_string()));
2594 }
2595
2596 #[test]
2597 fn test_hex_encode() {
2598 assert_eq!(hex::encode(&[0x00]), "00");
2599 assert_eq!(hex::encode(&[0xff]), "ff");
2600 assert_eq!(hex::encode(&[0x12, 0x34, 0xab, 0xcd]), "1234abcd");
2601 }
2602}
2603
2604#[cfg(test)]
2605mod durable_context_tests {
2606 use super::*;
2607 use crate::client::SharedDurableServiceClient;
2608 use crate::lambda::InitialExecutionState;
2609 use std::sync::Arc;
2610
2611 #[cfg(test)]
2612 fn create_mock_client() -> SharedDurableServiceClient {
2613 use crate::client::MockDurableServiceClient;
2614 Arc::new(MockDurableServiceClient::new())
2615 }
2616
2617 fn create_test_state() -> Arc<ExecutionState> {
2618 let client = create_mock_client();
2619 Arc::new(ExecutionState::new(
2620 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
2621 "token-123",
2622 InitialExecutionState::new(),
2623 client,
2624 ))
2625 }
2626
2627 #[test]
2628 fn test_durable_context_new() {
2629 let state = create_test_state();
2630 let ctx = DurableContext::new(state.clone());
2631
2632 assert_eq!(
2633 ctx.durable_execution_arn(),
2634 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123"
2635 );
2636 assert!(ctx.parent_id().is_none());
2637 assert!(ctx.lambda_context().is_none());
2638 assert_eq!(ctx.current_step_counter(), 0);
2639 }
2640
2641 #[test]
2642 fn test_durable_context_next_operation_id() {
2643 let state = create_test_state();
2644 let ctx = DurableContext::new(state);
2645
2646 let id1 = ctx.next_operation_id();
2647 let id2 = ctx.next_operation_id();
2648
2649 assert_ne!(id1, id2);
2650 assert_eq!(ctx.current_step_counter(), 2);
2651 }
2652
2653 #[test]
2654 fn test_durable_context_next_operation_identifier() {
2655 let state = create_test_state();
2656 let ctx = DurableContext::new(state);
2657
2658 let op_id = ctx.next_operation_identifier(Some("my-step".to_string()));
2659
2660 assert!(op_id.parent_id.is_none());
2661 assert_eq!(op_id.name, Some("my-step".to_string()));
2662 assert!(!op_id.operation_id.is_empty());
2663 }
2664
2665 #[test]
2666 fn test_durable_context_create_child_context() {
2667 let state = create_test_state();
2668 let ctx = DurableContext::new(state);
2669
2670 let parent_op_id = ctx.next_operation_id();
2672
2673 let child_ctx = ctx.create_child_context(&parent_op_id);
2675
2676 assert_eq!(child_ctx.parent_id(), Some(parent_op_id.as_str()));
2677 assert_eq!(child_ctx.current_step_counter(), 0);
2678 assert_eq!(
2679 child_ctx.durable_execution_arn(),
2680 ctx.durable_execution_arn()
2681 );
2682 }
2683
2684 #[test]
2685 fn test_durable_context_child_generates_different_ids() {
2686 let state = create_test_state();
2687 let ctx = DurableContext::new(state);
2688
2689 let parent_op_id = ctx.next_operation_id();
2690 let child_ctx = ctx.create_child_context(&parent_op_id);
2691
2692 let child_id = child_ctx.next_operation_id();
2694 let parent_id = ctx.next_operation_id();
2695
2696 assert_ne!(child_id, parent_id);
2697 }
2698
2699 #[test]
2700 fn test_durable_context_child_operation_identifier_has_parent() {
2701 let state = create_test_state();
2702 let ctx = DurableContext::new(state);
2703
2704 let parent_op_id = ctx.next_operation_id();
2705 let child_ctx = ctx.create_child_context(&parent_op_id);
2706
2707 let child_op_id = child_ctx.next_operation_identifier(None);
2708
2709 assert_eq!(child_op_id.parent_id, Some(parent_op_id));
2710 }
2711
2712 #[test]
2713 fn test_durable_context_set_logger() {
2714 let state = create_test_state();
2715 let mut ctx = DurableContext::new(state);
2716
2717 let custom_logger: Arc<dyn Logger> = Arc::new(TracingLogger);
2719 ctx.set_logger(custom_logger);
2720
2721 let _ = ctx.logger();
2723 }
2724
2725 #[test]
2726 fn test_durable_context_with_logger() {
2727 let state = create_test_state();
2728 let ctx = DurableContext::new(state);
2729
2730 let custom_logger: Arc<dyn Logger> = Arc::new(TracingLogger);
2731 let ctx_with_logger = ctx.with_logger(custom_logger);
2732
2733 let _ = ctx_with_logger.logger();
2735 }
2736
2737 #[test]
2738 fn test_durable_context_create_log_info() {
2739 let state = create_test_state();
2740 let ctx = DurableContext::new(state);
2741
2742 let info = ctx.create_log_info();
2743
2744 assert_eq!(
2745 info.durable_execution_arn,
2746 Some("arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123".to_string())
2747 );
2748 assert!(info.parent_id.is_none());
2749 }
2750
2751 #[test]
2752 fn test_durable_context_create_log_info_with_parent() {
2753 let state = create_test_state();
2754 let ctx = DurableContext::new(state);
2755
2756 let parent_op_id = ctx.next_operation_id();
2757 let child_ctx = ctx.create_child_context(&parent_op_id);
2758
2759 let info = child_ctx.create_log_info();
2760
2761 assert_eq!(info.parent_id, Some(parent_op_id));
2762 }
2763
2764 #[test]
2765 fn test_durable_context_create_log_info_with_operation() {
2766 let state = create_test_state();
2767 let ctx = DurableContext::new(state);
2768
2769 let info = ctx.create_log_info_with_operation("op-123");
2770
2771 assert_eq!(info.operation_id, Some("op-123".to_string()));
2772 }
2773
2774 #[test]
2775 fn test_log_info_with_replay() {
2776 let info = LogInfo::new("arn:test")
2777 .with_operation_id("op-123")
2778 .with_replay(true);
2779
2780 assert!(info.is_replay);
2781 assert_eq!(info.operation_id, Some("op-123".to_string()));
2782 }
2783
2784 #[test]
2785 fn test_log_info_default_not_replay() {
2786 let info = LogInfo::default();
2787 assert!(!info.is_replay);
2788 }
2789
2790 #[test]
2791 fn test_replay_logging_config_default() {
2792 let config = ReplayLoggingConfig::default();
2793 assert_eq!(config, ReplayLoggingConfig::SuppressAll);
2794 }
2795
2796 #[test]
2797 fn test_replay_aware_logger_suppress_all() {
2798 use std::sync::atomic::{AtomicUsize, Ordering};
2799
2800 let debug_count = Arc::new(AtomicUsize::new(0));
2802 let info_count = Arc::new(AtomicUsize::new(0));
2803 let warn_count = Arc::new(AtomicUsize::new(0));
2804 let error_count = Arc::new(AtomicUsize::new(0));
2805
2806 let inner = Arc::new(custom_logger(
2807 {
2808 let count = debug_count.clone();
2809 move |_, _| {
2810 count.fetch_add(1, Ordering::SeqCst);
2811 }
2812 },
2813 {
2814 let count = info_count.clone();
2815 move |_, _| {
2816 count.fetch_add(1, Ordering::SeqCst);
2817 }
2818 },
2819 {
2820 let count = warn_count.clone();
2821 move |_, _| {
2822 count.fetch_add(1, Ordering::SeqCst);
2823 }
2824 },
2825 {
2826 let count = error_count.clone();
2827 move |_, _| {
2828 count.fetch_add(1, Ordering::SeqCst);
2829 }
2830 },
2831 ));
2832
2833 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::SuppressAll);
2834
2835 let non_replay_info = LogInfo::new("arn:test").with_replay(false);
2837 logger.debug("test", &non_replay_info);
2838 logger.info("test", &non_replay_info);
2839 logger.warn("test", &non_replay_info);
2840 logger.error("test", &non_replay_info);
2841
2842 assert_eq!(debug_count.load(Ordering::SeqCst), 1);
2843 assert_eq!(info_count.load(Ordering::SeqCst), 1);
2844 assert_eq!(warn_count.load(Ordering::SeqCst), 1);
2845 assert_eq!(error_count.load(Ordering::SeqCst), 1);
2846
2847 let replay_info = LogInfo::new("arn:test").with_replay(true);
2849 logger.debug("test", &replay_info);
2850 logger.info("test", &replay_info);
2851 logger.warn("test", &replay_info);
2852 logger.error("test", &replay_info);
2853
2854 assert_eq!(debug_count.load(Ordering::SeqCst), 1);
2856 assert_eq!(info_count.load(Ordering::SeqCst), 1);
2857 assert_eq!(warn_count.load(Ordering::SeqCst), 1);
2858 assert_eq!(error_count.load(Ordering::SeqCst), 1);
2859 }
2860
2861 #[test]
2862 fn test_replay_aware_logger_allow_all() {
2863 use std::sync::atomic::{AtomicUsize, Ordering};
2864
2865 let call_count = Arc::new(AtomicUsize::new(0));
2866
2867 let inner = Arc::new(custom_logger(
2868 {
2869 let count = call_count.clone();
2870 move |_, _| {
2871 count.fetch_add(1, Ordering::SeqCst);
2872 }
2873 },
2874 {
2875 let count = call_count.clone();
2876 move |_, _| {
2877 count.fetch_add(1, Ordering::SeqCst);
2878 }
2879 },
2880 {
2881 let count = call_count.clone();
2882 move |_, _| {
2883 count.fetch_add(1, Ordering::SeqCst);
2884 }
2885 },
2886 {
2887 let count = call_count.clone();
2888 move |_, _| {
2889 count.fetch_add(1, Ordering::SeqCst);
2890 }
2891 },
2892 ));
2893
2894 let logger = ReplayAwareLogger::allow_all(inner);
2895
2896 let replay_info = LogInfo::new("arn:test").with_replay(true);
2898 logger.debug("test", &replay_info);
2899 logger.info("test", &replay_info);
2900 logger.warn("test", &replay_info);
2901 logger.error("test", &replay_info);
2902
2903 assert_eq!(call_count.load(Ordering::SeqCst), 4);
2904 }
2905
2906 #[test]
2907 fn test_replay_aware_logger_errors_only() {
2908 use std::sync::atomic::{AtomicUsize, Ordering};
2909
2910 let debug_count = Arc::new(AtomicUsize::new(0));
2911 let info_count = Arc::new(AtomicUsize::new(0));
2912 let warn_count = Arc::new(AtomicUsize::new(0));
2913 let error_count = Arc::new(AtomicUsize::new(0));
2914
2915 let inner = Arc::new(custom_logger(
2916 {
2917 let count = debug_count.clone();
2918 move |_, _| {
2919 count.fetch_add(1, Ordering::SeqCst);
2920 }
2921 },
2922 {
2923 let count = info_count.clone();
2924 move |_, _| {
2925 count.fetch_add(1, Ordering::SeqCst);
2926 }
2927 },
2928 {
2929 let count = warn_count.clone();
2930 move |_, _| {
2931 count.fetch_add(1, Ordering::SeqCst);
2932 }
2933 },
2934 {
2935 let count = error_count.clone();
2936 move |_, _| {
2937 count.fetch_add(1, Ordering::SeqCst);
2938 }
2939 },
2940 ));
2941
2942 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::ErrorsOnly);
2943
2944 let replay_info = LogInfo::new("arn:test").with_replay(true);
2946 logger.debug("test", &replay_info);
2947 logger.info("test", &replay_info);
2948 logger.warn("test", &replay_info);
2949 logger.error("test", &replay_info);
2950
2951 assert_eq!(debug_count.load(Ordering::SeqCst), 0);
2952 assert_eq!(info_count.load(Ordering::SeqCst), 0);
2953 assert_eq!(warn_count.load(Ordering::SeqCst), 0);
2954 assert_eq!(error_count.load(Ordering::SeqCst), 1);
2955 }
2956
2957 #[test]
2958 fn test_replay_aware_logger_warnings_and_errors() {
2959 use std::sync::atomic::{AtomicUsize, Ordering};
2960
2961 let debug_count = Arc::new(AtomicUsize::new(0));
2962 let info_count = Arc::new(AtomicUsize::new(0));
2963 let warn_count = Arc::new(AtomicUsize::new(0));
2964 let error_count = Arc::new(AtomicUsize::new(0));
2965
2966 let inner = Arc::new(custom_logger(
2967 {
2968 let count = debug_count.clone();
2969 move |_, _| {
2970 count.fetch_add(1, Ordering::SeqCst);
2971 }
2972 },
2973 {
2974 let count = info_count.clone();
2975 move |_, _| {
2976 count.fetch_add(1, Ordering::SeqCst);
2977 }
2978 },
2979 {
2980 let count = warn_count.clone();
2981 move |_, _| {
2982 count.fetch_add(1, Ordering::SeqCst);
2983 }
2984 },
2985 {
2986 let count = error_count.clone();
2987 move |_, _| {
2988 count.fetch_add(1, Ordering::SeqCst);
2989 }
2990 },
2991 ));
2992
2993 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::WarningsAndErrors);
2994
2995 let replay_info = LogInfo::new("arn:test").with_replay(true);
2997 logger.debug("test", &replay_info);
2998 logger.info("test", &replay_info);
2999 logger.warn("test", &replay_info);
3000 logger.error("test", &replay_info);
3001
3002 assert_eq!(debug_count.load(Ordering::SeqCst), 0);
3003 assert_eq!(info_count.load(Ordering::SeqCst), 0);
3004 assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3005 assert_eq!(error_count.load(Ordering::SeqCst), 1);
3006 }
3007
3008 #[test]
3009 fn test_replay_aware_logger_suppress_replay_constructor() {
3010 let inner: Arc<dyn Logger> = Arc::new(TracingLogger);
3011 let logger = ReplayAwareLogger::suppress_replay(inner);
3012
3013 assert_eq!(logger.config(), ReplayLoggingConfig::SuppressAll);
3014 }
3015
3016 #[test]
3017 fn test_replay_aware_logger_debug() {
3018 let inner: Arc<dyn Logger> = Arc::new(TracingLogger);
3019 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::SuppressAll);
3020
3021 let debug_str = format!("{:?}", logger);
3022 assert!(debug_str.contains("ReplayAwareLogger"));
3023 assert!(debug_str.contains("SuppressAll"));
3024 }
3025
3026 #[test]
3027 fn test_durable_context_create_log_info_with_replay_method() {
3028 let state = create_test_state();
3029 let ctx = DurableContext::new(state);
3030
3031 let info = ctx.create_log_info_with_replay("op-123", true);
3032
3033 assert_eq!(info.operation_id, Some("op-123".to_string()));
3034 assert!(info.is_replay);
3035 }
3036
3037 #[test]
3038 fn test_durable_context_clone() {
3039 let state = create_test_state();
3040 let ctx = DurableContext::new(state);
3041
3042 ctx.next_operation_id();
3043 ctx.next_operation_id();
3044
3045 let cloned = ctx.clone();
3046
3047 assert_eq!(cloned.durable_execution_arn(), ctx.durable_execution_arn());
3048 assert_eq!(cloned.current_step_counter(), ctx.current_step_counter());
3049 }
3050
3051 #[test]
3052 fn test_durable_context_debug() {
3053 let state = create_test_state();
3054 let ctx = DurableContext::new(state);
3055
3056 let debug_str = format!("{:?}", ctx);
3057
3058 assert!(debug_str.contains("DurableContext"));
3059 assert!(debug_str.contains("durable_execution_arn"));
3060 }
3061
3062 #[test]
3063 fn test_durable_context_state_access() {
3064 let state = create_test_state();
3065 let ctx = DurableContext::new(state.clone());
3066
3067 assert!(Arc::ptr_eq(ctx.state(), &state));
3069 }
3070
3071 #[test]
3072 fn test_durable_context_send_sync() {
3073 fn assert_send_sync<T: Send + Sync>() {}
3075 assert_send_sync::<DurableContext>();
3076 }
3077
3078 #[test]
3083 fn test_log_info_method() {
3084 use std::sync::atomic::{AtomicUsize, Ordering};
3085 use std::sync::Mutex;
3086
3087 let info_count = Arc::new(AtomicUsize::new(0));
3088 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3089
3090 let captured_info_clone = captured_info.clone();
3091 let inner = Arc::new(custom_logger(
3092 |_, _| {},
3093 {
3094 let count = info_count.clone();
3095 move |_, info: &LogInfo| {
3096 count.fetch_add(1, Ordering::SeqCst);
3097 *captured_info_clone.lock().unwrap() = Some(info.clone());
3098 }
3099 },
3100 |_, _| {},
3101 |_, _| {},
3102 ));
3103
3104 let state = create_test_state();
3105 let ctx = DurableContext::new(state).with_logger(inner);
3106
3107 ctx.log_info("Test message");
3108
3109 assert_eq!(info_count.load(Ordering::SeqCst), 1);
3110
3111 let captured = captured_info.lock().unwrap();
3112 let info = captured.as_ref().unwrap();
3113 assert_eq!(
3114 info.durable_execution_arn,
3115 Some("arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123".to_string())
3116 );
3117 }
3118
3119 #[test]
3120 fn test_log_debug_method() {
3121 use std::sync::atomic::{AtomicUsize, Ordering};
3122
3123 let debug_count = Arc::new(AtomicUsize::new(0));
3124
3125 let inner = Arc::new(custom_logger(
3126 {
3127 let count = debug_count.clone();
3128 move |_, _| {
3129 count.fetch_add(1, Ordering::SeqCst);
3130 }
3131 },
3132 |_, _| {},
3133 |_, _| {},
3134 |_, _| {},
3135 ));
3136
3137 let state = create_test_state();
3138 let ctx = DurableContext::new(state).with_logger(inner);
3139
3140 ctx.log_debug("Debug message");
3141
3142 assert_eq!(debug_count.load(Ordering::SeqCst), 1);
3143 }
3144
3145 #[test]
3146 fn test_log_warn_method() {
3147 use std::sync::atomic::{AtomicUsize, Ordering};
3148
3149 let warn_count = Arc::new(AtomicUsize::new(0));
3150
3151 let inner = Arc::new(custom_logger(
3152 |_, _| {},
3153 |_, _| {},
3154 {
3155 let count = warn_count.clone();
3156 move |_, _| {
3157 count.fetch_add(1, Ordering::SeqCst);
3158 }
3159 },
3160 |_, _| {},
3161 ));
3162
3163 let state = create_test_state();
3164 let ctx = DurableContext::new(state).with_logger(inner);
3165
3166 ctx.log_warn("Warning message");
3167
3168 assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3169 }
3170
3171 #[test]
3172 fn test_log_error_method() {
3173 use std::sync::atomic::{AtomicUsize, Ordering};
3174
3175 let error_count = Arc::new(AtomicUsize::new(0));
3176
3177 let inner = Arc::new(custom_logger(|_, _| {}, |_, _| {}, |_, _| {}, {
3178 let count = error_count.clone();
3179 move |_, _| {
3180 count.fetch_add(1, Ordering::SeqCst);
3181 }
3182 }));
3183
3184 let state = create_test_state();
3185 let ctx = DurableContext::new(state).with_logger(inner);
3186
3187 ctx.log_error("Error message");
3188
3189 assert_eq!(error_count.load(Ordering::SeqCst), 1);
3190 }
3191
3192 #[test]
3193 fn test_log_info_with_extra_fields() {
3194 use std::sync::Mutex;
3195
3196 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3197
3198 let captured_info_clone = captured_info.clone();
3199 let inner = Arc::new(custom_logger(
3200 |_, _| {},
3201 move |_, info: &LogInfo| {
3202 *captured_info_clone.lock().unwrap() = Some(info.clone());
3203 },
3204 |_, _| {},
3205 |_, _| {},
3206 ));
3207
3208 let state = create_test_state();
3209 let ctx = DurableContext::new(state).with_logger(inner);
3210
3211 ctx.log_info_with("Test message", &[("order_id", "123"), ("amount", "99.99")]);
3212
3213 let captured = captured_info.lock().unwrap();
3214 let info = captured.as_ref().unwrap();
3215
3216 assert_eq!(info.extra.len(), 2);
3218 assert!(info
3219 .extra
3220 .contains(&("order_id".to_string(), "123".to_string())));
3221 assert!(info
3222 .extra
3223 .contains(&("amount".to_string(), "99.99".to_string())));
3224 }
3225
3226 #[test]
3227 fn test_log_debug_with_extra_fields() {
3228 use std::sync::Mutex;
3229
3230 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3231
3232 let captured_info_clone = captured_info.clone();
3233 let inner = Arc::new(custom_logger(
3234 move |_, info: &LogInfo| {
3235 *captured_info_clone.lock().unwrap() = Some(info.clone());
3236 },
3237 |_, _| {},
3238 |_, _| {},
3239 |_, _| {},
3240 ));
3241
3242 let state = create_test_state();
3243 let ctx = DurableContext::new(state).with_logger(inner);
3244
3245 ctx.log_debug_with("Debug message", &[("key", "value")]);
3246
3247 let captured = captured_info.lock().unwrap();
3248 let info = captured.as_ref().unwrap();
3249
3250 assert_eq!(info.extra.len(), 1);
3251 assert!(info
3252 .extra
3253 .contains(&("key".to_string(), "value".to_string())));
3254 }
3255
3256 #[test]
3257 fn test_log_warn_with_extra_fields() {
3258 use std::sync::Mutex;
3259
3260 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3261
3262 let captured_info_clone = captured_info.clone();
3263 let inner = Arc::new(custom_logger(
3264 |_, _| {},
3265 |_, _| {},
3266 move |_, info: &LogInfo| {
3267 *captured_info_clone.lock().unwrap() = Some(info.clone());
3268 },
3269 |_, _| {},
3270 ));
3271
3272 let state = create_test_state();
3273 let ctx = DurableContext::new(state).with_logger(inner);
3274
3275 ctx.log_warn_with("Warning message", &[("retry", "3")]);
3276
3277 let captured = captured_info.lock().unwrap();
3278 let info = captured.as_ref().unwrap();
3279
3280 assert_eq!(info.extra.len(), 1);
3281 assert!(info.extra.contains(&("retry".to_string(), "3".to_string())));
3282 }
3283
3284 #[test]
3285 fn test_log_error_with_extra_fields() {
3286 use std::sync::Mutex;
3287
3288 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3289
3290 let captured_info_clone = captured_info.clone();
3291 let inner = Arc::new(custom_logger(
3292 |_, _| {},
3293 |_, _| {},
3294 |_, _| {},
3295 move |_, info: &LogInfo| {
3296 *captured_info_clone.lock().unwrap() = Some(info.clone());
3297 },
3298 ));
3299
3300 let state = create_test_state();
3301 let ctx = DurableContext::new(state).with_logger(inner);
3302
3303 ctx.log_error_with(
3304 "Error message",
3305 &[("error_code", "E001"), ("details", "Something went wrong")],
3306 );
3307
3308 let captured = captured_info.lock().unwrap();
3309 let info = captured.as_ref().unwrap();
3310
3311 assert_eq!(info.extra.len(), 2);
3312 assert!(info
3313 .extra
3314 .contains(&("error_code".to_string(), "E001".to_string())));
3315 assert!(info
3316 .extra
3317 .contains(&("details".to_string(), "Something went wrong".to_string())));
3318 }
3319
3320 #[test]
3321 fn test_log_methods_include_parent_id_in_child_context() {
3322 use std::sync::Mutex;
3323
3324 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3325
3326 let captured_info_clone = captured_info.clone();
3327 let inner: Arc<dyn Logger> = Arc::new(custom_logger(
3328 |_, _| {},
3329 move |_, info: &LogInfo| {
3330 *captured_info_clone.lock().unwrap() = Some(info.clone());
3331 },
3332 |_, _| {},
3333 |_, _| {},
3334 ));
3335
3336 let state = create_test_state();
3337 let ctx = DurableContext::new(state).with_logger(inner.clone());
3338
3339 let parent_op_id = ctx.next_operation_id();
3340 let child_ctx = ctx.create_child_context(&parent_op_id).with_logger(inner);
3341
3342 child_ctx.log_info("Child context message");
3343
3344 let captured = captured_info.lock().unwrap();
3345 let info = captured.as_ref().unwrap();
3346
3347 assert_eq!(info.parent_id, Some(parent_op_id));
3349 }
3350
3351 #[test]
3356 fn test_configure_logger_swaps_logger() {
3357 use std::sync::atomic::{AtomicUsize, Ordering};
3359
3360 let original_count = Arc::new(AtomicUsize::new(0));
3361 let new_count = Arc::new(AtomicUsize::new(0));
3362
3363 let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3364 |_, _| {},
3365 {
3366 let count = original_count.clone();
3367 move |_, _| {
3368 count.fetch_add(1, Ordering::SeqCst);
3369 }
3370 },
3371 |_, _| {},
3372 |_, _| {},
3373 ));
3374
3375 let new_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3376 |_, _| {},
3377 {
3378 let count = new_count.clone();
3379 move |_, _| {
3380 count.fetch_add(1, Ordering::SeqCst);
3381 }
3382 },
3383 |_, _| {},
3384 |_, _| {},
3385 ));
3386
3387 let state = create_test_state();
3388 let ctx = DurableContext::new(state).with_logger(original_logger);
3389
3390 ctx.log_info("before swap");
3392 assert_eq!(original_count.load(Ordering::SeqCst), 1);
3393 assert_eq!(new_count.load(Ordering::SeqCst), 0);
3394
3395 ctx.configure_logger(new_logger);
3397
3398 ctx.log_info("after swap");
3400 assert_eq!(original_count.load(Ordering::SeqCst), 1); assert_eq!(new_count.load(Ordering::SeqCst), 1);
3402 }
3403
3404 #[test]
3405 fn test_original_logger_used_when_configure_logger_not_called() {
3406 use std::sync::atomic::{AtomicUsize, Ordering};
3408
3409 let original_count = Arc::new(AtomicUsize::new(0));
3410
3411 let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3412 |_, _| {},
3413 {
3414 let count = original_count.clone();
3415 move |_, _| {
3416 count.fetch_add(1, Ordering::SeqCst);
3417 }
3418 },
3419 |_, _| {},
3420 |_, _| {},
3421 ));
3422
3423 let state = create_test_state();
3424 let ctx = DurableContext::new(state).with_logger(original_logger);
3425
3426 ctx.log_info("message 1");
3428 ctx.log_info("message 2");
3429 ctx.log_info("message 3");
3430
3431 assert_eq!(original_count.load(Ordering::SeqCst), 3);
3432 }
3433
3434 #[test]
3435 fn test_configure_logger_affects_child_contexts() {
3436 use std::sync::atomic::{AtomicUsize, Ordering};
3438
3439 let original_count = Arc::new(AtomicUsize::new(0));
3440 let new_count = Arc::new(AtomicUsize::new(0));
3441
3442 let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3443 |_, _| {},
3444 {
3445 let count = original_count.clone();
3446 move |_, _| {
3447 count.fetch_add(1, Ordering::SeqCst);
3448 }
3449 },
3450 |_, _| {},
3451 |_, _| {},
3452 ));
3453
3454 let new_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3455 |_, _| {},
3456 {
3457 let count = new_count.clone();
3458 move |_, _| {
3459 count.fetch_add(1, Ordering::SeqCst);
3460 }
3461 },
3462 |_, _| {},
3463 |_, _| {},
3464 ));
3465
3466 let state = create_test_state();
3467 let ctx = DurableContext::new(state).with_logger(original_logger);
3468 let parent_op_id = ctx.next_operation_id();
3469 let child_ctx = ctx.create_child_context(&parent_op_id);
3470
3471 child_ctx.log_info("child before swap");
3473 assert_eq!(original_count.load(Ordering::SeqCst), 1);
3474
3475 ctx.configure_logger(new_logger);
3477
3478 child_ctx.log_info("child after swap");
3480 assert_eq!(new_count.load(Ordering::SeqCst), 1);
3481 assert_eq!(original_count.load(Ordering::SeqCst), 1); }
3483}
3484
3485#[cfg(test)]
3486mod property_tests {
3487 use super::*;
3488 use proptest::prelude::*;
3489
3490 proptest! {
3496 #![proptest_config(ProptestConfig::with_cases(100))]
3497
3498 #[test]
3501 fn prop_operation_id_determinism(
3502 base_id in "[a-zA-Z0-9:/-]{1,100}",
3503 counter in 0u64..10000u64,
3504 ) {
3505 let id1 = generate_operation_id(&base_id, counter);
3507 let id2 = generate_operation_id(&base_id, counter);
3508
3509 prop_assert_eq!(&id1, &id2, "Same base_id and counter must produce identical IDs");
3511
3512 prop_assert_eq!(id1.len(), 32, "ID must be 32 hex characters");
3514 prop_assert!(id1.chars().all(|c| c.is_ascii_hexdigit()), "ID must be valid hex");
3515 }
3516
3517 #[test]
3520 fn prop_operation_id_generator_determinism(
3521 base_id in "[a-zA-Z0-9:/-]{1,100}",
3522 num_ids in 1usize..50usize,
3523 ) {
3524 let gen1 = OperationIdGenerator::new(&base_id);
3526 let gen2 = OperationIdGenerator::new(&base_id);
3527
3528 let ids1: Vec<String> = (0..num_ids).map(|_| gen1.next_id()).collect();
3530 let ids2: Vec<String> = (0..num_ids).map(|_| gen2.next_id()).collect();
3531
3532 prop_assert_eq!(&ids1, &ids2, "Same generator sequence must produce identical IDs");
3534
3535 let unique_count = {
3537 let mut set = std::collections::HashSet::new();
3538 for id in &ids1 {
3539 set.insert(id.clone());
3540 }
3541 set.len()
3542 };
3543 prop_assert_eq!(unique_count, num_ids, "All IDs in sequence must be unique");
3544 }
3545
3546 #[test]
3549 fn prop_operation_id_replay_determinism(
3550 base_id in "[a-zA-Z0-9:/-]{1,100}",
3551 operations in prop::collection::vec(0u32..3u32, 1..20),
3552 ) {
3553 let gen1 = OperationIdGenerator::new(&base_id);
3557 let gen2 = OperationIdGenerator::new(&base_id);
3558
3559 let mut ids1 = Vec::new();
3560 let mut ids2 = Vec::new();
3561
3562 for op_type in &operations {
3564 ids1.push(gen1.next_id());
3566
3567 if *op_type == 2 {
3569 let parent_id = ids1.last().unwrap().clone();
3570 let child_gen = gen1.create_child(parent_id);
3571 ids1.push(child_gen.next_id());
3572 }
3573 }
3574
3575 for op_type in &operations {
3577 ids2.push(gen2.next_id());
3578
3579 if *op_type == 2 {
3580 let parent_id = ids2.last().unwrap().clone();
3581 let child_gen = gen2.create_child(parent_id);
3582 ids2.push(child_gen.next_id());
3583 }
3584 }
3585
3586 prop_assert_eq!(&ids1, &ids2, "Replay must produce identical operation IDs");
3588 }
3589 }
3590
3591 mod concurrent_id_tests {
3596 use super::*;
3597 use std::sync::Arc;
3598 use std::thread;
3599
3600 proptest! {
3601 #![proptest_config(ProptestConfig::with_cases(100))]
3602
3603 #[test]
3606 fn prop_concurrent_id_uniqueness(
3607 base_id in "[a-zA-Z0-9:/-]{1,100}",
3608 num_threads in 2usize..10usize,
3609 ids_per_thread in 10usize..100usize,
3610 ) {
3611 let gen = Arc::new(OperationIdGenerator::new(&base_id));
3612 let mut handles = vec![];
3613
3614 for _ in 0..num_threads {
3616 let gen_clone = gen.clone();
3617 let count = ids_per_thread;
3618 handles.push(thread::spawn(move || {
3619 let mut ids = Vec::with_capacity(count);
3620 for _ in 0..count {
3621 ids.push(gen_clone.next_id());
3622 }
3623 ids
3624 }));
3625 }
3626
3627 let mut all_ids = Vec::new();
3629 for handle in handles {
3630 all_ids.extend(handle.join().unwrap());
3631 }
3632
3633 let total_expected = num_threads * ids_per_thread;
3634
3635 prop_assert_eq!(all_ids.len(), total_expected, "Should have generated {} IDs", total_expected);
3637
3638 let unique_count = {
3640 let mut set = std::collections::HashSet::new();
3641 for id in &all_ids {
3642 set.insert(id.clone());
3643 }
3644 set.len()
3645 };
3646
3647 prop_assert_eq!(
3648 unique_count,
3649 total_expected,
3650 "All {} IDs must be unique, but only {} were unique",
3651 total_expected,
3652 unique_count
3653 );
3654
3655 prop_assert_eq!(
3657 gen.current_counter() as usize,
3658 total_expected,
3659 "Counter should equal total IDs generated"
3660 );
3661 }
3662
3663 #[test]
3666 fn prop_concurrent_id_uniqueness_stress(
3667 base_id in "[a-zA-Z0-9:/-]{1,50}",
3668 ) {
3669 let num_threads = 20;
3671 let ids_per_thread = 500;
3672
3673 let gen = Arc::new(OperationIdGenerator::new(&base_id));
3674 let mut handles = vec![];
3675
3676 for _ in 0..num_threads {
3677 let gen_clone = gen.clone();
3678 handles.push(thread::spawn(move || {
3679 let mut ids = Vec::with_capacity(ids_per_thread);
3680 for _ in 0..ids_per_thread {
3681 ids.push(gen_clone.next_id());
3682 }
3683 ids
3684 }));
3685 }
3686
3687 let mut all_ids = Vec::new();
3688 for handle in handles {
3689 all_ids.extend(handle.join().unwrap());
3690 }
3691
3692 let total_expected = num_threads * ids_per_thread;
3693
3694 let unique_count = {
3696 let mut set = std::collections::HashSet::new();
3697 for id in &all_ids {
3698 set.insert(id.clone());
3699 }
3700 set.len()
3701 };
3702
3703 prop_assert_eq!(
3704 unique_count,
3705 total_expected,
3706 "All {} IDs must be unique under high concurrency",
3707 total_expected
3708 );
3709 }
3710 }
3711 }
3712
3713 mod logging_automatic_context_tests {
3719 use super::*;
3720 use std::sync::{Arc, Mutex};
3721
3722 proptest! {
3723 #![proptest_config(ProptestConfig::with_cases(100))]
3724
3725 #[test]
3728 fn prop_logging_automatic_context(
3729 message in "[a-zA-Z0-9 ]{1,100}",
3730 log_level in 0u8..4u8,
3731 ) {
3732 use crate::client::MockDurableServiceClient;
3733 use crate::lambda::InitialExecutionState;
3734
3735 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3736 let captured_info_clone = captured_info.clone();
3737
3738 let inner = Arc::new(custom_logger(
3740 {
3741 let captured = captured_info_clone.clone();
3742 move |_, info: &LogInfo| {
3743 *captured.lock().unwrap() = Some(info.clone());
3744 }
3745 },
3746 {
3747 let captured = captured_info_clone.clone();
3748 move |_, info: &LogInfo| {
3749 *captured.lock().unwrap() = Some(info.clone());
3750 }
3751 },
3752 {
3753 let captured = captured_info_clone.clone();
3754 move |_, info: &LogInfo| {
3755 *captured.lock().unwrap() = Some(info.clone());
3756 }
3757 },
3758 {
3759 let captured = captured_info_clone.clone();
3760 move |_, info: &LogInfo| {
3761 *captured.lock().unwrap() = Some(info.clone());
3762 }
3763 },
3764 ));
3765
3766 let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3767 let state = Arc::new(ExecutionState::new(
3768 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3769 "token-123",
3770 InitialExecutionState::new(),
3771 client,
3772 ));
3773 let ctx = DurableContext::new(state).with_logger(inner);
3774
3775 match log_level {
3777 0 => ctx.log_debug(&message),
3778 1 => ctx.log_info(&message),
3779 2 => ctx.log_warn(&message),
3780 _ => ctx.log_error(&message),
3781 }
3782
3783 let captured = captured_info.lock().unwrap();
3785 let info = captured.as_ref().expect("LogInfo should be captured");
3786
3787 prop_assert!(
3789 info.durable_execution_arn.is_some(),
3790 "durable_execution_arn must be automatically included"
3791 );
3792 prop_assert_eq!(
3793 info.durable_execution_arn.as_ref().unwrap(),
3794 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3795 "durable_execution_arn must match the context's ARN"
3796 );
3797 }
3798
3799 #[test]
3802 fn prop_logging_automatic_context_child(
3803 message in "[a-zA-Z0-9 ]{1,100}",
3804 log_level in 0u8..4u8,
3805 ) {
3806 use crate::client::MockDurableServiceClient;
3807 use crate::lambda::InitialExecutionState;
3808
3809 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3810 let captured_info_clone = captured_info.clone();
3811
3812 let inner: Arc<dyn Logger> = Arc::new(custom_logger(
3814 {
3815 let captured = captured_info_clone.clone();
3816 move |_, info: &LogInfo| {
3817 *captured.lock().unwrap() = Some(info.clone());
3818 }
3819 },
3820 {
3821 let captured = captured_info_clone.clone();
3822 move |_, info: &LogInfo| {
3823 *captured.lock().unwrap() = Some(info.clone());
3824 }
3825 },
3826 {
3827 let captured = captured_info_clone.clone();
3828 move |_, info: &LogInfo| {
3829 *captured.lock().unwrap() = Some(info.clone());
3830 }
3831 },
3832 {
3833 let captured = captured_info_clone.clone();
3834 move |_, info: &LogInfo| {
3835 *captured.lock().unwrap() = Some(info.clone());
3836 }
3837 },
3838 ));
3839
3840 let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3841 let state = Arc::new(ExecutionState::new(
3842 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3843 "token-123",
3844 InitialExecutionState::new(),
3845 client,
3846 ));
3847 let ctx = DurableContext::new(state).with_logger(inner.clone());
3848
3849 let parent_op_id = ctx.next_operation_id();
3851 let child_ctx = ctx.create_child_context(&parent_op_id).with_logger(inner);
3852
3853 match log_level {
3855 0 => child_ctx.log_debug(&message),
3856 1 => child_ctx.log_info(&message),
3857 2 => child_ctx.log_warn(&message),
3858 _ => child_ctx.log_error(&message),
3859 }
3860
3861 let captured = captured_info.lock().unwrap();
3863 let info = captured.as_ref().expect("LogInfo should be captured");
3864
3865 prop_assert!(
3867 info.durable_execution_arn.is_some(),
3868 "durable_execution_arn must be automatically included in child context"
3869 );
3870
3871 prop_assert!(
3873 info.parent_id.is_some(),
3874 "parent_id must be automatically included in child context"
3875 );
3876 prop_assert_eq!(
3877 info.parent_id.as_ref().unwrap(),
3878 &parent_op_id,
3879 "parent_id must match the parent operation ID"
3880 );
3881 }
3882 }
3883 }
3884
3885 mod logging_extra_fields_tests {
3890 use super::*;
3891 use std::sync::{Arc, Mutex};
3892
3893 proptest! {
3894 #![proptest_config(ProptestConfig::with_cases(100))]
3895
3896 #[test]
3899 fn prop_logging_extra_fields(
3900 message in "[a-zA-Z0-9 ]{1,100}",
3901 log_level in 0u8..4u8,
3902 key1 in "[a-zA-Z_][a-zA-Z0-9_]{0,20}",
3903 value1 in "[a-zA-Z0-9]{1,50}",
3904 key2 in "[a-zA-Z_][a-zA-Z0-9_]{0,20}",
3905 value2 in "[a-zA-Z0-9]{1,50}",
3906 ) {
3907 use crate::client::MockDurableServiceClient;
3908 use crate::lambda::InitialExecutionState;
3909
3910 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3911 let captured_info_clone = captured_info.clone();
3912
3913 let inner = Arc::new(custom_logger(
3915 {
3916 let captured = captured_info_clone.clone();
3917 move |_, info: &LogInfo| {
3918 *captured.lock().unwrap() = Some(info.clone());
3919 }
3920 },
3921 {
3922 let captured = captured_info_clone.clone();
3923 move |_, info: &LogInfo| {
3924 *captured.lock().unwrap() = Some(info.clone());
3925 }
3926 },
3927 {
3928 let captured = captured_info_clone.clone();
3929 move |_, info: &LogInfo| {
3930 *captured.lock().unwrap() = Some(info.clone());
3931 }
3932 },
3933 {
3934 let captured = captured_info_clone.clone();
3935 move |_, info: &LogInfo| {
3936 *captured.lock().unwrap() = Some(info.clone());
3937 }
3938 },
3939 ));
3940
3941 let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3942 let state = Arc::new(ExecutionState::new(
3943 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3944 "token-123",
3945 InitialExecutionState::new(),
3946 client,
3947 ));
3948 let ctx = DurableContext::new(state).with_logger(inner);
3949
3950 let fields: Vec<(&str, &str)> = vec![(&key1, &value1), (&key2, &value2)];
3952
3953 match log_level {
3955 0 => ctx.log_debug_with(&message, &fields),
3956 1 => ctx.log_info_with(&message, &fields),
3957 2 => ctx.log_warn_with(&message, &fields),
3958 _ => ctx.log_error_with(&message, &fields),
3959 }
3960
3961 let captured = captured_info.lock().unwrap();
3963 let info = captured.as_ref().expect("LogInfo should be captured");
3964
3965 prop_assert_eq!(
3967 info.extra.len(),
3968 2,
3969 "Extra fields must be included in the log output"
3970 );
3971
3972 prop_assert!(
3974 info.extra.contains(&(key1.clone(), value1.clone())),
3975 "First extra field must be present: {}={}", key1, value1
3976 );
3977 prop_assert!(
3978 info.extra.contains(&(key2.clone(), value2.clone())),
3979 "Second extra field must be present: {}={}", key2, value2
3980 );
3981 }
3982
3983 #[test]
3986 fn prop_logging_extra_fields_empty(
3987 message in "[a-zA-Z0-9 ]{1,100}",
3988 log_level in 0u8..4u8,
3989 ) {
3990 use crate::client::MockDurableServiceClient;
3991 use crate::lambda::InitialExecutionState;
3992
3993 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3994 let captured_info_clone = captured_info.clone();
3995
3996 let inner = Arc::new(custom_logger(
3998 {
3999 let captured = captured_info_clone.clone();
4000 move |_, info: &LogInfo| {
4001 *captured.lock().unwrap() = Some(info.clone());
4002 }
4003 },
4004 {
4005 let captured = captured_info_clone.clone();
4006 move |_, info: &LogInfo| {
4007 *captured.lock().unwrap() = Some(info.clone());
4008 }
4009 },
4010 {
4011 let captured = captured_info_clone.clone();
4012 move |_, info: &LogInfo| {
4013 *captured.lock().unwrap() = Some(info.clone());
4014 }
4015 },
4016 {
4017 let captured = captured_info_clone.clone();
4018 move |_, info: &LogInfo| {
4019 *captured.lock().unwrap() = Some(info.clone());
4020 }
4021 },
4022 ));
4023
4024 let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
4025 let state = Arc::new(ExecutionState::new(
4026 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
4027 "token-123",
4028 InitialExecutionState::new(),
4029 client,
4030 ));
4031 let ctx = DurableContext::new(state).with_logger(inner);
4032
4033 let empty_fields: &[(&str, &str)] = &[];
4035 match log_level {
4036 0 => ctx.log_debug_with(&message, empty_fields),
4037 1 => ctx.log_info_with(&message, empty_fields),
4038 2 => ctx.log_warn_with(&message, empty_fields),
4039 _ => ctx.log_error_with(&message, empty_fields),
4040 }
4041
4042 let captured = captured_info.lock().unwrap();
4044 let info = captured.as_ref().expect("LogInfo should be captured");
4045
4046 prop_assert!(
4047 info.extra.is_empty(),
4048 "Extra fields should be empty when none are provided"
4049 );
4050
4051 prop_assert!(
4053 info.durable_execution_arn.is_some(),
4054 "durable_execution_arn must still be present even with empty extra fields"
4055 );
4056 }
4057 }
4058 }
4059}
4060
4061#[cfg(test)]
4062mod sealed_trait_tests {
4063 use super::*;
4064 use std::sync::atomic::{AtomicUsize, Ordering};
4065
4066 mod logger_tests {
4075 use super::*;
4076
4077 #[test]
4078 fn test_tracing_logger_implements_logger() {
4079 let logger: &dyn Logger = &TracingLogger;
4081 let info = LogInfo::default();
4082
4083 logger.debug("test debug", &info);
4085 logger.info("test info", &info);
4086 logger.warn("test warn", &info);
4087 logger.error("test error", &info);
4088 }
4089
4090 #[test]
4091 fn test_replay_aware_logger_implements_logger() {
4092 let inner = Arc::new(TracingLogger);
4094 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::AllowAll);
4095 let logger_ref: &dyn Logger = &logger;
4096 let info = LogInfo::default();
4097
4098 logger_ref.debug("test debug", &info);
4100 logger_ref.info("test info", &info);
4101 logger_ref.warn("test warn", &info);
4102 logger_ref.error("test error", &info);
4103 }
4104
4105 #[test]
4106 fn test_custom_logger_implements_logger() {
4107 let call_count = Arc::new(AtomicUsize::new(0));
4109 let count_clone = call_count.clone();
4110
4111 let logger = custom_logger(
4112 {
4113 let count = count_clone.clone();
4114 move |_msg, _info| {
4115 count.fetch_add(1, Ordering::SeqCst);
4116 }
4117 },
4118 {
4119 let count = count_clone.clone();
4120 move |_msg, _info| {
4121 count.fetch_add(1, Ordering::SeqCst);
4122 }
4123 },
4124 {
4125 let count = count_clone.clone();
4126 move |_msg, _info| {
4127 count.fetch_add(1, Ordering::SeqCst);
4128 }
4129 },
4130 {
4131 let count = count_clone.clone();
4132 move |_msg, _info| {
4133 count.fetch_add(1, Ordering::SeqCst);
4134 }
4135 },
4136 );
4137
4138 let logger_ref: &dyn Logger = &logger;
4139 let info = LogInfo::default();
4140
4141 logger_ref.debug("test", &info);
4142 logger_ref.info("test", &info);
4143 logger_ref.warn("test", &info);
4144 logger_ref.error("test", &info);
4145
4146 assert_eq!(call_count.load(Ordering::SeqCst), 4);
4147 }
4148
4149 #[test]
4150 fn test_simple_custom_logger() {
4151 let call_count = Arc::new(AtomicUsize::new(0));
4152 let count_clone = call_count.clone();
4153
4154 let logger = simple_custom_logger(move |_level, _msg, _info| {
4155 count_clone.fetch_add(1, Ordering::SeqCst);
4156 });
4157
4158 let info = LogInfo::default();
4159
4160 logger.debug("test", &info);
4161 logger.info("test", &info);
4162 logger.warn("test", &info);
4163 logger.error("test", &info);
4164
4165 assert_eq!(call_count.load(Ordering::SeqCst), 4);
4166 }
4167
4168 #[test]
4169 fn test_custom_logger_receives_correct_messages() {
4170 let messages = Arc::new(std::sync::Mutex::new(Vec::new()));
4171 let messages_clone = messages.clone();
4172
4173 let logger = simple_custom_logger(move |level, msg, _info| {
4174 messages_clone
4175 .lock()
4176 .unwrap()
4177 .push(format!("[{}] {}", level, msg));
4178 });
4179
4180 let info = LogInfo::default();
4181
4182 logger.debug("debug message", &info);
4183 logger.info("info message", &info);
4184 logger.warn("warn message", &info);
4185 logger.error("error message", &info);
4186
4187 let logged = messages.lock().unwrap();
4188 assert_eq!(logged.len(), 4);
4189 assert_eq!(logged[0], "[DEBUG] debug message");
4190 assert_eq!(logged[1], "[INFO] info message");
4191 assert_eq!(logged[2], "[WARN] warn message");
4192 assert_eq!(logged[3], "[ERROR] error message");
4193 }
4194
4195 #[test]
4196 fn test_custom_logger_receives_log_info() {
4197 let received_info = Arc::new(std::sync::Mutex::new(None));
4198 let info_clone = received_info.clone();
4199
4200 let logger = simple_custom_logger(move |_level, _msg, info| {
4201 *info_clone.lock().unwrap() = Some(info.clone());
4202 });
4203
4204 let info = LogInfo::new("arn:aws:test")
4205 .with_operation_id("op-123")
4206 .with_parent_id("parent-456")
4207 .with_replay(true);
4208
4209 logger.info("test", &info);
4210
4211 let received = received_info.lock().unwrap().clone().unwrap();
4212 assert_eq!(
4213 received.durable_execution_arn,
4214 Some("arn:aws:test".to_string())
4215 );
4216 assert_eq!(received.operation_id, Some("op-123".to_string()));
4217 assert_eq!(received.parent_id, Some("parent-456".to_string()));
4218 assert!(received.is_replay);
4219 }
4220
4221 #[test]
4222 fn test_replay_aware_logger_suppresses_during_replay() {
4223 let call_count = Arc::new(AtomicUsize::new(0));
4224 let count_clone = call_count.clone();
4225
4226 let inner_logger = Arc::new(custom_logger(
4227 {
4228 let count = count_clone.clone();
4229 move |_msg, _info| {
4230 count.fetch_add(1, Ordering::SeqCst);
4231 }
4232 },
4233 {
4234 let count = count_clone.clone();
4235 move |_msg, _info| {
4236 count.fetch_add(1, Ordering::SeqCst);
4237 }
4238 },
4239 {
4240 let count = count_clone.clone();
4241 move |_msg, _info| {
4242 count.fetch_add(1, Ordering::SeqCst);
4243 }
4244 },
4245 {
4246 let count = count_clone.clone();
4247 move |_msg, _info| {
4248 count.fetch_add(1, Ordering::SeqCst);
4249 }
4250 },
4251 ));
4252
4253 let logger = ReplayAwareLogger::new(inner_logger, ReplayLoggingConfig::SuppressAll);
4254
4255 let non_replay_info = LogInfo::default().with_replay(false);
4257 logger.debug("test", &non_replay_info);
4258 logger.info("test", &non_replay_info);
4259 logger.warn("test", &non_replay_info);
4260 logger.error("test", &non_replay_info);
4261 assert_eq!(call_count.load(Ordering::SeqCst), 4);
4262
4263 let replay_info = LogInfo::default().with_replay(true);
4265 logger.debug("test", &replay_info);
4266 logger.info("test", &replay_info);
4267 logger.warn("test", &replay_info);
4268 logger.error("test", &replay_info);
4269 assert_eq!(call_count.load(Ordering::SeqCst), 4); }
4271
4272 #[test]
4273 fn test_replay_aware_logger_errors_only_during_replay() {
4274 let call_count = Arc::new(AtomicUsize::new(0));
4275 let count_clone = call_count.clone();
4276
4277 let inner_logger = Arc::new(custom_logger(
4278 {
4279 let count = count_clone.clone();
4280 move |_msg, _info| {
4281 count.fetch_add(1, Ordering::SeqCst);
4282 }
4283 },
4284 {
4285 let count = count_clone.clone();
4286 move |_msg, _info| {
4287 count.fetch_add(1, Ordering::SeqCst);
4288 }
4289 },
4290 {
4291 let count = count_clone.clone();
4292 move |_msg, _info| {
4293 count.fetch_add(1, Ordering::SeqCst);
4294 }
4295 },
4296 {
4297 let count = count_clone.clone();
4298 move |_msg, _info| {
4299 count.fetch_add(1, Ordering::SeqCst);
4300 }
4301 },
4302 ));
4303
4304 let logger = ReplayAwareLogger::new(inner_logger, ReplayLoggingConfig::ErrorsOnly);
4305
4306 let replay_info = LogInfo::default().with_replay(true);
4307 logger.debug("test", &replay_info);
4308 logger.info("test", &replay_info);
4309 logger.warn("test", &replay_info);
4310 logger.error("test", &replay_info);
4311
4312 assert_eq!(call_count.load(Ordering::SeqCst), 1);
4314 }
4315 }
4316}