Skip to main content

hashtree_cli/storage/
maintenance.rs

1use anyhow::Result;
2use heed::{CompactionOption, EnvOpenOptions};
3use std::collections::HashSet;
4use std::path::{Path, PathBuf};
5#[cfg(feature = "s3")]
6use std::sync::Arc;
7#[cfg(feature = "s3")]
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9
10use super::{GcStats, HashtreeStore};
11
12#[cfg(feature = "s3")]
13use futures::{stream::FuturesUnordered, StreamExt};
14#[cfg(feature = "s3")]
15use hashtree_core::from_hex;
16use hashtree_core::{sha256, to_hex};
17use serde::{Deserialize, Serialize};
18
19/// Result of blob integrity verification
20#[derive(Debug, Clone)]
21pub struct VerifyResult {
22    pub total: usize,
23    pub valid: usize,
24    pub corrupted: usize,
25    pub deleted: usize,
26}
27
28#[derive(Debug, Clone)]
29pub struct CompactResult {
30    pub env_dir: PathBuf,
31    pub before_bytes: u64,
32    pub after_bytes: u64,
33}
34
35#[derive(Debug, Clone, Default)]
36pub struct R2ImportOptions {
37    pub concurrency: usize,
38    pub check_only: bool,
39    pub resume: bool,
40    pub fast_list: bool,
41    pub stream_merge: bool,
42    pub keys: Vec<String>,
43    pub keys_file: Option<PathBuf>,
44    pub start_after: Option<String>,
45    pub scan_prefix: Option<String>,
46    pub state_file: Option<PathBuf>,
47    pub max_objects: Option<usize>,
48    pub progress_every: usize,
49    pub scan_delay_ms: u64,
50}
51
52#[derive(Debug, Clone, Default, Serialize, Deserialize)]
53pub struct R2ImportResult {
54    pub listed: usize,
55    pub skipped: usize,
56    pub missing: usize,
57    pub imported: usize,
58    pub corrupted: usize,
59    pub failed: usize,
60    pub bytes_imported: u64,
61    pub last_key: Option<String>,
62    pub completed: bool,
63}
64
65#[cfg(feature = "s3")]
66#[derive(Debug, Clone, Default, Serialize, Deserialize)]
67struct R2ImportState {
68    #[serde(flatten)]
69    result: R2ImportResult,
70    updated_at_unix: u64,
71}
72
73#[cfg(feature = "s3")]
74#[derive(Debug, Clone)]
75struct R2ObjectCandidate {
76    key: String,
77    hash: hashtree_core::types::Hash,
78}
79
80#[cfg(feature = "s3")]
81#[derive(Debug, Clone, Default)]
82struct R2ObjectImportOutcome {
83    skipped: bool,
84    missing: bool,
85    imported: bool,
86    corrupted: bool,
87    failed: bool,
88    bytes_imported: u64,
89    message: Option<String>,
90}
91
92#[cfg(feature = "s3")]
93const R2_IMPORT_OBJECT_READ_ATTEMPTS: usize = 4;
94#[cfg(feature = "s3")]
95const R2_IMPORT_OBJECT_RETRY_BASE_DELAY_MS: u64 = 250;
96
97const COMPACT_MAX_DBS: u32 = 64;
98const COMPACT_MAX_READERS: u32 = 2048;
99const COMPACT_OPEN_MAP_SIZE_BYTES: usize = 10 * 1024 * 1024;
100const COMPACT_PAGE_SIZE_BYTES: u64 = 4096;
101
102#[cfg(feature = "s3")]
103fn unix_timestamp_now() -> u64 {
104    SystemTime::now()
105        .duration_since(UNIX_EPOCH)
106        .unwrap_or_default()
107        .as_secs()
108}
109
110#[cfg(feature = "s3")]
111fn r2_import_key_hash(prefix: &str, key: &str) -> Option<hashtree_core::types::Hash> {
112    let filename = key.strip_prefix(prefix).unwrap_or(key);
113    let hash_hex = filename.strip_suffix(".bin")?;
114    if hash_hex.contains('/') {
115        return None;
116    }
117    if hash_hex.len() != 64 {
118        return None;
119    }
120    from_hex(hash_hex).ok()
121}
122
123#[cfg(feature = "s3")]
124fn r2_import_key_candidate(prefix: &str, input: &str) -> Option<R2ObjectCandidate> {
125    let input = input.trim();
126    if input.is_empty() {
127        return None;
128    }
129
130    let key = if input.len() == 64 && input.chars().all(|ch| ch.is_ascii_hexdigit()) {
131        format!("{prefix}{input}.bin")
132    } else if !prefix.is_empty() && !input.starts_with(prefix) && !input.contains('/') {
133        format!("{prefix}{input}")
134    } else {
135        input.to_string()
136    };
137
138    let hash = r2_import_key_hash(prefix, &key)?;
139    Some(R2ObjectCandidate { key, hash })
140}
141
142#[cfg(feature = "s3")]
143fn existing_r2_candidates(
144    local: &super::LocalStore,
145    candidates: &[R2ObjectCandidate],
146) -> Result<Vec<bool>> {
147    let mut indexed_hashes: Vec<(usize, hashtree_core::types::Hash)> = candidates
148        .iter()
149        .enumerate()
150        .map(|(index, candidate)| (index, candidate.hash))
151        .collect();
152    indexed_hashes.sort_unstable_by(|left, right| left.1.cmp(&right.1).then(left.0.cmp(&right.0)));
153
154    let sorted_hashes: Vec<hashtree_core::types::Hash> =
155        indexed_hashes.iter().map(|(_, hash)| *hash).collect();
156    let sorted_existing = local
157        .existing_hashes_in_sorted_candidates(&sorted_hashes)
158        .map_err(|err| anyhow::anyhow!("Failed to compare local hashes: {err}"))?;
159
160    let mut existing = vec![false; candidates.len()];
161    for ((candidate_index, _), exists) in indexed_hashes.into_iter().zip(sorted_existing) {
162        existing[candidate_index] = exists;
163    }
164    Ok(existing)
165}
166
167#[cfg(feature = "s3")]
168fn read_r2_import_keys_file(path: &Path) -> Result<Vec<String>> {
169    let raw = std::fs::read_to_string(path)?;
170    Ok(raw
171        .lines()
172        .map(str::trim)
173        .filter(|line| !line.is_empty() && !line.starts_with('#'))
174        .map(ToOwned::to_owned)
175        .collect())
176}
177
178#[cfg(feature = "s3")]
179fn read_r2_import_state(path: &Path) -> Option<R2ImportState> {
180    let raw = std::fs::read_to_string(path).ok()?;
181    serde_json::from_str(&raw).ok()
182}
183
184#[cfg(feature = "s3")]
185async fn fetch_r2_object_body_with_retries(
186    client: &aws_sdk_s3::Client,
187    bucket: &str,
188    key: &str,
189) -> Result<Vec<u8>, String> {
190    let mut last_error = None;
191    for attempt in 1..=R2_IMPORT_OBJECT_READ_ATTEMPTS {
192        let output = match client.get_object().bucket(bucket).key(key).send().await {
193            Ok(output) => output,
194            Err(err) => {
195                last_error = Some(format!("fetch failed for {key}: {err}"));
196                if attempt < R2_IMPORT_OBJECT_READ_ATTEMPTS {
197                    let delay_ms = R2_IMPORT_OBJECT_RETRY_BASE_DELAY_MS << (attempt - 1);
198                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
199                }
200                continue;
201            }
202        };
203
204        match output.body.collect().await {
205            Ok(body) => return Ok(body.into_bytes().to_vec()),
206            Err(err) => {
207                last_error = Some(format!("read failed for {key}: {err}"));
208                if attempt < R2_IMPORT_OBJECT_READ_ATTEMPTS {
209                    let delay_ms = R2_IMPORT_OBJECT_RETRY_BASE_DELAY_MS << (attempt - 1);
210                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
211                }
212            }
213        }
214    }
215
216    Err(format!(
217        "{} after {} attempt(s)",
218        last_error.unwrap_or_else(|| format!("fetch failed for {key}: unknown error")),
219        R2_IMPORT_OBJECT_READ_ATTEMPTS
220    ))
221}
222
223#[cfg(feature = "s3")]
224fn write_r2_import_state(path: &Path, result: &R2ImportResult) -> Result<()> {
225    if let Some(parent) = path.parent() {
226        std::fs::create_dir_all(parent)?;
227    }
228    let state = R2ImportState {
229        result: result.clone(),
230        updated_at_unix: unix_timestamp_now(),
231    };
232    std::fs::write(path, serde_json::to_vec_pretty(&state)?)?;
233    Ok(())
234}
235
236#[cfg(feature = "s3")]
237async fn import_r2_object_to_local(
238    client: Arc<aws_sdk_s3::Client>,
239    bucket: Arc<String>,
240    local: Arc<super::LocalStore>,
241    candidate: R2ObjectCandidate,
242    check_only: bool,
243    prechecked_missing: bool,
244) -> R2ObjectImportOutcome {
245    if !prechecked_missing {
246        match local.exists(&candidate.hash) {
247            Ok(true) => {
248                return R2ObjectImportOutcome {
249                    skipped: true,
250                    ..Default::default()
251                };
252            }
253            Ok(false) => {}
254            Err(err) => {
255                return R2ObjectImportOutcome {
256                    failed: true,
257                    message: Some(format!("local exists failed for {}: {err}", candidate.key)),
258                    ..Default::default()
259                };
260            }
261        }
262    }
263
264    if check_only {
265        return R2ObjectImportOutcome {
266            missing: true,
267            ..Default::default()
268        };
269    }
270
271    let body =
272        match fetch_r2_object_body_with_retries(client.as_ref(), bucket.as_str(), &candidate.key)
273            .await
274        {
275            Ok(body) => body,
276            Err(err) => {
277                return R2ObjectImportOutcome {
278                    missing: true,
279                    failed: true,
280                    message: Some(err),
281                    ..Default::default()
282                };
283            }
284        };
285    let data = body.as_slice();
286    let actual_hash = sha256(data);
287    if actual_hash != candidate.hash {
288        return R2ObjectImportOutcome {
289            missing: true,
290            corrupted: true,
291            message: Some(format!(
292                "hash mismatch for {}: actual {}",
293                candidate.key,
294                to_hex(&actual_hash)
295            )),
296            ..Default::default()
297        };
298    }
299
300    match local.put_sync(candidate.hash, data) {
301        Ok(inserted) => R2ObjectImportOutcome {
302            missing: true,
303            imported: inserted,
304            skipped: !inserted,
305            bytes_imported: if inserted { data.len() as u64 } else { 0 },
306            ..Default::default()
307        },
308        Err(err) => R2ObjectImportOutcome {
309            missing: true,
310            failed: true,
311            message: Some(format!("local put failed for {}: {err}", candidate.key)),
312            ..Default::default()
313        },
314    }
315}
316
317#[cfg(feature = "s3")]
318async fn settle_one_r2_import(
319    pending: &mut FuturesUnordered<impl std::future::Future<Output = R2ObjectImportOutcome>>,
320    result: &mut R2ImportResult,
321) {
322    if let Some(outcome) = pending.next().await {
323        if outcome.skipped {
324            result.skipped += 1;
325        }
326        if outcome.missing {
327            result.missing += 1;
328        }
329        if outcome.imported {
330            result.imported += 1;
331            result.bytes_imported = result.bytes_imported.saturating_add(outcome.bytes_imported);
332        }
333        if outcome.corrupted {
334            result.corrupted += 1;
335        }
336        if outcome.failed {
337            result.failed += 1;
338        }
339        if let Some(message) = outcome.message {
340            println!("  {message}");
341        }
342    }
343}
344
345impl HashtreeStore {
346    /// Garbage collect unpinned content
347    pub fn gc(&self) -> Result<GcStats> {
348        let rtxn = self.env.read_txn()?;
349
350        // Get all pinned hashes as raw bytes
351        let pinned: HashSet<[u8; 32]> = self
352            .pins
353            .iter(&rtxn)?
354            .filter_map(|item| item.ok())
355            .filter_map(|(hash_bytes, _)| {
356                if hash_bytes.len() == 32 {
357                    let mut hash = [0u8; 32];
358                    hash.copy_from_slice(hash_bytes);
359                    Some(hash)
360                } else {
361                    None
362                }
363            })
364            .collect();
365
366        drop(rtxn);
367
368        // Get all stored hashes
369        let all_hashes = self
370            .router
371            .list()
372            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
373
374        // Delete unpinned hashes
375        let mut deleted = 0;
376        let mut freed_bytes = 0u64;
377
378        for hash in all_hashes {
379            if !pinned.contains(&hash) {
380                if let Ok(Some(data)) = self.router.get_sync(&hash) {
381                    freed_bytes += data.len() as u64;
382                    // Delete locally only - keep S3 as archive
383                    let _ = self.router.delete_local_only(&hash);
384                    deleted += 1;
385                }
386            }
387        }
388
389        Ok(GcStats {
390            deleted_dags: deleted,
391            freed_bytes,
392        })
393    }
394
395    /// Verify LMDB blob integrity - checks that stored data matches its key hash
396    /// Returns verification statistics and optionally deletes corrupted entries
397    pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
398        let all_hashes = self
399            .router
400            .list()
401            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
402
403        let total = all_hashes.len();
404        let mut valid = 0;
405        let mut corrupted = 0;
406        let mut deleted = 0;
407        let mut corrupted_hashes = Vec::new();
408
409        for hash in &all_hashes {
410            let hash_hex = to_hex(hash);
411
412            match self.router.get_sync(hash) {
413                Ok(Some(data)) => {
414                    let actual_hash = sha256(&data);
415
416                    if actual_hash == *hash {
417                        valid += 1;
418                    } else {
419                        corrupted += 1;
420                        let actual_hex = to_hex(&actual_hash);
421                        println!(
422                            "  CORRUPTED: key={} actual={} size={}",
423                            &hash_hex[..16],
424                            &actual_hex[..16],
425                            data.len()
426                        );
427                        corrupted_hashes.push(*hash);
428                    }
429                }
430                Ok(None) => {
431                    corrupted += 1;
432                    println!("  MISSING: key={}", &hash_hex[..16]);
433                    corrupted_hashes.push(*hash);
434                }
435                Err(e) => {
436                    corrupted += 1;
437                    println!("  ERROR: key={} err={}", &hash_hex[..16], e);
438                    corrupted_hashes.push(*hash);
439                }
440            }
441        }
442
443        if delete {
444            for hash in &corrupted_hashes {
445                match self.router.delete_sync(hash) {
446                    Ok(true) => deleted += 1,
447                    Ok(false) => {}
448                    Err(e) => {
449                        let hash_hex = to_hex(hash);
450                        println!("  Failed to delete {}: {}", &hash_hex[..16], e);
451                    }
452                }
453            }
454        }
455
456        Ok(VerifyResult {
457            total,
458            valid,
459            corrupted,
460            deleted,
461        })
462    }
463
464    /// Verify R2/S3 blob integrity - lists all objects and verifies hash matches filename
465    /// Returns verification statistics and optionally deletes corrupted entries
466    #[cfg(feature = "s3")]
467    pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
468        use aws_sdk_s3::Client as S3Client;
469
470        let config = crate::config::Config::load()?;
471        let s3_config = config
472            .storage
473            .s3
474            .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
475
476        let aws_config = aws_config::from_env()
477            .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
478            .load()
479            .await;
480
481        let s3_client = S3Client::from_conf(
482            aws_sdk_s3::config::Builder::from(&aws_config)
483                .endpoint_url(&s3_config.endpoint)
484                .force_path_style(true)
485                .build(),
486        );
487
488        let bucket = &s3_config.bucket;
489        let prefix = s3_config.prefix.as_deref().unwrap_or("");
490
491        let mut total = 0;
492        let mut valid = 0;
493        let mut corrupted = 0;
494        let mut deleted = 0;
495        let mut corrupted_keys = Vec::new();
496
497        let mut continuation_token: Option<String> = None;
498
499        loop {
500            let mut list_req = s3_client.list_objects_v2().bucket(bucket).prefix(prefix);
501
502            if let Some(ref token) = continuation_token {
503                list_req = list_req.continuation_token(token);
504            }
505
506            let list_resp = list_req
507                .send()
508                .await
509                .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
510
511            for object in list_resp.contents() {
512                let key = object.key().unwrap_or("");
513
514                if !key.ends_with(".bin") {
515                    continue;
516                }
517
518                total += 1;
519
520                let filename = key.strip_prefix(prefix).unwrap_or(key);
521                let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
522
523                if expected_hash_hex.len() != 64 {
524                    corrupted += 1;
525                    println!("  INVALID KEY: {}", key);
526                    corrupted_keys.push(key.to_string());
527                    continue;
528                }
529
530                let expected_hash = match from_hex(expected_hash_hex) {
531                    Ok(h) => h,
532                    Err(_) => {
533                        corrupted += 1;
534                        println!("  INVALID HEX: {}", key);
535                        corrupted_keys.push(key.to_string());
536                        continue;
537                    }
538                };
539
540                match s3_client.get_object().bucket(bucket).key(key).send().await {
541                    Ok(resp) => match resp.body.collect().await {
542                        Ok(bytes) => {
543                            let data = bytes.into_bytes();
544                            let actual_hash = sha256(&data);
545
546                            if actual_hash == expected_hash {
547                                valid += 1;
548                            } else {
549                                corrupted += 1;
550                                let actual_hex = to_hex(&actual_hash);
551                                println!(
552                                    "  CORRUPTED: key={} actual={} size={}",
553                                    &expected_hash_hex[..16],
554                                    &actual_hex[..16],
555                                    data.len()
556                                );
557                                corrupted_keys.push(key.to_string());
558                            }
559                        }
560                        Err(e) => {
561                            corrupted += 1;
562                            println!("  READ ERROR: {} - {}", key, e);
563                            corrupted_keys.push(key.to_string());
564                        }
565                    },
566                    Err(e) => {
567                        corrupted += 1;
568                        println!("  FETCH ERROR: {} - {}", key, e);
569                        corrupted_keys.push(key.to_string());
570                    }
571                }
572
573                if total % 100 == 0 {
574                    println!(
575                        "  Progress: {} objects checked, {} corrupted so far",
576                        total, corrupted
577                    );
578                }
579            }
580
581            if list_resp.is_truncated() == Some(true) {
582                continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
583            } else {
584                break;
585            }
586        }
587
588        if delete {
589            for key in &corrupted_keys {
590                match s3_client
591                    .delete_object()
592                    .bucket(bucket)
593                    .key(key)
594                    .send()
595                    .await
596                {
597                    Ok(_) => deleted += 1,
598                    Err(e) => {
599                        println!("  Failed to delete {}: {}", key, e);
600                    }
601                }
602            }
603        }
604
605        Ok(VerifyResult {
606            total,
607            valid,
608            corrupted,
609            deleted,
610        })
611    }
612
613    /// Import missing R2/S3 blobs into local storage without writing back to S3.
614    ///
615    /// This mirrors rclone's shape: list the source, compare each source object
616    /// against the destination by cheap metadata (here the content-addressed key),
617    /// and only transfer missing objects. `--check-only` runs the same comparison
618    /// without downloading object bodies.
619    #[cfg(feature = "s3")]
620    pub async fn import_r2_to_local(&self, options: R2ImportOptions) -> Result<R2ImportResult> {
621        use aws_sdk_s3::Client as S3Client;
622
623        let config = crate::config::Config::load()?;
624        let s3_config = config
625            .storage
626            .s3
627            .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
628
629        let aws_config = aws_config::from_env()
630            .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
631            .load()
632            .await;
633
634        let s3_client = S3Client::from_conf(
635            aws_sdk_s3::config::Builder::from(&aws_config)
636                .endpoint_url(&s3_config.endpoint)
637                .force_path_style(true)
638                .build(),
639        );
640
641        let bucket = Arc::new(s3_config.bucket);
642        let prefix = s3_config.prefix.unwrap_or_default();
643        let list_prefix = options
644            .scan_prefix
645            .as_ref()
646            .map(|scan_prefix| format!("{prefix}{scan_prefix}"))
647            .unwrap_or_else(|| prefix.clone());
648        let mut explicit_keys = options.keys.clone();
649        if let Some(keys_file) = options.keys_file.as_ref() {
650            explicit_keys.extend(read_r2_import_keys_file(keys_file)?);
651        }
652
653        let local = self.router.local_store();
654        let client = Arc::new(s3_client);
655        let concurrency = options.concurrency.max(1);
656        let mut pending = FuturesUnordered::new();
657
658        if !explicit_keys.is_empty() {
659            let mut result = R2ImportResult {
660                completed: false,
661                ..Default::default()
662            };
663
664            println!(
665                "R2 import {} targeted: bucket={}, prefix={}, requested_keys={}, state_file={}",
666                if options.check_only { "check" } else { "sync" },
667                bucket.as_str(),
668                prefix,
669                explicit_keys.len(),
670                options
671                    .state_file
672                    .as_ref()
673                    .map(|path| path.display().to_string())
674                    .unwrap_or_else(|| "<none>".to_string()),
675            );
676            for key in explicit_keys {
677                let Some(candidate) = r2_import_key_candidate(&prefix, &key) else {
678                    result.failed += 1;
679                    println!("  invalid R2 blob key/hash: {key}");
680                    continue;
681                };
682
683                result.last_key = Some(candidate.key.clone());
684                result.listed += 1;
685                pending.push(import_r2_object_to_local(
686                    client.clone(),
687                    bucket.clone(),
688                    local.clone(),
689                    candidate,
690                    options.check_only,
691                    false,
692                ));
693
694                while pending.len() >= concurrency {
695                    settle_one_r2_import(&mut pending, &mut result).await;
696                }
697            }
698
699            while !pending.is_empty() {
700                settle_one_r2_import(&mut pending, &mut result).await;
701            }
702
703            result.completed = true;
704            if let Some(state_file) = options.state_file.as_ref() {
705                write_r2_import_state(state_file, &result)?;
706            }
707            return Ok(result);
708        }
709
710        let state_file = options
711            .state_file
712            .unwrap_or_else(|| self.base_path().join("r2-import-state.json"));
713        let saved_state = read_r2_import_state(&state_file);
714        let saved_incomplete = saved_state
715            .as_ref()
716            .is_some_and(|state| !state.result.completed && state.result.last_key.is_some());
717        let start_after = options.start_after.clone().or_else(|| {
718            if options.resume && saved_incomplete {
719                saved_state
720                    .as_ref()
721                    .and_then(|state| state.result.last_key.clone())
722            } else {
723                None
724            }
725        });
726        let mut result = if options.resume && options.start_after.is_none() && saved_incomplete {
727            saved_state.map(|state| state.result).unwrap_or_default()
728        } else {
729            R2ImportResult::default()
730        };
731        result.completed = false;
732
733        println!(
734            "R2 import {}: bucket={}, prefix={}, list_prefix={}, start_after={}, state_file={}",
735            if options.check_only { "check" } else { "sync" },
736            bucket.as_str(),
737            prefix,
738            list_prefix,
739            start_after.as_deref().unwrap_or("<beginning>"),
740            state_file.display(),
741        );
742
743        if options.stream_merge && options.fast_list {
744            println!("  Stream merge enabled; skipping in-memory --fast-list index");
745        }
746
747        let local_hashes = if options.fast_list && !options.stream_merge {
748            println!("  Loading local hash index...");
749            let mut local_hashes = self
750                .router
751                .list()
752                .map_err(|err| anyhow::anyhow!("Failed to list local blobs: {err}"))?;
753            local_hashes.sort_unstable();
754            println!("  Local hash index loaded: {} blobs", local_hashes.len());
755            Some(local_hashes)
756        } else {
757            None
758        };
759
760        let progress_every = options.progress_every.max(1);
761        let mut continuation_token: Option<String> = None;
762        let mut listed_since_progress = 0usize;
763        let mut listed_this_run = 0usize;
764        let mut first_page = true;
765        let mut hit_max_objects = false;
766
767        loop {
768            let mut list_req = client
769                .list_objects_v2()
770                .bucket(bucket.as_str())
771                .prefix(&list_prefix);
772
773            if let Some(ref token) = continuation_token {
774                list_req = list_req.continuation_token(token);
775            } else if first_page {
776                if let Some(ref start_after) = start_after {
777                    list_req = list_req.start_after(start_after);
778                }
779            }
780            first_page = false;
781
782            let list_resp = list_req
783                .send()
784                .await
785                .map_err(|err| anyhow::anyhow!("Failed to list S3 objects: {err}"))?;
786
787            let mut page_candidates = Vec::new();
788            let mut page_last_key = None;
789            for object in list_resp.contents() {
790                if options
791                    .max_objects
792                    .is_some_and(|max_objects| listed_this_run >= max_objects)
793                {
794                    hit_max_objects = true;
795                    break;
796                }
797
798                let key = object.key().unwrap_or("").to_string();
799                page_last_key = Some(key.clone());
800                if !key.ends_with(".bin") {
801                    continue;
802                }
803
804                let Some(hash) = r2_import_key_hash(&prefix, &key) else {
805                    continue;
806                };
807
808                result.listed += 1;
809                listed_this_run += 1;
810                listed_since_progress += 1;
811                page_candidates.push(R2ObjectCandidate { key, hash });
812            }
813
814            let page_existing = if options.stream_merge && !page_candidates.is_empty() {
815                Some(existing_r2_candidates(local.as_ref(), &page_candidates)?)
816            } else {
817                None
818            };
819
820            for (candidate_index, candidate) in page_candidates.into_iter().enumerate() {
821                let already_exists = page_existing
822                    .as_ref()
823                    .is_some_and(|existing| existing[candidate_index]);
824
825                if options.scan_delay_ms > 0 {
826                    tokio::time::sleep(Duration::from_millis(options.scan_delay_ms)).await;
827                }
828
829                if already_exists {
830                    result.skipped += 1;
831                    continue;
832                }
833
834                if let Some(local_hashes) = &local_hashes {
835                    if local_hashes.binary_search(&candidate.hash).is_ok() {
836                        result.skipped += 1;
837                        continue;
838                    }
839                }
840
841                pending.push(import_r2_object_to_local(
842                    client.clone(),
843                    bucket.clone(),
844                    local.clone(),
845                    candidate,
846                    options.check_only,
847                    page_existing.is_some(),
848                ));
849
850                while pending.len() >= concurrency {
851                    settle_one_r2_import(&mut pending, &mut result).await;
852                }
853            }
854
855            while !pending.is_empty() {
856                settle_one_r2_import(&mut pending, &mut result).await;
857            }
858            if let Some(last_key) = page_last_key {
859                result.last_key = Some(last_key);
860            }
861            if listed_since_progress >= progress_every {
862                listed_since_progress = 0;
863                println!(
864                    "  Progress: {} listed, {} imported, {} skipped, {} missing, {} corrupted, {} failed, {:.2} GB imported",
865                    result.listed,
866                    result.imported,
867                    result.skipped,
868                    result.missing,
869                    result.corrupted,
870                    result.failed,
871                    result.bytes_imported as f64 / 1024.0 / 1024.0 / 1024.0,
872                );
873            }
874            write_r2_import_state(&state_file, &result)?;
875
876            if hit_max_objects {
877                break;
878            }
879            if list_resp.is_truncated() == Some(true) {
880                continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
881            } else {
882                result.completed = true;
883                break;
884            }
885        }
886
887        write_r2_import_state(&state_file, &result)?;
888        Ok(result)
889    }
890
891    /// Fallback for non-S3 builds
892    #[cfg(not(feature = "s3"))]
893    pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
894        Err(anyhow::anyhow!("S3 feature not enabled"))
895    }
896
897    pub fn compact_lmdb_environments(
898        &self,
899        env_dirs: &[PathBuf],
900        keep_backup: bool,
901    ) -> Result<Vec<CompactResult>> {
902        compact_lmdb_environments_under(self.base_path(), env_dirs, keep_backup)
903    }
904}
905
906pub fn compact_lmdb_environments_under(
907    base_path: &Path,
908    env_dirs: &[PathBuf],
909    keep_backup: bool,
910) -> Result<Vec<CompactResult>> {
911    let targets = if env_dirs.is_empty() {
912        discover_lmdb_environment_dirs(base_path)?
913    } else {
914        env_dirs
915            .iter()
916            .map(|path| {
917                if path.is_absolute() {
918                    path.clone()
919                } else {
920                    base_path.join(path)
921                }
922            })
923            .collect()
924    };
925
926    let mut results = Vec::new();
927    for env_dir in targets {
928        results.push(compact_lmdb_environment_dir(&env_dir, keep_backup)?);
929    }
930    Ok(results)
931}
932
933fn discover_lmdb_environment_dirs(root: &Path) -> Result<Vec<PathBuf>> {
934    let mut dirs = Vec::new();
935    collect_lmdb_environment_dirs(root, &mut dirs)?;
936    dirs.sort();
937    Ok(dirs)
938}
939
940fn collect_lmdb_environment_dirs(root: &Path, dirs: &mut Vec<PathBuf>) -> Result<()> {
941    if root.join("data.mdb").exists() {
942        dirs.push(root.to_path_buf());
943    }
944
945    for entry in std::fs::read_dir(root)? {
946        let entry = entry?;
947        let path = entry.path();
948        if path.is_dir() {
949            collect_lmdb_environment_dirs(&path, dirs)?;
950        }
951    }
952
953    Ok(())
954}
955
956fn compact_lmdb_environment_dir(env_dir: &Path, keep_backup: bool) -> Result<CompactResult> {
957    let data_path = env_dir.join("data.mdb");
958    if !data_path.exists() {
959        anyhow::bail!("No data.mdb found in {}", env_dir.display());
960    }
961
962    let before_bytes = std::fs::metadata(&data_path)?.len();
963    let compact_path = env_dir.join("data.mdb.compact");
964    let backup_path = env_dir.join("data.mdb.bak");
965
966    if compact_path.exists() {
967        std::fs::remove_file(&compact_path)?;
968    }
969    if !keep_backup && backup_path.exists() {
970        std::fs::remove_file(&backup_path)?;
971    }
972
973    let open_map_size = existing_lmdb_map_size_bytes(&data_path)?;
974
975    {
976        let env = unsafe {
977            EnvOpenOptions::new()
978                .map_size(open_map_size)
979                .max_dbs(COMPACT_MAX_DBS)
980                .max_readers(COMPACT_MAX_READERS)
981                .open(env_dir)
982        }?;
983        env.force_sync()?;
984        env.copy_to_file(&compact_path, CompactionOption::Enabled)?;
985    }
986
987    let after_bytes = std::fs::metadata(&compact_path)?.len();
988
989    if backup_path.exists() {
990        std::fs::remove_file(&backup_path)?;
991    }
992
993    std::fs::rename(&data_path, &backup_path)?;
994    if let Err(error) = std::fs::rename(&compact_path, &data_path) {
995        let _ = std::fs::rename(&backup_path, &data_path);
996        return Err(error.into());
997    }
998
999    if !keep_backup {
1000        std::fs::remove_file(&backup_path)?;
1001    }
1002
1003    Ok(CompactResult {
1004        env_dir: env_dir.to_path_buf(),
1005        before_bytes,
1006        after_bytes,
1007    })
1008}
1009
1010fn existing_lmdb_map_size_bytes(data_path: &Path) -> Result<usize> {
1011    let file_bytes = std::fs::metadata(data_path)?.len();
1012    let aligned_bytes = if file_bytes == 0 {
1013        COMPACT_OPEN_MAP_SIZE_BYTES as u64
1014    } else {
1015        let remainder = file_bytes % COMPACT_PAGE_SIZE_BYTES;
1016        if remainder == 0 {
1017            file_bytes
1018        } else {
1019            file_bytes.saturating_add(COMPACT_PAGE_SIZE_BYTES - remainder)
1020        }
1021    };
1022
1023    Ok(usize::try_from(aligned_bytes)
1024        .unwrap_or(usize::MAX)
1025        .max(COMPACT_OPEN_MAP_SIZE_BYTES))
1026}
1027
1028#[cfg(all(test, feature = "s3"))]
1029mod tests {
1030    use super::{r2_import_key_candidate, r2_import_key_hash};
1031
1032    const HASH: &str = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
1033
1034    #[test]
1035    fn r2_import_key_hash_accepts_only_root_blob_keys() {
1036        assert!(r2_import_key_hash("", &format!("{HASH}.bin")).is_some());
1037        assert!(r2_import_key_hash("legacy/", &format!("legacy/{HASH}.bin")).is_some());
1038
1039        assert!(r2_import_key_hash("", &format!("hot/{HASH}.bin")).is_none());
1040        assert!(
1041            r2_import_key_hash("", &format!("site-bytes/pubkey/tree/root/{HASH}.bin")).is_none()
1042        );
1043        assert!(r2_import_key_hash("", "roots/pubkey/tree.json").is_none());
1044        assert!(r2_import_key_hash("", &format!("{HASH}.png")).is_none());
1045        assert!(r2_import_key_hash("", "not-a-hash.bin").is_none());
1046    }
1047
1048    #[test]
1049    fn r2_import_key_candidate_accepts_hash_or_canonical_key() {
1050        let bare = r2_import_key_candidate("", HASH).expect("bare hash");
1051        assert_eq!(bare.key, format!("{HASH}.bin"));
1052
1053        let explicit = r2_import_key_candidate("", &format!("{HASH}.bin")).expect("hash key");
1054        assert_eq!(explicit.key, format!("{HASH}.bin"));
1055
1056        assert!(r2_import_key_candidate("", &format!("hot/{HASH}.bin")).is_none());
1057    }
1058
1059    #[test]
1060    fn r2_import_key_candidate_applies_configured_prefix() {
1061        let bare = r2_import_key_candidate("legacy/", HASH).expect("prefixed bare hash");
1062        assert_eq!(bare.key, format!("legacy/{HASH}.bin"));
1063
1064        let explicit =
1065            r2_import_key_candidate("legacy/", &format!("{HASH}.bin")).expect("prefixed key");
1066        assert_eq!(explicit.key, format!("legacy/{HASH}.bin"));
1067
1068        let already_prefixed =
1069            r2_import_key_candidate("legacy/", &format!("legacy/{HASH}.bin")).expect("key");
1070        assert_eq!(already_prefixed.key, format!("legacy/{HASH}.bin"));
1071    }
1072}