use std::sync::Arc;
use dashmap::DashMap;
use crate::partition::Partition;
#[derive(Debug, Default)]
pub(crate) struct PartitionRegistry {
inner: DashMap<String, DashMap<i32, Arc<Partition>>>,
}
impl PartitionRegistry {
#[must_use]
pub(crate) fn new() -> Self {
Self::default()
}
#[must_use]
pub(crate) fn get(&self, topic: &str, partition: i32) -> Option<Arc<Partition>> {
self.inner
.get(topic)
.and_then(|m| m.get(&partition).map(|p| Arc::clone(&p)))
}
#[must_use]
pub(crate) fn contains(&self, topic: &str, partition: i32) -> bool {
self.inner
.get(topic)
.is_some_and(|m| m.contains_key(&partition))
}
pub(crate) fn insert(
&self,
topic: String,
partition: i32,
part: Arc<Partition>,
) -> Option<Arc<Partition>> {
self.inner.entry(topic).or_default().insert(partition, part)
}
pub(crate) fn remove(&self, topic: &str, partition: i32) -> Option<Arc<Partition>> {
let removed = self
.inner
.get(topic)
.and_then(|m| m.remove(&partition).map(|(_, v)| v));
self.inner.remove_if(topic, |_, m| m.is_empty());
removed
}
pub(crate) fn materialize_if_vacant<E>(
&self,
topic: &str,
partition: i32,
build: impl FnOnce() -> Result<Arc<Partition>, E>,
) -> Result<(), E> {
use dashmap::mapref::entry::Entry;
let inner = self.inner.entry(topic.to_string()).or_default();
match inner.entry(partition) {
Entry::Occupied(_) => Ok(()),
Entry::Vacant(slot) => {
let part = build()?;
slot.insert(part);
Ok(())
}
}
}
#[must_use]
pub(crate) fn partitions_of(&self, topic: &str) -> Vec<i32> {
self.inner
.get(topic)
.map(|m| m.iter().map(|e| *e.key()).collect())
.unwrap_or_default()
}
#[must_use]
pub(crate) fn arcs(&self) -> Vec<Arc<Partition>> {
self.inner
.iter()
.flat_map(|m| m.value().iter().map(|p| Arc::clone(&p)).collect::<Vec<_>>())
.collect()
}
#[must_use]
pub(crate) fn len(&self) -> usize {
self.inner.iter().map(|m| m.value().len()).sum()
}
}
#[cfg(test)]
mod tests {
use assert2::assert;
use std::path::Path;
use std::sync::Arc;
use crabka_log::{Log, LogConfig};
use tempfile::tempdir;
use super::PartitionRegistry;
use crate::partition::Partition;
fn fixture_partition(log_dir: &Path, topic: &str, partition: i32) -> Arc<Partition> {
let part_dir = crate::log_dir::partition_dir(log_dir, topic, partition);
std::fs::create_dir_all(&part_dir).unwrap();
let log = Log::open(&part_dir, LogConfig::default()).unwrap();
crate::broker::spawn_partition(
topic.to_string(),
partition,
log_dir.to_path_buf(),
log,
crate::log_dir_status::LogDirRegistry::default(),
)
}
#[tokio::test]
async fn insert_get_contains_remove() {
let dir = tempdir().unwrap();
let reg = PartitionRegistry::new();
assert!(reg.arcs().is_empty());
assert!(reg.arcs().len() == 0);
assert!(reg.get("t", 0).is_none());
assert!(!reg.contains("t", 0));
let p = fixture_partition(dir.path(), "t", 0);
assert!(reg.insert("t".to_string(), 0, Arc::clone(&p)).is_none());
assert!(reg.contains("t", 0));
assert!(!reg.arcs().is_empty());
assert!(reg.arcs().len() == 1);
let got = reg.get("t", 0).expect("present");
assert!(Arc::ptr_eq(&got, &p));
let p2 = fixture_partition(dir.path(), "t", 0);
let prev = reg
.insert("t".to_string(), 0, Arc::clone(&p2))
.expect("prev");
assert!(Arc::ptr_eq(&prev, &p));
assert!(reg.arcs().len() == 1);
let removed = reg.remove("t", 0).expect("removed");
assert!(Arc::ptr_eq(&removed, &p2));
assert!(reg.remove("t", 0).is_none());
assert!(!reg.contains("t", 0));
assert!(reg.arcs().is_empty());
}
#[tokio::test]
async fn materialize_if_vacant_builds_once() {
let dir = tempdir().unwrap();
let reg = PartitionRegistry::new();
let p = fixture_partition(dir.path(), "t", 1);
reg.materialize_if_vacant::<String>("t", 1, || Ok(Arc::clone(&p)))
.expect("build ok");
assert!(reg.contains("t", 1));
reg.materialize_if_vacant::<String>("t", 1, || {
panic!("build must not be called when slot is occupied");
})
.expect("occupied ok");
let got = reg.get("t", 1).expect("present");
assert!(Arc::ptr_eq(&got, &p));
}
#[tokio::test]
async fn materialize_if_vacant_propagates_error() {
let reg = PartitionRegistry::new();
let err = reg.materialize_if_vacant::<String>("t", 2, || Err("boom".to_string()));
assert!(err == Err("boom".to_string()));
assert!(!reg.contains("t", 2));
}
#[tokio::test]
async fn arcs_snapshots_all_partitions() {
let dir = tempdir().unwrap();
let reg = PartitionRegistry::new();
reg.insert("a".to_string(), 0, fixture_partition(dir.path(), "a", 0));
reg.insert("a".to_string(), 1, fixture_partition(dir.path(), "a", 1));
reg.insert("b".to_string(), 0, fixture_partition(dir.path(), "b", 0));
let arcs = reg.arcs();
assert!(arcs.len() == 3);
}
}