1use std::collections::HashMap;
8use std::pin::pin;
9
10use async_trait::async_trait;
11use clayers_xml::ContentHash;
12use futures_core::Stream;
13
14use crate::error::{Error, Result};
15use crate::graph;
16use crate::object::Object;
17use crate::store::{ObjectStore, RefStore};
18
19async fn try_collect_stream<S>(stream: S) -> Result<HashMap<ContentHash, Object>>
21where
22 S: Stream<Item = Result<(ContentHash, Object)>>,
23{
24 let mut stream = pin!(stream);
25 let mut map = HashMap::new();
26 while let Some(item) = std::future::poll_fn(|cx| stream.as_mut().poll_next(cx)).await {
27 let (hash, obj) = item?;
28 map.insert(hash, obj);
29 }
30 Ok(map)
31}
32
33#[async_trait]
39pub trait RefConflict: Send + Sync {
40 async fn resolve(
48 &self,
49 store: &dyn ObjectStore,
50 ref_name: &str,
51 src_hash: ContentHash,
52 dst_hash: ContentHash,
53 ) -> Result<bool>;
54}
55
56pub struct FastForwardOnly;
58
59#[async_trait]
60impl RefConflict for FastForwardOnly {
61 async fn resolve(
62 &self,
63 store: &dyn ObjectStore,
64 _ref_name: &str,
65 src_hash: ContentHash,
66 dst_hash: ContentHash,
67 ) -> Result<bool> {
68 let lca = graph::common_ancestor(store, src_hash, dst_hash).await?;
69 if lca == Some(dst_hash) {
70 Ok(true)
71 } else {
72 Err(Error::Ref(
73 "cannot fast-forward: destination is not an ancestor of source".into(),
74 ))
75 }
76 }
77}
78
79pub struct Overwrite;
81
82#[async_trait]
83impl RefConflict for Overwrite {
84 async fn resolve(
85 &self,
86 _store: &dyn ObjectStore,
87 _ref_name: &str,
88 _src_hash: ContentHash,
89 _dst_hash: ContentHash,
90 ) -> Result<bool> {
91 Ok(true)
92 }
93}
94
95pub struct Reject;
97
98#[async_trait]
99impl RefConflict for Reject {
100 async fn resolve(
101 &self,
102 _store: &dyn ObjectStore,
103 _ref_name: &str,
104 _src_hash: ContentHash,
105 _dst_hash: ContentHash,
106 ) -> Result<bool> {
107 Err(Error::Ref(
108 "destination ref already exists with a different value".into(),
109 ))
110 }
111}
112
113pub async fn transfer_objects(
128 src: &dyn ObjectStore,
129 dst: &dyn ObjectStore,
130 root: ContentHash,
131) -> Result<usize> {
132 let src_objects = try_collect_stream(src.subtree(&root)).await?;
133
134 let mut missing = Vec::new();
136 for (hash, obj) in &src_objects {
137 if !dst.contains(hash).await? {
138 missing.push((*hash, obj.clone()));
139 }
140 }
141
142 if missing.is_empty() {
143 return Ok(0);
144 }
145
146 let count = missing.len();
148 let mut tx = dst.transaction().await?;
149 for (hash, obj) in missing {
150 tx.put(hash, obj).await?;
151 }
152 tx.commit().await?;
153
154 Ok(count)
155}
156
157pub async fn sync_ref(
175 src_objects: &dyn ObjectStore,
176 src_refs: &dyn RefStore,
177 dst_objects: &dyn ObjectStore,
178 dst_refs: &dyn RefStore,
179 ref_name: &str,
180 on_conflict: &dyn RefConflict,
181) -> Result<bool> {
182 let src_hash = src_refs
183 .get_ref(ref_name)
184 .await?
185 .ok_or_else(|| Error::Ref(format!("source ref not found: {ref_name}")))?;
186
187 let dst_hash = dst_refs.get_ref(ref_name).await?;
188
189 if let Some(dst_hash) = dst_hash {
190 if dst_hash == src_hash {
191 return Ok(false);
193 }
194 transfer_objects(src_objects, dst_objects, src_hash).await?;
196 let proceed = on_conflict
197 .resolve(dst_objects, ref_name, src_hash, dst_hash)
198 .await?;
199 if !proceed {
200 return Ok(false);
201 }
202 } else {
203 transfer_objects(src_objects, dst_objects, src_hash).await?;
204 }
205
206 dst_refs.set_ref(ref_name, src_hash).await?;
207 Ok(true)
208}
209
210pub async fn sync_refs(
218 src_objects: &dyn ObjectStore,
219 src_refs: &dyn RefStore,
220 dst_objects: &dyn ObjectStore,
221 dst_refs: &dyn RefStore,
222 prefix: &str,
223 on_conflict: &dyn RefConflict,
224) -> Result<usize> {
225 let refs = src_refs.list_refs(prefix).await?;
226 let mut count = 0;
227
228 for (ref_name, _) in &refs {
229 let updated = sync_ref(
230 src_objects,
231 src_refs,
232 dst_objects,
233 dst_refs,
234 ref_name,
235 on_conflict,
236 )
237 .await?;
238 if updated {
239 count += 1;
240 }
241 }
242
243 Ok(count)
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use crate::object::{
250 Author, CommentObject, CommitObject, DocumentObject, ElementObject, PIObject, TagObject,
251 TextObject, TreeEntry, TreeObject,
252 };
253 use crate::store::memory::MemoryStore;
254 use chrono::Utc;
255 use proptest::prelude::*;
256 use tokio_stream::StreamExt as _;
257
258 fn author() -> Author {
259 Author {
260 name: "Test".into(),
261 email: "test@test.com".into(),
262 }
263 }
264
265 async fn build_commit(
269 store: &MemoryStore,
270 id: &[u8],
271 parents: Vec<ContentHash>,
272 ) -> (ContentHash, ContentHash) {
273 let text_hash = ContentHash::from_canonical(id);
274 let text = TextObject {
275 content: String::from_utf8_lossy(id).into(),
276 };
277
278 let elem_id: Vec<u8> = id.iter().chain(b"elem").copied().collect();
279 let elem_hash = ContentHash::from_canonical(&elem_id);
280 let elem = ElementObject {
281 local_name: "root".into(),
282 namespace_uri: None,
283 namespace_prefix: None,
284 extra_namespaces: vec![],
285 attributes: vec![],
286 children: vec![text_hash],
287 inclusive_hash: elem_hash,
288 };
289
290 let doc_id: Vec<u8> = id.iter().chain(b"doc").copied().collect();
291 let doc_hash = ContentHash::from_canonical(&doc_id);
292 let doc = DocumentObject { root: elem_hash, prologue: vec![] };
293
294 let tree_id: Vec<u8> = id.iter().chain(b"tree").copied().collect();
295 let tree_hash = ContentHash::from_canonical(&tree_id);
296 let tree = TreeObject::new(vec![
297 TreeEntry { path: "doc.xml".into(), document: doc_hash },
298 ]);
299
300 let commit_id: Vec<u8> = id.iter().chain(b"commit").copied().collect();
301 let commit_hash = ContentHash::from_canonical(&commit_id);
302 let commit = CommitObject {
303 tree: tree_hash,
304 parents,
305 author: author(),
306 timestamp: Utc::now(),
307 message: format!("commit {}", String::from_utf8_lossy(id)),
308 };
309
310 let mut tx = store.transaction().await.unwrap();
311 tx.put(text_hash, Object::Text(text)).await.unwrap();
312 tx.put(elem_hash, Object::Element(elem)).await.unwrap();
313 tx.put(doc_hash, Object::Document(doc)).await.unwrap();
314 tx.put(tree_hash, Object::Tree(tree)).await.unwrap();
315 tx.put(commit_hash, Object::Commit(commit)).await.unwrap();
316 tx.commit().await.unwrap();
317
318 (commit_hash, doc_hash)
319 }
320
321 #[tokio::test]
322 async fn sync_transfer_objects_copies_missing() {
323 let src = MemoryStore::new();
324 let dst = MemoryStore::new();
325
326 let (commit_hash, _) = build_commit(&src, b"c1", vec![]).await;
327
328 let count = transfer_objects(&src, &dst, commit_hash).await.unwrap();
330 assert_eq!(count, 5);
331
332 assert!(dst.contains(&commit_hash).await.unwrap());
334 }
335
336 #[tokio::test]
337 async fn sync_transfer_idempotent() {
338 let src = MemoryStore::new();
339 let dst = MemoryStore::new();
340
341 let (commit_hash, _) = build_commit(&src, b"c1", vec![]).await;
342
343 transfer_objects(&src, &dst, commit_hash).await.unwrap();
344 let second = transfer_objects(&src, &dst, commit_hash).await.unwrap();
345 assert_eq!(second, 0, "second transfer should copy 0 objects");
346 }
347
348 #[tokio::test]
349 async fn sync_ref_fast_forward() {
350 let src = MemoryStore::new();
351 let dst = MemoryStore::new();
352
353 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
355 let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
356
357 transfer_objects(&src, &dst, c1).await.unwrap();
359 dst.set_ref("refs/heads/main", c1).await.unwrap();
360
361 src.set_ref("refs/heads/main", c2).await.unwrap();
363
364 let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &FastForwardOnly)
366 .await
367 .unwrap();
368 assert!(updated, "fast-forward should report ref was updated");
369
370 let dst_ref = dst.get_ref("refs/heads/main").await.unwrap();
371 assert_eq!(dst_ref, Some(c2));
372 }
373
374 #[tokio::test]
375 async fn sync_ref_fast_forward_rejects_diverged() {
376 let src = MemoryStore::new();
377 let dst = MemoryStore::new();
378
379 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
381 let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
382 let (c3, _) = build_commit(&dst, b"c3", vec![c1]).await;
383
384 transfer_objects(&src, &dst, c1).await.unwrap();
386
387 src.set_ref("refs/heads/main", c2).await.unwrap();
388 dst.set_ref("refs/heads/main", c3).await.unwrap();
389
390 let result = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &FastForwardOnly).await;
391 assert!(result.is_err(), "should reject diverged histories");
392 }
393
394 #[tokio::test]
395 async fn sync_ref_overwrite_always_succeeds() {
396 let src = MemoryStore::new();
397 let dst = MemoryStore::new();
398
399 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
400 let (c2, _) = build_commit(&src, b"c2", vec![]).await; transfer_objects(&src, &dst, c1).await.unwrap();
403 dst.set_ref("refs/heads/main", c1).await.unwrap();
404 src.set_ref("refs/heads/main", c2).await.unwrap();
405
406 let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Overwrite)
407 .await
408 .unwrap();
409 assert!(updated, "overwrite should report ref was updated");
410
411 let dst_ref = dst.get_ref("refs/heads/main").await.unwrap();
412 assert_eq!(dst_ref, Some(c2));
413 }
414
415 #[tokio::test]
416 async fn sync_ref_reject_fails_when_different() {
417 let src = MemoryStore::new();
418 let dst = MemoryStore::new();
419
420 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
421 let (c2, _) = build_commit(&src, b"c2", vec![]).await;
422
423 transfer_objects(&src, &dst, c1).await.unwrap();
424 dst.set_ref("refs/heads/main", c1).await.unwrap();
425 src.set_ref("refs/heads/main", c2).await.unwrap();
426
427 let result = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Reject).await;
428 assert!(result.is_err(), "should reject when refs differ");
429 }
430
431 #[tokio::test]
432 async fn sync_refs_with_prefix() {
433 let src = MemoryStore::new();
434 let dst = MemoryStore::new();
435
436 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
437 let (c2, _) = build_commit(&src, b"c2", vec![]).await;
438
439 src.set_ref("refs/heads/main", c1).await.unwrap();
440 src.set_ref("refs/heads/feature", c2).await.unwrap();
441
442 let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Overwrite)
443 .await
444 .unwrap();
445
446 assert_eq!(count, 2);
447 assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
448 assert_eq!(dst.get_ref("refs/heads/feature").await.unwrap(), Some(c2));
449 }
450
451 #[tokio::test]
452 async fn sync_ref_missing_src_ref_errors() {
453 let src = MemoryStore::new();
454 let dst = MemoryStore::new();
455
456 let result =
457 sync_ref(&src, &src, &dst, &dst, "refs/heads/missing", &Overwrite).await;
458 assert!(result.is_err(), "missing source ref should error");
459 }
460
461 #[tokio::test]
462 async fn sync_ref_already_up_to_date() {
463 let src = MemoryStore::new();
464 let dst = MemoryStore::new();
465
466 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
467
468 transfer_objects(&src, &dst, c1).await.unwrap();
469 src.set_ref("refs/heads/main", c1).await.unwrap();
470 dst.set_ref("refs/heads/main", c1).await.unwrap();
471
472 let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Reject)
474 .await
475 .unwrap();
476 assert!(!updated, "already up-to-date should report false");
477 }
478
479 #[tokio::test]
482 async fn sync_reachable_follows_tag_to_commit() {
483 let src = MemoryStore::new();
484 let dst = MemoryStore::new();
485
486 let (commit_hash, _) = build_commit(&src, b"c1", vec![]).await;
487
488 let tag_id = b"tag-v1";
490 let tag_hash = ContentHash::from_canonical(tag_id);
491 let tag = TagObject {
492 target: commit_hash,
493 name: "v1.0".into(),
494 tagger: author(),
495 timestamp: Utc::now(),
496 message: "release".into(),
497 };
498 let mut tx = src.transaction().await.unwrap();
499 tx.put(tag_hash, Object::Tag(tag)).await.unwrap();
500 tx.commit().await.unwrap();
501
502 let count = transfer_objects(&src, &dst, tag_hash).await.unwrap();
504 assert_eq!(count, 6);
505 assert!(dst.contains(&tag_hash).await.unwrap());
506 assert!(dst.contains(&commit_hash).await.unwrap());
507 }
508
509 #[tokio::test]
510 async fn sync_reachable_follows_comment_and_pi_leaves() {
511 let src = MemoryStore::new();
512 let dst = MemoryStore::new();
513
514 let comment_hash = ContentHash::from_canonical(b"comment1");
516 let comment = CommentObject {
517 content: "a comment".into(),
518 };
519
520 let pi_hash = ContentHash::from_canonical(b"pi1");
521 let pi = PIObject {
522 target: "xml-stylesheet".into(),
523 data: Some("type=\"text/xsl\"".into()),
524 };
525
526 let elem_hash = ContentHash::from_canonical(b"elem-mixed");
527 let elem = ElementObject {
528 local_name: "root".into(),
529 namespace_uri: None,
530 namespace_prefix: None,
531 extra_namespaces: vec![],
532 attributes: vec![],
533 children: vec![comment_hash, pi_hash],
534 inclusive_hash: elem_hash,
535 };
536
537 let doc_hash = ContentHash::from_canonical(b"doc-mixed");
538 let doc = DocumentObject { root: elem_hash, prologue: vec![] };
539
540 let tree_hash = ContentHash::from_canonical(b"tree-mixed");
541 let tree = TreeObject::new(vec![
542 TreeEntry { path: "doc.xml".into(), document: doc_hash },
543 ]);
544
545 let commit_hash = ContentHash::from_canonical(b"commit-mixed");
546 let commit = CommitObject {
547 tree: tree_hash,
548 parents: vec![],
549 author: author(),
550 timestamp: Utc::now(),
551 message: "mixed content".into(),
552 };
553
554 let mut tx = src.transaction().await.unwrap();
555 tx.put(comment_hash, Object::Comment(comment)).await.unwrap();
556 tx.put(pi_hash, Object::PI(pi)).await.unwrap();
557 tx.put(elem_hash, Object::Element(elem)).await.unwrap();
558 tx.put(doc_hash, Object::Document(doc)).await.unwrap();
559 tx.put(tree_hash, Object::Tree(tree)).await.unwrap();
560 tx.put(commit_hash, Object::Commit(commit)).await.unwrap();
561 tx.commit().await.unwrap();
562
563 let count = transfer_objects(&src, &dst, commit_hash).await.unwrap();
564 assert_eq!(count, 6); assert!(dst.contains(&comment_hash).await.unwrap());
566 assert!(dst.contains(&pi_hash).await.unwrap());
567 }
568
569 #[tokio::test]
572 async fn sync_transfer_copies_all_inner_objects() {
573 let src = MemoryStore::new();
574 let dst = MemoryStore::new();
575
576 let (commit_hash, doc_hash) = build_commit(&src, b"c1", vec![]).await;
577
578 let text_hash = ContentHash::from_canonical(b"c1");
580 let elem_hash = ContentHash::from_canonical(b"c1elem");
581 let tree_hash = ContentHash::from_canonical(b"c1tree");
582
583 transfer_objects(&src, &dst, commit_hash).await.unwrap();
584
585 assert!(dst.contains(&commit_hash).await.unwrap(), "commit missing");
587 assert!(dst.contains(&tree_hash).await.unwrap(), "tree missing");
588 assert!(dst.contains(&doc_hash).await.unwrap(), "document missing");
589 assert!(dst.contains(&elem_hash).await.unwrap(), "element missing");
590 assert!(dst.contains(&text_hash).await.unwrap(), "text missing");
591
592 let src_text = src.get(&text_hash).await.unwrap().unwrap();
594 let dst_text = dst.get(&text_hash).await.unwrap().unwrap();
595 assert_eq!(src_text, dst_text);
596 }
597
598 #[tokio::test]
601 async fn sync_transfer_follows_parent_chain() {
602 let src = MemoryStore::new();
603 let dst = MemoryStore::new();
604
605 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
607 let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
608 let (c3, _) = build_commit(&src, b"c3", vec![c2]).await;
609
610 let count = transfer_objects(&src, &dst, c3).await.unwrap();
612 assert_eq!(count, 15); assert!(dst.contains(&c1).await.unwrap(), "ancestor c1 missing");
615 assert!(dst.contains(&c2).await.unwrap(), "ancestor c2 missing");
616 assert!(dst.contains(&c3).await.unwrap(), "tip c3 missing");
617 }
618
619 #[tokio::test]
622 async fn sync_refs_prefix_excludes_non_matching() {
623 let src = MemoryStore::new();
624 let dst = MemoryStore::new();
625
626 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
627 let (c2, _) = build_commit(&src, b"c2", vec![]).await;
628
629 src.set_ref("refs/heads/main", c1).await.unwrap();
630 src.set_ref("refs/tags/v1", c2).await.unwrap();
631
632 let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Overwrite)
634 .await
635 .unwrap();
636
637 assert_eq!(count, 1);
638 assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
639 assert_eq!(
640 dst.get_ref("refs/tags/v1").await.unwrap(),
641 None,
642 "tag ref should NOT have been synced"
643 );
644 }
645
646 #[tokio::test]
649 async fn sync_refs_aborts_on_conflict_error() {
650 let src = MemoryStore::new();
651 let dst = MemoryStore::new();
652
653 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
654 let (c2, _) = build_commit(&src, b"c2", vec![]).await;
655 let (c3, _) = build_commit(&src, b"c3", vec![]).await;
656
657 src.set_ref("refs/heads/alpha", c1).await.unwrap();
659 src.set_ref("refs/heads/beta", c2).await.unwrap();
660
661 transfer_objects(&src, &dst, c3).await.unwrap();
663 dst.set_ref("refs/heads/alpha", c3).await.unwrap(); let result = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Reject).await;
667 assert!(result.is_err(), "should abort when a ref conflicts under Reject");
668 }
669
670 #[tokio::test]
673 async fn sync_ref_fast_forward_full_flow() {
674 let src = MemoryStore::new();
675 let dst = MemoryStore::new();
676
677 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
679 let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
680
681 src.set_ref("refs/heads/main", c1).await.unwrap();
683 let created = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Overwrite)
684 .await
685 .unwrap();
686 assert!(created, "initial sync should report updated");
687
688 src.set_ref("refs/heads/main", c2).await.unwrap();
690
691 let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &FastForwardOnly)
694 .await
695 .unwrap();
696 assert!(updated, "fast-forward should report updated");
697
698 assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c2));
699 assert!(dst.contains(&c1).await.unwrap());
701 }
702
703 #[tokio::test]
706 async fn sync_transfer_deduplicates_shared_subtree() {
707 let src = MemoryStore::new();
708 let dst = MemoryStore::new();
709
710 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
712
713 let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
715
716 let first = transfer_objects(&src, &dst, c1).await.unwrap();
718 assert_eq!(first, 5);
719
720 let second = transfer_objects(&src, &dst, c2).await.unwrap();
724 assert_eq!(second, 5, "should only transfer c2's new objects, not c1's");
725 }
726
727 #[tokio::test]
730 async fn sync_ref_creates_new_ref_on_empty_dst() {
731 let src = MemoryStore::new();
732 let dst = MemoryStore::new();
733
734 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
735 src.set_ref("refs/heads/main", c1).await.unwrap();
736
737 let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &FastForwardOnly)
739 .await
740 .unwrap();
741 assert!(updated, "creating a new ref should report updated");
742
743 assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
744 assert!(dst.contains(&c1).await.unwrap(), "objects should be transferred");
745 }
746
747 struct Skip;
751
752 #[async_trait]
753 impl RefConflict for Skip {
754 async fn resolve(
755 &self,
756 _store: &dyn ObjectStore,
757 _ref_name: &str,
758 _src_hash: ContentHash,
759 _dst_hash: ContentHash,
760 ) -> Result<bool> {
761 Ok(false)
762 }
763 }
764
765 #[tokio::test]
766 async fn sync_ref_skip_leaves_dst_ref_unchanged() {
767 let src = MemoryStore::new();
768 let dst = MemoryStore::new();
769
770 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
771 let (c2, _) = build_commit(&src, b"c2", vec![]).await;
772
773 transfer_objects(&src, &dst, c1).await.unwrap();
774 dst.set_ref("refs/heads/main", c1).await.unwrap();
775 src.set_ref("refs/heads/main", c2).await.unwrap();
776
777 let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Skip)
779 .await
780 .unwrap();
781 assert!(!updated, "skip should report false");
782
783 assert_eq!(
784 dst.get_ref("refs/heads/main").await.unwrap(),
785 Some(c1),
786 "ref should remain at c1 after skip"
787 );
788 }
789
790 #[tokio::test]
791 async fn sync_ref_skip_still_transfers_objects() {
792 let src = MemoryStore::new();
793 let dst = MemoryStore::new();
794
795 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
796 let (c2, _) = build_commit(&src, b"c2", vec![]).await;
797
798 transfer_objects(&src, &dst, c1).await.unwrap();
799 dst.set_ref("refs/heads/main", c1).await.unwrap();
800 src.set_ref("refs/heads/main", c2).await.unwrap();
801
802 let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Skip)
803 .await
804 .unwrap();
805 assert!(!updated, "skip should not report updated");
806
807 assert!(
810 dst.contains(&c2).await.unwrap(),
811 "c2 objects should be on dst even after skip"
812 );
813 }
814
815 #[tokio::test]
818 async fn sync_transfer_follows_merge_commit_parents() {
819 let src = MemoryStore::new();
820 let dst = MemoryStore::new();
821
822 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
828 let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
829 let (c3, _) = build_commit(&src, b"c3", vec![c1]).await;
830 let (c4, _) = build_commit(&src, b"c4", vec![c2, c3]).await;
831
832 let count = transfer_objects(&src, &dst, c4).await.unwrap();
834 assert_eq!(count, 20);
836
837 assert!(dst.contains(&c1).await.unwrap(), "root c1 missing");
838 assert!(dst.contains(&c2).await.unwrap(), "left parent c2 missing");
839 assert!(dst.contains(&c3).await.unwrap(), "right parent c3 missing");
840 assert!(dst.contains(&c4).await.unwrap(), "merge c4 missing");
841 }
842
843 #[tokio::test]
846 async fn sync_ref_reject_still_leaves_objects_on_dst() {
847 let src = MemoryStore::new();
848 let dst = MemoryStore::new();
849
850 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
851 let (c2, _) = build_commit(&src, b"c2", vec![]).await;
852
853 transfer_objects(&src, &dst, c1).await.unwrap();
854 dst.set_ref("refs/heads/main", c1).await.unwrap();
855 src.set_ref("refs/heads/main", c2).await.unwrap();
856
857 let result = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Reject).await;
859 assert!(result.is_err());
860
861 assert!(
863 dst.contains(&c2).await.unwrap(),
864 "objects should be on dst despite rejection"
865 );
866 assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
868 }
869
870 #[tokio::test]
873 async fn sync_refs_excludes_skipped_from_count() {
874 let src = MemoryStore::new();
875 let dst = MemoryStore::new();
876
877 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
878 let (c2, _) = build_commit(&src, b"c2", vec![]).await;
879
880 src.set_ref("refs/heads/main", c1).await.unwrap();
881 src.set_ref("refs/heads/feature", c2).await.unwrap();
882
883 let (c3, _) = build_commit(&dst, b"c3", vec![]).await;
885 dst.set_ref("refs/heads/main", c3).await.unwrap();
886
887 let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Skip)
888 .await
889 .unwrap();
890
891 assert_eq!(count, 1, "count should exclude skipped refs");
893
894 assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c3));
896 assert_eq!(dst.get_ref("refs/heads/feature").await.unwrap(), Some(c2));
898 }
899
900 #[tokio::test]
903 async fn sync_refs_empty_prefix_matches_all() {
904 let src = MemoryStore::new();
905 let dst = MemoryStore::new();
906
907 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
908 let (c2, _) = build_commit(&src, b"c2", vec![]).await;
909
910 src.set_ref("refs/heads/main", c1).await.unwrap();
911 src.set_ref("refs/tags/v1", c2).await.unwrap();
912
913 let count = sync_refs(&src, &src, &dst, &dst, "", &Overwrite)
914 .await
915 .unwrap();
916
917 assert_eq!(count, 2);
918 assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
919 assert_eq!(dst.get_ref("refs/tags/v1").await.unwrap(), Some(c2));
920 }
921
922 #[tokio::test]
925 async fn sync_refs_excludes_up_to_date_from_count() {
926 let src = MemoryStore::new();
927 let dst = MemoryStore::new();
928
929 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
930 let (c2, _) = build_commit(&src, b"c2", vec![]).await;
931
932 src.set_ref("refs/heads/main", c1).await.unwrap();
933 src.set_ref("refs/heads/feature", c2).await.unwrap();
934
935 let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Overwrite)
937 .await
938 .unwrap();
939 assert_eq!(count, 2);
940
941 let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Overwrite)
943 .await
944 .unwrap();
945 assert_eq!(count, 0, "nothing changed, count should be 0");
946 }
947
948 #[tokio::test]
951 async fn sync_refs_no_matching_refs_returns_zero() {
952 let src = MemoryStore::new();
953 let dst = MemoryStore::new();
954
955 let (c1, _) = build_commit(&src, b"c1", vec![]).await;
956 src.set_ref("refs/heads/main", c1).await.unwrap();
957
958 let count = sync_refs(&src, &src, &dst, &dst, "refs/remotes/", &Overwrite)
959 .await
960 .unwrap();
961
962 assert_eq!(count, 0);
963 }
964
965 #[tokio::test]
971 async fn regression_transfer_with_former_hash_collision_dag() {
972 use crate::object::*;
973 let seed = ContentHash::from_canonical(b"regression-seed");
977 let mut hi: u32 = 0;
978 let mut next_h = || {
979 let mut input = seed.0.to_vec();
980 input.extend_from_slice(&hi.to_le_bytes());
981 hi += 1;
982 ContentHash::from_canonical(&input)
983 };
984
985 let mut objects = Vec::new();
986
987 let mut leaf_hashes = Vec::new();
989 for _ in 0..4 {
990 let h = next_h();
991 objects.push((h, Object::Text(TextObject { content: String::new() })));
992 leaf_hashes.push(h);
993 }
994
995 let inner1_h = next_h();
997 objects.push((
998 inner1_h,
999 Object::Element(ElementObject {
1000 local_name: "a".into(),
1001 namespace_uri: None,
1002 namespace_prefix: None,
1003 extra_namespaces: vec![],
1004 attributes: vec![],
1005 children: vec![leaf_hashes[0], leaf_hashes[1]],
1006 inclusive_hash: inner1_h,
1007 }),
1008 ));
1009 let inner2_h = next_h();
1010 objects.push((
1011 inner2_h,
1012 Object::Element(ElementObject {
1013 local_name: "b".into(),
1014 namespace_uri: None,
1015 namespace_prefix: None,
1016 extra_namespaces: vec![],
1017 attributes: vec![],
1018 children: vec![leaf_hashes[2], leaf_hashes[3]],
1019 inclusive_hash: inner2_h,
1020 }),
1021 ));
1022
1023 let root_h = next_h();
1025 objects.push((
1026 root_h,
1027 Object::Element(ElementObject {
1028 local_name: "root".into(),
1029 namespace_uri: None,
1030 namespace_prefix: None,
1031 extra_namespaces: vec![],
1032 attributes: vec![],
1033 children: vec![inner1_h, inner2_h],
1034 inclusive_hash: root_h,
1035 }),
1036 ));
1037
1038 let doc_h = next_h();
1040 objects.push((
1041 doc_h,
1042 Object::Document(DocumentObject {
1043 root: root_h,
1044 prologue: vec![],
1045 }),
1046 ));
1047
1048 let unique: std::collections::HashSet<_> = objects.iter().map(|(h, _)| *h).collect();
1050 assert_eq!(
1051 unique.len(),
1052 objects.len(),
1053 "all hashes must be unique (this was the bug)"
1054 );
1055
1056 let src = MemoryStore::new();
1058 let dst = MemoryStore::new();
1059
1060 let mut tx = src.transaction().await.unwrap();
1061 for (h, o) in &objects {
1062 tx.put(*h, o.clone()).await.unwrap();
1063 }
1064 tx.commit().await.unwrap();
1065
1066 transfer_objects(&src, &dst, doc_h).await.unwrap();
1067
1068 for (h, _) in &objects {
1069 assert!(
1070 dst.contains(h).await.unwrap(),
1071 "hash {h} should be on dst after transfer"
1072 );
1073 let src_obj = src.get(h).await.unwrap();
1074 let dst_obj = dst.get(h).await.unwrap();
1075 assert_eq!(src_obj, dst_obj, "object at {h} should match");
1076 }
1077 }
1078
1079 proptest! {
1082 #![proptest_config(ProptestConfig::with_cases(100))]
1083
1084 #[test]
1086 fn prop_transfer_idempotent((dag, root) in crate::store::prop_strategies::arb_object_dag()) {
1087 let rt = crate::store::prop_strategies::runtime();
1088 rt.block_on(async {
1089 let src = MemoryStore::new();
1090 let dst = MemoryStore::new();
1091
1092 let mut tx = src.transaction().await.unwrap();
1094 for (h, o) in &dag {
1095 tx.put(*h, o.clone()).await.unwrap();
1096 }
1097 tx.commit().await.unwrap();
1098
1099 let first = transfer_objects(&src, &dst, root).await.unwrap();
1100 assert!(first > 0, "first transfer should copy objects");
1101
1102 let second = transfer_objects(&src, &dst, root).await.unwrap();
1103 assert_eq!(second, 0, "second transfer should be a no-op");
1104 });
1105 }
1106
1107 #[test]
1109 fn prop_transfer_complete((dag, root) in crate::store::prop_strategies::arb_object_dag()) {
1110 let rt = crate::store::prop_strategies::runtime();
1111 rt.block_on(async {
1112 let src = MemoryStore::new();
1113 let dst = MemoryStore::new();
1114
1115 let mut tx = src.transaction().await.unwrap();
1116 for (h, o) in &dag {
1117 tx.put(*h, o.clone()).await.unwrap();
1118 }
1119 tx.commit().await.unwrap();
1120
1121 transfer_objects(&src, &dst, root).await.unwrap();
1122
1123 for (h, _) in &dag {
1124 assert!(
1125 dst.contains(h).await.unwrap(),
1126 "hash {h} should be on dst after transfer"
1127 );
1128 }
1129 });
1130 }
1131
1132 #[test]
1134 fn prop_transfer_preserves((dag, root) in crate::store::prop_strategies::arb_object_dag()) {
1135 let rt = crate::store::prop_strategies::runtime();
1136 rt.block_on(async {
1137 let src = MemoryStore::new();
1138 let dst = MemoryStore::new();
1139
1140 let mut tx = src.transaction().await.unwrap();
1141 for (h, o) in &dag {
1142 tx.put(*h, o.clone()).await.unwrap();
1143 }
1144 tx.commit().await.unwrap();
1145
1146 transfer_objects(&src, &dst, root).await.unwrap();
1147
1148 for (h, _) in &dag {
1149 let src_obj = src.get(h).await.unwrap();
1150 let dst_obj = dst.get(h).await.unwrap();
1151 assert_eq!(
1152 src_obj, dst_obj,
1153 "object at {h} should be identical on src and dst"
1154 );
1155 }
1156 });
1157 }
1158
1159 #[test]
1162 fn prop_transfer_commit_dag((dag, root, _order) in crate::store::prop_strategies::arb_commit_dag()) {
1163 let rt = crate::store::prop_strategies::runtime();
1164 rt.block_on(async {
1165 let src = MemoryStore::new();
1166 let dst = MemoryStore::new();
1167
1168 let mut tx = src.transaction().await.unwrap();
1169 for (h, o) in &dag {
1170 tx.put(*h, o.clone()).await.unwrap();
1171 }
1172 tx.commit().await.unwrap();
1173
1174 let first = transfer_objects(&src, &dst, root).await.unwrap();
1176 assert!(first > 0, "first transfer should copy objects");
1177
1178 let second = transfer_objects(&src, &dst, root).await.unwrap();
1180 assert_eq!(second, 0, "second transfer should be a no-op");
1181
1182 let src_reachable: Vec<_> = src.subtree(&root)
1186 .map(|r| r.unwrap())
1187 .collect()
1188 .await;
1189 for (h, src_obj) in &src_reachable {
1190 assert!(
1191 dst.contains(h).await.unwrap(),
1192 "hash {h} should be on dst after commit-dag transfer"
1193 );
1194 let dst_obj = dst.get(h).await.unwrap();
1195 assert_eq!(
1196 dst_obj.as_ref(), Some(src_obj),
1197 "object at {h} should be identical on src and dst"
1198 );
1199 }
1200 });
1201 }
1202 }
1203}