1use std::collections::VecDeque;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::Arc;
22
23use parking_lot::Mutex;
24use serde::{Deserialize, Serialize};
25use thiserror::Error;
26use tracing::{debug, warn};
27
28use crate::error::StreamError;
29use crate::state::distributed_state::StateBackend;
30use crate::state::exactly_once::{
31 DeduplicationConfig, ExactlyOnceProcessor as InnerProcessor, MessageId,
32};
33
34#[derive(Debug, Error)]
38pub enum ExactlyOnceError {
39 #[error("processing error: {0}")]
41 Processing(String),
42 #[error("transaction already committed")]
44 AlreadyCommitted,
45 #[error("transaction already finalised")]
47 AlreadyFinalised,
48 #[error("invalid call: {0}")]
50 Invalid(String),
51}
52
53impl From<StreamError> for ExactlyOnceError {
54 fn from(err: StreamError) -> Self {
55 ExactlyOnceError::Processing(err.to_string())
56 }
57}
58
59pub type ExactlyOnceResult<T> = std::result::Result<T, ExactlyOnceError>;
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct IdempotentProducerConfig {
67 pub producer_id: String,
70 pub partition: u32,
72 pub initial_sequence: u64,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
79pub struct ProducerStamp {
80 pub producer_id: String,
82 pub partition: u32,
84 pub sequence: u64,
86}
87
88impl ProducerStamp {
89 pub fn message_id(&self) -> MessageId {
91 MessageId::new(&self.producer_id, self.partition, self.sequence)
92 }
93}
94
95pub struct IdempotentProducer {
97 config: IdempotentProducerConfig,
98 next_seq: AtomicU64,
99 replay_window: Mutex<VecDeque<ProducerStamp>>,
102 replay_capacity: usize,
103}
104
105impl IdempotentProducer {
106 pub fn new(config: IdempotentProducerConfig) -> Self {
108 let initial = config.initial_sequence;
109 Self {
110 config,
111 next_seq: AtomicU64::new(initial),
112 replay_window: Mutex::new(VecDeque::with_capacity(1024)),
113 replay_capacity: 1024,
114 }
115 }
116
117 pub fn producer_id(&self) -> &str {
119 &self.config.producer_id
120 }
121
122 pub fn partition(&self) -> u32 {
124 self.config.partition
125 }
126
127 pub fn current_sequence(&self) -> u64 {
130 self.next_seq.load(Ordering::Relaxed)
131 }
132
133 pub fn issue(&self) -> ProducerStamp {
136 let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
137 let stamp = ProducerStamp {
138 producer_id: self.config.producer_id.clone(),
139 partition: self.config.partition,
140 sequence: seq,
141 };
142 self.remember(stamp.clone());
143 stamp
144 }
145
146 pub fn reissue(&self, sequence: u64) -> ProducerStamp {
149 loop {
151 let cur = self.next_seq.load(Ordering::Relaxed);
152 if cur > sequence {
153 break;
154 }
155 if self
156 .next_seq
157 .compare_exchange(cur, sequence + 1, Ordering::Relaxed, Ordering::Relaxed)
158 .is_ok()
159 {
160 break;
161 }
162 }
163 let stamp = ProducerStamp {
164 producer_id: self.config.producer_id.clone(),
165 partition: self.config.partition,
166 sequence,
167 };
168 self.remember(stamp.clone());
169 stamp
170 }
171
172 pub fn was_issued(&self, sequence: u64) -> bool {
175 self.replay_window
176 .lock()
177 .iter()
178 .any(|s| s.sequence == sequence)
179 }
180
181 fn remember(&self, stamp: ProducerStamp) {
182 let mut win = self.replay_window.lock();
183 if win.len() >= self.replay_capacity {
184 win.pop_front();
185 }
186 win.push_back(stamp);
187 }
188}
189
190#[derive(Debug, Default)]
194pub struct ExactlyOnceCoordinatorStats {
195 pub messages_received: AtomicU64,
196 pub duplicates_filtered: AtomicU64,
197 pub transactions_opened: AtomicU64,
198 pub transactions_committed: AtomicU64,
199 pub transactions_aborted: AtomicU64,
200}
201
202impl ExactlyOnceCoordinatorStats {
203 pub fn snapshot(&self) -> ExactlyOnceStatsSnapshot {
205 ExactlyOnceStatsSnapshot {
206 messages_received: self.messages_received.load(Ordering::Relaxed),
207 duplicates_filtered: self.duplicates_filtered.load(Ordering::Relaxed),
208 transactions_opened: self.transactions_opened.load(Ordering::Relaxed),
209 transactions_committed: self.transactions_committed.load(Ordering::Relaxed),
210 transactions_aborted: self.transactions_aborted.load(Ordering::Relaxed),
211 }
212 }
213}
214
215#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
217pub struct ExactlyOnceStatsSnapshot {
218 pub messages_received: u64,
219 pub duplicates_filtered: u64,
220 pub transactions_opened: u64,
221 pub transactions_committed: u64,
222 pub transactions_aborted: u64,
223}
224
225#[derive(Debug, Clone, Default)]
229pub struct ExactlyOnceCoordinatorConfig {
230 pub dedup: DeduplicationConfig,
232}
233
234#[derive(Debug)]
236struct PendingTxn {
237 stamp: ProducerStamp,
239 changes: Vec<(Vec<u8>, Vec<u8>)>,
241}
242
243pub struct EndToEndExactlyOnceCoordinator {
245 config: ExactlyOnceCoordinatorConfig,
246 inner: Arc<Mutex<InnerProcessor>>,
247 backend: Arc<dyn StateBackend>,
248 pending: Mutex<std::collections::HashMap<String, PendingTxn>>,
249 stats: Arc<ExactlyOnceCoordinatorStats>,
250 next_txn: AtomicU64,
251}
252
253impl EndToEndExactlyOnceCoordinator {
254 pub fn new(config: ExactlyOnceCoordinatorConfig, backend: Arc<dyn StateBackend>) -> Self {
256 let inner = InnerProcessor::new(config.dedup.clone(), backend.clone());
257 Self {
258 config,
259 inner: Arc::new(Mutex::new(inner)),
260 backend,
261 pending: Mutex::new(std::collections::HashMap::new()),
262 stats: Arc::new(ExactlyOnceCoordinatorStats::default()),
263 next_txn: AtomicU64::new(1),
264 }
265 }
266
267 pub fn stats(&self) -> &Arc<ExactlyOnceCoordinatorStats> {
269 &self.stats
270 }
271
272 pub fn pending_transactions(&self) -> usize {
274 self.pending.lock().len()
275 }
276
277 pub fn begin_transaction(&self, stamp: ProducerStamp) -> ExactlyOnceResult<Option<String>> {
285 self.stats.messages_received.fetch_add(1, Ordering::Relaxed);
286 let id = stamp.message_id();
287 let mut inner = self.inner.lock();
288 let dup_check = inner
292 .process(id.clone(), |_txn| Ok::<bool, StreamError>(true))
293 .map_err(ExactlyOnceError::from)?;
294 match dup_check {
295 None => {
296 self.stats
297 .duplicates_filtered
298 .fetch_add(1, Ordering::Relaxed);
299 Ok(None)
300 }
301 Some(_) => {
302 let txn_id = format!("txn-{}", self.next_txn.fetch_add(1, Ordering::Relaxed));
310 self.pending.lock().insert(
311 txn_id.clone(),
312 PendingTxn {
313 stamp,
314 changes: Vec::new(),
315 },
316 );
317 self.stats
318 .transactions_opened
319 .fetch_add(1, Ordering::Relaxed);
320 Ok(Some(txn_id))
321 }
322 }
323 }
324
325 pub fn add_state_change(
327 &self,
328 txn_id: &str,
329 key: Vec<u8>,
330 value: Vec<u8>,
331 ) -> ExactlyOnceResult<()> {
332 let mut pending = self.pending.lock();
333 let txn = pending
334 .get_mut(txn_id)
335 .ok_or_else(|| ExactlyOnceError::Invalid(format!("unknown txn {txn_id}")))?;
336 txn.changes.push((key, value));
337 Ok(())
338 }
339
340 pub fn commit_transaction(&self, txn_id: &str) -> ExactlyOnceResult<()> {
342 let txn = self
343 .pending
344 .lock()
345 .remove(txn_id)
346 .ok_or_else(|| ExactlyOnceError::Invalid(format!("unknown txn {txn_id}")))?;
347 for (k, v) in &txn.changes {
348 self.backend
349 .put(k, v)
350 .map_err(|e| ExactlyOnceError::Processing(e.to_string()))?;
351 }
352 self.stats
353 .transactions_committed
354 .fetch_add(1, Ordering::Relaxed);
355 debug!(stamp = ?txn.stamp, "exactly-once: txn committed");
356 Ok(())
357 }
358
359 pub fn abort_transaction(&self, txn_id: &str) -> ExactlyOnceResult<()> {
368 let txn = self
369 .pending
370 .lock()
371 .remove(txn_id)
372 .ok_or_else(|| ExactlyOnceError::Invalid(format!("unknown txn {txn_id}")))?;
373 warn!(stamp = ?txn.stamp, "exactly-once: txn aborted");
374 self.stats
375 .transactions_aborted
376 .fetch_add(1, Ordering::Relaxed);
377 Ok(())
378 }
379
380 pub fn maintenance(&self) -> usize {
382 self.inner.lock().maintenance()
383 }
384
385 pub fn config(&self) -> &ExactlyOnceCoordinatorConfig {
387 &self.config
388 }
389}
390
391#[cfg(test)]
394mod tests {
395 use super::*;
396 use crate::state::distributed_state::InMemoryStateBackend;
397
398 fn make_backend() -> Arc<dyn StateBackend> {
399 Arc::new(InMemoryStateBackend::new())
400 }
401
402 #[test]
403 fn idempotent_producer_issues_monotonic_stamps() {
404 let producer = IdempotentProducer::new(IdempotentProducerConfig {
405 producer_id: "p1".into(),
406 partition: 0,
407 initial_sequence: 0,
408 });
409 let s1 = producer.issue();
410 let s2 = producer.issue();
411 let s3 = producer.issue();
412 assert_eq!(s1.sequence, 0);
413 assert_eq!(s2.sequence, 1);
414 assert_eq!(s3.sequence, 2);
415 assert!(producer.was_issued(0));
416 assert!(producer.was_issued(2));
417 }
418
419 #[test]
420 fn idempotent_producer_reissue_advances_sequence() {
421 let producer = IdempotentProducer::new(IdempotentProducerConfig {
422 producer_id: "p1".into(),
423 partition: 0,
424 initial_sequence: 0,
425 });
426 let s = producer.reissue(7);
427 assert_eq!(s.sequence, 7);
428 let next = producer.issue();
429 assert_eq!(next.sequence, 8);
430 }
431
432 #[test]
433 fn coordinator_filters_duplicate_messages() {
434 let coord = EndToEndExactlyOnceCoordinator::new(
435 ExactlyOnceCoordinatorConfig::default(),
436 make_backend(),
437 );
438 let stamp = ProducerStamp {
439 producer_id: "p".into(),
440 partition: 0,
441 sequence: 0,
442 };
443 let txn1 = coord.begin_transaction(stamp.clone()).expect("ok");
444 assert!(txn1.is_some());
445 let txn2 = coord.begin_transaction(stamp.clone()).expect("ok");
446 assert!(txn2.is_none());
447 let stats = coord.stats().snapshot();
448 assert_eq!(stats.duplicates_filtered, 1);
449 }
450
451 #[test]
452 fn coordinator_commits_state_changes_atomically() {
453 let backend = make_backend();
454 let coord = EndToEndExactlyOnceCoordinator::new(
455 ExactlyOnceCoordinatorConfig::default(),
456 backend.clone(),
457 );
458 let stamp = ProducerStamp {
459 producer_id: "p".into(),
460 partition: 0,
461 sequence: 0,
462 };
463 let txn = coord.begin_transaction(stamp).expect("ok").expect("fresh");
464 coord
465 .add_state_change(&txn, b"k1".to_vec(), b"v1".to_vec())
466 .expect("ok");
467 coord
468 .add_state_change(&txn, b"k2".to_vec(), b"v2".to_vec())
469 .expect("ok");
470 assert!(backend.get(b"k1").expect("ok").is_none());
472 coord.commit_transaction(&txn).expect("commit");
473 assert_eq!(backend.get(b"k1").expect("ok"), Some(b"v1".to_vec()));
475 assert_eq!(backend.get(b"k2").expect("ok"), Some(b"v2".to_vec()));
476 let stats = coord.stats().snapshot();
477 assert_eq!(stats.transactions_committed, 1);
478 assert_eq!(coord.pending_transactions(), 0);
479 }
480
481 #[test]
482 fn coordinator_aborts_drop_changes() {
483 let backend = make_backend();
484 let coord = EndToEndExactlyOnceCoordinator::new(
485 ExactlyOnceCoordinatorConfig::default(),
486 backend.clone(),
487 );
488 let stamp = ProducerStamp {
489 producer_id: "p".into(),
490 partition: 0,
491 sequence: 0,
492 };
493 let txn = coord.begin_transaction(stamp).expect("ok").expect("fresh");
494 coord
495 .add_state_change(&txn, b"x".to_vec(), b"y".to_vec())
496 .expect("ok");
497 coord.abort_transaction(&txn).expect("abort");
498 assert!(backend.get(b"x").expect("ok").is_none());
499 let stats = coord.stats().snapshot();
500 assert_eq!(stats.transactions_aborted, 1);
501 }
502
503 #[test]
504 fn coordinator_unknown_txn_id_errors() {
505 let coord = EndToEndExactlyOnceCoordinator::new(
506 ExactlyOnceCoordinatorConfig::default(),
507 make_backend(),
508 );
509 let err = coord
510 .add_state_change("bad", vec![], vec![])
511 .expect_err("should fail");
512 assert!(matches!(err, ExactlyOnceError::Invalid(_)));
513 }
514
515 #[test]
516 fn producer_stamp_round_trip_to_message_id() {
517 let stamp = ProducerStamp {
518 producer_id: "p".into(),
519 partition: 1,
520 sequence: 4,
521 };
522 let id = stamp.message_id();
523 assert_eq!(id.producer_id, "p");
524 assert_eq!(id.partition, 1);
525 assert_eq!(id.sequence, 4);
526 }
527}