Skip to main content

openjd_snapshots/ops/
cache_sync.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// Copyright by contributors to this project.
3// SPDX-License-Identifier: (Apache-2.0 OR MIT)
4
5use super::memory_pool::{default_max_memory_bytes, MemoryPool};
6use super::rate::SlidingWindowRate;
7use crate::data_cache::{AsyncDataCache, CopyResult, MultipartDataCache, RangeReadDataCache};
8use crate::manifest::ManifestRef;
9use std::collections::HashSet;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, Mutex};
12
13#[derive(Default)]
14pub struct CacheSyncOptions {
15    pub max_workers: Option<usize>,
16    pub max_memory_bytes: Option<usize>,
17    pub on_progress: Option<Box<super::ProgressFn<CacheSyncStatistics>>>,
18}
19
20#[derive(Debug)]
21pub struct CacheSyncResult {
22    pub statistics: CacheSyncStatistics,
23}
24
25#[derive(Debug, Default, Clone)]
26pub struct CacheSyncStatistics {
27    pub total_objects: usize,
28    pub total_bytes: u64,
29    pub copied_objects: usize,
30    pub copied_bytes: u64,
31    pub skipped_objects: usize,
32    pub skipped_bytes: u64,
33    pub total_time: f64,
34    pub rate: f64,
35    pub progress: f64,
36    pub progress_message: String,
37}
38
39/// Transfer a single object from source to destination cache.
40/// Uses multipart upload for objects >= 2 * part_size when both sides support it;
41/// otherwise falls back to single get+put.
42async fn transfer_object(
43    src: &dyn AsyncDataCache,
44    dst: &dyn AsyncDataCache,
45    hash: &str,
46    alg: &str,
47    size_est: u64,
48    memory_pool: &MemoryPool,
49) -> crate::Result<u64> {
50    let part_size = dst.multipart_part_size();
51    let multipart_threshold = 2 * part_size as u64;
52
53    if size_est >= multipart_threshold {
54        if let (Some(dst_mp), Some(src_rr)) = (dst.as_multipart(), src.as_range_read()) {
55            return transfer_object_multipart(
56                src_rr,
57                dst_mp,
58                hash,
59                alg,
60                size_est,
61                part_size,
62                memory_pool,
63            )
64            .await;
65        }
66    }
67    let _mem_permit = memory_pool.acquire(size_est as usize).await;
68    let data = src
69        .get_object(hash, alg)
70        .await
71        .map_err(crate::SnapshotError::Io)?;
72    let actual_size = data.len() as u64;
73    dst.put_object(hash, alg, data)
74        .await
75        .map_err(crate::SnapshotError::Io)?;
76    Ok(actual_size)
77}
78
79async fn transfer_object_multipart(
80    src: &dyn RangeReadDataCache,
81    dst: &dyn MultipartDataCache,
82    hash: &str,
83    alg: &str,
84    size_est: u64,
85    part_size: usize,
86    memory_pool: &MemoryPool,
87) -> crate::Result<u64> {
88    use futures_util::stream::{FuturesUnordered, StreamExt};
89
90    let upload_id = dst
91        .create_multipart_upload(hash, alg)
92        .await
93        .map_err(crate::SnapshotError::Io)?;
94
95    let num_parts = (size_est as usize).div_ceil(part_size) as i32;
96    let mut futures = FuturesUnordered::new();
97    let mut parts = Vec::with_capacity(num_parts as usize);
98    let uid: &str = &upload_id;
99
100    for part_num in 1..=num_parts {
101        let start = (part_num as u64 - 1) * part_size as u64;
102        let end = std::cmp::min(start + part_size as u64 - 1, size_est.saturating_sub(1));
103
104        futures.push(async move {
105            let _permit = memory_pool.acquire(part_size).await;
106            let data = src
107                .get_object_range(hash, alg, start, end)
108                .await
109                .map_err(crate::SnapshotError::Io)?;
110            let etag = dst
111                .upload_part(hash, alg, uid, part_num, data)
112                .await
113                .map_err(crate::SnapshotError::Io)?;
114            Ok::<_, crate::SnapshotError>((part_num, etag))
115        });
116    }
117
118    let result: crate::Result<Vec<(i32, String)>> = async {
119        while let Some(res) = futures.next().await {
120            parts.push(res?);
121        }
122        parts.sort_by_key(|(num, _)| *num);
123        Ok(parts)
124    }
125    .await;
126
127    match result {
128        Ok(parts) => {
129            dst.complete_multipart_upload(hash, alg, &upload_id, parts)
130                .await
131                .map_err(crate::SnapshotError::Io)?;
132            Ok(size_est)
133        }
134        Err(e) => {
135            let _ = dst.abort_multipart_upload(hash, alg, &upload_id).await;
136            Err(e)
137        }
138    }
139}
140
141/// Copies data between two AsyncDataCache instances for all hashes referenced by manifest files.
142pub async fn cache_sync_manifest(
143    manifests: &[&dyn ManifestRef],
144    source: Arc<dyn AsyncDataCache>,
145    destination: Arc<dyn AsyncDataCache>,
146    options: CacheSyncOptions,
147) -> crate::Result<CacheSyncResult> {
148    let start_time = std::time::Instant::now();
149
150    // Extract unique (hash, alg_ext, size_estimate) triples across all manifests
151    let mut seen = HashSet::new();
152    let mut work_items: Vec<(String, String, u64)> = Vec::new();
153
154    for manifest in manifests {
155        let alg_ext = manifest.hash_alg().extension().to_string();
156        let file_chunk_size_bytes = manifest.file_chunk_size_bytes();
157
158        for file in manifest.files() {
159            if file.symlink_target.is_some() || file.deleted {
160                continue;
161            }
162            if let Some(ref hash) = file.hash {
163                let key = (hash.clone(), alg_ext.clone());
164                if seen.insert(key) {
165                    work_items.push((hash.clone(), alg_ext.clone(), file.size.unwrap_or(0)));
166                }
167            } else if let Some(ref chunks) = file.chunk_hashes {
168                let num_chunks = chunks.len().max(1) as u64;
169                let chunk_est = if file_chunk_size_bytes > 0 {
170                    file_chunk_size_bytes as u64
171                } else {
172                    file.size.unwrap_or(0) / num_chunks
173                };
174                for ch in chunks {
175                    let key = (ch.clone(), alg_ext.clone());
176                    if seen.insert(key) {
177                        work_items.push((ch.clone(), alg_ext.clone(), chunk_est));
178                    }
179                }
180            }
181        }
182    }
183
184    let mut stats = CacheSyncStatistics {
185        total_objects: work_items.len(),
186        total_bytes: work_items.iter().map(|(_, _, s)| *s).sum(),
187        ..Default::default()
188    };
189
190    let on_progress: Option<Arc<super::ProgressFn<CacheSyncStatistics>>> =
191        options.on_progress.map(|f| Arc::from(f));
192
193    if work_items.is_empty() {
194        stats.progress = 100.0;
195        stats.progress_message = "Synced 0 B (0 objects) in 0.00s".into();
196        if let Some(ref cb) = on_progress {
197            let _ = cb(&stats);
198        }
199        return Ok(CacheSyncResult { statistics: stats });
200    }
201
202    let num_workers = options.max_workers.unwrap_or(10);
203    let max_memory = options
204        .max_memory_bytes
205        .unwrap_or_else(default_max_memory_bytes);
206
207    let cancelled = Arc::new(AtomicBool::new(false));
208    let progress_stats = Arc::new(Mutex::new(stats.clone()));
209    let rate_calc = Arc::new(Mutex::new(SlidingWindowRate::new()));
210    let memory_pool = Arc::new(MemoryPool::new(max_memory));
211    let worker_semaphore = Arc::new(tokio::sync::Semaphore::new(num_workers));
212
213    let mut handles = Vec::new();
214
215    for (hash, alg, size_est) in work_items {
216        let src = source.clone();
217        let dst = destination.clone();
218        let pool = memory_pool.clone();
219        let cancelled = cancelled.clone();
220        let progress_stats = progress_stats.clone();
221        let rate_calc = rate_calc.clone();
222        let on_progress = on_progress.clone();
223        let worker_sem = worker_semaphore.clone();
224        let start = start_time;
225
226        handles.push(tokio::spawn(async move {
227            let _worker_permit = worker_sem
228                .acquire_owned()
229                .await
230                .map_err(|e| crate::SnapshotError::Task(e.to_string()))?;
231
232            if cancelled.load(Ordering::Relaxed) {
233                return Err(crate::SnapshotError::Cancelled);
234            }
235
236            // Check if destination already has this object
237            if dst.object_exists(&hash, &alg).await.unwrap_or(false) {
238                let mut s = progress_stats.lock().unwrap();
239                s.skipped_objects += 1;
240                s.skipped_bytes += size_est;
241                let elapsed = start.elapsed().as_secs_f64();
242                s.total_time = elapsed;
243                {
244                    let mut rc = rate_calc.lock().unwrap();
245                    s.rate = rc.update(elapsed, s.copied_bytes + s.skipped_bytes);
246                }
247                if s.total_bytes > 0 {
248                    s.progress =
249                        ((s.copied_bytes + s.skipped_bytes) as f64 / s.total_bytes as f64) * 100.0;
250                }
251                if let Some(ref cb) = on_progress {
252                    if !cb(&s) {
253                        cancelled.store(true, Ordering::Relaxed);
254                        return Err(crate::SnapshotError::Cancelled);
255                    }
256                }
257                return Ok(());
258            }
259
260            // Try server-side copy first (no memory permit needed)
261            match dst.copy_from(src.as_ref(), &hash, &alg).await {
262                Ok(CopyResult::ServerSideCopy) => {
263                    let mut s = progress_stats.lock().unwrap();
264                    s.copied_objects += 1;
265                    s.copied_bytes += size_est;
266                    let elapsed = start.elapsed().as_secs_f64();
267                    s.total_time = elapsed;
268                    {
269                        let mut rc = rate_calc.lock().unwrap();
270                        s.rate = rc.update(elapsed, s.copied_bytes + s.skipped_bytes);
271                    }
272                    if s.total_bytes > 0 {
273                        s.progress = ((s.copied_bytes + s.skipped_bytes) as f64
274                            / s.total_bytes as f64)
275                            * 100.0;
276                    }
277                    if let Some(ref cb) = on_progress {
278                        if !cb(&s) {
279                            cancelled.store(true, Ordering::Relaxed);
280                            return Err(crate::SnapshotError::Cancelled);
281                        }
282                    }
283                    return Ok(());
284                }
285                Ok(CopyResult::NotSupported) | Err(_) => {
286                    // Fall through to get+put
287                }
288            }
289
290            let actual_size =
291                transfer_object(src.as_ref(), dst.as_ref(), &hash, &alg, size_est, &pool).await?;
292
293            {
294                let mut s = progress_stats.lock().unwrap();
295                s.copied_objects += 1;
296                s.copied_bytes += actual_size;
297                let elapsed = start.elapsed().as_secs_f64();
298                s.total_time = elapsed;
299                {
300                    let mut rc = rate_calc.lock().unwrap();
301                    s.rate = rc.update(elapsed, s.copied_bytes + s.skipped_bytes);
302                }
303                if s.total_bytes > 0 {
304                    s.progress =
305                        ((s.copied_bytes + s.skipped_bytes) as f64 / s.total_bytes as f64) * 100.0;
306                }
307                if let Some(ref cb) = on_progress {
308                    if !cb(&s) {
309                        cancelled.store(true, Ordering::Relaxed);
310                        return Err(crate::SnapshotError::Cancelled);
311                    }
312                }
313            }
314
315            Ok(())
316        }));
317    }
318
319    let mut results = Vec::new();
320    for handle in handles {
321        match handle.await {
322            Ok(r) => results.push(r),
323            Err(e) => results.push(Err(crate::SnapshotError::Task(e.to_string()))),
324        }
325    }
326
327    // Check for errors
328    for r in results {
329        r?;
330    }
331
332    stats = progress_stats.lock().unwrap().clone();
333    stats.total_time = start_time.elapsed().as_secs_f64();
334    {
335        let mut rc = rate_calc.lock().unwrap();
336        stats.rate = rc.update(stats.total_time, stats.copied_bytes + stats.skipped_bytes);
337    }
338    if stats.total_bytes > 0 {
339        stats.progress =
340            ((stats.copied_bytes + stats.skipped_bytes) as f64 / stats.total_bytes as f64) * 100.0;
341    }
342
343    let mut parts = vec![
344        format!(
345            "Synced {}",
346            crate::hash::human_readable_file_size(stats.total_bytes)
347        ),
348        format!("({} objects)", stats.total_objects),
349        format!("in {:.2}s", stats.total_time),
350    ];
351    if stats.total_time > 0.0 {
352        parts.push(format!(
353            "({}/s)",
354            crate::hash::human_readable_file_size(stats.rate as u64)
355        ));
356    }
357    stats.progress_message = parts.join(" ");
358
359    if let Some(ref cb) = on_progress {
360        let _ = cb(&stats);
361    }
362
363    Ok(CacheSyncResult { statistics: stats })
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use crate::data_cache::FileSystemDataCache;
370    use crate::hash::HashAlgorithm;
371    use crate::manifest::{FileEntry, Manifest, RelManifest};
372    use tempfile::TempDir;
373
374    fn test_manifest(files: Vec<FileEntry>) -> RelManifest {
375        RelManifest::Snapshot(Manifest::new(HashAlgorithm::Xxh128, -1).with_files(files))
376    }
377
378    fn make_caches() -> (
379        TempDir,
380        TempDir,
381        Arc<dyn AsyncDataCache>,
382        Arc<dyn AsyncDataCache>,
383    ) {
384        let src_dir = TempDir::new().unwrap();
385        let dst_dir = TempDir::new().unwrap();
386        let src: Arc<dyn AsyncDataCache> =
387            Arc::new(FileSystemDataCache::new(src_dir.path().join("data")).unwrap());
388        let dst: Arc<dyn AsyncDataCache> =
389            Arc::new(FileSystemDataCache::new(dst_dir.path().join("data")).unwrap());
390        (src_dir, dst_dir, src, dst)
391    }
392
393    async fn put(cache: &Arc<dyn AsyncDataCache>, hash: &str, alg: &str, data: &[u8]) {
394        cache.put_object(hash, alg, data.to_vec()).await.unwrap();
395    }
396
397    async fn exists(cache: &Arc<dyn AsyncDataCache>, hash: &str, alg: &str) -> bool {
398        cache.object_exists(hash, alg).await.unwrap()
399    }
400
401    async fn get(cache: &Arc<dyn AsyncDataCache>, hash: &str, alg: &str) -> Vec<u8> {
402        cache.get_object(hash, alg).await.unwrap()
403    }
404
405    #[tokio::test]
406    async fn sync_copies_data() {
407        let (_sd, _dd, src, dst) = make_caches();
408        put(&src, "hash_a", "xxh128", b"hello").await;
409        put(&src, "hash_b", "xxh128", b"world").await;
410
411        let manifest = test_manifest(vec![
412            {
413                let mut e = FileEntry::new("a.txt");
414                e.hash = Some("hash_a".into());
415                e.size = Some(5);
416                e
417            },
418            {
419                let mut e = FileEntry::new("b.txt");
420                e.hash = Some("hash_b".into());
421                e.size = Some(5);
422                e
423            },
424        ]);
425
426        let result = cache_sync_manifest(
427            &[&manifest as &dyn ManifestRef],
428            src,
429            dst.clone(),
430            CacheSyncOptions::default(),
431        )
432        .await
433        .unwrap();
434
435        assert_eq!(result.statistics.copied_objects, 2);
436        assert!(exists(&dst, "hash_a", "xxh128").await);
437        assert_eq!(get(&dst, "hash_b", "xxh128").await, b"world");
438    }
439
440    #[tokio::test]
441    async fn sync_skips_existing() {
442        let (_sd, _dd, src, dst) = make_caches();
443        put(&src, "hash_a", "xxh128", b"hello").await;
444        put(&src, "hash_b", "xxh128", b"world").await;
445        put(&dst, "hash_a", "xxh128", b"hello").await;
446
447        let manifest = test_manifest(vec![
448            {
449                let mut e = FileEntry::new("a.txt");
450                e.hash = Some("hash_a".into());
451                e.size = Some(5);
452                e
453            },
454            {
455                let mut e = FileEntry::new("b.txt");
456                e.hash = Some("hash_b".into());
457                e.size = Some(5);
458                e
459            },
460        ]);
461
462        let result = cache_sync_manifest(
463            &[&manifest as &dyn ManifestRef],
464            src,
465            dst,
466            CacheSyncOptions::default(),
467        )
468        .await
469        .unwrap();
470
471        assert_eq!(result.statistics.skipped_objects, 1);
472        assert_eq!(result.statistics.copied_objects, 1);
473    }
474
475    #[tokio::test]
476    async fn sync_handles_chunk_hashes() {
477        let (_sd, _dd, src, dst) = make_caches();
478        put(&src, "chunk_0", "xxh128", b"aaa").await;
479        put(&src, "chunk_1", "xxh128", b"bbb").await;
480
481        let manifest =
482            RelManifest::Snapshot(Manifest::new(HashAlgorithm::Xxh128, 3).with_files(vec![{
483                let mut e = FileEntry::new("big.bin");
484                e.size = Some(6);
485                e.chunk_hashes = Some(vec!["chunk_0".into(), "chunk_1".into()]);
486                e
487            }]));
488
489        let result = cache_sync_manifest(
490            &[&manifest as &dyn ManifestRef],
491            src,
492            dst.clone(),
493            CacheSyncOptions::default(),
494        )
495        .await
496        .unwrap();
497
498        assert_eq!(result.statistics.copied_objects, 2);
499        assert!(exists(&dst, "chunk_0", "xxh128").await);
500        assert!(exists(&dst, "chunk_1", "xxh128").await);
501    }
502
503    #[tokio::test]
504    async fn sync_skips_symlinks_deleted_unhashed() {
505        let (_sd, _dd, src, dst) = make_caches();
506        put(&src, "hash_real", "xxh128", b"data").await;
507
508        let manifest = test_manifest(vec![
509            {
510                let mut e = FileEntry::new("real.txt");
511                e.hash = Some("hash_real".into());
512                e.size = Some(4);
513                e
514            },
515            FileEntry::symlink("link", "target"),
516            FileEntry::deleted("gone"),
517            FileEntry::new("unhashed.txt"),
518        ]);
519
520        let result = cache_sync_manifest(
521            &[&manifest as &dyn ManifestRef],
522            src,
523            dst,
524            CacheSyncOptions::default(),
525        )
526        .await
527        .unwrap();
528
529        assert_eq!(result.statistics.total_objects, 1);
530        assert_eq!(result.statistics.copied_objects, 1);
531    }
532
533    #[tokio::test]
534    async fn sync_deduplicates_hashes() {
535        let (_sd, _dd, src, dst) = make_caches();
536        put(&src, "same_hash", "xxh128", b"dup").await;
537
538        let manifest = test_manifest(vec![
539            {
540                let mut e = FileEntry::new("a.txt");
541                e.hash = Some("same_hash".into());
542                e.size = Some(3);
543                e
544            },
545            {
546                let mut e = FileEntry::new("b.txt");
547                e.hash = Some("same_hash".into());
548                e.size = Some(3);
549                e
550            },
551        ]);
552
553        let result = cache_sync_manifest(
554            &[&manifest as &dyn ManifestRef],
555            src,
556            dst,
557            CacheSyncOptions::default(),
558        )
559        .await
560        .unwrap();
561
562        assert_eq!(result.statistics.total_objects, 1);
563        assert_eq!(result.statistics.copied_objects, 1);
564    }
565
566    #[tokio::test]
567    async fn sync_uses_multipart_for_large_objects() {
568        use std::collections::HashMap;
569        use std::sync::atomic::{AtomicUsize, Ordering};
570
571        /// Wrapper that sets a small part_size so small test data triggers multipart.
572        struct SmallPartCache {
573            inner: Arc<dyn AsyncDataCache>,
574            upload_part_calls: Arc<AtomicUsize>,
575            #[allow(clippy::type_complexity)]
576            parts: Arc<Mutex<HashMap<String, Vec<(i32, Vec<u8>)>>>>,
577        }
578
579        #[async_trait::async_trait]
580        impl AsyncDataCache for SmallPartCache {
581            fn object_key(&self, h: &str, a: &str) -> String {
582                self.inner.object_key(h, a)
583            }
584            fn as_any(&self) -> &dyn std::any::Any {
585                self
586            }
587            fn multipart_part_size(&self) -> usize {
588                5
589            }
590            fn as_multipart(&self) -> Option<&dyn MultipartDataCache> {
591                Some(self)
592            }
593            fn as_range_read(&self) -> Option<&dyn RangeReadDataCache> {
594                Some(self)
595            }
596            async fn object_exists(&self, h: &str, a: &str) -> std::io::Result<bool> {
597                self.inner.object_exists(h, a).await
598            }
599            async fn put_object(&self, h: &str, a: &str, d: Vec<u8>) -> std::io::Result<String> {
600                self.inner.put_object(h, a, d).await
601            }
602            async fn get_object(&self, h: &str, a: &str) -> std::io::Result<Vec<u8>> {
603                self.inner.get_object(h, a).await
604            }
605        }
606
607        #[async_trait::async_trait]
608        impl MultipartDataCache for SmallPartCache {
609            async fn create_multipart_upload(&self, _h: &str, _a: &str) -> std::io::Result<String> {
610                Ok("test-upload-id".into())
611            }
612            async fn upload_part(
613                &self,
614                h: &str,
615                a: &str,
616                _uid: &str,
617                pn: i32,
618                data: Vec<u8>,
619            ) -> std::io::Result<String> {
620                self.upload_part_calls.fetch_add(1, Ordering::Relaxed);
621                let key = format!("{h}.{a}");
622                self.parts
623                    .lock()
624                    .unwrap()
625                    .entry(key)
626                    .or_default()
627                    .push((pn, data));
628                Ok(format!("etag-{pn}"))
629            }
630            async fn complete_multipart_upload(
631                &self,
632                h: &str,
633                a: &str,
634                _uid: &str,
635                parts: Vec<(i32, String)>,
636            ) -> std::io::Result<()> {
637                let key = format!("{h}.{a}");
638                let combined = {
639                    let stored = self.parts.lock().unwrap();
640                    let part_data = stored.get(&key).unwrap();
641                    assert_eq!(parts.len(), part_data.len());
642                    let mut sorted: Vec<_> = part_data.clone();
643                    sorted.sort_by_key(|(n, _)| *n);
644                    sorted.into_iter().flat_map(|(_, d)| d).collect::<Vec<u8>>()
645                };
646                self.inner.put_object(h, a, combined).await?;
647                Ok(())
648            }
649            async fn abort_multipart_upload(
650                &self,
651                _h: &str,
652                _a: &str,
653                _uid: &str,
654            ) -> std::io::Result<()> {
655                Ok(())
656            }
657        }
658
659        #[async_trait::async_trait]
660        impl RangeReadDataCache for SmallPartCache {
661            async fn get_object_range(
662                &self,
663                h: &str,
664                a: &str,
665                s: u64,
666                e: u64,
667            ) -> std::io::Result<Vec<u8>> {
668                let data = self.inner.get_object(h, a).await?;
669                let end = std::cmp::min(e as usize + 1, data.len());
670                Ok(data[s as usize..end].to_vec())
671            }
672        }
673
674        let (_sd, _dd, src, dst_inner) = make_caches();
675        // 20 bytes, part_size=5, threshold=10 → triggers multipart (4 parts)
676        put(&src, "big_hash", "xxh128", b"01234567890123456789").await;
677
678        let upload_part_calls = Arc::new(AtomicUsize::new(0));
679        let dst: Arc<dyn AsyncDataCache> = Arc::new(SmallPartCache {
680            inner: dst_inner.clone(),
681            upload_part_calls: upload_part_calls.clone(),
682            parts: Arc::new(Mutex::new(HashMap::new())),
683        });
684
685        let manifest = test_manifest(vec![{
686            let mut e = FileEntry::new("big.bin");
687            e.hash = Some("big_hash".into());
688            e.size = Some(20);
689            e
690        }]);
691
692        // Use src as the source but wrap it to support get_object_range
693        let src_wrapped: Arc<dyn AsyncDataCache> = Arc::new(SmallPartCache {
694            inner: src.clone(),
695            upload_part_calls: Arc::new(AtomicUsize::new(0)),
696            parts: Arc::new(Mutex::new(HashMap::new())),
697        });
698
699        let result = cache_sync_manifest(
700            &[&manifest as &dyn ManifestRef],
701            src_wrapped,
702            dst,
703            CacheSyncOptions::default(),
704        )
705        .await
706        .unwrap();
707
708        assert_eq!(result.statistics.copied_objects, 1);
709        assert_eq!(upload_part_calls.load(Ordering::Relaxed), 4);
710        // Verify data arrived correctly via the inner cache
711        assert_eq!(
712            get(&dst_inner, "big_hash", "xxh128").await,
713            b"01234567890123456789"
714        );
715    }
716
717    #[tokio::test]
718    async fn sync_uses_copy_from_when_server_side_copy() {
719        use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
720
721        struct MockServerSideCopyCache {
722            inner: Arc<dyn AsyncDataCache>,
723            copy_from_called: Arc<AtomicBool>,
724            get_object_calls: Arc<AtomicUsize>,
725        }
726
727        #[async_trait::async_trait]
728        impl AsyncDataCache for MockServerSideCopyCache {
729            fn object_key(&self, hash: &str, algorithm: &str) -> String {
730                self.inner.object_key(hash, algorithm)
731            }
732            fn as_any(&self) -> &dyn std::any::Any {
733                self
734            }
735            async fn object_exists(&self, hash: &str, algorithm: &str) -> std::io::Result<bool> {
736                self.inner.object_exists(hash, algorithm).await
737            }
738            async fn put_object(
739                &self,
740                hash: &str,
741                algorithm: &str,
742                data: Vec<u8>,
743            ) -> std::io::Result<String> {
744                self.inner.put_object(hash, algorithm, data).await
745            }
746            async fn get_object(&self, hash: &str, algorithm: &str) -> std::io::Result<Vec<u8>> {
747                self.get_object_calls.fetch_add(1, Ordering::Relaxed);
748                self.inner.get_object(hash, algorithm).await
749            }
750            async fn copy_from(
751                &self,
752                _source: &dyn AsyncDataCache,
753                hash: &str,
754                algorithm: &str,
755            ) -> std::io::Result<CopyResult> {
756                self.copy_from_called.store(true, Ordering::Relaxed);
757                // Simulate server-side copy by doing the actual copy via inner
758                let data = _source.get_object(hash, algorithm).await?;
759                self.inner.put_object(hash, algorithm, data).await?;
760                Ok(CopyResult::ServerSideCopy)
761            }
762        }
763
764        let (_sd, _dd, src, dst_inner) = make_caches();
765        put(&src, "hash_a", "xxh128", b"hello").await;
766
767        let copy_from_called = Arc::new(AtomicBool::new(false));
768        let get_object_calls = Arc::new(AtomicUsize::new(0));
769        let dst: Arc<dyn AsyncDataCache> = Arc::new(MockServerSideCopyCache {
770            inner: dst_inner,
771            copy_from_called: copy_from_called.clone(),
772            get_object_calls: get_object_calls.clone(),
773        });
774
775        let manifest = test_manifest(vec![{
776            let mut e = FileEntry::new("a.txt");
777            e.hash = Some("hash_a".into());
778            e.size = Some(5);
779            e
780        }]);
781
782        let result = cache_sync_manifest(
783            &[&manifest as &dyn ManifestRef],
784            src,
785            dst.clone(),
786            CacheSyncOptions::default(),
787        )
788        .await
789        .unwrap();
790
791        assert!(
792            copy_from_called.load(Ordering::Relaxed),
793            "copy_from should have been called"
794        );
795        assert_eq!(
796            get_object_calls.load(Ordering::Relaxed),
797            0,
798            "get_object on dst should not be called"
799        );
800        assert_eq!(result.statistics.copied_objects, 1);
801        assert!(exists(&dst, "hash_a", "xxh128").await);
802    }
803}