1use std::fs;
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use tycho_util::metrics::spawn_metrics_loop;
7use tycho_util::sync::CancellationFlag;
8use walkdir::WalkDir;
9
10const BYTES_METRIC: &str = "tycho_fs_used_bytes";
11const FILES_METRIC: &str = "tycho_fs_used_files";
12const BLOCKS_METRIC: &str = "tycho_fs_used_blocks";
13const TOTAL_LABEL: &str = "__total__";
14
15#[derive(Debug, Clone)]
16pub struct Stats {
17 pub entries: Vec<StatsEntry>,
18}
19
20#[derive(Debug, Clone)]
21pub struct StatsEntry {
22 pub path: PathBuf,
23 pub usage: Usage,
24}
25
26#[derive(Debug, Clone, Copy, Default)]
27pub struct Usage {
28 pub bytes: u64,
29 pub files: u64,
30 pub blocks: u64,
31}
32
33impl Stats {
34 pub fn total(&self) -> Usage {
35 total_counts(&self.entries)
36 }
37}
38
39pub struct FsUsageBuilder {
40 paths: Vec<PathBuf>,
41}
42
43impl FsUsageBuilder {
44 pub fn new() -> Self {
45 Self { paths: Vec::new() }
46 }
47
48 pub fn add_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
49 self.paths.push(path.into());
50 self
51 }
52
53 pub fn build(self) -> FsUsageMonitor {
54 let entries = self.paths.into_iter().map(Entry::new).collect::<Vec<_>>();
55
56 FsUsageMonitor {
57 state: Arc::new(FsUsageState {
58 entries: Mutex::new(entries),
59 }),
60 stop: CancellationFlag::new(),
61 export_handle: None,
62 }
63 }
64}
65
66impl Default for FsUsageBuilder {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72pub struct FsUsageMonitor {
73 state: Arc<FsUsageState>,
74 stop: CancellationFlag,
75 export_handle: Option<tokio::task::AbortHandle>,
76}
77
78impl FsUsageMonitor {
79 pub fn add_path<P: Into<PathBuf>>(&self, path: P) -> bool {
80 let path = path.into();
81 let mut entries = self.state.entries.lock().unwrap();
82
83 if entries.iter().any(|e| e.path == path) {
84 return false;
85 }
86
87 entries.push(Entry::new(path));
88 true
89 }
90
91 pub fn iter_sizes(&self) -> impl Iterator<Item = StatsEntry> {
92 self.snapshot().entries.into_iter()
93 }
94
95 pub fn walk(&self) -> Stats {
96 walk_state(self.state.as_ref(), &self.stop)
97 }
98
99 pub fn snapshot(&self) -> Stats {
100 let entries = self.state.entries.lock().unwrap();
101 Stats {
102 entries: entries
103 .iter()
104 .map(|e| StatsEntry {
105 path: e.path.clone(),
106 usage: e.usage,
107 })
108 .collect(),
109 }
110 }
111
112 pub fn spawn_metrics_loop(&mut self, interval: Duration) -> Result<(), MetricsLoopStartError> {
115 if self.stop.check() {
116 return Err(MetricsLoopStartError::ShuttingDown);
117 }
118
119 if self.export_handle.is_some() {
120 return Err(MetricsLoopStartError::AlreadyRunning);
121 }
122
123 let stop = self.stop.clone();
124
125 let handle = spawn_metrics_loop(&self.state, interval, move |state| {
126 let stop = stop.clone();
127 async move {
128 let stats = tokio::task::spawn_blocking(move || walk_state(state.as_ref(), &stop))
129 .await
130 .expect("spawn blocking failed");
131
132 export_metrics(&stats);
133 }
134 });
135
136 self.export_handle = Some(handle);
137
138 Ok(())
139 }
140
141 fn shutdown(&mut self) {
142 self.stop.cancel();
143
144 if let Some(handle) = self.export_handle.take() {
145 handle.abort();
146 }
147 }
148}
149
150impl Drop for FsUsageMonitor {
151 fn drop(&mut self) {
152 self.shutdown();
153 }
154}
155
156#[derive(Debug)]
157struct FsUsageState {
158 entries: Mutex<Vec<Entry>>,
159}
160
161#[derive(Debug, Clone)]
162struct Entry {
163 path: PathBuf,
164 usage: Usage,
165}
166
167impl Entry {
168 fn new(path: PathBuf) -> Self {
169 Self {
170 path,
171 usage: Usage::default(),
172 }
173 }
174}
175
176fn walk_state(state: &FsUsageState, stop: &CancellationFlag) -> Stats {
177 let paths: Vec<_> = state.entries.lock().unwrap().clone();
178
179 if paths.is_empty() {
180 return Stats { entries: vec![] };
181 }
182
183 let mut results = Vec::with_capacity(paths.len());
184 for e in paths {
185 if stop.check() {
186 break;
187 }
188
189 let usage = collect_path_usage(&e.path, stop);
190 results.push((e.path, usage));
191 }
192
193 let mut entries = state.entries.lock().unwrap();
194
195 let stats_out = entries
196 .iter_mut()
197 .zip(results)
198 .map(|(entry, (path, usage))| {
199 entry.usage = usage;
200 StatsEntry { path, usage }
201 })
202 .collect();
203
204 Stats { entries: stats_out }
205}
206
207fn collect_path_usage(path: &Path, stop: &CancellationFlag) -> Usage {
208 let mut usage = Usage::default();
209
210 let walker = WalkDir::new(path)
211 .follow_links(false)
212 .follow_root_links(true);
213
214 for item in walker {
215 if stop.check() {
216 break;
217 }
218
219 let entry = match item {
220 Ok(e) => e,
221 Err(e) => {
222 tracing::warn!("fs usage: walk e: {e:?}");
223 continue;
224 }
225 };
226
227 if entry.file_type().is_symlink() {
228 continue;
229 }
230
231 let metadata = match entry.metadata() {
232 Ok(m) => m,
233 Err(e) => {
234 tracing::warn!(
235 path = %entry.path().display(),
236 "fs usage: failed to read metadata: {e:?}",
237 );
238 continue;
239 }
240 };
241
242 if metadata.is_dir() {
243 usage.files = usage.files.saturating_add(1);
244 usage.blocks = usage.blocks.saturating_add(blocks_from_metadata(&metadata));
245 } else if metadata.is_file() {
246 usage.files = usage.files.saturating_add(1);
247 usage.bytes = usage.bytes.saturating_add(metadata.len());
248 usage.blocks = usage.blocks.saturating_add(blocks_from_metadata(&metadata));
249 }
250 }
251
252 usage
253}
254
255fn total_counts(entries: &[StatsEntry]) -> Usage {
256 let mut sorted = entries.iter().collect::<Vec<_>>();
257 sorted.sort_by(|a, b| a.path.cmp(&b.path));
258
259 let mut total_bytes: u64 = 0;
260 let mut total_files: u64 = 0;
261 let mut total_blocks: u64 = 0;
262
263 let mut last_parent: Option<&Path> = None;
264
265 for entry in sorted {
266 if let Some(parent) = last_parent
267 && entry.path.starts_with(parent)
268 {
269 continue;
270 }
271
272 total_bytes = total_bytes.saturating_add(entry.usage.bytes);
273 total_files = total_files.saturating_add(entry.usage.files);
274 total_blocks = total_blocks.saturating_add(entry.usage.blocks);
275
276 last_parent = Some(entry.path.as_path());
277 }
278
279 Usage {
280 bytes: total_bytes,
281 files: total_files,
282 blocks: total_blocks,
283 }
284}
285
286fn export_metrics(stats: &Stats) {
287 for entry in &stats.entries {
288 let path = entry.path.to_string_lossy().into_owned();
289
290 for (metric, value) in [
291 (BYTES_METRIC, entry.usage.bytes),
292 (FILES_METRIC, entry.usage.files),
293 (BLOCKS_METRIC, entry.usage.blocks),
294 ] {
295 metrics::gauge!(metric, "path" => path.clone()).set(value as f64);
296 }
297 }
298
299 let Usage {
300 bytes,
301 files,
302 blocks,
303 } = stats.total();
304 for (metric, value) in [
305 (BYTES_METRIC, bytes),
306 (FILES_METRIC, files),
307 (BLOCKS_METRIC, blocks),
308 ] {
309 metrics::gauge!(metric, "path" => TOTAL_LABEL).set(value as f64);
310 }
311}
312
313fn blocks_from_metadata(metadata: &fs::Metadata) -> u64 {
314 #[cfg(unix)]
315 {
316 use std::os::unix::fs::MetadataExt;
317 metadata.blocks()
318 }
319 #[cfg(not(unix))]
320 {
321 let len = metadata.len();
322 if len == 0 {
323 0
324 } else {
325 len.saturating_add(511) / 512
326 }
327 }
328}
329
330#[derive(thiserror::Error, Debug)]
331pub enum MetricsLoopStartError {
332 #[error("metrics loop is already running")]
333 AlreadyRunning,
334 #[error("fs monitor is shutting down")]
335 ShuttingDown,
336}
337
338#[cfg(test)]
339mod tests {
340 use std::collections::HashMap;
341
342 use tempfile::TempDir;
343
344 use super::*;
345
346 #[tokio::test]
347 async fn walk_counts_files_and_dirs() {
348 let temp_dir = TempDir::new().unwrap();
349 let root = temp_dir.path();
350
351 let file_a = root.join("a.txt");
352 fs::write(&file_a, b"abcd").unwrap();
353
354 let nested_dir = root.join("nested");
355 fs::create_dir(&nested_dir).unwrap();
356 fs::write(nested_dir.join("b.bin"), b"123456").unwrap();
357
358 let monitor = FsUsageBuilder::new().add_path(root).build();
359
360 let stats = monitor.walk();
361 let entry = stats.entries.first().unwrap();
362
363 assert_eq!(entry.usage.bytes, 10);
364 assert_eq!(entry.usage.files, 4);
365 assert!(entry.usage.blocks.saturating_mul(512) >= entry.usage.bytes);
366 }
367
368 #[cfg(unix)]
369 #[tokio::test]
370 async fn walk_skips_symlinks_and_missing_paths() {
371 let temp_dir = TempDir::new().unwrap();
372 let root = temp_dir.path();
373 let file_a = root.join("a");
374 fs::write(&file_a, b"1").unwrap();
375
376 let link = root.join("link");
377 std::os::unix::fs::symlink(&file_a, &link).unwrap();
378
379 let missing = root.join("missing");
380
381 let monitor = FsUsageBuilder::new()
382 .add_path(&link)
383 .add_path(&missing)
384 .add_path(&file_a)
385 .build();
386
387 let stats = monitor.walk();
388 let totals = stats
389 .entries
390 .iter()
391 .map(|entry| {
392 (
393 entry
394 .path
395 .file_name()
396 .unwrap()
397 .to_string_lossy()
398 .into_owned(),
399 (entry.usage.bytes, entry.usage.files, entry.usage.blocks),
400 )
401 })
402 .collect::<HashMap<_, _>>();
403
404 assert_eq!(totals.get("link"), Some(&(0, 0, 0)));
405 assert_eq!(totals.get("missing"), Some(&(0, 0, 0)));
406 let &(bytes, files, blocks) = totals.get("a").unwrap();
407 assert_eq!(bytes, 1);
408 assert_eq!(files, 1);
409 assert!(blocks >= 1);
410 }
411
412 #[test]
413 fn total_counts_skips_children() {
414 let entries = vec![
415 StatsEntry {
416 path: PathBuf::from("/var/log"),
417 usage: Usage {
418 bytes: 3,
419 files: 1,
420 blocks: 2,
421 },
422 },
423 StatsEntry {
424 path: PathBuf::from("/var"),
425 usage: Usage {
426 bytes: 10,
427 files: 4,
428 blocks: 7,
429 },
430 },
431 StatsEntry {
432 path: PathBuf::from("/opt"),
433 usage: Usage {
434 bytes: 5,
435 files: 2,
436 blocks: 3,
437 },
438 },
439 ];
440
441 let Usage {
442 bytes,
443 files,
444 blocks,
445 } = total_counts(&entries);
446
447 assert_eq!(bytes, 15);
448 assert_eq!(files, 6);
449 assert_eq!(blocks, 10);
450 }
451}