use std::sync::atomic::{AtomicI16, AtomicI64, Ordering};
use dashmap::DashMap;
const PID_BASE: i64 = 1000;
#[derive(Debug)]
pub struct ProducerIdManager {
next_pid: AtomicI64,
epochs: DashMap<i64, AtomicI16>,
}
impl Default for ProducerIdManager {
fn default() -> Self {
Self::new()
}
}
impl ProducerIdManager {
#[must_use]
pub fn new() -> Self {
Self {
next_pid: AtomicI64::new(PID_BASE),
epochs: DashMap::new(),
}
}
pub fn allocate(&self) -> (i64, i16) {
let pid = self.next_pid.fetch_add(1, Ordering::Relaxed);
self.epochs.insert(pid, AtomicI16::new(0));
(pid, 0)
}
#[allow(dead_code)]
pub fn bump_epoch(&self, pid: i64) -> Option<i16> {
self.epochs
.get(&pid)
.map(|e| e.value().fetch_add(1, Ordering::Relaxed) + 1)
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn allocate_returns_monotonic_pids_starting_at_base() {
let m = ProducerIdManager::new();
assert!(m.allocate() == (PID_BASE, 0));
assert!(m.allocate() == (PID_BASE + 1, 0));
assert!(m.allocate() == (PID_BASE + 2, 0));
}
#[test]
fn bump_epoch_increments() {
let m = ProducerIdManager::new();
let (pid, _) = m.allocate();
assert!(m.bump_epoch(pid) == Some(1));
assert!(m.bump_epoch(pid) == Some(2));
assert!(m.bump_epoch(9999) == None);
}
}