Skip to main content

liminal/durability/channel/
storage.rs

1use std::fmt;
2use std::sync::Arc;
3
4use super::MessageEnvelope;
5use crate::durability::{DurabilityConfig, DurabilityError, DurabilityMode, DurableStore};
6
7/// Caller-provided function used to route envelopes to partitions.
8#[derive(Clone)]
9pub struct PartitionKey {
10    function: Arc<dyn Fn(&MessageEnvelope) -> u64 + Send + Sync + 'static>,
11}
12
13impl PartitionKey {
14    /// Wraps a partition key closure.
15    #[must_use]
16    pub fn new<F>(function: F) -> Self
17    where
18        F: Fn(&MessageEnvelope) -> u64 + Send + Sync + 'static,
19    {
20        Self {
21            function: Arc::new(function),
22        }
23    }
24
25    fn apply(&self, envelope: &MessageEnvelope) -> u64 {
26        (self.function)(envelope)
27    }
28}
29
30impl fmt::Debug for PartitionKey {
31    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
32        formatter.write_str("PartitionKey(<closure>)")
33    }
34}
35
36/// Haematite-backed durable channel state.
37#[derive(Clone)]
38pub struct DurableChannel {
39    channel_id: String,
40    partition_count: usize,
41    partition_key: Option<PartitionKey>,
42    next_sequences: Vec<u64>,
43    store: Arc<dyn DurableStore>,
44}
45
46impl DurableChannel {
47    /// Creates a new durable channel with all partition sequences initialized to zero.
48    ///
49    /// # Errors
50    ///
51    /// Returns [`DurabilityError::ConfigError`] when `partition_count` is zero.
52    pub fn new(
53        channel_id: impl Into<String>,
54        partition_count: usize,
55        store: Arc<dyn DurableStore>,
56    ) -> Result<Self, DurabilityError> {
57        Self::from_parts(channel_id.into(), partition_count, store, None, None)
58    }
59
60    /// Creates a durable channel with a caller-provided partition key function.
61    ///
62    /// # Errors
63    ///
64    /// Returns [`DurabilityError::ConfigError`] when `partition_count` is zero.
65    pub fn with_partition_key<F>(
66        channel_id: impl Into<String>,
67        partition_count: usize,
68        store: Arc<dyn DurableStore>,
69        partition_key: F,
70    ) -> Result<Self, DurabilityError>
71    where
72        F: Fn(&MessageEnvelope) -> u64 + Send + Sync + 'static,
73    {
74        Self::from_parts(
75            channel_id.into(),
76            partition_count,
77            store,
78            Some(PartitionKey::new(partition_key)),
79            None,
80        )
81    }
82
83    /// Creates a durable channel from explicit recovered next sequence counters.
84    ///
85    /// # Errors
86    ///
87    /// Returns [`DurabilityError::ConfigError`] when `partition_count` is zero
88    /// or the recovered sequence vector length differs from `partition_count`.
89    pub fn from_recovered_sequences(
90        channel_id: impl Into<String>,
91        partition_count: usize,
92        store: Arc<dyn DurableStore>,
93        next_sequences: Vec<u64>,
94    ) -> Result<Self, DurabilityError> {
95        Self::from_parts(
96            channel_id.into(),
97            partition_count,
98            store,
99            None,
100            Some(next_sequences),
101        )
102    }
103
104    /// Creates a durable channel from validated durability configuration.
105    ///
106    /// # Errors
107    ///
108    /// Returns [`DurabilityError::ConfigError`] when `config` is ephemeral.
109    pub fn from_config(
110        channel_id: impl Into<String>,
111        config: DurabilityConfig,
112        store: Arc<dyn DurableStore>,
113    ) -> Result<Self, DurabilityError> {
114        if config.mode() == DurabilityMode::Ephemeral {
115            return Err(DurabilityError::ConfigError(
116                "durable channel requires a durable durability mode".to_owned(),
117            ));
118        }
119        Self::new(channel_id, config.partition_count(), store)
120    }
121
122    /// Returns the durable channel identifier.
123    #[must_use]
124    pub fn channel_id(&self) -> &str {
125        &self.channel_id
126    }
127
128    /// Returns the number of partitions owned by this channel.
129    #[must_use]
130    pub const fn partition_count(&self) -> usize {
131        self.partition_count
132    }
133
134    /// Returns the next expected sequence for a partition, if it exists.
135    #[must_use]
136    pub fn next_expected_sequence(&self, partition_index: usize) -> Option<u64> {
137        self.next_sequences.get(partition_index).copied()
138    }
139
140    /// Returns all next expected sequence counters.
141    #[must_use]
142    pub fn next_sequences(&self) -> &[u64] {
143        &self.next_sequences
144    }
145
146    /// Computes the partition index for an envelope without touching storage.
147    #[must_use]
148    pub fn partition_for(&self, envelope: &MessageEnvelope) -> usize {
149        route_partition(self.partition_count, self.partition_key.as_ref(), envelope)
150    }
151
152    /// Formats the haematite stream key for a channel partition.
153    #[must_use]
154    pub fn stream_key_for(&self, partition_index: usize) -> String {
155        format!("{}:{partition_index}", self.channel_id)
156    }
157
158    /// Persists an envelope before acknowledging the publish.
159    ///
160    /// # Errors
161    ///
162    /// Returns envelope serialization errors and propagates any
163    /// [`DurabilityError`] returned by [`DurableStore::append`], including
164    /// [`DurabilityError::SequenceConflict`].
165    pub async fn publish(&mut self, envelope: &MessageEnvelope) -> Result<u64, DurabilityError> {
166        let payload = envelope.serialize()?;
167        let partition_index = self.partition_for(envelope);
168        let expected_seq = self.sequence_for_append(partition_index)?;
169        let stream_key = self.stream_key_for(partition_index);
170        let assigned_seq = self
171            .store
172            .append(&stream_key, payload, expected_seq)
173            .await?;
174        let next_seq = assigned_seq.checked_add(1).ok_or_else(|| {
175            DurabilityError::ConfigError("sequence number overflow after append".to_owned())
176        })?;
177        self.set_next_sequence(partition_index, next_seq)?;
178        Ok(assigned_seq)
179    }
180
181    /// Flushes the backing store so every appended message is durably persisted.
182    ///
183    /// # Errors
184    ///
185    /// Propagates any [`DurabilityError`] returned by [`DurableStore::flush`].
186    pub async fn flush_store(&self) -> Result<(), DurabilityError> {
187        self.store.flush().await
188    }
189
190    fn from_parts(
191        channel_id: String,
192        partition_count: usize,
193        store: Arc<dyn DurableStore>,
194        partition_key: Option<PartitionKey>,
195        next_sequences: Option<Vec<u64>>,
196    ) -> Result<Self, DurabilityError> {
197        validate_partition_count(partition_count)?;
198        let next_sequences = next_sequences.unwrap_or_else(|| vec![0; partition_count]);
199        if next_sequences.len() != partition_count {
200            return Err(DurabilityError::ConfigError(
201                "recovered sequence count must match partition_count".to_owned(),
202            ));
203        }
204        Ok(Self {
205            channel_id,
206            partition_count,
207            partition_key,
208            next_sequences,
209            store,
210        })
211    }
212
213    fn sequence_for_append(&self, partition_index: usize) -> Result<u64, DurabilityError> {
214        self.next_sequences
215            .get(partition_index)
216            .copied()
217            .ok_or_else(|| {
218                DurabilityError::ConfigError("partition sequence state missing".to_owned())
219            })
220    }
221
222    fn set_next_sequence(
223        &mut self,
224        partition_index: usize,
225        next_sequence: u64,
226    ) -> Result<(), DurabilityError> {
227        let Some(sequence) = self.next_sequences.get_mut(partition_index) else {
228            return Err(DurabilityError::ConfigError(
229                "partition sequence state missing".to_owned(),
230            ));
231        };
232        *sequence = next_sequence;
233        Ok(())
234    }
235}
236
237impl fmt::Debug for DurableChannel {
238    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
239        formatter
240            .debug_struct("DurableChannel")
241            .field("channel_id", &self.channel_id)
242            .field("partition_count", &self.partition_count)
243            .field("partition_key_configured", &self.partition_key.is_some())
244            .field("next_sequences", &self.next_sequences)
245            .field("store", &self.store)
246            .finish()
247    }
248}
249
250/// Ephemeral channel state with no durable store reference.
251#[derive(Clone)]
252pub struct EphemeralChannel {
253    channel_id: String,
254    partition_count: usize,
255    partition_key: Option<PartitionKey>,
256}
257
258impl EphemeralChannel {
259    /// Creates an ephemeral channel without requiring any durable store.
260    ///
261    /// # Errors
262    ///
263    /// Returns [`DurabilityError::ConfigError`] when `partition_count` is zero.
264    pub fn new(
265        channel_id: impl Into<String>,
266        partition_count: usize,
267    ) -> Result<Self, DurabilityError> {
268        Self::from_parts(channel_id.into(), partition_count, None)
269    }
270
271    /// Creates an ephemeral channel with a caller-provided partition key function.
272    ///
273    /// # Errors
274    ///
275    /// Returns [`DurabilityError::ConfigError`] when `partition_count` is zero.
276    pub fn with_partition_key<F>(
277        channel_id: impl Into<String>,
278        partition_count: usize,
279        partition_key: F,
280    ) -> Result<Self, DurabilityError>
281    where
282        F: Fn(&MessageEnvelope) -> u64 + Send + Sync + 'static,
283    {
284        Self::from_parts(
285            channel_id.into(),
286            partition_count,
287            Some(PartitionKey::new(partition_key)),
288        )
289    }
290
291    /// Creates an ephemeral channel from validated durability configuration.
292    ///
293    /// # Errors
294    ///
295    /// Returns [`DurabilityError::ConfigError`] when `config` is not ephemeral.
296    pub fn from_config(
297        channel_id: impl Into<String>,
298        config: DurabilityConfig,
299    ) -> Result<Self, DurabilityError> {
300        if config.mode() != DurabilityMode::Ephemeral {
301            return Err(DurabilityError::ConfigError(
302                "ephemeral channel requires Ephemeral durability mode".to_owned(),
303            ));
304        }
305        Self::new(channel_id, config.partition_count())
306    }
307
308    /// Returns the channel identifier.
309    #[must_use]
310    pub fn channel_id(&self) -> &str {
311        &self.channel_id
312    }
313
314    /// Returns the number of configured partitions.
315    #[must_use]
316    pub const fn partition_count(&self) -> usize {
317        self.partition_count
318    }
319
320    /// Computes the partition index for an envelope without touching storage.
321    #[must_use]
322    pub fn partition_for(&self, envelope: &MessageEnvelope) -> usize {
323        route_partition(self.partition_count, self.partition_key.as_ref(), envelope)
324    }
325
326    fn from_parts(
327        channel_id: String,
328        partition_count: usize,
329        partition_key: Option<PartitionKey>,
330    ) -> Result<Self, DurabilityError> {
331        validate_partition_count(partition_count)?;
332        Ok(Self {
333            channel_id,
334            partition_count,
335            partition_key,
336        })
337    }
338}
339
340impl fmt::Debug for EphemeralChannel {
341    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
342        formatter
343            .debug_struct("EphemeralChannel")
344            .field("channel_id", &self.channel_id)
345            .field("partition_count", &self.partition_count)
346            .field("partition_key_configured", &self.partition_key.is_some())
347            .finish()
348    }
349}
350
351fn route_partition(
352    partition_count: usize,
353    partition_key: Option<&PartitionKey>,
354    envelope: &MessageEnvelope,
355) -> usize {
356    if partition_count == 1 {
357        return 0;
358    }
359
360    let Some(partition_key) = partition_key else {
361        return 0;
362    };
363
364    let Ok(partition_count_u64) = u64::try_from(partition_count) else {
365        return 0;
366    };
367    let routed = partition_key.apply(envelope) % partition_count_u64;
368    usize::try_from(routed).unwrap_or_else(|_| partition_count.saturating_sub(1))
369}
370
371fn validate_partition_count(partition_count: usize) -> Result<(), DurabilityError> {
372    if partition_count == 0 {
373        return Err(DurabilityError::ConfigError(
374            "partition_count must be at least 1".to_owned(),
375        ));
376    }
377    Ok(())
378}