detritus_server/
janitor.rs1use std::{
2 collections::HashSet,
3 path::{Path, PathBuf},
4 time::{Duration, Instant, SystemTime},
5};
6
7use serde::Deserialize;
8use tokio::{fs, task::JoinHandle};
9
10use crate::{
11 metrics::{JanitorMetricStats, Metrics},
12 storage::StoragePaths,
13};
14
15#[derive(Debug, Clone, Copy)]
17pub struct RetentionConfig {
18 pub logs_ttl_days: u64,
20 pub crashes_ttl_days: u64,
22 pub janitor_interval: Duration,
24}
25
26impl Default for RetentionConfig {
27 fn default() -> Self {
28 Self {
29 logs_ttl_days: 14,
30 crashes_ttl_days: 90,
31 janitor_interval: Duration::from_secs(60 * 60),
32 }
33 }
34}
35
36#[derive(Debug, Default)]
38pub struct JanitorStats {
39 pub logs_deleted: u64,
41 pub indexes_deleted: u64,
43 pub blobs_deleted: u64,
45 pub bytes_freed: u64,
47}
48
49pub(crate) fn spawn_janitor(
50 storage: StoragePaths,
51 config: RetentionConfig,
52 metrics: Metrics,
53) -> JoinHandle<()> {
54 tokio::spawn(async move {
55 let mut interval = tokio::time::interval(config.janitor_interval);
56 loop {
57 interval.tick().await;
58 let started = Instant::now();
59 match run_janitor_cycle(&storage, config).await {
60 Ok(stats) => {
61 metrics.observe_janitor("ok", started.elapsed(), &stats.as_metrics());
62 tracing::info!(?stats, "retention janitor cycle complete");
63 }
64 Err(error) => {
65 metrics.observe_janitor(
66 "error",
67 started.elapsed(),
68 &JanitorMetricStats::default(),
69 );
70 tracing::error!(%error, "retention janitor cycle failed");
71 }
72 }
73 }
74 })
75}
76
77pub async fn run_janitor_cycle(
79 storage: &StoragePaths,
80 config: RetentionConfig,
81) -> Result<JanitorStats, JanitorError> {
82 let mut stats = JanitorStats::default();
83 let blob_snapshot = snapshot_blobs(&storage.data_dir().join("crashes").join("by-hash")).await?;
84 prune_logs(storage, config.logs_ttl_days, &mut stats).await?;
85 let referenced =
86 prune_indexes_and_collect_references(storage, config.crashes_ttl_days, &mut stats).await?;
87 sweep_blobs(&blob_snapshot, &referenced, &mut stats).await?;
88 Ok(stats)
89}
90
91async fn prune_logs(
92 storage: &StoragePaths,
93 ttl_days: u64,
94 stats: &mut JanitorStats,
95) -> Result<(), JanitorError> {
96 let root = storage.data_dir().join("logs");
97 let files = collect_files(&root).await?;
98 for file in files {
99 if file.extension().and_then(|ext| ext.to_str()) != Some("ndjson") {
100 continue;
101 }
102 if is_older_than(&file, ttl_days).await? {
103 let len = file_len(&file).await;
104 fs::remove_file(&file).await?;
105 stats.logs_deleted += 1;
106 stats.bytes_freed += len;
107 }
108 }
109 Ok(())
110}
111
112async fn prune_indexes_and_collect_references(
113 storage: &StoragePaths,
114 ttl_days: u64,
115 stats: &mut JanitorStats,
116) -> Result<HashSet<String>, JanitorError> {
117 let root = storage.data_dir().join("crashes").join("by-source");
118 let files = collect_files(&root).await?;
119 let mut referenced = HashSet::new();
120 for file in files {
121 if file.extension().and_then(|ext| ext.to_str()) != Some("json") {
122 continue;
123 }
124 if is_older_than(&file, ttl_days).await? {
125 let len = file_len(&file).await;
126 fs::remove_file(&file).await?;
127 stats.indexes_deleted += 1;
128 stats.bytes_freed += len;
129 continue;
130 }
131 let bytes = fs::read(&file).await?;
132 if let Ok(index) = serde_json::from_slice::<CrashIndexRef>(&bytes) {
133 referenced.insert(index.dump.sha256);
134 }
135 }
136 Ok(referenced)
137}
138
139async fn sweep_blobs(
140 blob_snapshot: &[PathBuf],
141 referenced: &HashSet<String>,
142 stats: &mut JanitorStats,
143) -> Result<(), JanitorError> {
144 for blob in blob_snapshot {
145 let Some(file_name) = blob.file_name().and_then(|name| name.to_str()) else {
146 continue;
147 };
148 let Some(hash) = file_name.strip_suffix(".bin") else {
149 continue;
150 };
151 if referenced.contains(hash) {
152 continue;
153 }
154 let len = file_len(blob).await;
155 match fs::remove_file(blob).await {
156 Ok(()) => {
157 stats.blobs_deleted += 1;
158 stats.bytes_freed += len;
159 }
160 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
161 Err(error) => return Err(error.into()),
162 }
163 }
164 Ok(())
165}
166
167async fn snapshot_blobs(root: &Path) -> Result<Vec<PathBuf>, JanitorError> {
168 collect_files(root).await.map(|files| {
169 files
170 .into_iter()
171 .filter(|path| path.extension().and_then(|ext| ext.to_str()) == Some("bin"))
172 .collect()
173 })
174}
175
176async fn collect_files(root: &Path) -> Result<Vec<PathBuf>, JanitorError> {
177 if !fs::try_exists(root).await? {
178 return Ok(Vec::new());
179 }
180 let mut dirs = vec![root.to_path_buf()];
181 let mut files = Vec::new();
182 while let Some(dir) = dirs.pop() {
183 let mut entries = fs::read_dir(&dir).await?;
184 while let Some(entry) = entries.next_entry().await? {
185 let ty = entry.file_type().await?;
186 if ty.is_dir() {
187 dirs.push(entry.path());
188 } else if ty.is_file() {
189 files.push(entry.path());
190 }
191 }
192 }
193 Ok(files)
194}
195
196async fn is_older_than(path: &Path, ttl_days: u64) -> Result<bool, JanitorError> {
197 let modified = fs::metadata(path).await?.modified()?;
198 if ttl_days == 0 {
199 return Ok(true);
200 }
201 let cutoff = SystemTime::now() - Duration::from_secs(ttl_days * 24 * 60 * 60);
202 Ok(modified <= cutoff)
203}
204
205async fn file_len(path: &Path) -> u64 {
206 fs::metadata(path)
207 .await
208 .map_or(0, |metadata| metadata.len())
209}
210
211impl JanitorStats {
212 fn as_metrics(&self) -> JanitorMetricStats {
213 JanitorMetricStats {
214 logs_deleted: self.logs_deleted,
215 indexes_deleted: self.indexes_deleted,
216 blobs_deleted: self.blobs_deleted,
217 bytes_freed: self.bytes_freed,
218 }
219 }
220}
221
222#[derive(Debug, Deserialize)]
223struct CrashIndexRef {
224 dump: CrashDumpRef,
225}
226
227#[derive(Debug, Deserialize)]
228struct CrashDumpRef {
229 sha256: String,
230}
231
232#[doc(hidden)]
233#[derive(Debug, thiserror::Error)]
235pub enum JanitorError {
236 #[error("janitor I/O error: {0}")]
238 Io(#[from] std::io::Error),
239}