Skip to main content

liminal/durability/
config.rs

1use std::time::Duration;
2
3use super::DurabilityError;
4
5/// Per-channel durability strategy.
6#[derive(Clone, Copy, Debug, PartialEq, Eq)]
7pub enum DurabilityMode {
8    /// No persistence or durability bookkeeping.
9    Ephemeral,
10    /// Persist messages for replay and crash recovery.
11    Durable,
12    /// Persist messages and apply the lightweight dedup contract.
13    DurableDedup,
14    /// Persist resumable processing state for durable conversations.
15    DurableConversation,
16}
17
18/// Policy controlling when consumer progress is checkpointed.
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub enum CheckpointPolicy {
21    /// Checkpoint after every processed message.
22    PerMessage,
23    /// Checkpoint after processing batches of the configured size.
24    PerBatch(usize),
25    /// Checkpoint only when the caller explicitly flushes progress.
26    ExplicitFlush,
27}
28
29/// Explicit durability configuration supplied per channel.
30#[derive(Clone, Copy, Debug, PartialEq, Eq)]
31pub struct DurabilityConfig {
32    /// Channel durability strategy.
33    mode: DurabilityMode,
34    /// Number of independent durable partitions.
35    partition_count: usize,
36    /// Time-to-live for dedup receipts.
37    dedup_ttl: Duration,
38    /// Consumer checkpoint policy.
39    checkpoint_policy: CheckpointPolicy,
40}
41
42impl DurabilityConfig {
43    /// Creates validated durability configuration from caller-supplied fields.
44    ///
45    /// # Errors
46    ///
47    /// Returns [`DurabilityError::ConfigError`] when `partition_count` is zero,
48    /// when `mode` is [`DurabilityMode::DurableDedup`] and `dedup_ttl` is zero,
49    /// or when `checkpoint_policy` is [`CheckpointPolicy::PerBatch`] with a zero batch size.
50    pub fn new(
51        mode: DurabilityMode,
52        partition_count: usize,
53        dedup_ttl: Duration,
54        checkpoint_policy: CheckpointPolicy,
55    ) -> Result<Self, DurabilityError> {
56        if partition_count == 0 {
57            return Err(DurabilityError::ConfigError(
58                "partition_count must be at least 1".to_owned(),
59            ));
60        }
61
62        if mode == DurabilityMode::DurableDedup && dedup_ttl == Duration::ZERO {
63            return Err(DurabilityError::ConfigError(
64                "dedup_ttl must be greater than zero for DurableDedup mode".to_owned(),
65            ));
66        }
67
68        if checkpoint_policy == CheckpointPolicy::PerBatch(0) {
69            return Err(DurabilityError::ConfigError(
70                "checkpoint batch size must be at least 1".to_owned(),
71            ));
72        }
73
74        Ok(Self {
75            mode,
76            partition_count,
77            dedup_ttl,
78            checkpoint_policy,
79        })
80    }
81
82    /// Returns the configured channel durability strategy.
83    #[must_use]
84    pub const fn mode(&self) -> DurabilityMode {
85        self.mode
86    }
87
88    /// Returns the number of independent durable partitions.
89    #[must_use]
90    pub const fn partition_count(&self) -> usize {
91        self.partition_count
92    }
93
94    /// Returns the time-to-live for dedup receipts.
95    #[must_use]
96    pub const fn dedup_ttl(&self) -> Duration {
97        self.dedup_ttl
98    }
99
100    /// Returns the consumer checkpoint policy.
101    #[must_use]
102    pub const fn checkpoint_policy(&self) -> CheckpointPolicy {
103        self.checkpoint_policy
104    }
105}