tycho_util/metrics/
fs_usage.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use tycho_util::metrics::spawn_metrics_loop;
7use tycho_util::sync::CancellationFlag;
8use walkdir::WalkDir;
9
10const BYTES_METRIC: &str = "tycho_fs_used_bytes";
11const FILES_METRIC: &str = "tycho_fs_used_files";
12const BLOCKS_METRIC: &str = "tycho_fs_used_blocks";
13const TOTAL_LABEL: &str = "__total__";
14
15#[derive(Debug, Clone)]
16pub struct Stats {
17    pub entries: Vec<StatsEntry>,
18}
19
20#[derive(Debug, Clone)]
21pub struct StatsEntry {
22    pub path: PathBuf,
23    pub usage: Usage,
24}
25
26#[derive(Debug, Clone, Copy, Default)]
27pub struct Usage {
28    pub bytes: u64,
29    pub files: u64,
30    pub blocks: u64,
31}
32
33impl Stats {
34    pub fn total(&self) -> Usage {
35        total_counts(&self.entries)
36    }
37}
38
39pub struct FsUsageBuilder {
40    paths: Vec<PathBuf>,
41}
42
43impl FsUsageBuilder {
44    pub fn new() -> Self {
45        Self { paths: Vec::new() }
46    }
47
48    pub fn add_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
49        self.paths.push(path.into());
50        self
51    }
52
53    pub fn build(self) -> FsUsageMonitor {
54        let entries = self.paths.into_iter().map(Entry::new).collect::<Vec<_>>();
55
56        FsUsageMonitor {
57            state: Arc::new(FsUsageState {
58                entries: Mutex::new(entries),
59            }),
60            stop: CancellationFlag::new(),
61            export_handle: None,
62        }
63    }
64}
65
66impl Default for FsUsageBuilder {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72pub struct FsUsageMonitor {
73    state: Arc<FsUsageState>,
74    stop: CancellationFlag,
75    export_handle: Option<tokio::task::AbortHandle>,
76}
77
78impl FsUsageMonitor {
79    pub fn add_path<P: Into<PathBuf>>(&self, path: P) -> bool {
80        let path = path.into();
81        let mut entries = self.state.entries.lock().unwrap();
82
83        if entries.iter().any(|e| e.path == path) {
84            return false;
85        }
86
87        entries.push(Entry::new(path));
88        true
89    }
90
91    pub fn iter_sizes(&self) -> impl Iterator<Item = StatsEntry> {
92        self.snapshot().entries.into_iter()
93    }
94
95    pub fn walk(&self) -> Stats {
96        walk_state(self.state.as_ref(), &self.stop)
97    }
98
99    pub fn snapshot(&self) -> Stats {
100        let entries = self.state.entries.lock().unwrap();
101        Stats {
102            entries: entries
103                .iter()
104                .map(|e| StatsEntry {
105                    path: e.path.clone(),
106                    usage: e.usage,
107                })
108                .collect(),
109        }
110    }
111
112    /// Starts the metrics loop that periodically collects and exports filesystem usage metrics.
113    /// Can only be called once; subsequent calls will return an error.
114    pub fn spawn_metrics_loop(&mut self, interval: Duration) -> Result<(), MetricsLoopStartError> {
115        if self.stop.check() {
116            return Err(MetricsLoopStartError::ShuttingDown);
117        }
118
119        if self.export_handle.is_some() {
120            return Err(MetricsLoopStartError::AlreadyRunning);
121        }
122
123        let stop = self.stop.clone();
124
125        let handle = spawn_metrics_loop(&self.state, interval, move |state| {
126            let stop = stop.clone();
127            async move {
128                let stats = tokio::task::spawn_blocking(move || walk_state(state.as_ref(), &stop))
129                    .await
130                    .expect("spawn blocking failed");
131
132                export_metrics(&stats);
133            }
134        });
135
136        self.export_handle = Some(handle);
137
138        Ok(())
139    }
140
141    fn shutdown(&mut self) {
142        self.stop.cancel();
143
144        if let Some(handle) = self.export_handle.take() {
145            handle.abort();
146        }
147    }
148}
149
150impl Drop for FsUsageMonitor {
151    fn drop(&mut self) {
152        self.shutdown();
153    }
154}
155
156#[derive(Debug)]
157struct FsUsageState {
158    entries: Mutex<Vec<Entry>>,
159}
160
161#[derive(Debug, Clone)]
162struct Entry {
163    path: PathBuf,
164    usage: Usage,
165}
166
167impl Entry {
168    fn new(path: PathBuf) -> Self {
169        Self {
170            path,
171            usage: Usage::default(),
172        }
173    }
174}
175
176fn walk_state(state: &FsUsageState, stop: &CancellationFlag) -> Stats {
177    let paths: Vec<_> = state.entries.lock().unwrap().clone();
178
179    if paths.is_empty() {
180        return Stats { entries: vec![] };
181    }
182
183    let mut results = Vec::with_capacity(paths.len());
184    for e in paths {
185        if stop.check() {
186            break;
187        }
188
189        let usage = collect_path_usage(&e.path, stop);
190        results.push((e.path, usage));
191    }
192
193    let mut entries = state.entries.lock().unwrap();
194
195    let stats_out = entries
196        .iter_mut()
197        .zip(results)
198        .map(|(entry, (path, usage))| {
199            entry.usage = usage;
200            StatsEntry { path, usage }
201        })
202        .collect();
203
204    Stats { entries: stats_out }
205}
206
207fn collect_path_usage(path: &Path, stop: &CancellationFlag) -> Usage {
208    let mut usage = Usage::default();
209
210    let walker = WalkDir::new(path)
211        .follow_links(false)
212        .follow_root_links(true);
213
214    for item in walker {
215        if stop.check() {
216            break;
217        }
218
219        let entry = match item {
220            Ok(e) => e,
221            Err(e) => {
222                tracing::warn!("fs usage: walk e: {e:?}");
223                continue;
224            }
225        };
226
227        if entry.file_type().is_symlink() {
228            continue;
229        }
230
231        let metadata = match entry.metadata() {
232            Ok(m) => m,
233            Err(e) => {
234                tracing::warn!(
235                    path = %entry.path().display(),
236                    "fs usage: failed to read metadata: {e:?}",
237                );
238                continue;
239            }
240        };
241
242        if metadata.is_dir() {
243            usage.files = usage.files.saturating_add(1);
244            usage.blocks = usage.blocks.saturating_add(blocks_from_metadata(&metadata));
245        } else if metadata.is_file() {
246            usage.files = usage.files.saturating_add(1);
247            usage.bytes = usage.bytes.saturating_add(metadata.len());
248            usage.blocks = usage.blocks.saturating_add(blocks_from_metadata(&metadata));
249        }
250    }
251
252    usage
253}
254
255fn total_counts(entries: &[StatsEntry]) -> Usage {
256    let mut sorted = entries.iter().collect::<Vec<_>>();
257    sorted.sort_by(|a, b| a.path.cmp(&b.path));
258
259    let mut total_bytes: u64 = 0;
260    let mut total_files: u64 = 0;
261    let mut total_blocks: u64 = 0;
262
263    let mut last_parent: Option<&Path> = None;
264
265    for entry in sorted {
266        if let Some(parent) = last_parent
267            && entry.path.starts_with(parent)
268        {
269            continue;
270        }
271
272        total_bytes = total_bytes.saturating_add(entry.usage.bytes);
273        total_files = total_files.saturating_add(entry.usage.files);
274        total_blocks = total_blocks.saturating_add(entry.usage.blocks);
275
276        last_parent = Some(entry.path.as_path());
277    }
278
279    Usage {
280        bytes: total_bytes,
281        files: total_files,
282        blocks: total_blocks,
283    }
284}
285
286fn export_metrics(stats: &Stats) {
287    for entry in &stats.entries {
288        let path = entry.path.to_string_lossy().into_owned();
289
290        for (metric, value) in [
291            (BYTES_METRIC, entry.usage.bytes),
292            (FILES_METRIC, entry.usage.files),
293            (BLOCKS_METRIC, entry.usage.blocks),
294        ] {
295            metrics::gauge!(metric, "path" => path.clone()).set(value as f64);
296        }
297    }
298
299    let Usage {
300        bytes,
301        files,
302        blocks,
303    } = stats.total();
304    for (metric, value) in [
305        (BYTES_METRIC, bytes),
306        (FILES_METRIC, files),
307        (BLOCKS_METRIC, blocks),
308    ] {
309        metrics::gauge!(metric, "path" => TOTAL_LABEL).set(value as f64);
310    }
311}
312
313fn blocks_from_metadata(metadata: &fs::Metadata) -> u64 {
314    #[cfg(unix)]
315    {
316        use std::os::unix::fs::MetadataExt;
317        metadata.blocks()
318    }
319    #[cfg(not(unix))]
320    {
321        let len = metadata.len();
322        if len == 0 {
323            0
324        } else {
325            len.saturating_add(511) / 512
326        }
327    }
328}
329
330#[derive(thiserror::Error, Debug)]
331pub enum MetricsLoopStartError {
332    #[error("metrics loop is already running")]
333    AlreadyRunning,
334    #[error("fs monitor is shutting down")]
335    ShuttingDown,
336}
337
338#[cfg(test)]
339mod tests {
340    use std::collections::HashMap;
341
342    use tempfile::TempDir;
343
344    use super::*;
345
346    #[tokio::test]
347    async fn walk_counts_files_and_dirs() {
348        let temp_dir = TempDir::new().unwrap();
349        let root = temp_dir.path();
350
351        let file_a = root.join("a.txt");
352        fs::write(&file_a, b"abcd").unwrap();
353
354        let nested_dir = root.join("nested");
355        fs::create_dir(&nested_dir).unwrap();
356        fs::write(nested_dir.join("b.bin"), b"123456").unwrap();
357
358        let monitor = FsUsageBuilder::new().add_path(root).build();
359
360        let stats = monitor.walk();
361        let entry = stats.entries.first().unwrap();
362
363        assert_eq!(entry.usage.bytes, 10);
364        assert_eq!(entry.usage.files, 4);
365        assert!(entry.usage.blocks.saturating_mul(512) >= entry.usage.bytes);
366    }
367
368    #[cfg(unix)]
369    #[tokio::test]
370    async fn walk_skips_symlinks_and_missing_paths() {
371        let temp_dir = TempDir::new().unwrap();
372        let root = temp_dir.path();
373        let file_a = root.join("a");
374        fs::write(&file_a, b"1").unwrap();
375
376        let link = root.join("link");
377        std::os::unix::fs::symlink(&file_a, &link).unwrap();
378
379        let missing = root.join("missing");
380
381        let monitor = FsUsageBuilder::new()
382            .add_path(&link)
383            .add_path(&missing)
384            .add_path(&file_a)
385            .build();
386
387        let stats = monitor.walk();
388        let totals = stats
389            .entries
390            .iter()
391            .map(|entry| {
392                (
393                    entry
394                        .path
395                        .file_name()
396                        .unwrap()
397                        .to_string_lossy()
398                        .into_owned(),
399                    (entry.usage.bytes, entry.usage.files, entry.usage.blocks),
400                )
401            })
402            .collect::<HashMap<_, _>>();
403
404        assert_eq!(totals.get("link"), Some(&(0, 0, 0)));
405        assert_eq!(totals.get("missing"), Some(&(0, 0, 0)));
406        let &(bytes, files, blocks) = totals.get("a").unwrap();
407        assert_eq!(bytes, 1);
408        assert_eq!(files, 1);
409        assert!(blocks >= 1);
410    }
411
412    #[test]
413    fn total_counts_skips_children() {
414        let entries = vec![
415            StatsEntry {
416                path: PathBuf::from("/var/log"),
417                usage: Usage {
418                    bytes: 3,
419                    files: 1,
420                    blocks: 2,
421                },
422            },
423            StatsEntry {
424                path: PathBuf::from("/var"),
425                usage: Usage {
426                    bytes: 10,
427                    files: 4,
428                    blocks: 7,
429                },
430            },
431            StatsEntry {
432                path: PathBuf::from("/opt"),
433                usage: Usage {
434                    bytes: 5,
435                    files: 2,
436                    blocks: 3,
437                },
438            },
439        ];
440
441        let Usage {
442            bytes,
443            files,
444            blocks,
445        } = total_counts(&entries);
446
447        assert_eq!(bytes, 15);
448        assert_eq!(files, 6);
449        assert_eq!(blocks, 10);
450    }
451}