Skip to main content

liminal/durability/
dedup.rs

1use std::fmt;
2use std::sync::Arc;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use super::{DurabilityError, DurableStore, ProcessingReceipt, StoredEntry};
6use codec::DedupRecord;
7pub use sweep::{DedupSweepReport, DedupSweeper};
8
9mod codec;
10mod sweep;
11#[cfg(test)]
12mod tests;
13
14const READ_BATCH_SIZE: usize = 1_024;
15
16/// Hashes a producer idempotency key into a deterministic stream-key suffix.
17#[must_use]
18pub fn key_hash(idempotency_key: &str) -> String {
19    let mut hash = 0xcbf2_9ce4_8422_2325_u64;
20    for byte in idempotency_key.as_bytes() {
21        hash ^= u64::from(*byte);
22        hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
23    }
24    format!("{hash:016x}")
25}
26
27/// Persisted dedup cache entry for one producer idempotency key.
28#[derive(Clone, PartialEq, Eq)]
29pub struct DedupEntry {
30    idempotency_key: String,
31    receipt: Option<Vec<u8>>,
32    timestamp_millis: u64,
33}
34
35impl DedupEntry {
36    /// Builds an entry containing the original key, optional receipt bytes, and epoch millis.
37    #[must_use]
38    pub fn new(
39        idempotency_key: impl Into<String>,
40        receipt: Option<Vec<u8>>,
41        timestamp_millis: u64,
42    ) -> Self {
43        Self {
44            idempotency_key: idempotency_key.into(),
45            receipt,
46            timestamp_millis,
47        }
48    }
49
50    /// Returns the original idempotency key stored with this entry.
51    #[must_use]
52    pub fn idempotency_key(&self) -> &str {
53        &self.idempotency_key
54    }
55
56    /// Returns the stored opaque receipt bytes, when processing has completed.
57    #[must_use]
58    pub fn receipt(&self) -> Option<&[u8]> {
59        self.receipt.as_deref()
60    }
61
62    /// Returns the entry timestamp in epoch milliseconds.
63    #[must_use]
64    pub const fn timestamp_millis(&self) -> u64 {
65        self.timestamp_millis
66    }
67
68    /// Serializes this active cache entry into deterministic storage bytes.
69    ///
70    /// # Errors
71    ///
72    /// Returns [`DurabilityError::EnvelopeError`] when a field length cannot be encoded.
73    pub fn serialize(&self) -> Result<Vec<u8>, DurabilityError> {
74        DedupRecord::Active(self.clone()).serialize()
75    }
76
77    /// Deserializes an active cache entry previously produced by [`Self::serialize`].
78    ///
79    /// # Errors
80    ///
81    /// Returns [`DurabilityError::EnvelopeError`] when bytes are malformed or contain a tombstone.
82    pub fn deserialize(bytes: &[u8]) -> Result<Self, DurabilityError> {
83        match DedupRecord::deserialize(bytes)? {
84            DedupRecord::Active(entry) => Ok(entry),
85            DedupRecord::Tombstone { .. } => Err(DurabilityError::EnvelopeError(
86                "dedup tombstone is not an active entry".to_owned(),
87            )),
88        }
89    }
90}
91
92impl fmt::Debug for DedupEntry {
93    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
94        formatter
95            .debug_struct("DedupEntry")
96            .field("idempotency_key", &self.idempotency_key)
97            .field("receipt_bytes", &self.receipt.as_ref().map(Vec::len))
98            .field("timestamp_millis", &self.timestamp_millis)
99            .finish()
100    }
101}
102
103/// Result of checking or claiming an idempotency key.
104#[derive(Clone, Debug, PartialEq, Eq)]
105pub enum DedupDecision {
106    /// The caller claimed the key and may deliver the message for processing.
107    Claimed,
108    /// The key was already completed; return this receipt without re-delivery.
109    Completed(ProcessingReceipt),
110    /// The key is already being processed and delivery must be deferred.
111    InFlight,
112}
113
114/// Haematite-backed idempotency-key cache for the lightweight dedup contract.
115#[derive(Clone)]
116pub struct DedupCache {
117    store: Arc<dyn DurableStore>,
118    namespace: String,
119}
120
121impl DedupCache {
122    /// Creates a dedup cache over the given durable store and namespace prefix.
123    #[must_use]
124    pub fn new(store: Arc<dyn DurableStore>, namespace: impl Into<String>) -> Self {
125        Self {
126            store,
127            namespace: namespace.into(),
128        }
129    }
130
131    /// Returns the configured dedup namespace.
132    #[must_use]
133    pub fn namespace(&self) -> &str {
134        &self.namespace
135    }
136
137    /// Formats the haematite stream key for an idempotency key.
138    #[must_use]
139    pub fn stream_key_for(&self, idempotency_key: &str) -> String {
140        format!("{}:{}", self.namespace, key_hash(idempotency_key))
141    }
142
143    /// Looks up an existing key without appending or refreshing cache state.
144    ///
145    /// # Errors
146    ///
147    /// Propagates store read errors and returns [`DurabilityError::DedupCollision`] when
148    /// the hashed stream contains a different original idempotency key.
149    pub async fn lookup(
150        &self,
151        idempotency_key: &str,
152    ) -> Result<Option<DedupDecision>, DurabilityError> {
153        let stream_key = self.stream_key_for(idempotency_key);
154        let snapshot = self.load_snapshot(&stream_key, idempotency_key).await?;
155        Ok(snapshot.current.as_ref().map(decision_for_entry))
156    }
157
158    /// Claims a new key or returns the existing completed/in-flight decision.
159    ///
160    /// Callers must only deliver the message when this returns [`DedupDecision::Claimed`].
161    ///
162    /// # Errors
163    ///
164    /// Propagates store errors and serialization errors. A concurrent first claim that wins
165    /// the append race is converted into the duplicate completed/in-flight decision.
166    pub async fn claim_or_get(
167        &self,
168        idempotency_key: &str,
169        timestamp_millis: u64,
170    ) -> Result<DedupDecision, DurabilityError> {
171        let stream_key = self.stream_key_for(idempotency_key);
172        let snapshot = self.load_snapshot(&stream_key, idempotency_key).await?;
173        if let Some(entry) = snapshot.current.as_ref() {
174            return Ok(decision_for_entry(entry));
175        }
176
177        let entry = DedupEntry::new(idempotency_key, None, timestamp_millis);
178        match self
179            .store
180            .append(&stream_key, entry.serialize()?, snapshot.next_seq)
181            .await
182        {
183            Ok(_) => Ok(DedupDecision::Claimed),
184            Err(DurabilityError::SequenceConflict { expected, actual }) => {
185                self.decision_after_conflict(&stream_key, idempotency_key, expected, actual)
186                    .await
187            }
188            Err(error) => Err(error),
189        }
190    }
191
192    /// Stores a processing receipt by appending a completed entry for an existing key.
193    ///
194    /// # Errors
195    ///
196    /// Returns [`DurabilityError::ConfigError`] when the current system time cannot be encoded.
197    /// Returns [`DurabilityError::DedupCollision`] when the key is missing or collides with
198    /// another original key, and propagates serialization/store append errors.
199    pub async fn complete_receipt(
200        &self,
201        idempotency_key: &str,
202        receipt: ProcessingReceipt,
203    ) -> Result<(), DurabilityError> {
204        self.complete_receipt_at(idempotency_key, receipt, current_epoch_millis()?)
205            .await
206    }
207
208    /// Stores a processing receipt with an explicit receipt timestamp.
209    ///
210    /// This is useful for deterministic tests and for callers that already have a trusted
211    /// processing-completion timestamp. The timestamp is the TTL anchor for the stored receipt.
212    ///
213    /// # Errors
214    ///
215    /// Returns [`DurabilityError::DedupCollision`] when the key is missing, collides with
216    /// another original key, or has already completed with different receipt bytes. Propagates
217    /// serialization/store append errors.
218    async fn complete_receipt_at(
219        &self,
220        idempotency_key: &str,
221        receipt: ProcessingReceipt,
222        timestamp_millis: u64,
223    ) -> Result<(), DurabilityError> {
224        let stream_key = self.stream_key_for(idempotency_key);
225        let snapshot = self.load_snapshot(&stream_key, idempotency_key).await?;
226        let Some(entry) = snapshot.current.as_ref() else {
227            return Err(DurabilityError::DedupCollision {
228                key: idempotency_key.to_owned(),
229            });
230        };
231
232        let receipt_bytes = receipt.into_bytes();
233        if let Some(existing_receipt) = entry.receipt() {
234            if existing_receipt == receipt_bytes.as_slice() {
235                return Ok(());
236            }
237            return Err(DurabilityError::DedupCollision {
238                key: idempotency_key.to_owned(),
239            });
240        }
241
242        let completed = DedupEntry::new(
243            entry.idempotency_key().to_owned(),
244            Some(receipt_bytes.clone()),
245            timestamp_millis,
246        );
247        match self
248            .store
249            .append(&stream_key, completed.serialize()?, snapshot.next_seq)
250            .await
251        {
252            Ok(_) => Ok(()),
253            Err(DurabilityError::SequenceConflict { expected, actual }) => {
254                self.confirm_matching_receipt(
255                    &stream_key,
256                    idempotency_key,
257                    &receipt_bytes,
258                    expected,
259                    actual,
260                )
261                .await
262            }
263            Err(error) => Err(error),
264        }
265    }
266
267    /// Releases an in-flight claim so the key becomes re-claimable.
268    ///
269    /// Callers use this on the publish failure path: a key was claimed
270    /// ([`DedupDecision::Claimed`]) but the downstream delivery failed before a
271    /// receipt could be recorded, leaving the key dangling [`DedupDecision::InFlight`]
272    /// forever and permanently suppressing every re-publish. Releasing appends a
273    /// tombstone so the next claim succeeds.
274    ///
275    /// This NEVER clobbers a stored receipt: an already-completed key (receipt
276    /// present) and an absent key are both no-ops, preserving the at-most-once
277    /// guarantee. Only a current in-flight entry (active, no receipt) is tombstoned.
278    ///
279    /// # Errors
280    ///
281    /// Propagates store read/append and serialization errors. A [`DurabilityError::SequenceConflict`]
282    /// on the tombstone append is re-checked: if the latest state is now completed
283    /// or already a tombstone the release is treated as successful, otherwise the
284    /// conflict is propagated.
285    pub async fn release_claim(&self, idempotency_key: &str) -> Result<(), DurabilityError> {
286        self.release_claim_at(idempotency_key, current_epoch_millis()?)
287            .await
288    }
289
290    async fn release_claim_at(
291        &self,
292        idempotency_key: &str,
293        timestamp_millis: u64,
294    ) -> Result<(), DurabilityError> {
295        let stream_key = self.stream_key_for(idempotency_key);
296        let snapshot = self.load_snapshot(&stream_key, idempotency_key).await?;
297        // No-op when there is nothing in flight: absent key, or a completed key
298        // whose receipt must never be clobbered (guards at-most-once).
299        let Some(entry) = snapshot.current.as_ref() else {
300            return Ok(());
301        };
302        if entry.receipt().is_some() {
303            return Ok(());
304        }
305
306        let tombstone = DedupRecord::tombstone(idempotency_key.to_owned(), timestamp_millis);
307        match self
308            .store
309            .append(&stream_key, tombstone.serialize()?, snapshot.next_seq)
310            .await
311        {
312            Ok(_) => Ok(()),
313            Err(DurabilityError::SequenceConflict { expected, actual }) => {
314                self.confirm_release_after_conflict(&stream_key, idempotency_key, expected, actual)
315                    .await
316            }
317            Err(error) => Err(error),
318        }
319    }
320
321    async fn confirm_release_after_conflict(
322        &self,
323        stream_key: &str,
324        idempotency_key: &str,
325        expected: u64,
326        actual: u64,
327    ) -> Result<(), DurabilityError> {
328        // A concurrent writer advanced the stream after our snapshot. Re-load and
329        // re-check the latest record directly (not via `into_active`, so a fresh
330        // tombstone is distinguishable): if it is now completed (receipt present)
331        // or already a tombstone, the in-flight claim is gone and the release goal
332        // is met. A still-active no-receipt entry means a legitimate re-claim won
333        // the race, so we must not clobber it -- propagate the conflict.
334        let latest = self.latest_record(stream_key, idempotency_key).await?;
335        match latest {
336            Some(DedupRecord::Tombstone { .. }) => Ok(()),
337            Some(DedupRecord::Active(entry)) if entry.receipt().is_some() => Ok(()),
338            _ => Err(DurabilityError::SequenceConflict { expected, actual }),
339        }
340    }
341
342    async fn latest_record(
343        &self,
344        stream_key: &str,
345        idempotency_key: &str,
346    ) -> Result<Option<DedupRecord>, DurabilityError> {
347        let entries = self.read_stream(stream_key).await?;
348        let mut latest = None;
349        for stored in entries {
350            let record = DedupRecord::deserialize(&stored.payload)?;
351            if record.idempotency_key() != idempotency_key {
352                return Err(DurabilityError::DedupCollision {
353                    key: idempotency_key.to_owned(),
354                });
355            }
356            latest = Some(record);
357        }
358        Ok(latest)
359    }
360
361    fn scan_prefix(&self) -> String {
362        format!("{}:", self.namespace)
363    }
364
365    async fn decision_after_conflict(
366        &self,
367        stream_key: &str,
368        idempotency_key: &str,
369        expected: u64,
370        actual: u64,
371    ) -> Result<DedupDecision, DurabilityError> {
372        let snapshot = self.load_snapshot(stream_key, idempotency_key).await?;
373        snapshot.current.as_ref().map_or(
374            Err(DurabilityError::SequenceConflict { expected, actual }),
375            |entry| Ok(decision_for_entry(entry)),
376        )
377    }
378
379    async fn confirm_matching_receipt(
380        &self,
381        stream_key: &str,
382        idempotency_key: &str,
383        receipt_bytes: &[u8],
384        expected: u64,
385        actual: u64,
386    ) -> Result<(), DurabilityError> {
387        let snapshot = self.load_snapshot(stream_key, idempotency_key).await?;
388        if snapshot
389            .current
390            .as_ref()
391            .and_then(DedupEntry::receipt)
392            .is_some_and(|bytes| bytes == receipt_bytes)
393        {
394            Ok(())
395        } else {
396            Err(DurabilityError::SequenceConflict { expected, actual })
397        }
398    }
399
400    async fn load_snapshot(
401        &self,
402        stream_key: &str,
403        idempotency_key: &str,
404    ) -> Result<StreamSnapshot, DurabilityError> {
405        let entries = self.read_stream(stream_key).await?;
406        let next_seq = len_to_u64(entries.len())?;
407        let mut current = None;
408        for stored in entries {
409            let record = DedupRecord::deserialize(&stored.payload)?;
410            if record.idempotency_key() != idempotency_key {
411                return Err(DurabilityError::DedupCollision {
412                    key: idempotency_key.to_owned(),
413                });
414            }
415            current = Some(record);
416        }
417        Ok(StreamSnapshot {
418            current: current.and_then(DedupRecord::into_active),
419            next_seq,
420        })
421    }
422
423    async fn read_stream(&self, stream_key: &str) -> Result<Vec<StoredEntry>, DurabilityError> {
424        let mut entries = Vec::new();
425        let mut offset = 0;
426        loop {
427            let batch = self
428                .store
429                .read_from(stream_key, offset, READ_BATCH_SIZE)
430                .await?;
431            let batch_len = batch.len();
432            if batch_len == 0 {
433                break;
434            }
435            entries.extend(batch);
436            offset = offset.checked_add(len_to_u64(batch_len)?).ok_or_else(|| {
437                DurabilityError::ConfigError("dedup read offset overflow".to_owned())
438            })?;
439            if batch_len < READ_BATCH_SIZE {
440                break;
441            }
442        }
443        Ok(entries)
444    }
445}
446
447impl fmt::Debug for DedupCache {
448    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
449        formatter
450            .debug_struct("DedupCache")
451            .field("namespace", &self.namespace)
452            .field("store", &self.store)
453            .finish()
454    }
455}
456
457struct StreamSnapshot {
458    current: Option<DedupEntry>,
459    next_seq: u64,
460}
461
462fn decision_for_entry(entry: &DedupEntry) -> DedupDecision {
463    entry.receipt().map_or(DedupDecision::InFlight, |bytes| {
464        DedupDecision::Completed(ProcessingReceipt::new(bytes.to_vec()))
465    })
466}
467
468fn len_to_u64(len: usize) -> Result<u64, DurabilityError> {
469    u64::try_from(len).map_err(|error| {
470        DurabilityError::ConfigError(format!("dedup entry count cannot fit u64: {error}"))
471    })
472}
473
474fn current_epoch_millis() -> Result<u64, DurabilityError> {
475    let duration = SystemTime::now()
476        .duration_since(UNIX_EPOCH)
477        .map_err(|error| {
478            DurabilityError::ConfigError(format!("system clock is before Unix epoch: {error}"))
479        })?;
480    u64::try_from(duration.as_millis()).map_err(|error| {
481        DurabilityError::ConfigError(format!(
482            "current epoch millis cannot fit u64 for dedup receipt: {error}"
483        ))
484    })
485}