1use crate::error::StreamError;
12use crate::state::distributed_state::StateBackend;
13use std::collections::{HashMap, VecDeque};
14use std::time::{Duration, Instant};
15use uuid::Uuid;
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct MessageId {
25 pub producer_id: String,
26 pub partition: u32,
27 pub sequence: u64,
28}
29
30impl MessageId {
31 pub fn new(producer: &str, partition: u32, seq: u64) -> Self {
33 Self {
34 producer_id: producer.to_string(),
35 partition,
36 sequence: seq,
37 }
38 }
39
40 pub fn serialize(&self) -> String {
44 format!("{}/{}/{}", self.producer_id, self.partition, self.sequence)
45 }
46
47 pub fn parse(s: &str) -> Result<Self, StreamError> {
51 let parts: Vec<&str> = s.splitn(3, '/').collect();
52 if parts.len() != 3 {
53 return Err(StreamError::InvalidInput(format!(
54 "MessageId must be '<producer>/<partition>/<seq>', got: {s}"
55 )));
56 }
57
58 let partition = parts[1].parse::<u32>().map_err(|e| {
59 StreamError::InvalidInput(format!("invalid partition in MessageId: {e}"))
60 })?;
61
62 let sequence = parts[2].parse::<u64>().map_err(|e| {
63 StreamError::InvalidInput(format!("invalid sequence in MessageId: {e}"))
64 })?;
65
66 Ok(Self {
67 producer_id: parts[0].to_string(),
68 partition,
69 sequence,
70 })
71 }
72}
73
74#[derive(Debug, Clone)]
78pub struct DeduplicationConfig {
79 pub window_duration: Duration,
81 pub max_tracked: usize,
83}
84
85impl Default for DeduplicationConfig {
86 fn default() -> Self {
87 Self {
88 window_duration: Duration::from_secs(300), max_tracked: 100_000,
90 }
91 }
92}
93
94pub struct DeduplicationLog {
99 config: DeduplicationConfig,
100 processed: HashMap<MessageId, Instant>,
102 eviction_queue: VecDeque<(MessageId, Instant)>,
104}
105
106impl DeduplicationLog {
107 pub fn new(config: DeduplicationConfig) -> Self {
109 Self {
110 processed: HashMap::new(),
111 eviction_queue: VecDeque::new(),
112 config,
113 }
114 }
115
116 pub fn is_duplicate(&self, id: &MessageId) -> bool {
118 match self.processed.get(id) {
119 None => false,
120 Some(&processed_at) => {
121 processed_at.elapsed() < self.config.window_duration
123 }
124 }
125 }
126
127 pub fn mark_processed(&mut self, id: MessageId) {
131 let now = Instant::now();
132
133 while self.processed.len() >= self.config.max_tracked {
135 if let Some((oldest_id, _)) = self.eviction_queue.pop_front() {
136 self.processed.remove(&oldest_id);
137 } else {
138 break;
139 }
140 }
141
142 self.eviction_queue.push_back((id.clone(), now));
143 self.processed.insert(id, now);
144 }
145
146 pub fn evict_expired(&mut self) -> usize {
150 let deadline = self.config.window_duration;
151 let mut evicted = 0usize;
152
153 while let Some((id, ts)) = self.eviction_queue.front() {
154 if ts.elapsed() >= deadline {
155 let id = id.clone();
156 self.eviction_queue.pop_front();
157 self.processed.remove(&id);
158 evicted += 1;
159 } else {
160 break;
161 }
162 }
163
164 evicted
165 }
166
167 pub fn size(&self) -> usize {
169 self.processed.len()
170 }
171}
172
173pub struct ExactlyOnceTransaction {
181 pub transaction_id: String,
183 pub messages: Vec<MessageId>,
185 pub state_changes: Vec<(Vec<u8>, Vec<u8>)>,
187 pub started_at: Instant,
188 pub committed: bool,
189}
190
191impl Default for ExactlyOnceTransaction {
192 fn default() -> Self {
193 Self::new()
194 }
195}
196
197impl ExactlyOnceTransaction {
198 pub fn new() -> Self {
200 Self {
201 transaction_id: Uuid::new_v4().to_string(),
202 messages: Vec::new(),
203 state_changes: Vec::new(),
204 started_at: Instant::now(),
205 committed: false,
206 }
207 }
208
209 pub fn add_message(&mut self, id: MessageId) {
211 self.messages.push(id);
212 }
213
214 pub fn add_state_change(&mut self, key: Vec<u8>, value: Vec<u8>) {
216 self.state_changes.push((key, value));
217 }
218
219 pub fn commit(
228 mut self,
229 dedup_log: &mut DeduplicationLog,
230 backend: &dyn StateBackend,
231 ) -> Result<(), StreamError> {
232 if self.committed {
233 return Err(StreamError::InvalidOperation(format!(
234 "transaction {} already committed",
235 self.transaction_id
236 )));
237 }
238
239 for (key, value) in &self.state_changes {
241 backend.put(key, value)?;
242 }
243
244 for id in self.messages.drain(..) {
246 dedup_log.mark_processed(id);
247 }
248
249 self.committed = true;
250 Ok(())
251 }
252}
253
254pub struct ExactlyOnceProcessor {
259 dedup_log: DeduplicationLog,
260 backend: std::sync::Arc<dyn StateBackend>,
261 duplicates_filtered: u64,
263 messages_processed: u64,
265}
266
267impl ExactlyOnceProcessor {
268 pub fn new(config: DeduplicationConfig, backend: std::sync::Arc<dyn StateBackend>) -> Self {
270 Self {
271 dedup_log: DeduplicationLog::new(config),
272 backend,
273 duplicates_filtered: 0,
274 messages_processed: 0,
275 }
276 }
277
278 pub fn process<R, F>(&mut self, id: MessageId, processor: F) -> Result<Option<R>, StreamError>
287 where
288 F: FnOnce(&mut ExactlyOnceTransaction) -> Result<R, StreamError>,
289 {
290 if self.dedup_log.is_duplicate(&id) {
292 self.duplicates_filtered += 1;
293 return Ok(None);
294 }
295
296 let mut txn = ExactlyOnceTransaction::new();
297 txn.add_message(id);
298
299 let result = processor(&mut txn)?;
300 txn.commit(&mut self.dedup_log, self.backend.as_ref())?;
301 self.messages_processed += 1;
302
303 Ok(Some(result))
304 }
305
306 pub fn maintenance(&mut self) -> usize {
308 self.dedup_log.evict_expired()
309 }
310
311 pub fn stats(&self) -> ExactlyOnceStats {
313 ExactlyOnceStats {
314 dedup_window_size: self.dedup_log.size(),
315 duplicates_filtered: self.duplicates_filtered,
316 messages_processed: self.messages_processed,
317 }
318 }
319}
320
321#[derive(Debug, Clone)]
323pub struct ExactlyOnceStats {
324 pub dedup_window_size: usize,
325 pub duplicates_filtered: u64,
326 pub messages_processed: u64,
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use crate::state::distributed_state::InMemoryStateBackend;
333 use std::sync::Arc;
334
335 #[test]
338 fn test_message_id_round_trip() {
339 let id = MessageId::new("producer-1", 3, 42);
340 let serialized = id.serialize();
341 assert_eq!(serialized, "producer-1/3/42");
342
343 let parsed = MessageId::parse(&serialized).unwrap();
344 assert_eq!(parsed, id);
345 }
346
347 #[test]
348 fn test_message_id_parse_error() {
349 assert!(MessageId::parse("bad").is_err());
350 assert!(MessageId::parse("a/b").is_err());
351 assert!(MessageId::parse("a/notnum/1").is_err());
352 assert!(MessageId::parse("a/1/notnum").is_err());
353 }
354
355 #[test]
358 fn test_dedup_log_basic() {
359 let config = DeduplicationConfig {
360 window_duration: Duration::from_secs(60),
361 max_tracked: 1000,
362 };
363 let mut log = DeduplicationLog::new(config);
364
365 let id = MessageId::new("p", 0, 1);
366 assert!(!log.is_duplicate(&id));
367
368 log.mark_processed(id.clone());
369 assert!(log.is_duplicate(&id));
370 assert_eq!(log.size(), 1);
371 }
372
373 #[test]
374 fn test_dedup_log_capacity_eviction() {
375 let config = DeduplicationConfig {
376 window_duration: Duration::from_secs(60),
377 max_tracked: 3,
378 };
379 let mut log = DeduplicationLog::new(config);
380
381 for i in 0..5u64 {
382 log.mark_processed(MessageId::new("p", 0, i));
383 }
384
385 assert!(log.size() <= 3);
387 }
388
389 #[test]
390 fn test_dedup_log_expiry() {
391 let config = DeduplicationConfig {
392 window_duration: Duration::from_millis(50),
394 max_tracked: 1000,
395 };
396 let mut log = DeduplicationLog::new(config);
397
398 let id = MessageId::new("p", 0, 99);
399 log.mark_processed(id.clone());
400 assert!(log.is_duplicate(&id));
401
402 std::thread::sleep(Duration::from_millis(60));
403
404 assert!(!log.is_duplicate(&id));
406
407 let evicted = log.evict_expired();
409 assert_eq!(evicted, 1);
410 assert_eq!(log.size(), 0);
411 }
412
413 #[test]
416 fn test_transaction_commit_applies_state() {
417 let backend = InMemoryStateBackend::new();
418 let mut dedup = DeduplicationLog::new(DeduplicationConfig::default());
419
420 let mut txn = ExactlyOnceTransaction::new();
421 txn.add_message(MessageId::new("p", 0, 1));
422 txn.add_state_change(
423 b"counter".to_vec(),
424 b"\x01\x00\x00\x00\x00\x00\x00\x00".to_vec(),
425 );
426
427 txn.commit(&mut dedup, &backend).unwrap();
428
429 assert_eq!(
430 backend.get(b"counter").unwrap().as_deref(),
431 Some(b"\x01\x00\x00\x00\x00\x00\x00\x00".as_ref())
432 );
433 assert!(dedup.is_duplicate(&MessageId::new("p", 0, 1)));
434 }
435
436 #[test]
437 fn test_transaction_double_commit_fails() {
438 let backend = InMemoryStateBackend::new();
439 let mut dedup = DeduplicationLog::new(DeduplicationConfig::default());
440
441 let txn = ExactlyOnceTransaction::new();
442 txn.commit(&mut dedup, &backend).unwrap();
444
445 }
451
452 #[test]
455 fn test_processor_exactly_once() {
456 let backend = Arc::new(InMemoryStateBackend::new());
457 let mut processor = ExactlyOnceProcessor::new(DeduplicationConfig::default(), backend);
458
459 let id = MessageId::new("prod", 0, 1);
460 let mut call_count = 0u32;
461
462 let result = processor
464 .process(id.clone(), |_txn| {
465 call_count += 1;
466 Ok(42u32)
467 })
468 .unwrap();
469 assert_eq!(result, Some(42u32));
470 assert_eq!(call_count, 1);
471
472 let result = processor
474 .process(id, |_txn| {
475 call_count += 1;
476 Ok(99u32)
477 })
478 .unwrap();
479 assert_eq!(result, None);
480 assert_eq!(call_count, 1); let stats = processor.stats();
483 assert_eq!(stats.messages_processed, 1);
484 assert_eq!(stats.duplicates_filtered, 1);
485 }
486}