mqdb-core 0.5.0

Core types, storage, schema, and protocol for MQDB
Documentation
// Copyright 2025-2026 LabOverWire. All rights reserved.
// SPDX-License-Identifier: AGPL-3.0-only

use super::{Epoch, NUM_PARTITIONS, NodeId, PartitionId};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PartitionRole {
    None,
    Primary,
    Replica,
}

#[derive(Debug, Clone, PartialEq)]
pub struct PartitionAssignment {
    pub primary: Option<NodeId>,
    pub replicas: Vec<NodeId>,
    pub epoch: Epoch,
}

impl Default for PartitionAssignment {
    fn default() -> Self {
        Self {
            primary: None,
            replicas: Vec::new(),
            epoch: Epoch::ZERO,
        }
    }
}

impl PartitionAssignment {
    #[allow(clippy::must_use_candidate)]
    pub fn new(primary: NodeId, replicas: Vec<NodeId>, epoch: Epoch) -> Self {
        Self {
            primary: Some(primary),
            replicas,
            epoch,
        }
    }

    #[must_use]
    pub fn role_for(&self, node: NodeId) -> PartitionRole {
        if self.primary == Some(node) {
            PartitionRole::Primary
        } else if self.replicas.contains(&node) {
            PartitionRole::Replica
        } else {
            PartitionRole::None
        }
    }

    #[must_use]
    pub fn all_nodes(&self) -> Vec<NodeId> {
        let mut nodes = self.replicas.clone();
        if let Some(primary) = self.primary {
            nodes.insert(0, primary);
        }
        nodes
    }
}

#[derive(Debug, Clone, PartialEq)]
pub struct PartitionMap {
    version: u64,
    assignments: [PartitionAssignment; NUM_PARTITIONS as usize],
}

impl Default for PartitionMap {
    fn default() -> Self {
        Self::new()
    }
}

impl PartitionMap {
    #[allow(clippy::must_use_candidate)]
    pub fn new() -> Self {
        Self {
            version: 0,
            assignments: std::array::from_fn(|_| PartitionAssignment::default()),
        }
    }

    #[must_use]
    pub fn version(&self) -> u64 {
        self.version
    }

    #[must_use]
    pub fn get(&self, partition: PartitionId) -> &PartitionAssignment {
        &self.assignments[partition.get() as usize]
    }

    pub fn set(&mut self, partition: PartitionId, assignment: PartitionAssignment) {
        self.assignments[partition.get() as usize] = assignment;
        self.version += 1;
    }

    #[must_use]
    pub fn primary(&self, partition: PartitionId) -> Option<NodeId> {
        self.get(partition).primary
    }

    #[must_use]
    pub fn replicas(&self, partition: PartitionId) -> &[NodeId] {
        &self.get(partition).replicas
    }

    #[must_use]
    pub fn epoch(&self, partition: PartitionId) -> Epoch {
        self.get(partition).epoch
    }

    #[must_use]
    pub fn role_for(&self, partition: PartitionId, node: NodeId) -> PartitionRole {
        self.get(partition).role_for(node)
    }

    #[must_use]
    pub fn partitions_for_node(&self, node: NodeId) -> Vec<(PartitionId, PartitionRole)> {
        PartitionId::all()
            .filter_map(|p| {
                let role = self.role_for(p, node);
                if role == PartitionRole::None {
                    None
                } else {
                    Some((p, role))
                }
            })
            .collect()
    }

    #[must_use]
    pub fn primary_count(&self, node: NodeId) -> usize {
        PartitionId::all()
            .filter(|p| self.primary(*p) == Some(node))
            .count()
    }

    #[must_use]
    pub fn replica_count(&self, node: NodeId) -> usize {
        PartitionId::all()
            .filter(|p| self.replicas(*p).contains(&node))
            .count()
    }

    #[must_use]
    pub fn has_any_assignment(&self, node: NodeId) -> bool {
        PartitionId::all().any(|p| self.role_for(p, node) != PartitionRole::None)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn node(id: u16) -> NodeId {
        NodeId::validated(id).unwrap()
    }

    fn partition(id: u16) -> PartitionId {
        PartitionId::new(id).unwrap()
    }

    #[test]
    fn partition_map_default_empty() {
        let map = PartitionMap::new();
        assert_eq!(map.version(), 0);
        assert!(map.primary(partition(0)).is_none());
    }

    #[test]
    fn partition_assignment() {
        let mut map = PartitionMap::new();

        let assignment = PartitionAssignment::new(node(1), vec![node(2), node(3)], Epoch::new(1));

        map.set(partition(0), assignment);

        assert_eq!(map.primary(partition(0)), Some(node(1)));
        assert_eq!(map.replicas(partition(0)), &[node(2), node(3)]);
        assert_eq!(map.epoch(partition(0)), Epoch::new(1));
        assert_eq!(map.version(), 1);
    }

    #[test]
    fn role_for_node() {
        let mut map = PartitionMap::new();

        let assignment = PartitionAssignment::new(node(1), vec![node(2), node(3)], Epoch::new(1));
        map.set(partition(0), assignment);

        assert_eq!(map.role_for(partition(0), node(1)), PartitionRole::Primary);
        assert_eq!(map.role_for(partition(0), node(2)), PartitionRole::Replica);
        assert_eq!(map.role_for(partition(0), node(3)), PartitionRole::Replica);
        assert_eq!(map.role_for(partition(0), node(4)), PartitionRole::None);
    }

    #[test]
    fn partitions_for_node() {
        let mut map = PartitionMap::new();

        map.set(
            partition(0),
            PartitionAssignment::new(node(1), vec![node(2)], Epoch::new(1)),
        );
        map.set(
            partition(1),
            PartitionAssignment::new(node(2), vec![node(1)], Epoch::new(1)),
        );
        map.set(
            partition(2),
            PartitionAssignment::new(node(1), vec![node(3)], Epoch::new(1)),
        );

        let node1_partitions = map.partitions_for_node(node(1));
        assert_eq!(node1_partitions.len(), 3);
        assert_eq!(map.primary_count(node(1)), 2);
        assert_eq!(map.replica_count(node(1)), 1);
    }
}