ant_core/node/daemon/
disk.rs1use std::collections::BTreeMap;
15use std::path::{Path, PathBuf};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct NodeDiskUsage {
20 pub node_id: u32,
21 pub data_dir: PathBuf,
22 pub size_bytes: u64,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
31pub struct PartitionKey(String);
32
33impl std::fmt::Display for PartitionKey {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "{}", self.0)
36 }
37}
38
39impl PartitionKey {
40 #[cfg(test)]
42 pub(crate) fn for_test(label: &str) -> Self {
43 PartitionKey(label.to_string())
44 }
45}
46
47#[derive(Debug, Clone)]
50pub struct PartitionState {
51 pub partition: PartitionKey,
52 pub available_bytes: u64,
54 pub nodes: Vec<NodeDiskUsage>,
56}
57
58impl PartitionState {
59 pub fn eviction_candidate(&self) -> Option<&NodeDiskUsage> {
66 self.nodes.iter().min_by(|a, b| {
67 a.size_bytes
68 .cmp(&b.size_bytes)
69 .then_with(|| b.node_id.cmp(&a.node_id))
70 })
71 }
72}
73
74pub fn dir_size(path: &Path) -> u64 {
88 let mut total = 0u64;
89 let mut stack = vec![path.to_path_buf()];
90 while let Some(dir) = stack.pop() {
91 let entries = match std::fs::read_dir(&dir) {
92 Ok(entries) => entries,
93 Err(_) => continue,
94 };
95 for entry in entries.flatten() {
96 let meta = match std::fs::symlink_metadata(entry.path()) {
98 Ok(meta) => meta,
99 Err(_) => continue,
100 };
101 let file_type = meta.file_type();
102 if file_type.is_symlink() {
103 continue;
105 } else if file_type.is_dir() {
106 stack.push(entry.path());
107 } else if file_type.is_file() {
108 total = total.saturating_add(allocated_size(&meta));
109 }
110 }
111 }
112 total
113}
114
115fn allocated_size(meta: &std::fs::Metadata) -> u64 {
121 #[cfg(unix)]
122 {
123 use std::os::unix::fs::MetadataExt;
124 meta.blocks().saturating_mul(512)
125 }
126 #[cfg(not(unix))]
127 {
128 meta.len()
129 }
130}
131
132pub fn available_space(path: &Path) -> Option<u64> {
136 let existing = nearest_existing_ancestor(path);
137 fs2::available_space(&existing).ok()
138}
139
140pub fn partition_states<I>(nodes: I) -> Vec<PartitionState>
147where
148 I: IntoIterator<Item = (u32, PathBuf)>,
149{
150 let mut grouped: BTreeMap<PartitionKey, Vec<NodeDiskUsage>> = BTreeMap::new();
151 for (node_id, data_dir) in nodes {
152 let key = partition_key(&data_dir);
153 let size_bytes = dir_size(&data_dir);
154 grouped.entry(key).or_default().push(NodeDiskUsage {
155 node_id,
156 data_dir,
157 size_bytes,
158 });
159 }
160
161 grouped
162 .into_iter()
163 .filter_map(|(partition, nodes)| {
164 let probe = nodes.first()?.data_dir.clone();
167 let available_bytes = available_space(&probe)?;
168 Some(PartitionState {
169 partition,
170 available_bytes,
171 nodes,
172 })
173 })
174 .collect()
175}
176
177fn partition_key(path: &Path) -> PartitionKey {
179 let existing = nearest_existing_ancestor(path);
180
181 #[cfg(unix)]
182 {
183 use std::os::unix::fs::MetadataExt;
184 if let Ok(meta) = std::fs::metadata(&existing) {
185 return PartitionKey(format!("dev:{}", meta.dev()));
186 }
187 }
188
189 let canon = std::fs::canonicalize(&existing).unwrap_or(existing);
191 let key = canon
192 .components()
193 .next()
194 .map(|c| c.as_os_str().to_string_lossy().into_owned())
195 .unwrap_or_else(|| canon.to_string_lossy().into_owned());
196 PartitionKey(key)
197}
198
199fn nearest_existing_ancestor(path: &Path) -> PathBuf {
202 let mut current = Some(path);
203 while let Some(p) = current {
204 if p.exists() {
205 return p.to_path_buf();
206 }
207 current = p.parent();
208 }
209 path.to_path_buf()
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 fn usage(node_id: u32, size_bytes: u64) -> NodeDiskUsage {
217 NodeDiskUsage {
218 node_id,
219 data_dir: PathBuf::from(format!("/data/node-{node_id}")),
220 size_bytes,
221 }
222 }
223
224 fn partition_with(nodes: Vec<NodeDiskUsage>) -> PartitionState {
225 PartitionState {
226 partition: PartitionKey("test".to_string()),
227 available_bytes: 0,
228 nodes,
229 }
230 }
231
232 #[test]
233 fn candidate_picks_smallest_data_dir() {
234 let p = partition_with(vec![usage(1, 900), usage(2, 100), usage(3, 500)]);
235 assert_eq!(p.eviction_candidate().unwrap().node_id, 2);
236 }
237
238 #[test]
239 fn candidate_tie_break_prefers_newest_id() {
240 let p = partition_with(vec![usage(1, 100), usage(4, 100), usage(2, 800)]);
242 assert_eq!(p.eviction_candidate().unwrap().node_id, 4);
243 }
244
245 #[test]
246 fn candidate_none_when_empty() {
247 let p = partition_with(vec![]);
248 assert!(p.eviction_candidate().is_none());
249 }
250
251 #[test]
252 fn dir_size_sums_nested_files() {
253 let tmp = tempfile::tempdir().unwrap();
254 std::fs::write(tmp.path().join("a.bin"), vec![1u8; 1000]).unwrap();
256 let sub = tmp.path().join("sub");
257 std::fs::create_dir(&sub).unwrap();
258 std::fs::write(sub.join("b.bin"), vec![1u8; 2500]).unwrap();
259 assert!(dir_size(tmp.path()) >= 3500);
262 }
263
264 #[test]
265 fn dir_size_zero_for_missing_path() {
266 assert_eq!(dir_size(Path::new("/nonexistent/path/xyz")), 0);
267 }
268
269 #[cfg(unix)]
270 #[test]
271 fn dir_size_does_not_follow_symlinks() {
272 let tmp = tempfile::tempdir().unwrap();
275 let node_dir = tmp.path().join("node");
276 std::fs::create_dir(&node_dir).unwrap();
277 std::fs::write(node_dir.join("real.bin"), vec![1u8; 4000]).unwrap();
278
279 let external = tmp.path().join("external_big.bin");
280 std::fs::write(&external, vec![1u8; 5_000_000]).unwrap();
281 std::os::unix::fs::symlink(&external, node_dir.join("link")).unwrap();
282
283 let size = dir_size(&node_dir);
285 assert!(size > 0);
286 assert!(size < 1_000_000, "symlink target was followed: {size}");
287 }
288
289 #[cfg(unix)]
290 #[test]
291 fn dir_size_uses_allocated_not_logical_size() {
292 use std::io::Write;
293 let tmp = tempfile::tempdir().unwrap();
295 let f = std::fs::File::create(tmp.path().join("sparse.mdb")).unwrap();
296 f.set_len(20 * 1024 * 1024).unwrap();
297 drop(f);
298 let mut real = std::fs::File::create(tmp.path().join("real.bin")).unwrap();
300 real.write_all(&[1u8; 2000]).unwrap();
301 drop(real);
302
303 let size = dir_size(tmp.path());
305 assert!(
306 size < 1024 * 1024,
307 "expected allocated size well under 1 MiB, got {size}"
308 );
309 }
310
311 #[test]
312 fn partition_states_groups_colocated_nodes() {
313 let tmp = tempfile::tempdir().unwrap();
315 let d1 = tmp.path().join("node-1");
316 let d2 = tmp.path().join("node-2");
317 std::fs::create_dir_all(&d1).unwrap();
318 std::fs::create_dir_all(&d2).unwrap();
319 std::fs::write(d1.join("data"), vec![1u8; 200_000]).unwrap();
321 std::fs::write(d2.join("data"), vec![1u8; 1000]).unwrap();
322
323 let states = partition_states(vec![(1, d1), (2, d2)]);
324 assert_eq!(states.len(), 1, "co-located nodes share one partition");
325 let state = &states[0];
326 assert_eq!(state.nodes.len(), 2);
327 assert!(state.available_bytes > 0);
328 assert_eq!(state.eviction_candidate().unwrap().node_id, 2);
330 }
331}