hashtree_cli/storage/
maintenance.rs1use anyhow::Result;
2use heed::{CompactionOption, EnvOpenOptions};
3use std::collections::HashSet;
4use std::path::{Path, PathBuf};
5
6use super::{GcStats, HashtreeStore};
7
8#[cfg(feature = "s3")]
9use hashtree_core::from_hex;
10use hashtree_core::{sha256, to_hex};
11
12#[derive(Debug, Clone)]
14pub struct VerifyResult {
15 pub total: usize,
16 pub valid: usize,
17 pub corrupted: usize,
18 pub deleted: usize,
19}
20
21#[derive(Debug, Clone)]
22pub struct CompactResult {
23 pub env_dir: PathBuf,
24 pub before_bytes: u64,
25 pub after_bytes: u64,
26}
27
28const COMPACT_MAX_DBS: u32 = 64;
29const COMPACT_MAX_READERS: u32 = 2048;
30const COMPACT_OPEN_MAP_SIZE_BYTES: usize = 10 * 1024 * 1024;
31const COMPACT_PAGE_SIZE_BYTES: u64 = 4096;
32
33impl HashtreeStore {
34 pub fn gc(&self) -> Result<GcStats> {
36 let rtxn = self.env.read_txn()?;
37
38 let pinned: HashSet<[u8; 32]> = self
40 .pins
41 .iter(&rtxn)?
42 .filter_map(|item| item.ok())
43 .filter_map(|(hash_bytes, _)| {
44 if hash_bytes.len() == 32 {
45 let mut hash = [0u8; 32];
46 hash.copy_from_slice(hash_bytes);
47 Some(hash)
48 } else {
49 None
50 }
51 })
52 .collect();
53
54 drop(rtxn);
55
56 let all_hashes = self
58 .router
59 .list()
60 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
61
62 let mut deleted = 0;
64 let mut freed_bytes = 0u64;
65
66 for hash in all_hashes {
67 if !pinned.contains(&hash) {
68 if let Ok(Some(data)) = self.router.get_sync(&hash) {
69 freed_bytes += data.len() as u64;
70 let _ = self.router.delete_local_only(&hash);
72 deleted += 1;
73 }
74 }
75 }
76
77 Ok(GcStats {
78 deleted_dags: deleted,
79 freed_bytes,
80 })
81 }
82
83 pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
86 let all_hashes = self
87 .router
88 .list()
89 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
90
91 let total = all_hashes.len();
92 let mut valid = 0;
93 let mut corrupted = 0;
94 let mut deleted = 0;
95 let mut corrupted_hashes = Vec::new();
96
97 for hash in &all_hashes {
98 let hash_hex = to_hex(hash);
99
100 match self.router.get_sync(hash) {
101 Ok(Some(data)) => {
102 let actual_hash = sha256(&data);
103
104 if actual_hash == *hash {
105 valid += 1;
106 } else {
107 corrupted += 1;
108 let actual_hex = to_hex(&actual_hash);
109 println!(
110 " CORRUPTED: key={} actual={} size={}",
111 &hash_hex[..16],
112 &actual_hex[..16],
113 data.len()
114 );
115 corrupted_hashes.push(*hash);
116 }
117 }
118 Ok(None) => {
119 corrupted += 1;
120 println!(" MISSING: key={}", &hash_hex[..16]);
121 corrupted_hashes.push(*hash);
122 }
123 Err(e) => {
124 corrupted += 1;
125 println!(" ERROR: key={} err={}", &hash_hex[..16], e);
126 corrupted_hashes.push(*hash);
127 }
128 }
129 }
130
131 if delete {
132 for hash in &corrupted_hashes {
133 match self.router.delete_sync(hash) {
134 Ok(true) => deleted += 1,
135 Ok(false) => {}
136 Err(e) => {
137 let hash_hex = to_hex(hash);
138 println!(" Failed to delete {}: {}", &hash_hex[..16], e);
139 }
140 }
141 }
142 }
143
144 Ok(VerifyResult {
145 total,
146 valid,
147 corrupted,
148 deleted,
149 })
150 }
151
152 #[cfg(feature = "s3")]
155 pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
156 use aws_sdk_s3::Client as S3Client;
157
158 let config = crate::config::Config::load()?;
159 let s3_config = config
160 .storage
161 .s3
162 .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
163
164 let aws_config = aws_config::from_env()
165 .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
166 .load()
167 .await;
168
169 let s3_client = S3Client::from_conf(
170 aws_sdk_s3::config::Builder::from(&aws_config)
171 .endpoint_url(&s3_config.endpoint)
172 .force_path_style(true)
173 .build(),
174 );
175
176 let bucket = &s3_config.bucket;
177 let prefix = s3_config.prefix.as_deref().unwrap_or("");
178
179 let mut total = 0;
180 let mut valid = 0;
181 let mut corrupted = 0;
182 let mut deleted = 0;
183 let mut corrupted_keys = Vec::new();
184
185 let mut continuation_token: Option<String> = None;
186
187 loop {
188 let mut list_req = s3_client.list_objects_v2().bucket(bucket).prefix(prefix);
189
190 if let Some(ref token) = continuation_token {
191 list_req = list_req.continuation_token(token);
192 }
193
194 let list_resp = list_req
195 .send()
196 .await
197 .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
198
199 for object in list_resp.contents() {
200 let key = object.key().unwrap_or("");
201
202 if !key.ends_with(".bin") {
203 continue;
204 }
205
206 total += 1;
207
208 let filename = key.strip_prefix(prefix).unwrap_or(key);
209 let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
210
211 if expected_hash_hex.len() != 64 {
212 corrupted += 1;
213 println!(" INVALID KEY: {}", key);
214 corrupted_keys.push(key.to_string());
215 continue;
216 }
217
218 let expected_hash = match from_hex(expected_hash_hex) {
219 Ok(h) => h,
220 Err(_) => {
221 corrupted += 1;
222 println!(" INVALID HEX: {}", key);
223 corrupted_keys.push(key.to_string());
224 continue;
225 }
226 };
227
228 match s3_client.get_object().bucket(bucket).key(key).send().await {
229 Ok(resp) => match resp.body.collect().await {
230 Ok(bytes) => {
231 let data = bytes.into_bytes();
232 let actual_hash = sha256(&data);
233
234 if actual_hash == expected_hash {
235 valid += 1;
236 } else {
237 corrupted += 1;
238 let actual_hex = to_hex(&actual_hash);
239 println!(
240 " CORRUPTED: key={} actual={} size={}",
241 &expected_hash_hex[..16],
242 &actual_hex[..16],
243 data.len()
244 );
245 corrupted_keys.push(key.to_string());
246 }
247 }
248 Err(e) => {
249 corrupted += 1;
250 println!(" READ ERROR: {} - {}", key, e);
251 corrupted_keys.push(key.to_string());
252 }
253 },
254 Err(e) => {
255 corrupted += 1;
256 println!(" FETCH ERROR: {} - {}", key, e);
257 corrupted_keys.push(key.to_string());
258 }
259 }
260
261 if total % 100 == 0 {
262 println!(
263 " Progress: {} objects checked, {} corrupted so far",
264 total, corrupted
265 );
266 }
267 }
268
269 if list_resp.is_truncated() == Some(true) {
270 continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
271 } else {
272 break;
273 }
274 }
275
276 if delete {
277 for key in &corrupted_keys {
278 match s3_client
279 .delete_object()
280 .bucket(bucket)
281 .key(key)
282 .send()
283 .await
284 {
285 Ok(_) => deleted += 1,
286 Err(e) => {
287 println!(" Failed to delete {}: {}", key, e);
288 }
289 }
290 }
291 }
292
293 Ok(VerifyResult {
294 total,
295 valid,
296 corrupted,
297 deleted,
298 })
299 }
300
301 #[cfg(not(feature = "s3"))]
303 pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
304 Err(anyhow::anyhow!("S3 feature not enabled"))
305 }
306
307 pub fn compact_lmdb_environments(
308 &self,
309 env_dirs: &[PathBuf],
310 keep_backup: bool,
311 ) -> Result<Vec<CompactResult>> {
312 compact_lmdb_environments_under(self.base_path(), env_dirs, keep_backup)
313 }
314}
315
316pub fn compact_lmdb_environments_under(
317 base_path: &Path,
318 env_dirs: &[PathBuf],
319 keep_backup: bool,
320) -> Result<Vec<CompactResult>> {
321 let targets = if env_dirs.is_empty() {
322 discover_lmdb_environment_dirs(base_path)?
323 } else {
324 env_dirs
325 .iter()
326 .map(|path| {
327 if path.is_absolute() {
328 path.clone()
329 } else {
330 base_path.join(path)
331 }
332 })
333 .collect()
334 };
335
336 let mut results = Vec::new();
337 for env_dir in targets {
338 results.push(compact_lmdb_environment_dir(&env_dir, keep_backup)?);
339 }
340 Ok(results)
341}
342
343fn discover_lmdb_environment_dirs(root: &Path) -> Result<Vec<PathBuf>> {
344 let mut dirs = Vec::new();
345 collect_lmdb_environment_dirs(root, &mut dirs)?;
346 dirs.sort();
347 Ok(dirs)
348}
349
350fn collect_lmdb_environment_dirs(root: &Path, dirs: &mut Vec<PathBuf>) -> Result<()> {
351 if root.join("data.mdb").exists() {
352 dirs.push(root.to_path_buf());
353 }
354
355 for entry in std::fs::read_dir(root)? {
356 let entry = entry?;
357 let path = entry.path();
358 if path.is_dir() {
359 collect_lmdb_environment_dirs(&path, dirs)?;
360 }
361 }
362
363 Ok(())
364}
365
366fn compact_lmdb_environment_dir(env_dir: &Path, keep_backup: bool) -> Result<CompactResult> {
367 let data_path = env_dir.join("data.mdb");
368 if !data_path.exists() {
369 anyhow::bail!("No data.mdb found in {}", env_dir.display());
370 }
371
372 let before_bytes = std::fs::metadata(&data_path)?.len();
373 let compact_path = env_dir.join("data.mdb.compact");
374 let backup_path = env_dir.join("data.mdb.bak");
375
376 if compact_path.exists() {
377 std::fs::remove_file(&compact_path)?;
378 }
379 if !keep_backup && backup_path.exists() {
380 std::fs::remove_file(&backup_path)?;
381 }
382
383 let open_map_size = existing_lmdb_map_size_bytes(&data_path)?;
384
385 {
386 let env = unsafe {
387 EnvOpenOptions::new()
388 .map_size(open_map_size)
389 .max_dbs(COMPACT_MAX_DBS)
390 .max_readers(COMPACT_MAX_READERS)
391 .open(env_dir)
392 }?;
393 env.force_sync()?;
394 env.copy_to_file(&compact_path, CompactionOption::Enabled)?;
395 }
396
397 let after_bytes = std::fs::metadata(&compact_path)?.len();
398
399 if backup_path.exists() {
400 std::fs::remove_file(&backup_path)?;
401 }
402
403 std::fs::rename(&data_path, &backup_path)?;
404 if let Err(error) = std::fs::rename(&compact_path, &data_path) {
405 let _ = std::fs::rename(&backup_path, &data_path);
406 return Err(error.into());
407 }
408
409 if !keep_backup {
410 std::fs::remove_file(&backup_path)?;
411 }
412
413 Ok(CompactResult {
414 env_dir: env_dir.to_path_buf(),
415 before_bytes,
416 after_bytes,
417 })
418}
419
420fn existing_lmdb_map_size_bytes(data_path: &Path) -> Result<usize> {
421 let file_bytes = std::fs::metadata(data_path)?.len();
422 let aligned_bytes = if file_bytes == 0 {
423 COMPACT_OPEN_MAP_SIZE_BYTES as u64
424 } else {
425 let remainder = file_bytes % COMPACT_PAGE_SIZE_BYTES;
426 if remainder == 0 {
427 file_bytes
428 } else {
429 file_bytes.saturating_add(COMPACT_PAGE_SIZE_BYTES - remainder)
430 }
431 };
432
433 Ok(usize::try_from(aligned_bytes)
434 .unwrap_or(usize::MAX)
435 .max(COMPACT_OPEN_MAP_SIZE_BYTES))
436}