Skip to main content

detritus_server/
janitor.rs

1use std::{
2    collections::HashSet,
3    path::{Path, PathBuf},
4    time::{Duration, Instant, SystemTime},
5};
6
7use serde::Deserialize;
8use tokio::{fs, task::JoinHandle};
9
10use crate::{
11    metrics::{JanitorMetricStats, Metrics},
12    storage::StoragePaths,
13};
14
15/// Retention policy for logs, crash indexes, and unreferenced crash blobs.
16#[derive(Debug, Clone, Copy)]
17pub struct RetentionConfig {
18    /// Number of days to keep NDJSON log files.
19    pub logs_ttl_days: u64,
20    /// Number of days to keep crash indexes.
21    pub crashes_ttl_days: u64,
22    /// Delay between background janitor cycles.
23    pub janitor_interval: Duration,
24}
25
26impl Default for RetentionConfig {
27    fn default() -> Self {
28        Self {
29            logs_ttl_days: 14,
30            crashes_ttl_days: 90,
31            janitor_interval: Duration::from_secs(60 * 60),
32        }
33    }
34}
35
36/// Files and bytes removed by one janitor cycle.
37#[derive(Debug, Default)]
38pub struct JanitorStats {
39    /// NDJSON log files removed.
40    pub logs_deleted: u64,
41    /// Crash index JSON files removed.
42    pub indexes_deleted: u64,
43    /// Unreferenced crash blobs removed.
44    pub blobs_deleted: u64,
45    /// Total bytes freed from removed files.
46    pub bytes_freed: u64,
47}
48
49pub(crate) fn spawn_janitor(
50    storage: StoragePaths,
51    config: RetentionConfig,
52    metrics: Metrics,
53) -> JoinHandle<()> {
54    tokio::spawn(async move {
55        let mut interval = tokio::time::interval(config.janitor_interval);
56        loop {
57            interval.tick().await;
58            let started = Instant::now();
59            match run_janitor_cycle(&storage, config).await {
60                Ok(stats) => {
61                    metrics.observe_janitor("ok", started.elapsed(), &stats.as_metrics());
62                    tracing::info!(?stats, "retention janitor cycle complete");
63                }
64                Err(error) => {
65                    metrics.observe_janitor(
66                        "error",
67                        started.elapsed(),
68                        &JanitorMetricStats::default(),
69                    );
70                    tracing::error!(%error, "retention janitor cycle failed");
71                }
72            }
73        }
74    })
75}
76
77/// Runs one retention pass over the storage tree.
78pub async fn run_janitor_cycle(
79    storage: &StoragePaths,
80    config: RetentionConfig,
81) -> Result<JanitorStats, JanitorError> {
82    let mut stats = JanitorStats::default();
83    let blob_snapshot = snapshot_blobs(&storage.data_dir().join("crashes").join("by-hash")).await?;
84    prune_logs(storage, config.logs_ttl_days, &mut stats).await?;
85    let referenced =
86        prune_indexes_and_collect_references(storage, config.crashes_ttl_days, &mut stats).await?;
87    sweep_blobs(&blob_snapshot, &referenced, &mut stats).await?;
88    Ok(stats)
89}
90
91async fn prune_logs(
92    storage: &StoragePaths,
93    ttl_days: u64,
94    stats: &mut JanitorStats,
95) -> Result<(), JanitorError> {
96    let root = storage.data_dir().join("logs");
97    let files = collect_files(&root).await?;
98    for file in files {
99        if file.extension().and_then(|ext| ext.to_str()) != Some("ndjson") {
100            continue;
101        }
102        if is_older_than(&file, ttl_days).await? {
103            let len = file_len(&file).await;
104            fs::remove_file(&file).await?;
105            stats.logs_deleted += 1;
106            stats.bytes_freed += len;
107        }
108    }
109    Ok(())
110}
111
112async fn prune_indexes_and_collect_references(
113    storage: &StoragePaths,
114    ttl_days: u64,
115    stats: &mut JanitorStats,
116) -> Result<HashSet<String>, JanitorError> {
117    let root = storage.data_dir().join("crashes").join("by-source");
118    let files = collect_files(&root).await?;
119    let mut referenced = HashSet::new();
120    for file in files {
121        if file.extension().and_then(|ext| ext.to_str()) != Some("json") {
122            continue;
123        }
124        if is_older_than(&file, ttl_days).await? {
125            let len = file_len(&file).await;
126            fs::remove_file(&file).await?;
127            stats.indexes_deleted += 1;
128            stats.bytes_freed += len;
129            continue;
130        }
131        let bytes = fs::read(&file).await?;
132        if let Ok(index) = serde_json::from_slice::<CrashIndexRef>(&bytes) {
133            referenced.insert(index.dump.sha256);
134        }
135    }
136    Ok(referenced)
137}
138
139async fn sweep_blobs(
140    blob_snapshot: &[PathBuf],
141    referenced: &HashSet<String>,
142    stats: &mut JanitorStats,
143) -> Result<(), JanitorError> {
144    for blob in blob_snapshot {
145        let Some(file_name) = blob.file_name().and_then(|name| name.to_str()) else {
146            continue;
147        };
148        let Some(hash) = file_name.strip_suffix(".bin") else {
149            continue;
150        };
151        if referenced.contains(hash) {
152            continue;
153        }
154        let len = file_len(blob).await;
155        match fs::remove_file(blob).await {
156            Ok(()) => {
157                stats.blobs_deleted += 1;
158                stats.bytes_freed += len;
159            }
160            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
161            Err(error) => return Err(error.into()),
162        }
163    }
164    Ok(())
165}
166
167async fn snapshot_blobs(root: &Path) -> Result<Vec<PathBuf>, JanitorError> {
168    collect_files(root).await.map(|files| {
169        files
170            .into_iter()
171            .filter(|path| path.extension().and_then(|ext| ext.to_str()) == Some("bin"))
172            .collect()
173    })
174}
175
176async fn collect_files(root: &Path) -> Result<Vec<PathBuf>, JanitorError> {
177    if !fs::try_exists(root).await? {
178        return Ok(Vec::new());
179    }
180    let mut dirs = vec![root.to_path_buf()];
181    let mut files = Vec::new();
182    while let Some(dir) = dirs.pop() {
183        let mut entries = fs::read_dir(&dir).await?;
184        while let Some(entry) = entries.next_entry().await? {
185            let ty = entry.file_type().await?;
186            if ty.is_dir() {
187                dirs.push(entry.path());
188            } else if ty.is_file() {
189                files.push(entry.path());
190            }
191        }
192    }
193    Ok(files)
194}
195
196async fn is_older_than(path: &Path, ttl_days: u64) -> Result<bool, JanitorError> {
197    let modified = fs::metadata(path).await?.modified()?;
198    if ttl_days == 0 {
199        return Ok(true);
200    }
201    let cutoff = SystemTime::now() - Duration::from_secs(ttl_days * 24 * 60 * 60);
202    Ok(modified <= cutoff)
203}
204
205async fn file_len(path: &Path) -> u64 {
206    fs::metadata(path)
207        .await
208        .map_or(0, |metadata| metadata.len())
209}
210
211impl JanitorStats {
212    fn as_metrics(&self) -> JanitorMetricStats {
213        JanitorMetricStats {
214            logs_deleted: self.logs_deleted,
215            indexes_deleted: self.indexes_deleted,
216            blobs_deleted: self.blobs_deleted,
217            bytes_freed: self.bytes_freed,
218        }
219    }
220}
221
222#[derive(Debug, Deserialize)]
223struct CrashIndexRef {
224    dump: CrashDumpRef,
225}
226
227#[derive(Debug, Deserialize)]
228struct CrashDumpRef {
229    sha256: String,
230}
231
232#[doc(hidden)]
233/// Hidden error type returned by the retention test hook.
234#[derive(Debug, thiserror::Error)]
235pub enum JanitorError {
236    /// Filesystem operation failed during retention cleanup.
237    #[error("janitor I/O error: {0}")]
238    Io(#[from] std::io::Error),
239}