Skip to main content

hashtree_cli/storage/
maintenance.rs

1use anyhow::Result;
2use heed::{CompactionOption, EnvOpenOptions};
3use std::collections::HashSet;
4use std::path::{Path, PathBuf};
5
6use super::{GcStats, HashtreeStore};
7
8#[cfg(feature = "s3")]
9use hashtree_core::from_hex;
10use hashtree_core::{sha256, to_hex};
11
12/// Result of blob integrity verification
13#[derive(Debug, Clone)]
14pub struct VerifyResult {
15    pub total: usize,
16    pub valid: usize,
17    pub corrupted: usize,
18    pub deleted: usize,
19}
20
21#[derive(Debug, Clone)]
22pub struct CompactResult {
23    pub env_dir: PathBuf,
24    pub before_bytes: u64,
25    pub after_bytes: u64,
26}
27
28const COMPACT_MAX_DBS: u32 = 64;
29const COMPACT_MAX_READERS: u32 = 2048;
30const COMPACT_OPEN_MAP_SIZE_BYTES: usize = 10 * 1024 * 1024;
31const COMPACT_PAGE_SIZE_BYTES: u64 = 4096;
32
33impl HashtreeStore {
34    /// Garbage collect unpinned content
35    pub fn gc(&self) -> Result<GcStats> {
36        let rtxn = self.env.read_txn()?;
37
38        // Get all pinned hashes as raw bytes
39        let pinned: HashSet<[u8; 32]> = self
40            .pins
41            .iter(&rtxn)?
42            .filter_map(|item| item.ok())
43            .filter_map(|(hash_bytes, _)| {
44                if hash_bytes.len() == 32 {
45                    let mut hash = [0u8; 32];
46                    hash.copy_from_slice(hash_bytes);
47                    Some(hash)
48                } else {
49                    None
50                }
51            })
52            .collect();
53
54        drop(rtxn);
55
56        // Get all stored hashes
57        let all_hashes = self
58            .router
59            .list()
60            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
61
62        // Delete unpinned hashes
63        let mut deleted = 0;
64        let mut freed_bytes = 0u64;
65
66        for hash in all_hashes {
67            if !pinned.contains(&hash) {
68                if let Ok(Some(data)) = self.router.get_sync(&hash) {
69                    freed_bytes += data.len() as u64;
70                    // Delete locally only - keep S3 as archive
71                    let _ = self.router.delete_local_only(&hash);
72                    deleted += 1;
73                }
74            }
75        }
76
77        Ok(GcStats {
78            deleted_dags: deleted,
79            freed_bytes,
80        })
81    }
82
83    /// Verify LMDB blob integrity - checks that stored data matches its key hash
84    /// Returns verification statistics and optionally deletes corrupted entries
85    pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
86        let all_hashes = self
87            .router
88            .list()
89            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
90
91        let total = all_hashes.len();
92        let mut valid = 0;
93        let mut corrupted = 0;
94        let mut deleted = 0;
95        let mut corrupted_hashes = Vec::new();
96
97        for hash in &all_hashes {
98            let hash_hex = to_hex(hash);
99
100            match self.router.get_sync(hash) {
101                Ok(Some(data)) => {
102                    let actual_hash = sha256(&data);
103
104                    if actual_hash == *hash {
105                        valid += 1;
106                    } else {
107                        corrupted += 1;
108                        let actual_hex = to_hex(&actual_hash);
109                        println!(
110                            "  CORRUPTED: key={} actual={} size={}",
111                            &hash_hex[..16],
112                            &actual_hex[..16],
113                            data.len()
114                        );
115                        corrupted_hashes.push(*hash);
116                    }
117                }
118                Ok(None) => {
119                    corrupted += 1;
120                    println!("  MISSING: key={}", &hash_hex[..16]);
121                    corrupted_hashes.push(*hash);
122                }
123                Err(e) => {
124                    corrupted += 1;
125                    println!("  ERROR: key={} err={}", &hash_hex[..16], e);
126                    corrupted_hashes.push(*hash);
127                }
128            }
129        }
130
131        if delete {
132            for hash in &corrupted_hashes {
133                match self.router.delete_sync(hash) {
134                    Ok(true) => deleted += 1,
135                    Ok(false) => {}
136                    Err(e) => {
137                        let hash_hex = to_hex(hash);
138                        println!("  Failed to delete {}: {}", &hash_hex[..16], e);
139                    }
140                }
141            }
142        }
143
144        Ok(VerifyResult {
145            total,
146            valid,
147            corrupted,
148            deleted,
149        })
150    }
151
152    /// Verify R2/S3 blob integrity - lists all objects and verifies hash matches filename
153    /// Returns verification statistics and optionally deletes corrupted entries
154    #[cfg(feature = "s3")]
155    pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
156        use aws_sdk_s3::Client as S3Client;
157
158        let config = crate::config::Config::load()?;
159        let s3_config = config
160            .storage
161            .s3
162            .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
163
164        let aws_config = aws_config::from_env()
165            .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
166            .load()
167            .await;
168
169        let s3_client = S3Client::from_conf(
170            aws_sdk_s3::config::Builder::from(&aws_config)
171                .endpoint_url(&s3_config.endpoint)
172                .force_path_style(true)
173                .build(),
174        );
175
176        let bucket = &s3_config.bucket;
177        let prefix = s3_config.prefix.as_deref().unwrap_or("");
178
179        let mut total = 0;
180        let mut valid = 0;
181        let mut corrupted = 0;
182        let mut deleted = 0;
183        let mut corrupted_keys = Vec::new();
184
185        let mut continuation_token: Option<String> = None;
186
187        loop {
188            let mut list_req = s3_client.list_objects_v2().bucket(bucket).prefix(prefix);
189
190            if let Some(ref token) = continuation_token {
191                list_req = list_req.continuation_token(token);
192            }
193
194            let list_resp = list_req
195                .send()
196                .await
197                .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
198
199            for object in list_resp.contents() {
200                let key = object.key().unwrap_or("");
201
202                if !key.ends_with(".bin") {
203                    continue;
204                }
205
206                total += 1;
207
208                let filename = key.strip_prefix(prefix).unwrap_or(key);
209                let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
210
211                if expected_hash_hex.len() != 64 {
212                    corrupted += 1;
213                    println!("  INVALID KEY: {}", key);
214                    corrupted_keys.push(key.to_string());
215                    continue;
216                }
217
218                let expected_hash = match from_hex(expected_hash_hex) {
219                    Ok(h) => h,
220                    Err(_) => {
221                        corrupted += 1;
222                        println!("  INVALID HEX: {}", key);
223                        corrupted_keys.push(key.to_string());
224                        continue;
225                    }
226                };
227
228                match s3_client.get_object().bucket(bucket).key(key).send().await {
229                    Ok(resp) => match resp.body.collect().await {
230                        Ok(bytes) => {
231                            let data = bytes.into_bytes();
232                            let actual_hash = sha256(&data);
233
234                            if actual_hash == expected_hash {
235                                valid += 1;
236                            } else {
237                                corrupted += 1;
238                                let actual_hex = to_hex(&actual_hash);
239                                println!(
240                                    "  CORRUPTED: key={} actual={} size={}",
241                                    &expected_hash_hex[..16],
242                                    &actual_hex[..16],
243                                    data.len()
244                                );
245                                corrupted_keys.push(key.to_string());
246                            }
247                        }
248                        Err(e) => {
249                            corrupted += 1;
250                            println!("  READ ERROR: {} - {}", key, e);
251                            corrupted_keys.push(key.to_string());
252                        }
253                    },
254                    Err(e) => {
255                        corrupted += 1;
256                        println!("  FETCH ERROR: {} - {}", key, e);
257                        corrupted_keys.push(key.to_string());
258                    }
259                }
260
261                if total % 100 == 0 {
262                    println!(
263                        "  Progress: {} objects checked, {} corrupted so far",
264                        total, corrupted
265                    );
266                }
267            }
268
269            if list_resp.is_truncated() == Some(true) {
270                continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
271            } else {
272                break;
273            }
274        }
275
276        if delete {
277            for key in &corrupted_keys {
278                match s3_client
279                    .delete_object()
280                    .bucket(bucket)
281                    .key(key)
282                    .send()
283                    .await
284                {
285                    Ok(_) => deleted += 1,
286                    Err(e) => {
287                        println!("  Failed to delete {}: {}", key, e);
288                    }
289                }
290            }
291        }
292
293        Ok(VerifyResult {
294            total,
295            valid,
296            corrupted,
297            deleted,
298        })
299    }
300
301    /// Fallback for non-S3 builds
302    #[cfg(not(feature = "s3"))]
303    pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
304        Err(anyhow::anyhow!("S3 feature not enabled"))
305    }
306
307    pub fn compact_lmdb_environments(
308        &self,
309        env_dirs: &[PathBuf],
310        keep_backup: bool,
311    ) -> Result<Vec<CompactResult>> {
312        compact_lmdb_environments_under(self.base_path(), env_dirs, keep_backup)
313    }
314}
315
316pub fn compact_lmdb_environments_under(
317    base_path: &Path,
318    env_dirs: &[PathBuf],
319    keep_backup: bool,
320) -> Result<Vec<CompactResult>> {
321    let targets = if env_dirs.is_empty() {
322        discover_lmdb_environment_dirs(base_path)?
323    } else {
324        env_dirs
325            .iter()
326            .map(|path| {
327                if path.is_absolute() {
328                    path.clone()
329                } else {
330                    base_path.join(path)
331                }
332            })
333            .collect()
334    };
335
336    let mut results = Vec::new();
337    for env_dir in targets {
338        results.push(compact_lmdb_environment_dir(&env_dir, keep_backup)?);
339    }
340    Ok(results)
341}
342
343fn discover_lmdb_environment_dirs(root: &Path) -> Result<Vec<PathBuf>> {
344    let mut dirs = Vec::new();
345    collect_lmdb_environment_dirs(root, &mut dirs)?;
346    dirs.sort();
347    Ok(dirs)
348}
349
350fn collect_lmdb_environment_dirs(root: &Path, dirs: &mut Vec<PathBuf>) -> Result<()> {
351    if root.join("data.mdb").exists() {
352        dirs.push(root.to_path_buf());
353    }
354
355    for entry in std::fs::read_dir(root)? {
356        let entry = entry?;
357        let path = entry.path();
358        if path.is_dir() {
359            collect_lmdb_environment_dirs(&path, dirs)?;
360        }
361    }
362
363    Ok(())
364}
365
366fn compact_lmdb_environment_dir(env_dir: &Path, keep_backup: bool) -> Result<CompactResult> {
367    let data_path = env_dir.join("data.mdb");
368    if !data_path.exists() {
369        anyhow::bail!("No data.mdb found in {}", env_dir.display());
370    }
371
372    let before_bytes = std::fs::metadata(&data_path)?.len();
373    let compact_path = env_dir.join("data.mdb.compact");
374    let backup_path = env_dir.join("data.mdb.bak");
375
376    if compact_path.exists() {
377        std::fs::remove_file(&compact_path)?;
378    }
379    if !keep_backup && backup_path.exists() {
380        std::fs::remove_file(&backup_path)?;
381    }
382
383    let open_map_size = existing_lmdb_map_size_bytes(&data_path)?;
384
385    {
386        let env = unsafe {
387            EnvOpenOptions::new()
388                .map_size(open_map_size)
389                .max_dbs(COMPACT_MAX_DBS)
390                .max_readers(COMPACT_MAX_READERS)
391                .open(env_dir)
392        }?;
393        env.force_sync()?;
394        env.copy_to_file(&compact_path, CompactionOption::Enabled)?;
395    }
396
397    let after_bytes = std::fs::metadata(&compact_path)?.len();
398
399    if backup_path.exists() {
400        std::fs::remove_file(&backup_path)?;
401    }
402
403    std::fs::rename(&data_path, &backup_path)?;
404    if let Err(error) = std::fs::rename(&compact_path, &data_path) {
405        let _ = std::fs::rename(&backup_path, &data_path);
406        return Err(error.into());
407    }
408
409    if !keep_backup {
410        std::fs::remove_file(&backup_path)?;
411    }
412
413    Ok(CompactResult {
414        env_dir: env_dir.to_path_buf(),
415        before_bytes,
416        after_bytes,
417    })
418}
419
420fn existing_lmdb_map_size_bytes(data_path: &Path) -> Result<usize> {
421    let file_bytes = std::fs::metadata(data_path)?.len();
422    let aligned_bytes = if file_bytes == 0 {
423        COMPACT_OPEN_MAP_SIZE_BYTES as u64
424    } else {
425        let remainder = file_bytes % COMPACT_PAGE_SIZE_BYTES;
426        if remainder == 0 {
427            file_bytes
428        } else {
429            file_bytes.saturating_add(COMPACT_PAGE_SIZE_BYTES - remainder)
430        }
431    };
432
433    Ok(usize::try_from(aligned_bytes)
434        .unwrap_or(usize::MAX)
435        .max(COMPACT_OPEN_MAP_SIZE_BYTES))
436}