1use std::fmt;
2use std::sync::Arc;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use super::{DurabilityError, DurableStore, ProcessingReceipt, StoredEntry};
6use codec::DedupRecord;
7pub use sweep::{DedupSweepReport, DedupSweeper};
8
9mod codec;
10mod sweep;
11#[cfg(test)]
12mod tests;
13
14const READ_BATCH_SIZE: usize = 1_024;
15
16#[must_use]
18pub fn key_hash(idempotency_key: &str) -> String {
19 let mut hash = 0xcbf2_9ce4_8422_2325_u64;
20 for byte in idempotency_key.as_bytes() {
21 hash ^= u64::from(*byte);
22 hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
23 }
24 format!("{hash:016x}")
25}
26
27#[derive(Clone, PartialEq, Eq)]
29pub struct DedupEntry {
30 idempotency_key: String,
31 receipt: Option<Vec<u8>>,
32 timestamp_millis: u64,
33}
34
35impl DedupEntry {
36 #[must_use]
38 pub fn new(
39 idempotency_key: impl Into<String>,
40 receipt: Option<Vec<u8>>,
41 timestamp_millis: u64,
42 ) -> Self {
43 Self {
44 idempotency_key: idempotency_key.into(),
45 receipt,
46 timestamp_millis,
47 }
48 }
49
50 #[must_use]
52 pub fn idempotency_key(&self) -> &str {
53 &self.idempotency_key
54 }
55
56 #[must_use]
58 pub fn receipt(&self) -> Option<&[u8]> {
59 self.receipt.as_deref()
60 }
61
62 #[must_use]
64 pub const fn timestamp_millis(&self) -> u64 {
65 self.timestamp_millis
66 }
67
68 pub fn serialize(&self) -> Result<Vec<u8>, DurabilityError> {
74 DedupRecord::Active(self.clone()).serialize()
75 }
76
77 pub fn deserialize(bytes: &[u8]) -> Result<Self, DurabilityError> {
83 match DedupRecord::deserialize(bytes)? {
84 DedupRecord::Active(entry) => Ok(entry),
85 DedupRecord::Tombstone { .. } => Err(DurabilityError::EnvelopeError(
86 "dedup tombstone is not an active entry".to_owned(),
87 )),
88 }
89 }
90}
91
92impl fmt::Debug for DedupEntry {
93 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
94 formatter
95 .debug_struct("DedupEntry")
96 .field("idempotency_key", &self.idempotency_key)
97 .field("receipt_bytes", &self.receipt.as_ref().map(Vec::len))
98 .field("timestamp_millis", &self.timestamp_millis)
99 .finish()
100 }
101}
102
103#[derive(Clone, Debug, PartialEq, Eq)]
105pub enum DedupDecision {
106 Claimed,
108 Completed(ProcessingReceipt),
110 InFlight,
112}
113
114#[derive(Clone)]
116pub struct DedupCache {
117 store: Arc<dyn DurableStore>,
118 namespace: String,
119}
120
121impl DedupCache {
122 #[must_use]
124 pub fn new(store: Arc<dyn DurableStore>, namespace: impl Into<String>) -> Self {
125 Self {
126 store,
127 namespace: namespace.into(),
128 }
129 }
130
131 #[must_use]
133 pub fn namespace(&self) -> &str {
134 &self.namespace
135 }
136
137 #[must_use]
139 pub fn stream_key_for(&self, idempotency_key: &str) -> String {
140 format!("{}:{}", self.namespace, key_hash(idempotency_key))
141 }
142
143 pub async fn lookup(
150 &self,
151 idempotency_key: &str,
152 ) -> Result<Option<DedupDecision>, DurabilityError> {
153 let stream_key = self.stream_key_for(idempotency_key);
154 let snapshot = self.load_snapshot(&stream_key, idempotency_key).await?;
155 Ok(snapshot.current.as_ref().map(decision_for_entry))
156 }
157
158 pub async fn claim_or_get(
167 &self,
168 idempotency_key: &str,
169 timestamp_millis: u64,
170 ) -> Result<DedupDecision, DurabilityError> {
171 let stream_key = self.stream_key_for(idempotency_key);
172 let snapshot = self.load_snapshot(&stream_key, idempotency_key).await?;
173 if let Some(entry) = snapshot.current.as_ref() {
174 return Ok(decision_for_entry(entry));
175 }
176
177 let entry = DedupEntry::new(idempotency_key, None, timestamp_millis);
178 match self
179 .store
180 .append(&stream_key, entry.serialize()?, snapshot.next_seq)
181 .await
182 {
183 Ok(_) => Ok(DedupDecision::Claimed),
184 Err(DurabilityError::SequenceConflict { expected, actual }) => {
185 self.decision_after_conflict(&stream_key, idempotency_key, expected, actual)
186 .await
187 }
188 Err(error) => Err(error),
189 }
190 }
191
192 pub async fn complete_receipt(
200 &self,
201 idempotency_key: &str,
202 receipt: ProcessingReceipt,
203 ) -> Result<(), DurabilityError> {
204 self.complete_receipt_at(idempotency_key, receipt, current_epoch_millis()?)
205 .await
206 }
207
208 async fn complete_receipt_at(
219 &self,
220 idempotency_key: &str,
221 receipt: ProcessingReceipt,
222 timestamp_millis: u64,
223 ) -> Result<(), DurabilityError> {
224 let stream_key = self.stream_key_for(idempotency_key);
225 let snapshot = self.load_snapshot(&stream_key, idempotency_key).await?;
226 let Some(entry) = snapshot.current.as_ref() else {
227 return Err(DurabilityError::DedupCollision {
228 key: idempotency_key.to_owned(),
229 });
230 };
231
232 let receipt_bytes = receipt.into_bytes();
233 if let Some(existing_receipt) = entry.receipt() {
234 if existing_receipt == receipt_bytes.as_slice() {
235 return Ok(());
236 }
237 return Err(DurabilityError::DedupCollision {
238 key: idempotency_key.to_owned(),
239 });
240 }
241
242 let completed = DedupEntry::new(
243 entry.idempotency_key().to_owned(),
244 Some(receipt_bytes.clone()),
245 timestamp_millis,
246 );
247 match self
248 .store
249 .append(&stream_key, completed.serialize()?, snapshot.next_seq)
250 .await
251 {
252 Ok(_) => Ok(()),
253 Err(DurabilityError::SequenceConflict { expected, actual }) => {
254 self.confirm_matching_receipt(
255 &stream_key,
256 idempotency_key,
257 &receipt_bytes,
258 expected,
259 actual,
260 )
261 .await
262 }
263 Err(error) => Err(error),
264 }
265 }
266
267 pub async fn release_claim(&self, idempotency_key: &str) -> Result<(), DurabilityError> {
286 self.release_claim_at(idempotency_key, current_epoch_millis()?)
287 .await
288 }
289
290 async fn release_claim_at(
291 &self,
292 idempotency_key: &str,
293 timestamp_millis: u64,
294 ) -> Result<(), DurabilityError> {
295 let stream_key = self.stream_key_for(idempotency_key);
296 let snapshot = self.load_snapshot(&stream_key, idempotency_key).await?;
297 let Some(entry) = snapshot.current.as_ref() else {
300 return Ok(());
301 };
302 if entry.receipt().is_some() {
303 return Ok(());
304 }
305
306 let tombstone = DedupRecord::tombstone(idempotency_key.to_owned(), timestamp_millis);
307 match self
308 .store
309 .append(&stream_key, tombstone.serialize()?, snapshot.next_seq)
310 .await
311 {
312 Ok(_) => Ok(()),
313 Err(DurabilityError::SequenceConflict { expected, actual }) => {
314 self.confirm_release_after_conflict(&stream_key, idempotency_key, expected, actual)
315 .await
316 }
317 Err(error) => Err(error),
318 }
319 }
320
321 async fn confirm_release_after_conflict(
322 &self,
323 stream_key: &str,
324 idempotency_key: &str,
325 expected: u64,
326 actual: u64,
327 ) -> Result<(), DurabilityError> {
328 let latest = self.latest_record(stream_key, idempotency_key).await?;
335 match latest {
336 Some(DedupRecord::Tombstone { .. }) => Ok(()),
337 Some(DedupRecord::Active(entry)) if entry.receipt().is_some() => Ok(()),
338 _ => Err(DurabilityError::SequenceConflict { expected, actual }),
339 }
340 }
341
342 async fn latest_record(
343 &self,
344 stream_key: &str,
345 idempotency_key: &str,
346 ) -> Result<Option<DedupRecord>, DurabilityError> {
347 let entries = self.read_stream(stream_key).await?;
348 let mut latest = None;
349 for stored in entries {
350 let record = DedupRecord::deserialize(&stored.payload)?;
351 if record.idempotency_key() != idempotency_key {
352 return Err(DurabilityError::DedupCollision {
353 key: idempotency_key.to_owned(),
354 });
355 }
356 latest = Some(record);
357 }
358 Ok(latest)
359 }
360
361 fn scan_prefix(&self) -> String {
362 format!("{}:", self.namespace)
363 }
364
365 async fn decision_after_conflict(
366 &self,
367 stream_key: &str,
368 idempotency_key: &str,
369 expected: u64,
370 actual: u64,
371 ) -> Result<DedupDecision, DurabilityError> {
372 let snapshot = self.load_snapshot(stream_key, idempotency_key).await?;
373 snapshot.current.as_ref().map_or(
374 Err(DurabilityError::SequenceConflict { expected, actual }),
375 |entry| Ok(decision_for_entry(entry)),
376 )
377 }
378
379 async fn confirm_matching_receipt(
380 &self,
381 stream_key: &str,
382 idempotency_key: &str,
383 receipt_bytes: &[u8],
384 expected: u64,
385 actual: u64,
386 ) -> Result<(), DurabilityError> {
387 let snapshot = self.load_snapshot(stream_key, idempotency_key).await?;
388 if snapshot
389 .current
390 .as_ref()
391 .and_then(DedupEntry::receipt)
392 .is_some_and(|bytes| bytes == receipt_bytes)
393 {
394 Ok(())
395 } else {
396 Err(DurabilityError::SequenceConflict { expected, actual })
397 }
398 }
399
400 async fn load_snapshot(
401 &self,
402 stream_key: &str,
403 idempotency_key: &str,
404 ) -> Result<StreamSnapshot, DurabilityError> {
405 let entries = self.read_stream(stream_key).await?;
406 let next_seq = len_to_u64(entries.len())?;
407 let mut current = None;
408 for stored in entries {
409 let record = DedupRecord::deserialize(&stored.payload)?;
410 if record.idempotency_key() != idempotency_key {
411 return Err(DurabilityError::DedupCollision {
412 key: idempotency_key.to_owned(),
413 });
414 }
415 current = Some(record);
416 }
417 Ok(StreamSnapshot {
418 current: current.and_then(DedupRecord::into_active),
419 next_seq,
420 })
421 }
422
423 async fn read_stream(&self, stream_key: &str) -> Result<Vec<StoredEntry>, DurabilityError> {
424 let mut entries = Vec::new();
425 let mut offset = 0;
426 loop {
427 let batch = self
428 .store
429 .read_from(stream_key, offset, READ_BATCH_SIZE)
430 .await?;
431 let batch_len = batch.len();
432 if batch_len == 0 {
433 break;
434 }
435 entries.extend(batch);
436 offset = offset.checked_add(len_to_u64(batch_len)?).ok_or_else(|| {
437 DurabilityError::ConfigError("dedup read offset overflow".to_owned())
438 })?;
439 if batch_len < READ_BATCH_SIZE {
440 break;
441 }
442 }
443 Ok(entries)
444 }
445}
446
447impl fmt::Debug for DedupCache {
448 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
449 formatter
450 .debug_struct("DedupCache")
451 .field("namespace", &self.namespace)
452 .field("store", &self.store)
453 .finish()
454 }
455}
456
457struct StreamSnapshot {
458 current: Option<DedupEntry>,
459 next_seq: u64,
460}
461
462fn decision_for_entry(entry: &DedupEntry) -> DedupDecision {
463 entry.receipt().map_or(DedupDecision::InFlight, |bytes| {
464 DedupDecision::Completed(ProcessingReceipt::new(bytes.to_vec()))
465 })
466}
467
468fn len_to_u64(len: usize) -> Result<u64, DurabilityError> {
469 u64::try_from(len).map_err(|error| {
470 DurabilityError::ConfigError(format!("dedup entry count cannot fit u64: {error}"))
471 })
472}
473
474fn current_epoch_millis() -> Result<u64, DurabilityError> {
475 let duration = SystemTime::now()
476 .duration_since(UNIX_EPOCH)
477 .map_err(|error| {
478 DurabilityError::ConfigError(format!("system clock is before Unix epoch: {error}"))
479 })?;
480 u64::try_from(duration.as_millis()).map_err(|error| {
481 DurabilityError::ConfigError(format!(
482 "current epoch millis cannot fit u64 for dedup receipt: {error}"
483 ))
484 })
485}