Skip to main content

hashtree_cli/storage/
maintenance.rs

1use anyhow::Result;
2use std::collections::HashSet;
3
4use super::{GcStats, HashtreeStore};
5
6#[cfg(feature = "s3")]
7use hashtree_core::from_hex;
8use hashtree_core::{sha256, to_hex};
9
10/// Result of blob integrity verification
11#[derive(Debug, Clone)]
12pub struct VerifyResult {
13    pub total: usize,
14    pub valid: usize,
15    pub corrupted: usize,
16    pub deleted: usize,
17}
18
19impl HashtreeStore {
20    /// Garbage collect unpinned content
21    pub fn gc(&self) -> Result<GcStats> {
22        let rtxn = self.env.read_txn()?;
23
24        // Get all pinned hashes as raw bytes
25        let pinned: HashSet<[u8; 32]> = self
26            .pins
27            .iter(&rtxn)?
28            .filter_map(|item| item.ok())
29            .filter_map(|(hash_bytes, _)| {
30                if hash_bytes.len() == 32 {
31                    let mut hash = [0u8; 32];
32                    hash.copy_from_slice(hash_bytes);
33                    Some(hash)
34                } else {
35                    None
36                }
37            })
38            .collect();
39
40        drop(rtxn);
41
42        // Get all stored hashes
43        let all_hashes = self
44            .router
45            .list()
46            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
47
48        // Delete unpinned hashes
49        let mut deleted = 0;
50        let mut freed_bytes = 0u64;
51
52        for hash in all_hashes {
53            if !pinned.contains(&hash) {
54                if let Ok(Some(data)) = self.router.get_sync(&hash) {
55                    freed_bytes += data.len() as u64;
56                    // Delete locally only - keep S3 as archive
57                    let _ = self.router.delete_local_only(&hash);
58                    deleted += 1;
59                }
60            }
61        }
62
63        Ok(GcStats {
64            deleted_dags: deleted,
65            freed_bytes,
66        })
67    }
68
69    /// Verify LMDB blob integrity - checks that stored data matches its key hash
70    /// Returns verification statistics and optionally deletes corrupted entries
71    pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
72        let all_hashes = self
73            .router
74            .list()
75            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
76
77        let total = all_hashes.len();
78        let mut valid = 0;
79        let mut corrupted = 0;
80        let mut deleted = 0;
81        let mut corrupted_hashes = Vec::new();
82
83        for hash in &all_hashes {
84            let hash_hex = to_hex(hash);
85
86            match self.router.get_sync(hash) {
87                Ok(Some(data)) => {
88                    let actual_hash = sha256(&data);
89
90                    if actual_hash == *hash {
91                        valid += 1;
92                    } else {
93                        corrupted += 1;
94                        let actual_hex = to_hex(&actual_hash);
95                        println!(
96                            "  CORRUPTED: key={} actual={} size={}",
97                            &hash_hex[..16],
98                            &actual_hex[..16],
99                            data.len()
100                        );
101                        corrupted_hashes.push(*hash);
102                    }
103                }
104                Ok(None) => {
105                    corrupted += 1;
106                    println!("  MISSING: key={}", &hash_hex[..16]);
107                    corrupted_hashes.push(*hash);
108                }
109                Err(e) => {
110                    corrupted += 1;
111                    println!("  ERROR: key={} err={}", &hash_hex[..16], e);
112                    corrupted_hashes.push(*hash);
113                }
114            }
115        }
116
117        if delete {
118            for hash in &corrupted_hashes {
119                match self.router.delete_sync(hash) {
120                    Ok(true) => deleted += 1,
121                    Ok(false) => {}
122                    Err(e) => {
123                        let hash_hex = to_hex(hash);
124                        println!("  Failed to delete {}: {}", &hash_hex[..16], e);
125                    }
126                }
127            }
128        }
129
130        Ok(VerifyResult {
131            total,
132            valid,
133            corrupted,
134            deleted,
135        })
136    }
137
138    /// Verify R2/S3 blob integrity - lists all objects and verifies hash matches filename
139    /// Returns verification statistics and optionally deletes corrupted entries
140    #[cfg(feature = "s3")]
141    pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
142        use aws_sdk_s3::Client as S3Client;
143
144        let config = crate::config::Config::load()?;
145        let s3_config = config
146            .storage
147            .s3
148            .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
149
150        let aws_config = aws_config::from_env()
151            .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
152            .load()
153            .await;
154
155        let s3_client = S3Client::from_conf(
156            aws_sdk_s3::config::Builder::from(&aws_config)
157                .endpoint_url(&s3_config.endpoint)
158                .force_path_style(true)
159                .build(),
160        );
161
162        let bucket = &s3_config.bucket;
163        let prefix = s3_config.prefix.as_deref().unwrap_or("");
164
165        let mut total = 0;
166        let mut valid = 0;
167        let mut corrupted = 0;
168        let mut deleted = 0;
169        let mut corrupted_keys = Vec::new();
170
171        let mut continuation_token: Option<String> = None;
172
173        loop {
174            let mut list_req = s3_client.list_objects_v2().bucket(bucket).prefix(prefix);
175
176            if let Some(ref token) = continuation_token {
177                list_req = list_req.continuation_token(token);
178            }
179
180            let list_resp = list_req
181                .send()
182                .await
183                .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
184
185            for object in list_resp.contents() {
186                let key = object.key().unwrap_or("");
187
188                if !key.ends_with(".bin") {
189                    continue;
190                }
191
192                total += 1;
193
194                let filename = key.strip_prefix(prefix).unwrap_or(key);
195                let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
196
197                if expected_hash_hex.len() != 64 {
198                    corrupted += 1;
199                    println!("  INVALID KEY: {}", key);
200                    corrupted_keys.push(key.to_string());
201                    continue;
202                }
203
204                let expected_hash = match from_hex(expected_hash_hex) {
205                    Ok(h) => h,
206                    Err(_) => {
207                        corrupted += 1;
208                        println!("  INVALID HEX: {}", key);
209                        corrupted_keys.push(key.to_string());
210                        continue;
211                    }
212                };
213
214                match s3_client.get_object().bucket(bucket).key(key).send().await {
215                    Ok(resp) => match resp.body.collect().await {
216                        Ok(bytes) => {
217                            let data = bytes.into_bytes();
218                            let actual_hash = sha256(&data);
219
220                            if actual_hash == expected_hash {
221                                valid += 1;
222                            } else {
223                                corrupted += 1;
224                                let actual_hex = to_hex(&actual_hash);
225                                println!(
226                                    "  CORRUPTED: key={} actual={} size={}",
227                                    &expected_hash_hex[..16],
228                                    &actual_hex[..16],
229                                    data.len()
230                                );
231                                corrupted_keys.push(key.to_string());
232                            }
233                        }
234                        Err(e) => {
235                            corrupted += 1;
236                            println!("  READ ERROR: {} - {}", key, e);
237                            corrupted_keys.push(key.to_string());
238                        }
239                    },
240                    Err(e) => {
241                        corrupted += 1;
242                        println!("  FETCH ERROR: {} - {}", key, e);
243                        corrupted_keys.push(key.to_string());
244                    }
245                }
246
247                if total % 100 == 0 {
248                    println!(
249                        "  Progress: {} objects checked, {} corrupted so far",
250                        total, corrupted
251                    );
252                }
253            }
254
255            if list_resp.is_truncated() == Some(true) {
256                continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
257            } else {
258                break;
259            }
260        }
261
262        if delete {
263            for key in &corrupted_keys {
264                match s3_client
265                    .delete_object()
266                    .bucket(bucket)
267                    .key(key)
268                    .send()
269                    .await
270                {
271                    Ok(_) => deleted += 1,
272                    Err(e) => {
273                        println!("  Failed to delete {}: {}", key, e);
274                    }
275                }
276            }
277        }
278
279        Ok(VerifyResult {
280            total,
281            valid,
282            corrupted,
283            deleted,
284        })
285    }
286
287    /// Fallback for non-S3 builds
288    #[cfg(not(feature = "s3"))]
289    pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
290        Err(anyhow::anyhow!("S3 feature not enabled"))
291    }
292}