crabka-broker 0.3.6

Single-node Apache Kafka-compatible broker (MVP)
Documentation
//! Allocates `(producer_id, producer_epoch)` pairs. Single-broker MVP:
//! the id space is a single monotonic counter. Transactions
//! will revisit this when transactional ids enter the picture.

use std::sync::atomic::{AtomicI16, AtomicI64, Ordering};

use dashmap::DashMap;

/// Lowest pid handed out. Mirrors Apache Kafka's `0` initial range
/// (we start above the legacy non-idempotent sentinel of `-1`).
const PID_BASE: i64 = 1000;

#[derive(Debug)]
pub struct ProducerIdManager {
    next_pid: AtomicI64,
    epochs: DashMap<i64, AtomicI16>,
}

impl Default for ProducerIdManager {
    fn default() -> Self {
        Self::new()
    }
}

impl ProducerIdManager {
    #[must_use]
    pub fn new() -> Self {
        Self {
            next_pid: AtomicI64::new(PID_BASE),
            epochs: DashMap::new(),
        }
    }

    /// Allocate a fresh `(producer_id, producer_epoch=0)`.
    pub fn allocate(&self) -> (i64, i16) {
        let pid = self.next_pid.fetch_add(1, Ordering::Relaxed);
        self.epochs.insert(pid, AtomicI16::new(0));
        (pid, 0)
    }

    /// Bump the epoch for an existing pid. Used by transactional producers
    /// re-initialising under the same `transactional_id`. Returns the new
    /// epoch.
    ///
    /// Transactional producers use this on `InitProducerId` re-init.
    #[allow(dead_code)]
    pub fn bump_epoch(&self, pid: i64) -> Option<i16> {
        self.epochs
            .get(&pid)
            .map(|e| e.value().fetch_add(1, Ordering::Relaxed) + 1)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use assert2::assert;

    #[test]
    fn allocate_returns_monotonic_pids_starting_at_base() {
        let m = ProducerIdManager::new();
        assert!(m.allocate() == (PID_BASE, 0));
        assert!(m.allocate() == (PID_BASE + 1, 0));
        assert!(m.allocate() == (PID_BASE + 2, 0));
    }

    #[test]
    fn bump_epoch_increments() {
        let m = ProducerIdManager::new();
        let (pid, _) = m.allocate();
        assert!(m.bump_epoch(pid) == Some(1));
        assert!(m.bump_epoch(pid) == Some(2));
        assert!(m.bump_epoch(9999) == None);
    }
}