1use super::memory_pool::{default_max_memory_bytes, MemoryPool};
6use super::rate::SlidingWindowRate;
7use crate::data_cache::{AsyncDataCache, CopyResult, MultipartDataCache, RangeReadDataCache};
8use crate::manifest::ManifestRef;
9use std::collections::HashSet;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, Mutex};
12
13#[derive(Default)]
14pub struct CacheSyncOptions {
15 pub max_workers: Option<usize>,
16 pub max_memory_bytes: Option<usize>,
17 pub on_progress: Option<Box<super::ProgressFn<CacheSyncStatistics>>>,
18}
19
20#[derive(Debug)]
21pub struct CacheSyncResult {
22 pub statistics: CacheSyncStatistics,
23}
24
25#[derive(Debug, Default, Clone)]
26pub struct CacheSyncStatistics {
27 pub total_objects: usize,
28 pub total_bytes: u64,
29 pub copied_objects: usize,
30 pub copied_bytes: u64,
31 pub skipped_objects: usize,
32 pub skipped_bytes: u64,
33 pub total_time: f64,
34 pub rate: f64,
35 pub progress: f64,
36 pub progress_message: String,
37}
38
39async fn transfer_object(
43 src: &dyn AsyncDataCache,
44 dst: &dyn AsyncDataCache,
45 hash: &str,
46 alg: &str,
47 size_est: u64,
48 memory_pool: &MemoryPool,
49) -> crate::Result<u64> {
50 let part_size = dst.multipart_part_size();
51 let multipart_threshold = 2 * part_size as u64;
52
53 if size_est >= multipart_threshold {
54 if let (Some(dst_mp), Some(src_rr)) = (dst.as_multipart(), src.as_range_read()) {
55 return transfer_object_multipart(
56 src_rr,
57 dst_mp,
58 hash,
59 alg,
60 size_est,
61 part_size,
62 memory_pool,
63 )
64 .await;
65 }
66 }
67 let _mem_permit = memory_pool.acquire(size_est as usize).await;
68 let data = src
69 .get_object(hash, alg)
70 .await
71 .map_err(crate::SnapshotError::Io)?;
72 let actual_size = data.len() as u64;
73 dst.put_object(hash, alg, data)
74 .await
75 .map_err(crate::SnapshotError::Io)?;
76 Ok(actual_size)
77}
78
79async fn transfer_object_multipart(
80 src: &dyn RangeReadDataCache,
81 dst: &dyn MultipartDataCache,
82 hash: &str,
83 alg: &str,
84 size_est: u64,
85 part_size: usize,
86 memory_pool: &MemoryPool,
87) -> crate::Result<u64> {
88 use futures_util::stream::{FuturesUnordered, StreamExt};
89
90 let upload_id = dst
91 .create_multipart_upload(hash, alg)
92 .await
93 .map_err(crate::SnapshotError::Io)?;
94
95 let num_parts = (size_est as usize).div_ceil(part_size) as i32;
96 let mut futures = FuturesUnordered::new();
97 let mut parts = Vec::with_capacity(num_parts as usize);
98 let uid: &str = &upload_id;
99
100 for part_num in 1..=num_parts {
101 let start = (part_num as u64 - 1) * part_size as u64;
102 let end = std::cmp::min(start + part_size as u64 - 1, size_est.saturating_sub(1));
103
104 futures.push(async move {
105 let _permit = memory_pool.acquire(part_size).await;
106 let data = src
107 .get_object_range(hash, alg, start, end)
108 .await
109 .map_err(crate::SnapshotError::Io)?;
110 let etag = dst
111 .upload_part(hash, alg, uid, part_num, data)
112 .await
113 .map_err(crate::SnapshotError::Io)?;
114 Ok::<_, crate::SnapshotError>((part_num, etag))
115 });
116 }
117
118 let result: crate::Result<Vec<(i32, String)>> = async {
119 while let Some(res) = futures.next().await {
120 parts.push(res?);
121 }
122 parts.sort_by_key(|(num, _)| *num);
123 Ok(parts)
124 }
125 .await;
126
127 match result {
128 Ok(parts) => {
129 dst.complete_multipart_upload(hash, alg, &upload_id, parts)
130 .await
131 .map_err(crate::SnapshotError::Io)?;
132 Ok(size_est)
133 }
134 Err(e) => {
135 let _ = dst.abort_multipart_upload(hash, alg, &upload_id).await;
136 Err(e)
137 }
138 }
139}
140
141pub async fn cache_sync_manifest(
143 manifests: &[&dyn ManifestRef],
144 source: Arc<dyn AsyncDataCache>,
145 destination: Arc<dyn AsyncDataCache>,
146 options: CacheSyncOptions,
147) -> crate::Result<CacheSyncResult> {
148 let start_time = std::time::Instant::now();
149
150 let mut seen = HashSet::new();
152 let mut work_items: Vec<(String, String, u64)> = Vec::new();
153
154 for manifest in manifests {
155 let alg_ext = manifest.hash_alg().extension().to_string();
156 let file_chunk_size_bytes = manifest.file_chunk_size_bytes();
157
158 for file in manifest.files() {
159 if file.symlink_target.is_some() || file.deleted {
160 continue;
161 }
162 if let Some(ref hash) = file.hash {
163 let key = (hash.clone(), alg_ext.clone());
164 if seen.insert(key) {
165 work_items.push((hash.clone(), alg_ext.clone(), file.size.unwrap_or(0)));
166 }
167 } else if let Some(ref chunks) = file.chunk_hashes {
168 let num_chunks = chunks.len().max(1) as u64;
169 let chunk_est = if file_chunk_size_bytes > 0 {
170 file_chunk_size_bytes as u64
171 } else {
172 file.size.unwrap_or(0) / num_chunks
173 };
174 for ch in chunks {
175 let key = (ch.clone(), alg_ext.clone());
176 if seen.insert(key) {
177 work_items.push((ch.clone(), alg_ext.clone(), chunk_est));
178 }
179 }
180 }
181 }
182 }
183
184 let mut stats = CacheSyncStatistics {
185 total_objects: work_items.len(),
186 total_bytes: work_items.iter().map(|(_, _, s)| *s).sum(),
187 ..Default::default()
188 };
189
190 let on_progress: Option<Arc<super::ProgressFn<CacheSyncStatistics>>> =
191 options.on_progress.map(|f| Arc::from(f));
192
193 if work_items.is_empty() {
194 stats.progress = 100.0;
195 stats.progress_message = "Synced 0 B (0 objects) in 0.00s".into();
196 if let Some(ref cb) = on_progress {
197 let _ = cb(&stats);
198 }
199 return Ok(CacheSyncResult { statistics: stats });
200 }
201
202 let num_workers = options.max_workers.unwrap_or(10);
203 let max_memory = options
204 .max_memory_bytes
205 .unwrap_or_else(default_max_memory_bytes);
206
207 let cancelled = Arc::new(AtomicBool::new(false));
208 let progress_stats = Arc::new(Mutex::new(stats.clone()));
209 let rate_calc = Arc::new(Mutex::new(SlidingWindowRate::new()));
210 let memory_pool = Arc::new(MemoryPool::new(max_memory));
211 let worker_semaphore = Arc::new(tokio::sync::Semaphore::new(num_workers));
212
213 let mut handles = Vec::new();
214
215 for (hash, alg, size_est) in work_items {
216 let src = source.clone();
217 let dst = destination.clone();
218 let pool = memory_pool.clone();
219 let cancelled = cancelled.clone();
220 let progress_stats = progress_stats.clone();
221 let rate_calc = rate_calc.clone();
222 let on_progress = on_progress.clone();
223 let worker_sem = worker_semaphore.clone();
224 let start = start_time;
225
226 handles.push(tokio::spawn(async move {
227 let _worker_permit = worker_sem
228 .acquire_owned()
229 .await
230 .map_err(|e| crate::SnapshotError::Task(e.to_string()))?;
231
232 if cancelled.load(Ordering::Relaxed) {
233 return Err(crate::SnapshotError::Cancelled);
234 }
235
236 if dst.object_exists(&hash, &alg).await.unwrap_or(false) {
238 let mut s = progress_stats.lock().unwrap();
239 s.skipped_objects += 1;
240 s.skipped_bytes += size_est;
241 let elapsed = start.elapsed().as_secs_f64();
242 s.total_time = elapsed;
243 {
244 let mut rc = rate_calc.lock().unwrap();
245 s.rate = rc.update(elapsed, s.copied_bytes + s.skipped_bytes);
246 }
247 if s.total_bytes > 0 {
248 s.progress =
249 ((s.copied_bytes + s.skipped_bytes) as f64 / s.total_bytes as f64) * 100.0;
250 }
251 if let Some(ref cb) = on_progress {
252 if !cb(&s) {
253 cancelled.store(true, Ordering::Relaxed);
254 return Err(crate::SnapshotError::Cancelled);
255 }
256 }
257 return Ok(());
258 }
259
260 match dst.copy_from(src.as_ref(), &hash, &alg).await {
262 Ok(CopyResult::ServerSideCopy) => {
263 let mut s = progress_stats.lock().unwrap();
264 s.copied_objects += 1;
265 s.copied_bytes += size_est;
266 let elapsed = start.elapsed().as_secs_f64();
267 s.total_time = elapsed;
268 {
269 let mut rc = rate_calc.lock().unwrap();
270 s.rate = rc.update(elapsed, s.copied_bytes + s.skipped_bytes);
271 }
272 if s.total_bytes > 0 {
273 s.progress = ((s.copied_bytes + s.skipped_bytes) as f64
274 / s.total_bytes as f64)
275 * 100.0;
276 }
277 if let Some(ref cb) = on_progress {
278 if !cb(&s) {
279 cancelled.store(true, Ordering::Relaxed);
280 return Err(crate::SnapshotError::Cancelled);
281 }
282 }
283 return Ok(());
284 }
285 Ok(CopyResult::NotSupported) | Err(_) => {
286 }
288 }
289
290 let actual_size =
291 transfer_object(src.as_ref(), dst.as_ref(), &hash, &alg, size_est, &pool).await?;
292
293 {
294 let mut s = progress_stats.lock().unwrap();
295 s.copied_objects += 1;
296 s.copied_bytes += actual_size;
297 let elapsed = start.elapsed().as_secs_f64();
298 s.total_time = elapsed;
299 {
300 let mut rc = rate_calc.lock().unwrap();
301 s.rate = rc.update(elapsed, s.copied_bytes + s.skipped_bytes);
302 }
303 if s.total_bytes > 0 {
304 s.progress =
305 ((s.copied_bytes + s.skipped_bytes) as f64 / s.total_bytes as f64) * 100.0;
306 }
307 if let Some(ref cb) = on_progress {
308 if !cb(&s) {
309 cancelled.store(true, Ordering::Relaxed);
310 return Err(crate::SnapshotError::Cancelled);
311 }
312 }
313 }
314
315 Ok(())
316 }));
317 }
318
319 let mut results = Vec::new();
320 for handle in handles {
321 match handle.await {
322 Ok(r) => results.push(r),
323 Err(e) => results.push(Err(crate::SnapshotError::Task(e.to_string()))),
324 }
325 }
326
327 for r in results {
329 r?;
330 }
331
332 stats = progress_stats.lock().unwrap().clone();
333 stats.total_time = start_time.elapsed().as_secs_f64();
334 {
335 let mut rc = rate_calc.lock().unwrap();
336 stats.rate = rc.update(stats.total_time, stats.copied_bytes + stats.skipped_bytes);
337 }
338 if stats.total_bytes > 0 {
339 stats.progress =
340 ((stats.copied_bytes + stats.skipped_bytes) as f64 / stats.total_bytes as f64) * 100.0;
341 }
342
343 let mut parts = vec![
344 format!(
345 "Synced {}",
346 crate::hash::human_readable_file_size(stats.total_bytes)
347 ),
348 format!("({} objects)", stats.total_objects),
349 format!("in {:.2}s", stats.total_time),
350 ];
351 if stats.total_time > 0.0 {
352 parts.push(format!(
353 "({}/s)",
354 crate::hash::human_readable_file_size(stats.rate as u64)
355 ));
356 }
357 stats.progress_message = parts.join(" ");
358
359 if let Some(ref cb) = on_progress {
360 let _ = cb(&stats);
361 }
362
363 Ok(CacheSyncResult { statistics: stats })
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use crate::data_cache::FileSystemDataCache;
370 use crate::hash::HashAlgorithm;
371 use crate::manifest::{FileEntry, Manifest, RelManifest};
372 use tempfile::TempDir;
373
374 fn test_manifest(files: Vec<FileEntry>) -> RelManifest {
375 RelManifest::Snapshot(Manifest::new(HashAlgorithm::Xxh128, -1).with_files(files))
376 }
377
378 fn make_caches() -> (
379 TempDir,
380 TempDir,
381 Arc<dyn AsyncDataCache>,
382 Arc<dyn AsyncDataCache>,
383 ) {
384 let src_dir = TempDir::new().unwrap();
385 let dst_dir = TempDir::new().unwrap();
386 let src: Arc<dyn AsyncDataCache> =
387 Arc::new(FileSystemDataCache::new(src_dir.path().join("data")).unwrap());
388 let dst: Arc<dyn AsyncDataCache> =
389 Arc::new(FileSystemDataCache::new(dst_dir.path().join("data")).unwrap());
390 (src_dir, dst_dir, src, dst)
391 }
392
393 async fn put(cache: &Arc<dyn AsyncDataCache>, hash: &str, alg: &str, data: &[u8]) {
394 cache.put_object(hash, alg, data.to_vec()).await.unwrap();
395 }
396
397 async fn exists(cache: &Arc<dyn AsyncDataCache>, hash: &str, alg: &str) -> bool {
398 cache.object_exists(hash, alg).await.unwrap()
399 }
400
401 async fn get(cache: &Arc<dyn AsyncDataCache>, hash: &str, alg: &str) -> Vec<u8> {
402 cache.get_object(hash, alg).await.unwrap()
403 }
404
405 #[tokio::test]
406 async fn sync_copies_data() {
407 let (_sd, _dd, src, dst) = make_caches();
408 put(&src, "hash_a", "xxh128", b"hello").await;
409 put(&src, "hash_b", "xxh128", b"world").await;
410
411 let manifest = test_manifest(vec![
412 {
413 let mut e = FileEntry::new("a.txt");
414 e.hash = Some("hash_a".into());
415 e.size = Some(5);
416 e
417 },
418 {
419 let mut e = FileEntry::new("b.txt");
420 e.hash = Some("hash_b".into());
421 e.size = Some(5);
422 e
423 },
424 ]);
425
426 let result = cache_sync_manifest(
427 &[&manifest as &dyn ManifestRef],
428 src,
429 dst.clone(),
430 CacheSyncOptions::default(),
431 )
432 .await
433 .unwrap();
434
435 assert_eq!(result.statistics.copied_objects, 2);
436 assert!(exists(&dst, "hash_a", "xxh128").await);
437 assert_eq!(get(&dst, "hash_b", "xxh128").await, b"world");
438 }
439
440 #[tokio::test]
441 async fn sync_skips_existing() {
442 let (_sd, _dd, src, dst) = make_caches();
443 put(&src, "hash_a", "xxh128", b"hello").await;
444 put(&src, "hash_b", "xxh128", b"world").await;
445 put(&dst, "hash_a", "xxh128", b"hello").await;
446
447 let manifest = test_manifest(vec![
448 {
449 let mut e = FileEntry::new("a.txt");
450 e.hash = Some("hash_a".into());
451 e.size = Some(5);
452 e
453 },
454 {
455 let mut e = FileEntry::new("b.txt");
456 e.hash = Some("hash_b".into());
457 e.size = Some(5);
458 e
459 },
460 ]);
461
462 let result = cache_sync_manifest(
463 &[&manifest as &dyn ManifestRef],
464 src,
465 dst,
466 CacheSyncOptions::default(),
467 )
468 .await
469 .unwrap();
470
471 assert_eq!(result.statistics.skipped_objects, 1);
472 assert_eq!(result.statistics.copied_objects, 1);
473 }
474
475 #[tokio::test]
476 async fn sync_handles_chunk_hashes() {
477 let (_sd, _dd, src, dst) = make_caches();
478 put(&src, "chunk_0", "xxh128", b"aaa").await;
479 put(&src, "chunk_1", "xxh128", b"bbb").await;
480
481 let manifest =
482 RelManifest::Snapshot(Manifest::new(HashAlgorithm::Xxh128, 3).with_files(vec![{
483 let mut e = FileEntry::new("big.bin");
484 e.size = Some(6);
485 e.chunk_hashes = Some(vec!["chunk_0".into(), "chunk_1".into()]);
486 e
487 }]));
488
489 let result = cache_sync_manifest(
490 &[&manifest as &dyn ManifestRef],
491 src,
492 dst.clone(),
493 CacheSyncOptions::default(),
494 )
495 .await
496 .unwrap();
497
498 assert_eq!(result.statistics.copied_objects, 2);
499 assert!(exists(&dst, "chunk_0", "xxh128").await);
500 assert!(exists(&dst, "chunk_1", "xxh128").await);
501 }
502
503 #[tokio::test]
504 async fn sync_skips_symlinks_deleted_unhashed() {
505 let (_sd, _dd, src, dst) = make_caches();
506 put(&src, "hash_real", "xxh128", b"data").await;
507
508 let manifest = test_manifest(vec![
509 {
510 let mut e = FileEntry::new("real.txt");
511 e.hash = Some("hash_real".into());
512 e.size = Some(4);
513 e
514 },
515 FileEntry::symlink("link", "target"),
516 FileEntry::deleted("gone"),
517 FileEntry::new("unhashed.txt"),
518 ]);
519
520 let result = cache_sync_manifest(
521 &[&manifest as &dyn ManifestRef],
522 src,
523 dst,
524 CacheSyncOptions::default(),
525 )
526 .await
527 .unwrap();
528
529 assert_eq!(result.statistics.total_objects, 1);
530 assert_eq!(result.statistics.copied_objects, 1);
531 }
532
533 #[tokio::test]
534 async fn sync_deduplicates_hashes() {
535 let (_sd, _dd, src, dst) = make_caches();
536 put(&src, "same_hash", "xxh128", b"dup").await;
537
538 let manifest = test_manifest(vec![
539 {
540 let mut e = FileEntry::new("a.txt");
541 e.hash = Some("same_hash".into());
542 e.size = Some(3);
543 e
544 },
545 {
546 let mut e = FileEntry::new("b.txt");
547 e.hash = Some("same_hash".into());
548 e.size = Some(3);
549 e
550 },
551 ]);
552
553 let result = cache_sync_manifest(
554 &[&manifest as &dyn ManifestRef],
555 src,
556 dst,
557 CacheSyncOptions::default(),
558 )
559 .await
560 .unwrap();
561
562 assert_eq!(result.statistics.total_objects, 1);
563 assert_eq!(result.statistics.copied_objects, 1);
564 }
565
566 #[tokio::test]
567 async fn sync_uses_multipart_for_large_objects() {
568 use std::collections::HashMap;
569 use std::sync::atomic::{AtomicUsize, Ordering};
570
571 struct SmallPartCache {
573 inner: Arc<dyn AsyncDataCache>,
574 upload_part_calls: Arc<AtomicUsize>,
575 #[allow(clippy::type_complexity)]
576 parts: Arc<Mutex<HashMap<String, Vec<(i32, Vec<u8>)>>>>,
577 }
578
579 #[async_trait::async_trait]
580 impl AsyncDataCache for SmallPartCache {
581 fn object_key(&self, h: &str, a: &str) -> String {
582 self.inner.object_key(h, a)
583 }
584 fn as_any(&self) -> &dyn std::any::Any {
585 self
586 }
587 fn multipart_part_size(&self) -> usize {
588 5
589 }
590 fn as_multipart(&self) -> Option<&dyn MultipartDataCache> {
591 Some(self)
592 }
593 fn as_range_read(&self) -> Option<&dyn RangeReadDataCache> {
594 Some(self)
595 }
596 async fn object_exists(&self, h: &str, a: &str) -> std::io::Result<bool> {
597 self.inner.object_exists(h, a).await
598 }
599 async fn put_object(&self, h: &str, a: &str, d: Vec<u8>) -> std::io::Result<String> {
600 self.inner.put_object(h, a, d).await
601 }
602 async fn get_object(&self, h: &str, a: &str) -> std::io::Result<Vec<u8>> {
603 self.inner.get_object(h, a).await
604 }
605 }
606
607 #[async_trait::async_trait]
608 impl MultipartDataCache for SmallPartCache {
609 async fn create_multipart_upload(&self, _h: &str, _a: &str) -> std::io::Result<String> {
610 Ok("test-upload-id".into())
611 }
612 async fn upload_part(
613 &self,
614 h: &str,
615 a: &str,
616 _uid: &str,
617 pn: i32,
618 data: Vec<u8>,
619 ) -> std::io::Result<String> {
620 self.upload_part_calls.fetch_add(1, Ordering::Relaxed);
621 let key = format!("{h}.{a}");
622 self.parts
623 .lock()
624 .unwrap()
625 .entry(key)
626 .or_default()
627 .push((pn, data));
628 Ok(format!("etag-{pn}"))
629 }
630 async fn complete_multipart_upload(
631 &self,
632 h: &str,
633 a: &str,
634 _uid: &str,
635 parts: Vec<(i32, String)>,
636 ) -> std::io::Result<()> {
637 let key = format!("{h}.{a}");
638 let combined = {
639 let stored = self.parts.lock().unwrap();
640 let part_data = stored.get(&key).unwrap();
641 assert_eq!(parts.len(), part_data.len());
642 let mut sorted: Vec<_> = part_data.clone();
643 sorted.sort_by_key(|(n, _)| *n);
644 sorted.into_iter().flat_map(|(_, d)| d).collect::<Vec<u8>>()
645 };
646 self.inner.put_object(h, a, combined).await?;
647 Ok(())
648 }
649 async fn abort_multipart_upload(
650 &self,
651 _h: &str,
652 _a: &str,
653 _uid: &str,
654 ) -> std::io::Result<()> {
655 Ok(())
656 }
657 }
658
659 #[async_trait::async_trait]
660 impl RangeReadDataCache for SmallPartCache {
661 async fn get_object_range(
662 &self,
663 h: &str,
664 a: &str,
665 s: u64,
666 e: u64,
667 ) -> std::io::Result<Vec<u8>> {
668 let data = self.inner.get_object(h, a).await?;
669 let end = std::cmp::min(e as usize + 1, data.len());
670 Ok(data[s as usize..end].to_vec())
671 }
672 }
673
674 let (_sd, _dd, src, dst_inner) = make_caches();
675 put(&src, "big_hash", "xxh128", b"01234567890123456789").await;
677
678 let upload_part_calls = Arc::new(AtomicUsize::new(0));
679 let dst: Arc<dyn AsyncDataCache> = Arc::new(SmallPartCache {
680 inner: dst_inner.clone(),
681 upload_part_calls: upload_part_calls.clone(),
682 parts: Arc::new(Mutex::new(HashMap::new())),
683 });
684
685 let manifest = test_manifest(vec![{
686 let mut e = FileEntry::new("big.bin");
687 e.hash = Some("big_hash".into());
688 e.size = Some(20);
689 e
690 }]);
691
692 let src_wrapped: Arc<dyn AsyncDataCache> = Arc::new(SmallPartCache {
694 inner: src.clone(),
695 upload_part_calls: Arc::new(AtomicUsize::new(0)),
696 parts: Arc::new(Mutex::new(HashMap::new())),
697 });
698
699 let result = cache_sync_manifest(
700 &[&manifest as &dyn ManifestRef],
701 src_wrapped,
702 dst,
703 CacheSyncOptions::default(),
704 )
705 .await
706 .unwrap();
707
708 assert_eq!(result.statistics.copied_objects, 1);
709 assert_eq!(upload_part_calls.load(Ordering::Relaxed), 4);
710 assert_eq!(
712 get(&dst_inner, "big_hash", "xxh128").await,
713 b"01234567890123456789"
714 );
715 }
716
717 #[tokio::test]
718 async fn sync_uses_copy_from_when_server_side_copy() {
719 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
720
721 struct MockServerSideCopyCache {
722 inner: Arc<dyn AsyncDataCache>,
723 copy_from_called: Arc<AtomicBool>,
724 get_object_calls: Arc<AtomicUsize>,
725 }
726
727 #[async_trait::async_trait]
728 impl AsyncDataCache for MockServerSideCopyCache {
729 fn object_key(&self, hash: &str, algorithm: &str) -> String {
730 self.inner.object_key(hash, algorithm)
731 }
732 fn as_any(&self) -> &dyn std::any::Any {
733 self
734 }
735 async fn object_exists(&self, hash: &str, algorithm: &str) -> std::io::Result<bool> {
736 self.inner.object_exists(hash, algorithm).await
737 }
738 async fn put_object(
739 &self,
740 hash: &str,
741 algorithm: &str,
742 data: Vec<u8>,
743 ) -> std::io::Result<String> {
744 self.inner.put_object(hash, algorithm, data).await
745 }
746 async fn get_object(&self, hash: &str, algorithm: &str) -> std::io::Result<Vec<u8>> {
747 self.get_object_calls.fetch_add(1, Ordering::Relaxed);
748 self.inner.get_object(hash, algorithm).await
749 }
750 async fn copy_from(
751 &self,
752 _source: &dyn AsyncDataCache,
753 hash: &str,
754 algorithm: &str,
755 ) -> std::io::Result<CopyResult> {
756 self.copy_from_called.store(true, Ordering::Relaxed);
757 let data = _source.get_object(hash, algorithm).await?;
759 self.inner.put_object(hash, algorithm, data).await?;
760 Ok(CopyResult::ServerSideCopy)
761 }
762 }
763
764 let (_sd, _dd, src, dst_inner) = make_caches();
765 put(&src, "hash_a", "xxh128", b"hello").await;
766
767 let copy_from_called = Arc::new(AtomicBool::new(false));
768 let get_object_calls = Arc::new(AtomicUsize::new(0));
769 let dst: Arc<dyn AsyncDataCache> = Arc::new(MockServerSideCopyCache {
770 inner: dst_inner,
771 copy_from_called: copy_from_called.clone(),
772 get_object_calls: get_object_calls.clone(),
773 });
774
775 let manifest = test_manifest(vec![{
776 let mut e = FileEntry::new("a.txt");
777 e.hash = Some("hash_a".into());
778 e.size = Some(5);
779 e
780 }]);
781
782 let result = cache_sync_manifest(
783 &[&manifest as &dyn ManifestRef],
784 src,
785 dst.clone(),
786 CacheSyncOptions::default(),
787 )
788 .await
789 .unwrap();
790
791 assert!(
792 copy_from_called.load(Ordering::Relaxed),
793 "copy_from should have been called"
794 );
795 assert_eq!(
796 get_object_calls.load(Ordering::Relaxed),
797 0,
798 "get_object on dst should not be called"
799 );
800 assert_eq!(result.statistics.copied_objects, 1);
801 assert!(exists(&dst, "hash_a", "xxh128").await);
802 }
803}