liminal/durability/channel/
storage.rs1use std::fmt;
2use std::sync::Arc;
3
4use super::MessageEnvelope;
5use crate::durability::{DurabilityConfig, DurabilityError, DurabilityMode, DurableStore};
6
7#[derive(Clone)]
9pub struct PartitionKey {
10 function: Arc<dyn Fn(&MessageEnvelope) -> u64 + Send + Sync + 'static>,
11}
12
13impl PartitionKey {
14 #[must_use]
16 pub fn new<F>(function: F) -> Self
17 where
18 F: Fn(&MessageEnvelope) -> u64 + Send + Sync + 'static,
19 {
20 Self {
21 function: Arc::new(function),
22 }
23 }
24
25 fn apply(&self, envelope: &MessageEnvelope) -> u64 {
26 (self.function)(envelope)
27 }
28}
29
30impl fmt::Debug for PartitionKey {
31 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
32 formatter.write_str("PartitionKey(<closure>)")
33 }
34}
35
36#[derive(Clone)]
38pub struct DurableChannel {
39 channel_id: String,
40 partition_count: usize,
41 partition_key: Option<PartitionKey>,
42 next_sequences: Vec<u64>,
43 store: Arc<dyn DurableStore>,
44}
45
46impl DurableChannel {
47 pub fn new(
53 channel_id: impl Into<String>,
54 partition_count: usize,
55 store: Arc<dyn DurableStore>,
56 ) -> Result<Self, DurabilityError> {
57 Self::from_parts(channel_id.into(), partition_count, store, None, None)
58 }
59
60 pub fn with_partition_key<F>(
66 channel_id: impl Into<String>,
67 partition_count: usize,
68 store: Arc<dyn DurableStore>,
69 partition_key: F,
70 ) -> Result<Self, DurabilityError>
71 where
72 F: Fn(&MessageEnvelope) -> u64 + Send + Sync + 'static,
73 {
74 Self::from_parts(
75 channel_id.into(),
76 partition_count,
77 store,
78 Some(PartitionKey::new(partition_key)),
79 None,
80 )
81 }
82
83 pub fn from_recovered_sequences(
90 channel_id: impl Into<String>,
91 partition_count: usize,
92 store: Arc<dyn DurableStore>,
93 next_sequences: Vec<u64>,
94 ) -> Result<Self, DurabilityError> {
95 Self::from_parts(
96 channel_id.into(),
97 partition_count,
98 store,
99 None,
100 Some(next_sequences),
101 )
102 }
103
104 pub fn from_config(
110 channel_id: impl Into<String>,
111 config: DurabilityConfig,
112 store: Arc<dyn DurableStore>,
113 ) -> Result<Self, DurabilityError> {
114 if config.mode() == DurabilityMode::Ephemeral {
115 return Err(DurabilityError::ConfigError(
116 "durable channel requires a durable durability mode".to_owned(),
117 ));
118 }
119 Self::new(channel_id, config.partition_count(), store)
120 }
121
122 #[must_use]
124 pub fn channel_id(&self) -> &str {
125 &self.channel_id
126 }
127
128 #[must_use]
130 pub const fn partition_count(&self) -> usize {
131 self.partition_count
132 }
133
134 #[must_use]
136 pub fn next_expected_sequence(&self, partition_index: usize) -> Option<u64> {
137 self.next_sequences.get(partition_index).copied()
138 }
139
140 #[must_use]
142 pub fn next_sequences(&self) -> &[u64] {
143 &self.next_sequences
144 }
145
146 #[must_use]
148 pub fn partition_for(&self, envelope: &MessageEnvelope) -> usize {
149 route_partition(self.partition_count, self.partition_key.as_ref(), envelope)
150 }
151
152 #[must_use]
154 pub fn stream_key_for(&self, partition_index: usize) -> String {
155 format!("{}:{partition_index}", self.channel_id)
156 }
157
158 pub async fn publish(&mut self, envelope: &MessageEnvelope) -> Result<u64, DurabilityError> {
166 let payload = envelope.serialize()?;
167 let partition_index = self.partition_for(envelope);
168 let expected_seq = self.sequence_for_append(partition_index)?;
169 let stream_key = self.stream_key_for(partition_index);
170 let assigned_seq = self
171 .store
172 .append(&stream_key, payload, expected_seq)
173 .await?;
174 let next_seq = assigned_seq.checked_add(1).ok_or_else(|| {
175 DurabilityError::ConfigError("sequence number overflow after append".to_owned())
176 })?;
177 self.set_next_sequence(partition_index, next_seq)?;
178 Ok(assigned_seq)
179 }
180
181 pub async fn flush_store(&self) -> Result<(), DurabilityError> {
187 self.store.flush().await
188 }
189
190 fn from_parts(
191 channel_id: String,
192 partition_count: usize,
193 store: Arc<dyn DurableStore>,
194 partition_key: Option<PartitionKey>,
195 next_sequences: Option<Vec<u64>>,
196 ) -> Result<Self, DurabilityError> {
197 validate_partition_count(partition_count)?;
198 let next_sequences = next_sequences.unwrap_or_else(|| vec![0; partition_count]);
199 if next_sequences.len() != partition_count {
200 return Err(DurabilityError::ConfigError(
201 "recovered sequence count must match partition_count".to_owned(),
202 ));
203 }
204 Ok(Self {
205 channel_id,
206 partition_count,
207 partition_key,
208 next_sequences,
209 store,
210 })
211 }
212
213 fn sequence_for_append(&self, partition_index: usize) -> Result<u64, DurabilityError> {
214 self.next_sequences
215 .get(partition_index)
216 .copied()
217 .ok_or_else(|| {
218 DurabilityError::ConfigError("partition sequence state missing".to_owned())
219 })
220 }
221
222 fn set_next_sequence(
223 &mut self,
224 partition_index: usize,
225 next_sequence: u64,
226 ) -> Result<(), DurabilityError> {
227 let Some(sequence) = self.next_sequences.get_mut(partition_index) else {
228 return Err(DurabilityError::ConfigError(
229 "partition sequence state missing".to_owned(),
230 ));
231 };
232 *sequence = next_sequence;
233 Ok(())
234 }
235}
236
237impl fmt::Debug for DurableChannel {
238 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
239 formatter
240 .debug_struct("DurableChannel")
241 .field("channel_id", &self.channel_id)
242 .field("partition_count", &self.partition_count)
243 .field("partition_key_configured", &self.partition_key.is_some())
244 .field("next_sequences", &self.next_sequences)
245 .field("store", &self.store)
246 .finish()
247 }
248}
249
250#[derive(Clone)]
252pub struct EphemeralChannel {
253 channel_id: String,
254 partition_count: usize,
255 partition_key: Option<PartitionKey>,
256}
257
258impl EphemeralChannel {
259 pub fn new(
265 channel_id: impl Into<String>,
266 partition_count: usize,
267 ) -> Result<Self, DurabilityError> {
268 Self::from_parts(channel_id.into(), partition_count, None)
269 }
270
271 pub fn with_partition_key<F>(
277 channel_id: impl Into<String>,
278 partition_count: usize,
279 partition_key: F,
280 ) -> Result<Self, DurabilityError>
281 where
282 F: Fn(&MessageEnvelope) -> u64 + Send + Sync + 'static,
283 {
284 Self::from_parts(
285 channel_id.into(),
286 partition_count,
287 Some(PartitionKey::new(partition_key)),
288 )
289 }
290
291 pub fn from_config(
297 channel_id: impl Into<String>,
298 config: DurabilityConfig,
299 ) -> Result<Self, DurabilityError> {
300 if config.mode() != DurabilityMode::Ephemeral {
301 return Err(DurabilityError::ConfigError(
302 "ephemeral channel requires Ephemeral durability mode".to_owned(),
303 ));
304 }
305 Self::new(channel_id, config.partition_count())
306 }
307
308 #[must_use]
310 pub fn channel_id(&self) -> &str {
311 &self.channel_id
312 }
313
314 #[must_use]
316 pub const fn partition_count(&self) -> usize {
317 self.partition_count
318 }
319
320 #[must_use]
322 pub fn partition_for(&self, envelope: &MessageEnvelope) -> usize {
323 route_partition(self.partition_count, self.partition_key.as_ref(), envelope)
324 }
325
326 fn from_parts(
327 channel_id: String,
328 partition_count: usize,
329 partition_key: Option<PartitionKey>,
330 ) -> Result<Self, DurabilityError> {
331 validate_partition_count(partition_count)?;
332 Ok(Self {
333 channel_id,
334 partition_count,
335 partition_key,
336 })
337 }
338}
339
340impl fmt::Debug for EphemeralChannel {
341 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
342 formatter
343 .debug_struct("EphemeralChannel")
344 .field("channel_id", &self.channel_id)
345 .field("partition_count", &self.partition_count)
346 .field("partition_key_configured", &self.partition_key.is_some())
347 .finish()
348 }
349}
350
351fn route_partition(
352 partition_count: usize,
353 partition_key: Option<&PartitionKey>,
354 envelope: &MessageEnvelope,
355) -> usize {
356 if partition_count == 1 {
357 return 0;
358 }
359
360 let Some(partition_key) = partition_key else {
361 return 0;
362 };
363
364 let Ok(partition_count_u64) = u64::try_from(partition_count) else {
365 return 0;
366 };
367 let routed = partition_key.apply(envelope) % partition_count_u64;
368 usize::try_from(routed).unwrap_or_else(|_| partition_count.saturating_sub(1))
369}
370
371fn validate_partition_count(partition_count: usize) -> Result<(), DurabilityError> {
372 if partition_count == 0 {
373 return Err(DurabilityError::ConfigError(
374 "partition_count must be at least 1".to_owned(),
375 ));
376 }
377 Ok(())
378}