hashtree_cli/storage/
maintenance.rs1use anyhow::Result;
2use std::collections::HashSet;
3
4use super::{GcStats, HashtreeStore};
5
6#[cfg(feature = "s3")]
7use hashtree_core::from_hex;
8use hashtree_core::{sha256, to_hex};
9
10#[derive(Debug, Clone)]
12pub struct VerifyResult {
13 pub total: usize,
14 pub valid: usize,
15 pub corrupted: usize,
16 pub deleted: usize,
17}
18
19impl HashtreeStore {
20 pub fn gc(&self) -> Result<GcStats> {
22 let rtxn = self.env.read_txn()?;
23
24 let pinned: HashSet<[u8; 32]> = self
26 .pins
27 .iter(&rtxn)?
28 .filter_map(|item| item.ok())
29 .filter_map(|(hash_bytes, _)| {
30 if hash_bytes.len() == 32 {
31 let mut hash = [0u8; 32];
32 hash.copy_from_slice(hash_bytes);
33 Some(hash)
34 } else {
35 None
36 }
37 })
38 .collect();
39
40 drop(rtxn);
41
42 let all_hashes = self
44 .router
45 .list()
46 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
47
48 let mut deleted = 0;
50 let mut freed_bytes = 0u64;
51
52 for hash in all_hashes {
53 if !pinned.contains(&hash) {
54 if let Ok(Some(data)) = self.router.get_sync(&hash) {
55 freed_bytes += data.len() as u64;
56 let _ = self.router.delete_local_only(&hash);
58 deleted += 1;
59 }
60 }
61 }
62
63 Ok(GcStats {
64 deleted_dags: deleted,
65 freed_bytes,
66 })
67 }
68
69 pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
72 let all_hashes = self
73 .router
74 .list()
75 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
76
77 let total = all_hashes.len();
78 let mut valid = 0;
79 let mut corrupted = 0;
80 let mut deleted = 0;
81 let mut corrupted_hashes = Vec::new();
82
83 for hash in &all_hashes {
84 let hash_hex = to_hex(hash);
85
86 match self.router.get_sync(hash) {
87 Ok(Some(data)) => {
88 let actual_hash = sha256(&data);
89
90 if actual_hash == *hash {
91 valid += 1;
92 } else {
93 corrupted += 1;
94 let actual_hex = to_hex(&actual_hash);
95 println!(
96 " CORRUPTED: key={} actual={} size={}",
97 &hash_hex[..16],
98 &actual_hex[..16],
99 data.len()
100 );
101 corrupted_hashes.push(*hash);
102 }
103 }
104 Ok(None) => {
105 corrupted += 1;
106 println!(" MISSING: key={}", &hash_hex[..16]);
107 corrupted_hashes.push(*hash);
108 }
109 Err(e) => {
110 corrupted += 1;
111 println!(" ERROR: key={} err={}", &hash_hex[..16], e);
112 corrupted_hashes.push(*hash);
113 }
114 }
115 }
116
117 if delete {
118 for hash in &corrupted_hashes {
119 match self.router.delete_sync(hash) {
120 Ok(true) => deleted += 1,
121 Ok(false) => {}
122 Err(e) => {
123 let hash_hex = to_hex(hash);
124 println!(" Failed to delete {}: {}", &hash_hex[..16], e);
125 }
126 }
127 }
128 }
129
130 Ok(VerifyResult {
131 total,
132 valid,
133 corrupted,
134 deleted,
135 })
136 }
137
138 #[cfg(feature = "s3")]
141 pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
142 use aws_sdk_s3::Client as S3Client;
143
144 let config = crate::config::Config::load()?;
145 let s3_config = config
146 .storage
147 .s3
148 .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
149
150 let aws_config = aws_config::from_env()
151 .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
152 .load()
153 .await;
154
155 let s3_client = S3Client::from_conf(
156 aws_sdk_s3::config::Builder::from(&aws_config)
157 .endpoint_url(&s3_config.endpoint)
158 .force_path_style(true)
159 .build(),
160 );
161
162 let bucket = &s3_config.bucket;
163 let prefix = s3_config.prefix.as_deref().unwrap_or("");
164
165 let mut total = 0;
166 let mut valid = 0;
167 let mut corrupted = 0;
168 let mut deleted = 0;
169 let mut corrupted_keys = Vec::new();
170
171 let mut continuation_token: Option<String> = None;
172
173 loop {
174 let mut list_req = s3_client.list_objects_v2().bucket(bucket).prefix(prefix);
175
176 if let Some(ref token) = continuation_token {
177 list_req = list_req.continuation_token(token);
178 }
179
180 let list_resp = list_req
181 .send()
182 .await
183 .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
184
185 for object in list_resp.contents() {
186 let key = object.key().unwrap_or("");
187
188 if !key.ends_with(".bin") {
189 continue;
190 }
191
192 total += 1;
193
194 let filename = key.strip_prefix(prefix).unwrap_or(key);
195 let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
196
197 if expected_hash_hex.len() != 64 {
198 corrupted += 1;
199 println!(" INVALID KEY: {}", key);
200 corrupted_keys.push(key.to_string());
201 continue;
202 }
203
204 let expected_hash = match from_hex(expected_hash_hex) {
205 Ok(h) => h,
206 Err(_) => {
207 corrupted += 1;
208 println!(" INVALID HEX: {}", key);
209 corrupted_keys.push(key.to_string());
210 continue;
211 }
212 };
213
214 match s3_client.get_object().bucket(bucket).key(key).send().await {
215 Ok(resp) => match resp.body.collect().await {
216 Ok(bytes) => {
217 let data = bytes.into_bytes();
218 let actual_hash = sha256(&data);
219
220 if actual_hash == expected_hash {
221 valid += 1;
222 } else {
223 corrupted += 1;
224 let actual_hex = to_hex(&actual_hash);
225 println!(
226 " CORRUPTED: key={} actual={} size={}",
227 &expected_hash_hex[..16],
228 &actual_hex[..16],
229 data.len()
230 );
231 corrupted_keys.push(key.to_string());
232 }
233 }
234 Err(e) => {
235 corrupted += 1;
236 println!(" READ ERROR: {} - {}", key, e);
237 corrupted_keys.push(key.to_string());
238 }
239 },
240 Err(e) => {
241 corrupted += 1;
242 println!(" FETCH ERROR: {} - {}", key, e);
243 corrupted_keys.push(key.to_string());
244 }
245 }
246
247 if total % 100 == 0 {
248 println!(
249 " Progress: {} objects checked, {} corrupted so far",
250 total, corrupted
251 );
252 }
253 }
254
255 if list_resp.is_truncated() == Some(true) {
256 continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
257 } else {
258 break;
259 }
260 }
261
262 if delete {
263 for key in &corrupted_keys {
264 match s3_client
265 .delete_object()
266 .bucket(bucket)
267 .key(key)
268 .send()
269 .await
270 {
271 Ok(_) => deleted += 1,
272 Err(e) => {
273 println!(" Failed to delete {}: {}", key, e);
274 }
275 }
276 }
277 }
278
279 Ok(VerifyResult {
280 total,
281 valid,
282 corrupted,
283 deleted,
284 })
285 }
286
287 #[cfg(not(feature = "s3"))]
289 pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
290 Err(anyhow::anyhow!("S3 feature not enabled"))
291 }
292}