1use anyhow::Result;
2use heed::{CompactionOption, EnvOpenOptions};
3use std::collections::HashSet;
4use std::path::{Path, PathBuf};
5#[cfg(feature = "s3")]
6use std::sync::Arc;
7#[cfg(feature = "s3")]
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9
10use super::{GcStats, HashtreeStore};
11
12#[cfg(feature = "s3")]
13use futures::{stream::FuturesUnordered, StreamExt};
14#[cfg(feature = "s3")]
15use hashtree_core::from_hex;
16use hashtree_core::{sha256, to_hex};
17use serde::{Deserialize, Serialize};
18
19#[derive(Debug, Clone)]
21pub struct VerifyResult {
22 pub total: usize,
23 pub valid: usize,
24 pub corrupted: usize,
25 pub deleted: usize,
26}
27
28#[derive(Debug, Clone)]
29pub struct CompactResult {
30 pub env_dir: PathBuf,
31 pub before_bytes: u64,
32 pub after_bytes: u64,
33}
34
35#[derive(Debug, Clone, Default)]
36pub struct R2ImportOptions {
37 pub concurrency: usize,
38 pub check_only: bool,
39 pub resume: bool,
40 pub fast_list: bool,
41 pub stream_merge: bool,
42 pub keys: Vec<String>,
43 pub keys_file: Option<PathBuf>,
44 pub start_after: Option<String>,
45 pub scan_prefix: Option<String>,
46 pub state_file: Option<PathBuf>,
47 pub max_objects: Option<usize>,
48 pub progress_every: usize,
49 pub scan_delay_ms: u64,
50}
51
52#[derive(Debug, Clone, Default, Serialize, Deserialize)]
53pub struct R2ImportResult {
54 pub listed: usize,
55 pub skipped: usize,
56 pub missing: usize,
57 pub imported: usize,
58 pub corrupted: usize,
59 pub failed: usize,
60 pub bytes_imported: u64,
61 pub last_key: Option<String>,
62 pub completed: bool,
63}
64
65#[cfg(feature = "s3")]
66#[derive(Debug, Clone, Default, Serialize, Deserialize)]
67struct R2ImportState {
68 #[serde(flatten)]
69 result: R2ImportResult,
70 updated_at_unix: u64,
71}
72
73#[cfg(feature = "s3")]
74#[derive(Debug, Clone)]
75struct R2ObjectCandidate {
76 key: String,
77 hash: hashtree_core::types::Hash,
78}
79
80#[cfg(feature = "s3")]
81#[derive(Debug, Clone, Default)]
82struct R2ObjectImportOutcome {
83 skipped: bool,
84 missing: bool,
85 imported: bool,
86 corrupted: bool,
87 failed: bool,
88 bytes_imported: u64,
89 message: Option<String>,
90}
91
92#[cfg(feature = "s3")]
93const R2_IMPORT_OBJECT_READ_ATTEMPTS: usize = 4;
94#[cfg(feature = "s3")]
95const R2_IMPORT_OBJECT_RETRY_BASE_DELAY_MS: u64 = 250;
96
97const COMPACT_MAX_DBS: u32 = 64;
98const COMPACT_MAX_READERS: u32 = 2048;
99const COMPACT_OPEN_MAP_SIZE_BYTES: usize = 10 * 1024 * 1024;
100const COMPACT_PAGE_SIZE_BYTES: u64 = 4096;
101
102#[cfg(feature = "s3")]
103fn unix_timestamp_now() -> u64 {
104 SystemTime::now()
105 .duration_since(UNIX_EPOCH)
106 .unwrap_or_default()
107 .as_secs()
108}
109
110#[cfg(feature = "s3")]
111fn r2_import_key_hash(prefix: &str, key: &str) -> Option<hashtree_core::types::Hash> {
112 let filename = key.strip_prefix(prefix).unwrap_or(key);
113 let hash_hex = filename.strip_suffix(".bin")?;
114 if hash_hex.contains('/') {
115 return None;
116 }
117 if hash_hex.len() != 64 {
118 return None;
119 }
120 from_hex(hash_hex).ok()
121}
122
123#[cfg(feature = "s3")]
124fn r2_import_key_candidate(prefix: &str, input: &str) -> Option<R2ObjectCandidate> {
125 let input = input.trim();
126 if input.is_empty() {
127 return None;
128 }
129
130 let key = if input.len() == 64 && input.chars().all(|ch| ch.is_ascii_hexdigit()) {
131 format!("{prefix}{input}.bin")
132 } else if !prefix.is_empty() && !input.starts_with(prefix) && !input.contains('/') {
133 format!("{prefix}{input}")
134 } else {
135 input.to_string()
136 };
137
138 let hash = r2_import_key_hash(prefix, &key)?;
139 Some(R2ObjectCandidate { key, hash })
140}
141
142#[cfg(feature = "s3")]
143fn existing_r2_candidates(
144 local: &super::LocalStore,
145 candidates: &[R2ObjectCandidate],
146) -> Result<Vec<bool>> {
147 let mut indexed_hashes: Vec<(usize, hashtree_core::types::Hash)> = candidates
148 .iter()
149 .enumerate()
150 .map(|(index, candidate)| (index, candidate.hash))
151 .collect();
152 indexed_hashes.sort_unstable_by(|left, right| left.1.cmp(&right.1).then(left.0.cmp(&right.0)));
153
154 let sorted_hashes: Vec<hashtree_core::types::Hash> =
155 indexed_hashes.iter().map(|(_, hash)| *hash).collect();
156 let sorted_existing = local
157 .existing_hashes_in_sorted_candidates(&sorted_hashes)
158 .map_err(|err| anyhow::anyhow!("Failed to compare local hashes: {err}"))?;
159
160 let mut existing = vec![false; candidates.len()];
161 for ((candidate_index, _), exists) in indexed_hashes.into_iter().zip(sorted_existing) {
162 existing[candidate_index] = exists;
163 }
164 Ok(existing)
165}
166
167#[cfg(feature = "s3")]
168fn read_r2_import_keys_file(path: &Path) -> Result<Vec<String>> {
169 let raw = std::fs::read_to_string(path)?;
170 Ok(raw
171 .lines()
172 .map(str::trim)
173 .filter(|line| !line.is_empty() && !line.starts_with('#'))
174 .map(ToOwned::to_owned)
175 .collect())
176}
177
178#[cfg(feature = "s3")]
179fn read_r2_import_state(path: &Path) -> Option<R2ImportState> {
180 let raw = std::fs::read_to_string(path).ok()?;
181 serde_json::from_str(&raw).ok()
182}
183
184#[cfg(feature = "s3")]
185async fn fetch_r2_object_body_with_retries(
186 client: &aws_sdk_s3::Client,
187 bucket: &str,
188 key: &str,
189) -> Result<Vec<u8>, String> {
190 let mut last_error = None;
191 for attempt in 1..=R2_IMPORT_OBJECT_READ_ATTEMPTS {
192 let output = match client.get_object().bucket(bucket).key(key).send().await {
193 Ok(output) => output,
194 Err(err) => {
195 last_error = Some(format!("fetch failed for {key}: {err}"));
196 if attempt < R2_IMPORT_OBJECT_READ_ATTEMPTS {
197 let delay_ms = R2_IMPORT_OBJECT_RETRY_BASE_DELAY_MS << (attempt - 1);
198 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
199 }
200 continue;
201 }
202 };
203
204 match output.body.collect().await {
205 Ok(body) => return Ok(body.into_bytes().to_vec()),
206 Err(err) => {
207 last_error = Some(format!("read failed for {key}: {err}"));
208 if attempt < R2_IMPORT_OBJECT_READ_ATTEMPTS {
209 let delay_ms = R2_IMPORT_OBJECT_RETRY_BASE_DELAY_MS << (attempt - 1);
210 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
211 }
212 }
213 }
214 }
215
216 Err(format!(
217 "{} after {} attempt(s)",
218 last_error.unwrap_or_else(|| format!("fetch failed for {key}: unknown error")),
219 R2_IMPORT_OBJECT_READ_ATTEMPTS
220 ))
221}
222
223#[cfg(feature = "s3")]
224fn write_r2_import_state(path: &Path, result: &R2ImportResult) -> Result<()> {
225 if let Some(parent) = path.parent() {
226 std::fs::create_dir_all(parent)?;
227 }
228 let state = R2ImportState {
229 result: result.clone(),
230 updated_at_unix: unix_timestamp_now(),
231 };
232 std::fs::write(path, serde_json::to_vec_pretty(&state)?)?;
233 Ok(())
234}
235
236#[cfg(feature = "s3")]
237async fn import_r2_object_to_local(
238 client: Arc<aws_sdk_s3::Client>,
239 bucket: Arc<String>,
240 local: Arc<super::LocalStore>,
241 candidate: R2ObjectCandidate,
242 check_only: bool,
243 prechecked_missing: bool,
244) -> R2ObjectImportOutcome {
245 if !prechecked_missing {
246 match local.exists(&candidate.hash) {
247 Ok(true) => {
248 return R2ObjectImportOutcome {
249 skipped: true,
250 ..Default::default()
251 };
252 }
253 Ok(false) => {}
254 Err(err) => {
255 return R2ObjectImportOutcome {
256 failed: true,
257 message: Some(format!("local exists failed for {}: {err}", candidate.key)),
258 ..Default::default()
259 };
260 }
261 }
262 }
263
264 if check_only {
265 return R2ObjectImportOutcome {
266 missing: true,
267 ..Default::default()
268 };
269 }
270
271 let body =
272 match fetch_r2_object_body_with_retries(client.as_ref(), bucket.as_str(), &candidate.key)
273 .await
274 {
275 Ok(body) => body,
276 Err(err) => {
277 return R2ObjectImportOutcome {
278 missing: true,
279 failed: true,
280 message: Some(err),
281 ..Default::default()
282 };
283 }
284 };
285 let data = body.as_slice();
286 let actual_hash = sha256(data);
287 if actual_hash != candidate.hash {
288 return R2ObjectImportOutcome {
289 missing: true,
290 corrupted: true,
291 message: Some(format!(
292 "hash mismatch for {}: actual {}",
293 candidate.key,
294 to_hex(&actual_hash)
295 )),
296 ..Default::default()
297 };
298 }
299
300 match local.put_sync(candidate.hash, data) {
301 Ok(inserted) => R2ObjectImportOutcome {
302 missing: true,
303 imported: inserted,
304 skipped: !inserted,
305 bytes_imported: if inserted { data.len() as u64 } else { 0 },
306 ..Default::default()
307 },
308 Err(err) => R2ObjectImportOutcome {
309 missing: true,
310 failed: true,
311 message: Some(format!("local put failed for {}: {err}", candidate.key)),
312 ..Default::default()
313 },
314 }
315}
316
317#[cfg(feature = "s3")]
318async fn settle_one_r2_import(
319 pending: &mut FuturesUnordered<impl std::future::Future<Output = R2ObjectImportOutcome>>,
320 result: &mut R2ImportResult,
321) {
322 if let Some(outcome) = pending.next().await {
323 if outcome.skipped {
324 result.skipped += 1;
325 }
326 if outcome.missing {
327 result.missing += 1;
328 }
329 if outcome.imported {
330 result.imported += 1;
331 result.bytes_imported = result.bytes_imported.saturating_add(outcome.bytes_imported);
332 }
333 if outcome.corrupted {
334 result.corrupted += 1;
335 }
336 if outcome.failed {
337 result.failed += 1;
338 }
339 if let Some(message) = outcome.message {
340 println!(" {message}");
341 }
342 }
343}
344
345impl HashtreeStore {
346 pub fn gc(&self) -> Result<GcStats> {
348 let rtxn = self.env.read_txn()?;
349
350 let pinned: HashSet<[u8; 32]> = self
352 .pins
353 .iter(&rtxn)?
354 .filter_map(|item| item.ok())
355 .filter_map(|(hash_bytes, _)| {
356 if hash_bytes.len() == 32 {
357 let mut hash = [0u8; 32];
358 hash.copy_from_slice(hash_bytes);
359 Some(hash)
360 } else {
361 None
362 }
363 })
364 .collect();
365
366 drop(rtxn);
367
368 let all_hashes = self
370 .router
371 .list()
372 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
373
374 let mut deleted = 0;
376 let mut freed_bytes = 0u64;
377
378 for hash in all_hashes {
379 if !pinned.contains(&hash) {
380 if let Ok(Some(data)) = self.router.get_sync(&hash) {
381 freed_bytes += data.len() as u64;
382 let _ = self.router.delete_local_only(&hash);
384 deleted += 1;
385 }
386 }
387 }
388
389 Ok(GcStats {
390 deleted_dags: deleted,
391 freed_bytes,
392 })
393 }
394
395 pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
398 let all_hashes = self
399 .router
400 .list()
401 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
402
403 let total = all_hashes.len();
404 let mut valid = 0;
405 let mut corrupted = 0;
406 let mut deleted = 0;
407 let mut corrupted_hashes = Vec::new();
408
409 for hash in &all_hashes {
410 let hash_hex = to_hex(hash);
411
412 match self.router.get_sync(hash) {
413 Ok(Some(data)) => {
414 let actual_hash = sha256(&data);
415
416 if actual_hash == *hash {
417 valid += 1;
418 } else {
419 corrupted += 1;
420 let actual_hex = to_hex(&actual_hash);
421 println!(
422 " CORRUPTED: key={} actual={} size={}",
423 &hash_hex[..16],
424 &actual_hex[..16],
425 data.len()
426 );
427 corrupted_hashes.push(*hash);
428 }
429 }
430 Ok(None) => {
431 corrupted += 1;
432 println!(" MISSING: key={}", &hash_hex[..16]);
433 corrupted_hashes.push(*hash);
434 }
435 Err(e) => {
436 corrupted += 1;
437 println!(" ERROR: key={} err={}", &hash_hex[..16], e);
438 corrupted_hashes.push(*hash);
439 }
440 }
441 }
442
443 if delete {
444 for hash in &corrupted_hashes {
445 match self.router.delete_sync(hash) {
446 Ok(true) => deleted += 1,
447 Ok(false) => {}
448 Err(e) => {
449 let hash_hex = to_hex(hash);
450 println!(" Failed to delete {}: {}", &hash_hex[..16], e);
451 }
452 }
453 }
454 }
455
456 Ok(VerifyResult {
457 total,
458 valid,
459 corrupted,
460 deleted,
461 })
462 }
463
464 #[cfg(feature = "s3")]
467 pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
468 use aws_sdk_s3::Client as S3Client;
469
470 let config = crate::config::Config::load()?;
471 let s3_config = config
472 .storage
473 .s3
474 .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
475
476 let aws_config = aws_config::from_env()
477 .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
478 .load()
479 .await;
480
481 let s3_client = S3Client::from_conf(
482 aws_sdk_s3::config::Builder::from(&aws_config)
483 .endpoint_url(&s3_config.endpoint)
484 .force_path_style(true)
485 .build(),
486 );
487
488 let bucket = &s3_config.bucket;
489 let prefix = s3_config.prefix.as_deref().unwrap_or("");
490
491 let mut total = 0;
492 let mut valid = 0;
493 let mut corrupted = 0;
494 let mut deleted = 0;
495 let mut corrupted_keys = Vec::new();
496
497 let mut continuation_token: Option<String> = None;
498
499 loop {
500 let mut list_req = s3_client.list_objects_v2().bucket(bucket).prefix(prefix);
501
502 if let Some(ref token) = continuation_token {
503 list_req = list_req.continuation_token(token);
504 }
505
506 let list_resp = list_req
507 .send()
508 .await
509 .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
510
511 for object in list_resp.contents() {
512 let key = object.key().unwrap_or("");
513
514 if !key.ends_with(".bin") {
515 continue;
516 }
517
518 total += 1;
519
520 let filename = key.strip_prefix(prefix).unwrap_or(key);
521 let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
522
523 if expected_hash_hex.len() != 64 {
524 corrupted += 1;
525 println!(" INVALID KEY: {}", key);
526 corrupted_keys.push(key.to_string());
527 continue;
528 }
529
530 let expected_hash = match from_hex(expected_hash_hex) {
531 Ok(h) => h,
532 Err(_) => {
533 corrupted += 1;
534 println!(" INVALID HEX: {}", key);
535 corrupted_keys.push(key.to_string());
536 continue;
537 }
538 };
539
540 match s3_client.get_object().bucket(bucket).key(key).send().await {
541 Ok(resp) => match resp.body.collect().await {
542 Ok(bytes) => {
543 let data = bytes.into_bytes();
544 let actual_hash = sha256(&data);
545
546 if actual_hash == expected_hash {
547 valid += 1;
548 } else {
549 corrupted += 1;
550 let actual_hex = to_hex(&actual_hash);
551 println!(
552 " CORRUPTED: key={} actual={} size={}",
553 &expected_hash_hex[..16],
554 &actual_hex[..16],
555 data.len()
556 );
557 corrupted_keys.push(key.to_string());
558 }
559 }
560 Err(e) => {
561 corrupted += 1;
562 println!(" READ ERROR: {} - {}", key, e);
563 corrupted_keys.push(key.to_string());
564 }
565 },
566 Err(e) => {
567 corrupted += 1;
568 println!(" FETCH ERROR: {} - {}", key, e);
569 corrupted_keys.push(key.to_string());
570 }
571 }
572
573 if total % 100 == 0 {
574 println!(
575 " Progress: {} objects checked, {} corrupted so far",
576 total, corrupted
577 );
578 }
579 }
580
581 if list_resp.is_truncated() == Some(true) {
582 continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
583 } else {
584 break;
585 }
586 }
587
588 if delete {
589 for key in &corrupted_keys {
590 match s3_client
591 .delete_object()
592 .bucket(bucket)
593 .key(key)
594 .send()
595 .await
596 {
597 Ok(_) => deleted += 1,
598 Err(e) => {
599 println!(" Failed to delete {}: {}", key, e);
600 }
601 }
602 }
603 }
604
605 Ok(VerifyResult {
606 total,
607 valid,
608 corrupted,
609 deleted,
610 })
611 }
612
613 #[cfg(feature = "s3")]
620 pub async fn import_r2_to_local(&self, options: R2ImportOptions) -> Result<R2ImportResult> {
621 use aws_sdk_s3::Client as S3Client;
622
623 let config = crate::config::Config::load()?;
624 let s3_config = config
625 .storage
626 .s3
627 .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
628
629 let aws_config = aws_config::from_env()
630 .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
631 .load()
632 .await;
633
634 let s3_client = S3Client::from_conf(
635 aws_sdk_s3::config::Builder::from(&aws_config)
636 .endpoint_url(&s3_config.endpoint)
637 .force_path_style(true)
638 .build(),
639 );
640
641 let bucket = Arc::new(s3_config.bucket);
642 let prefix = s3_config.prefix.unwrap_or_default();
643 let list_prefix = options
644 .scan_prefix
645 .as_ref()
646 .map(|scan_prefix| format!("{prefix}{scan_prefix}"))
647 .unwrap_or_else(|| prefix.clone());
648 let mut explicit_keys = options.keys.clone();
649 if let Some(keys_file) = options.keys_file.as_ref() {
650 explicit_keys.extend(read_r2_import_keys_file(keys_file)?);
651 }
652
653 let local = self.router.local_store();
654 let client = Arc::new(s3_client);
655 let concurrency = options.concurrency.max(1);
656 let mut pending = FuturesUnordered::new();
657
658 if !explicit_keys.is_empty() {
659 let mut result = R2ImportResult {
660 completed: false,
661 ..Default::default()
662 };
663
664 println!(
665 "R2 import {} targeted: bucket={}, prefix={}, requested_keys={}, state_file={}",
666 if options.check_only { "check" } else { "sync" },
667 bucket.as_str(),
668 prefix,
669 explicit_keys.len(),
670 options
671 .state_file
672 .as_ref()
673 .map(|path| path.display().to_string())
674 .unwrap_or_else(|| "<none>".to_string()),
675 );
676 for key in explicit_keys {
677 let Some(candidate) = r2_import_key_candidate(&prefix, &key) else {
678 result.failed += 1;
679 println!(" invalid R2 blob key/hash: {key}");
680 continue;
681 };
682
683 result.last_key = Some(candidate.key.clone());
684 result.listed += 1;
685 pending.push(import_r2_object_to_local(
686 client.clone(),
687 bucket.clone(),
688 local.clone(),
689 candidate,
690 options.check_only,
691 false,
692 ));
693
694 while pending.len() >= concurrency {
695 settle_one_r2_import(&mut pending, &mut result).await;
696 }
697 }
698
699 while !pending.is_empty() {
700 settle_one_r2_import(&mut pending, &mut result).await;
701 }
702
703 result.completed = true;
704 if let Some(state_file) = options.state_file.as_ref() {
705 write_r2_import_state(state_file, &result)?;
706 }
707 return Ok(result);
708 }
709
710 let state_file = options
711 .state_file
712 .unwrap_or_else(|| self.base_path().join("r2-import-state.json"));
713 let saved_state = read_r2_import_state(&state_file);
714 let saved_incomplete = saved_state
715 .as_ref()
716 .is_some_and(|state| !state.result.completed && state.result.last_key.is_some());
717 let start_after = options.start_after.clone().or_else(|| {
718 if options.resume && saved_incomplete {
719 saved_state
720 .as_ref()
721 .and_then(|state| state.result.last_key.clone())
722 } else {
723 None
724 }
725 });
726 let mut result = if options.resume && options.start_after.is_none() && saved_incomplete {
727 saved_state.map(|state| state.result).unwrap_or_default()
728 } else {
729 R2ImportResult::default()
730 };
731 result.completed = false;
732
733 println!(
734 "R2 import {}: bucket={}, prefix={}, list_prefix={}, start_after={}, state_file={}",
735 if options.check_only { "check" } else { "sync" },
736 bucket.as_str(),
737 prefix,
738 list_prefix,
739 start_after.as_deref().unwrap_or("<beginning>"),
740 state_file.display(),
741 );
742
743 if options.stream_merge && options.fast_list {
744 println!(" Stream merge enabled; skipping in-memory --fast-list index");
745 }
746
747 let local_hashes = if options.fast_list && !options.stream_merge {
748 println!(" Loading local hash index...");
749 let mut local_hashes = self
750 .router
751 .list()
752 .map_err(|err| anyhow::anyhow!("Failed to list local blobs: {err}"))?;
753 local_hashes.sort_unstable();
754 println!(" Local hash index loaded: {} blobs", local_hashes.len());
755 Some(local_hashes)
756 } else {
757 None
758 };
759
760 let progress_every = options.progress_every.max(1);
761 let mut continuation_token: Option<String> = None;
762 let mut listed_since_progress = 0usize;
763 let mut listed_this_run = 0usize;
764 let mut first_page = true;
765 let mut hit_max_objects = false;
766
767 loop {
768 let mut list_req = client
769 .list_objects_v2()
770 .bucket(bucket.as_str())
771 .prefix(&list_prefix);
772
773 if let Some(ref token) = continuation_token {
774 list_req = list_req.continuation_token(token);
775 } else if first_page {
776 if let Some(ref start_after) = start_after {
777 list_req = list_req.start_after(start_after);
778 }
779 }
780 first_page = false;
781
782 let list_resp = list_req
783 .send()
784 .await
785 .map_err(|err| anyhow::anyhow!("Failed to list S3 objects: {err}"))?;
786
787 let mut page_candidates = Vec::new();
788 let mut page_last_key = None;
789 for object in list_resp.contents() {
790 if options
791 .max_objects
792 .is_some_and(|max_objects| listed_this_run >= max_objects)
793 {
794 hit_max_objects = true;
795 break;
796 }
797
798 let key = object.key().unwrap_or("").to_string();
799 page_last_key = Some(key.clone());
800 if !key.ends_with(".bin") {
801 continue;
802 }
803
804 let Some(hash) = r2_import_key_hash(&prefix, &key) else {
805 continue;
806 };
807
808 result.listed += 1;
809 listed_this_run += 1;
810 listed_since_progress += 1;
811 page_candidates.push(R2ObjectCandidate { key, hash });
812 }
813
814 let page_existing = if options.stream_merge && !page_candidates.is_empty() {
815 Some(existing_r2_candidates(local.as_ref(), &page_candidates)?)
816 } else {
817 None
818 };
819
820 for (candidate_index, candidate) in page_candidates.into_iter().enumerate() {
821 let already_exists = page_existing
822 .as_ref()
823 .is_some_and(|existing| existing[candidate_index]);
824
825 if options.scan_delay_ms > 0 {
826 tokio::time::sleep(Duration::from_millis(options.scan_delay_ms)).await;
827 }
828
829 if already_exists {
830 result.skipped += 1;
831 continue;
832 }
833
834 if let Some(local_hashes) = &local_hashes {
835 if local_hashes.binary_search(&candidate.hash).is_ok() {
836 result.skipped += 1;
837 continue;
838 }
839 }
840
841 pending.push(import_r2_object_to_local(
842 client.clone(),
843 bucket.clone(),
844 local.clone(),
845 candidate,
846 options.check_only,
847 page_existing.is_some(),
848 ));
849
850 while pending.len() >= concurrency {
851 settle_one_r2_import(&mut pending, &mut result).await;
852 }
853 }
854
855 while !pending.is_empty() {
856 settle_one_r2_import(&mut pending, &mut result).await;
857 }
858 if let Some(last_key) = page_last_key {
859 result.last_key = Some(last_key);
860 }
861 if listed_since_progress >= progress_every {
862 listed_since_progress = 0;
863 println!(
864 " Progress: {} listed, {} imported, {} skipped, {} missing, {} corrupted, {} failed, {:.2} GB imported",
865 result.listed,
866 result.imported,
867 result.skipped,
868 result.missing,
869 result.corrupted,
870 result.failed,
871 result.bytes_imported as f64 / 1024.0 / 1024.0 / 1024.0,
872 );
873 }
874 write_r2_import_state(&state_file, &result)?;
875
876 if hit_max_objects {
877 break;
878 }
879 if list_resp.is_truncated() == Some(true) {
880 continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
881 } else {
882 result.completed = true;
883 break;
884 }
885 }
886
887 write_r2_import_state(&state_file, &result)?;
888 Ok(result)
889 }
890
891 #[cfg(not(feature = "s3"))]
893 pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
894 Err(anyhow::anyhow!("S3 feature not enabled"))
895 }
896
897 pub fn compact_lmdb_environments(
898 &self,
899 env_dirs: &[PathBuf],
900 keep_backup: bool,
901 ) -> Result<Vec<CompactResult>> {
902 compact_lmdb_environments_under(self.base_path(), env_dirs, keep_backup)
903 }
904}
905
906pub fn compact_lmdb_environments_under(
907 base_path: &Path,
908 env_dirs: &[PathBuf],
909 keep_backup: bool,
910) -> Result<Vec<CompactResult>> {
911 let targets = if env_dirs.is_empty() {
912 discover_lmdb_environment_dirs(base_path)?
913 } else {
914 env_dirs
915 .iter()
916 .map(|path| {
917 if path.is_absolute() {
918 path.clone()
919 } else {
920 base_path.join(path)
921 }
922 })
923 .collect()
924 };
925
926 let mut results = Vec::new();
927 for env_dir in targets {
928 results.push(compact_lmdb_environment_dir(&env_dir, keep_backup)?);
929 }
930 Ok(results)
931}
932
933fn discover_lmdb_environment_dirs(root: &Path) -> Result<Vec<PathBuf>> {
934 let mut dirs = Vec::new();
935 collect_lmdb_environment_dirs(root, &mut dirs)?;
936 dirs.sort();
937 Ok(dirs)
938}
939
940fn collect_lmdb_environment_dirs(root: &Path, dirs: &mut Vec<PathBuf>) -> Result<()> {
941 if root.join("data.mdb").exists() {
942 dirs.push(root.to_path_buf());
943 }
944
945 for entry in std::fs::read_dir(root)? {
946 let entry = entry?;
947 let path = entry.path();
948 if path.is_dir() {
949 collect_lmdb_environment_dirs(&path, dirs)?;
950 }
951 }
952
953 Ok(())
954}
955
956fn compact_lmdb_environment_dir(env_dir: &Path, keep_backup: bool) -> Result<CompactResult> {
957 let data_path = env_dir.join("data.mdb");
958 if !data_path.exists() {
959 anyhow::bail!("No data.mdb found in {}", env_dir.display());
960 }
961
962 let before_bytes = std::fs::metadata(&data_path)?.len();
963 let compact_path = env_dir.join("data.mdb.compact");
964 let backup_path = env_dir.join("data.mdb.bak");
965
966 if compact_path.exists() {
967 std::fs::remove_file(&compact_path)?;
968 }
969 if !keep_backup && backup_path.exists() {
970 std::fs::remove_file(&backup_path)?;
971 }
972
973 let open_map_size = existing_lmdb_map_size_bytes(&data_path)?;
974
975 {
976 let env = unsafe {
977 EnvOpenOptions::new()
978 .map_size(open_map_size)
979 .max_dbs(COMPACT_MAX_DBS)
980 .max_readers(COMPACT_MAX_READERS)
981 .open(env_dir)
982 }?;
983 env.force_sync()?;
984 env.copy_to_file(&compact_path, CompactionOption::Enabled)?;
985 }
986
987 let after_bytes = std::fs::metadata(&compact_path)?.len();
988
989 if backup_path.exists() {
990 std::fs::remove_file(&backup_path)?;
991 }
992
993 std::fs::rename(&data_path, &backup_path)?;
994 if let Err(error) = std::fs::rename(&compact_path, &data_path) {
995 let _ = std::fs::rename(&backup_path, &data_path);
996 return Err(error.into());
997 }
998
999 if !keep_backup {
1000 std::fs::remove_file(&backup_path)?;
1001 }
1002
1003 Ok(CompactResult {
1004 env_dir: env_dir.to_path_buf(),
1005 before_bytes,
1006 after_bytes,
1007 })
1008}
1009
1010fn existing_lmdb_map_size_bytes(data_path: &Path) -> Result<usize> {
1011 let file_bytes = std::fs::metadata(data_path)?.len();
1012 let aligned_bytes = if file_bytes == 0 {
1013 COMPACT_OPEN_MAP_SIZE_BYTES as u64
1014 } else {
1015 let remainder = file_bytes % COMPACT_PAGE_SIZE_BYTES;
1016 if remainder == 0 {
1017 file_bytes
1018 } else {
1019 file_bytes.saturating_add(COMPACT_PAGE_SIZE_BYTES - remainder)
1020 }
1021 };
1022
1023 Ok(usize::try_from(aligned_bytes)
1024 .unwrap_or(usize::MAX)
1025 .max(COMPACT_OPEN_MAP_SIZE_BYTES))
1026}
1027
1028#[cfg(all(test, feature = "s3"))]
1029mod tests {
1030 use super::{r2_import_key_candidate, r2_import_key_hash};
1031
1032 const HASH: &str = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
1033
1034 #[test]
1035 fn r2_import_key_hash_accepts_only_root_blob_keys() {
1036 assert!(r2_import_key_hash("", &format!("{HASH}.bin")).is_some());
1037 assert!(r2_import_key_hash("legacy/", &format!("legacy/{HASH}.bin")).is_some());
1038
1039 assert!(r2_import_key_hash("", &format!("hot/{HASH}.bin")).is_none());
1040 assert!(
1041 r2_import_key_hash("", &format!("site-bytes/pubkey/tree/root/{HASH}.bin")).is_none()
1042 );
1043 assert!(r2_import_key_hash("", "roots/pubkey/tree.json").is_none());
1044 assert!(r2_import_key_hash("", &format!("{HASH}.png")).is_none());
1045 assert!(r2_import_key_hash("", "not-a-hash.bin").is_none());
1046 }
1047
1048 #[test]
1049 fn r2_import_key_candidate_accepts_hash_or_canonical_key() {
1050 let bare = r2_import_key_candidate("", HASH).expect("bare hash");
1051 assert_eq!(bare.key, format!("{HASH}.bin"));
1052
1053 let explicit = r2_import_key_candidate("", &format!("{HASH}.bin")).expect("hash key");
1054 assert_eq!(explicit.key, format!("{HASH}.bin"));
1055
1056 assert!(r2_import_key_candidate("", &format!("hot/{HASH}.bin")).is_none());
1057 }
1058
1059 #[test]
1060 fn r2_import_key_candidate_applies_configured_prefix() {
1061 let bare = r2_import_key_candidate("legacy/", HASH).expect("prefixed bare hash");
1062 assert_eq!(bare.key, format!("legacy/{HASH}.bin"));
1063
1064 let explicit =
1065 r2_import_key_candidate("legacy/", &format!("{HASH}.bin")).expect("prefixed key");
1066 assert_eq!(explicit.key, format!("legacy/{HASH}.bin"));
1067
1068 let already_prefixed =
1069 r2_import_key_candidate("legacy/", &format!("legacy/{HASH}.bin")).expect("key");
1070 assert_eq!(already_prefixed.key, format!("legacy/{HASH}.bin"));
1071 }
1072}