use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NodeDiskUsage {
pub node_id: u32,
pub data_dir: PathBuf,
pub size_bytes: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PartitionKey(String);
impl std::fmt::Display for PartitionKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl PartitionKey {
#[cfg(test)]
pub(crate) fn for_test(label: &str) -> Self {
PartitionKey(label.to_string())
}
}
#[derive(Debug, Clone)]
pub struct PartitionState {
pub partition: PartitionKey,
pub available_bytes: u64,
pub nodes: Vec<NodeDiskUsage>,
}
impl PartitionState {
pub fn eviction_candidate(&self) -> Option<&NodeDiskUsage> {
self.nodes.iter().min_by(|a, b| {
a.size_bytes
.cmp(&b.size_bytes)
.then_with(|| b.node_id.cmp(&a.node_id))
})
}
}
pub fn dir_size(path: &Path) -> u64 {
let mut total = 0u64;
let mut stack = vec![path.to_path_buf()];
while let Some(dir) = stack.pop() {
let entries = match std::fs::read_dir(&dir) {
Ok(entries) => entries,
Err(_) => continue,
};
for entry in entries.flatten() {
let meta = match std::fs::symlink_metadata(entry.path()) {
Ok(meta) => meta,
Err(_) => continue,
};
let file_type = meta.file_type();
if file_type.is_symlink() {
continue;
} else if file_type.is_dir() {
stack.push(entry.path());
} else if file_type.is_file() {
total = total.saturating_add(allocated_size(&meta));
}
}
}
total
}
fn allocated_size(meta: &std::fs::Metadata) -> u64 {
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt;
meta.blocks().saturating_mul(512)
}
#[cfg(not(unix))]
{
meta.len()
}
}
pub fn available_space(path: &Path) -> Option<u64> {
let existing = nearest_existing_ancestor(path);
fs2::available_space(&existing).ok()
}
pub fn partition_states<I>(nodes: I) -> Vec<PartitionState>
where
I: IntoIterator<Item = (u32, PathBuf)>,
{
let mut grouped: BTreeMap<PartitionKey, Vec<NodeDiskUsage>> = BTreeMap::new();
for (node_id, data_dir) in nodes {
let key = partition_key(&data_dir);
let size_bytes = dir_size(&data_dir);
grouped.entry(key).or_default().push(NodeDiskUsage {
node_id,
data_dir,
size_bytes,
});
}
grouped
.into_iter()
.filter_map(|(partition, nodes)| {
let probe = nodes.first()?.data_dir.clone();
let available_bytes = available_space(&probe)?;
Some(PartitionState {
partition,
available_bytes,
nodes,
})
})
.collect()
}
fn partition_key(path: &Path) -> PartitionKey {
let existing = nearest_existing_ancestor(path);
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt;
if let Ok(meta) = std::fs::metadata(&existing) {
return PartitionKey(format!("dev:{}", meta.dev()));
}
}
let canon = std::fs::canonicalize(&existing).unwrap_or(existing);
let key = canon
.components()
.next()
.map(|c| c.as_os_str().to_string_lossy().into_owned())
.unwrap_or_else(|| canon.to_string_lossy().into_owned());
PartitionKey(key)
}
fn nearest_existing_ancestor(path: &Path) -> PathBuf {
let mut current = Some(path);
while let Some(p) = current {
if p.exists() {
return p.to_path_buf();
}
current = p.parent();
}
path.to_path_buf()
}
#[cfg(test)]
mod tests {
use super::*;
fn usage(node_id: u32, size_bytes: u64) -> NodeDiskUsage {
NodeDiskUsage {
node_id,
data_dir: PathBuf::from(format!("/data/node-{node_id}")),
size_bytes,
}
}
fn partition_with(nodes: Vec<NodeDiskUsage>) -> PartitionState {
PartitionState {
partition: PartitionKey("test".to_string()),
available_bytes: 0,
nodes,
}
}
#[test]
fn candidate_picks_smallest_data_dir() {
let p = partition_with(vec![usage(1, 900), usage(2, 100), usage(3, 500)]);
assert_eq!(p.eviction_candidate().unwrap().node_id, 2);
}
#[test]
fn candidate_tie_break_prefers_newest_id() {
let p = partition_with(vec![usage(1, 100), usage(4, 100), usage(2, 800)]);
assert_eq!(p.eviction_candidate().unwrap().node_id, 4);
}
#[test]
fn candidate_none_when_empty() {
let p = partition_with(vec![]);
assert!(p.eviction_candidate().is_none());
}
#[test]
fn dir_size_sums_nested_files() {
let tmp = tempfile::tempdir().unwrap();
std::fs::write(tmp.path().join("a.bin"), vec![1u8; 1000]).unwrap();
let sub = tmp.path().join("sub");
std::fs::create_dir(&sub).unwrap();
std::fs::write(sub.join("b.bin"), vec![1u8; 2500]).unwrap();
assert!(dir_size(tmp.path()) >= 3500);
}
#[test]
fn dir_size_zero_for_missing_path() {
assert_eq!(dir_size(Path::new("/nonexistent/path/xyz")), 0);
}
#[cfg(unix)]
#[test]
fn dir_size_does_not_follow_symlinks() {
let tmp = tempfile::tempdir().unwrap();
let node_dir = tmp.path().join("node");
std::fs::create_dir(&node_dir).unwrap();
std::fs::write(node_dir.join("real.bin"), vec![1u8; 4000]).unwrap();
let external = tmp.path().join("external_big.bin");
std::fs::write(&external, vec![1u8; 5_000_000]).unwrap();
std::os::unix::fs::symlink(&external, node_dir.join("link")).unwrap();
let size = dir_size(&node_dir);
assert!(size > 0);
assert!(size < 1_000_000, "symlink target was followed: {size}");
}
#[cfg(unix)]
#[test]
fn dir_size_uses_allocated_not_logical_size() {
use std::io::Write;
let tmp = tempfile::tempdir().unwrap();
let f = std::fs::File::create(tmp.path().join("sparse.mdb")).unwrap();
f.set_len(20 * 1024 * 1024).unwrap();
drop(f);
let mut real = std::fs::File::create(tmp.path().join("real.bin")).unwrap();
real.write_all(&[1u8; 2000]).unwrap();
drop(real);
let size = dir_size(tmp.path());
assert!(
size < 1024 * 1024,
"expected allocated size well under 1 MiB, got {size}"
);
}
#[test]
fn partition_states_groups_colocated_nodes() {
let tmp = tempfile::tempdir().unwrap();
let d1 = tmp.path().join("node-1");
let d2 = tmp.path().join("node-2");
std::fs::create_dir_all(&d1).unwrap();
std::fs::create_dir_all(&d2).unwrap();
std::fs::write(d1.join("data"), vec![1u8; 200_000]).unwrap();
std::fs::write(d2.join("data"), vec![1u8; 1000]).unwrap();
let states = partition_states(vec![(1, d1), (2, d2)]);
assert_eq!(states.len(), 1, "co-located nodes share one partition");
let state = &states[0];
assert_eq!(state.nodes.len(), 2);
assert!(state.available_bytes > 0);
assert_eq!(state.eviction_candidate().unwrap().node_id, 2);
}
}