Skip to main content

clayers_repo/
sync.rs

1//! Pull/push synchronization between any two `ObjectStore + RefStore` implementations.
2//!
3//! Operates via free functions rather than methods on store traits, keeping sync
4//! as an external concern. Efficiently transfers only missing objects by walking
5//! the Merkle DAG from ref tips.
6
7use std::collections::HashMap;
8use std::pin::pin;
9
10use async_trait::async_trait;
11use clayers_xml::ContentHash;
12use futures_core::Stream;
13
14use crate::error::{Error, Result};
15use crate::graph;
16use crate::object::Object;
17use crate::store::{ObjectStore, RefStore};
18
19/// Collect a stream of `Result<(K, V)>` into a `HashMap`, short-circuiting on error.
20async fn try_collect_stream<S>(stream: S) -> Result<HashMap<ContentHash, Object>>
21where
22    S: Stream<Item = Result<(ContentHash, Object)>>,
23{
24    let mut stream = pin!(stream);
25    let mut map = HashMap::new();
26    while let Some(item) = std::future::poll_fn(|cx| stream.as_mut().poll_next(cx)).await {
27        let (hash, obj) = item?;
28        map.insert(hash, obj);
29    }
30    Ok(map)
31}
32
33// ---------------------------------------------------------------------------
34// Ref conflict resolution
35// ---------------------------------------------------------------------------
36
37/// How to resolve a ref that already exists on the destination with a different value.
38#[async_trait]
39pub trait RefConflict: Send + Sync {
40    /// Decide whether to update `ref_name` from `dst_hash` to `src_hash`.
41    ///
42    /// `store` is the **destination** object store, after source objects have
43    /// already been transferred. It contains both the source and destination
44    /// commit histories, so graph operations like `common_ancestor` will work.
45    ///
46    /// Returns `Ok(true)` to proceed, `Ok(false)` to skip, `Err` to abort.
47    async fn resolve(
48        &self,
49        store: &dyn ObjectStore,
50        ref_name: &str,
51        src_hash: ContentHash,
52        dst_hash: ContentHash,
53    ) -> Result<bool>;
54}
55
56/// Update only if dst is an ancestor of src (no history loss).
57pub struct FastForwardOnly;
58
59#[async_trait]
60impl RefConflict for FastForwardOnly {
61    async fn resolve(
62        &self,
63        store: &dyn ObjectStore,
64        _ref_name: &str,
65        src_hash: ContentHash,
66        dst_hash: ContentHash,
67    ) -> Result<bool> {
68        let lca = graph::common_ancestor(store, src_hash, dst_hash).await?;
69        if lca == Some(dst_hash) {
70            Ok(true)
71        } else {
72            Err(Error::Ref(
73                "cannot fast-forward: destination is not an ancestor of source".into(),
74            ))
75        }
76    }
77}
78
79/// Always overwrite the destination ref.
80pub struct Overwrite;
81
82#[async_trait]
83impl RefConflict for Overwrite {
84    async fn resolve(
85        &self,
86        _store: &dyn ObjectStore,
87        _ref_name: &str,
88        _src_hash: ContentHash,
89        _dst_hash: ContentHash,
90    ) -> Result<bool> {
91        Ok(true)
92    }
93}
94
95/// Fail if the destination ref differs.
96pub struct Reject;
97
98#[async_trait]
99impl RefConflict for Reject {
100    async fn resolve(
101        &self,
102        _store: &dyn ObjectStore,
103        _ref_name: &str,
104        _src_hash: ContentHash,
105        _dst_hash: ContentHash,
106    ) -> Result<bool> {
107        Err(Error::Ref(
108            "destination ref already exists with a different value".into(),
109        ))
110    }
111}
112
113// ---------------------------------------------------------------------------
114// Transfer
115// ---------------------------------------------------------------------------
116
117/// Copy objects reachable from `root` that `dst` doesn't already have.
118///
119/// Uses `subtree()` to stream all reachable objects, filters to those
120/// missing on `dst`, and batch-inserts them in a single transaction.
121///
122/// Returns the number of objects transferred.
123///
124/// # Errors
125///
126/// Returns an error if objects cannot be read from `src` or written to `dst`.
127pub async fn transfer_objects(
128    src: &dyn ObjectStore,
129    dst: &dyn ObjectStore,
130    root: ContentHash,
131) -> Result<usize> {
132    let src_objects = try_collect_stream(src.subtree(&root)).await?;
133
134    // Filter to objects missing from dst.
135    let mut missing = Vec::new();
136    for (hash, obj) in &src_objects {
137        if !dst.contains(hash).await? {
138            missing.push((*hash, obj.clone()));
139        }
140    }
141
142    if missing.is_empty() {
143        return Ok(0);
144    }
145
146    // Batch into a single transaction.
147    let count = missing.len();
148    let mut tx = dst.transaction().await?;
149    for (hash, obj) in missing {
150        tx.put(hash, obj).await?;
151    }
152    tx.commit().await?;
153
154    Ok(count)
155}
156
157// ---------------------------------------------------------------------------
158// Ref sync
159// ---------------------------------------------------------------------------
160
161/// Sync a single ref: transfer objects reachable from the source ref, then
162/// update the ref on the destination.
163///
164/// Uses `on_conflict` to decide what to do when the destination already has
165/// a different value for the ref.
166///
167/// Returns `true` if the ref was updated, `false` if it was already
168/// up-to-date or the conflict policy chose to skip.
169///
170/// # Errors
171///
172/// Returns an error if the source ref is missing, objects cannot be transferred,
173/// or the conflict policy rejects the update.
174pub async fn sync_ref(
175    src_objects: &dyn ObjectStore,
176    src_refs: &dyn RefStore,
177    dst_objects: &dyn ObjectStore,
178    dst_refs: &dyn RefStore,
179    ref_name: &str,
180    on_conflict: &dyn RefConflict,
181) -> Result<bool> {
182    let src_hash = src_refs
183        .get_ref(ref_name)
184        .await?
185        .ok_or_else(|| Error::Ref(format!("source ref not found: {ref_name}")))?;
186
187    let dst_hash = dst_refs.get_ref(ref_name).await?;
188
189    if let Some(dst_hash) = dst_hash {
190        if dst_hash == src_hash {
191            // Already up-to-date.
192            return Ok(false);
193        }
194        // Transfer first so conflict resolution can walk the full graph on dst.
195        transfer_objects(src_objects, dst_objects, src_hash).await?;
196        let proceed = on_conflict
197            .resolve(dst_objects, ref_name, src_hash, dst_hash)
198            .await?;
199        if !proceed {
200            return Ok(false);
201        }
202    } else {
203        transfer_objects(src_objects, dst_objects, src_hash).await?;
204    }
205
206    dst_refs.set_ref(ref_name, src_hash).await?;
207    Ok(true)
208}
209
210/// Sync all refs matching a prefix.
211///
212/// Returns the number of refs synced.
213///
214/// # Errors
215///
216/// Returns an error if refs cannot be listed or any individual ref sync fails.
217pub async fn sync_refs(
218    src_objects: &dyn ObjectStore,
219    src_refs: &dyn RefStore,
220    dst_objects: &dyn ObjectStore,
221    dst_refs: &dyn RefStore,
222    prefix: &str,
223    on_conflict: &dyn RefConflict,
224) -> Result<usize> {
225    let refs = src_refs.list_refs(prefix).await?;
226    let mut count = 0;
227
228    for (ref_name, _) in &refs {
229        let updated = sync_ref(
230            src_objects,
231            src_refs,
232            dst_objects,
233            dst_refs,
234            ref_name,
235            on_conflict,
236        )
237        .await?;
238        if updated {
239            count += 1;
240        }
241    }
242
243    Ok(count)
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use crate::object::{
250        Author, CommentObject, CommitObject, DocumentObject, ElementObject, PIObject, TagObject,
251        TextObject, TreeEntry, TreeObject,
252    };
253    use crate::store::memory::MemoryStore;
254    use chrono::Utc;
255    use proptest::prelude::*;
256    use tokio_stream::StreamExt as _;
257
258    fn author() -> Author {
259        Author {
260            name: "Test".into(),
261            email: "test@test.com".into(),
262        }
263    }
264
265    /// Build a minimal commit chain in `store`:
266    /// text -> element -> document -> tree -> commit
267    /// Returns `(commit_hash, document_hash)`.
268    async fn build_commit(
269        store: &MemoryStore,
270        id: &[u8],
271        parents: Vec<ContentHash>,
272    ) -> (ContentHash, ContentHash) {
273        let text_hash = ContentHash::from_canonical(id);
274        let text = TextObject {
275            content: String::from_utf8_lossy(id).into(),
276        };
277
278        let elem_id: Vec<u8> = id.iter().chain(b"elem").copied().collect();
279        let elem_hash = ContentHash::from_canonical(&elem_id);
280        let elem = ElementObject {
281            local_name: "root".into(),
282            namespace_uri: None,
283            namespace_prefix: None,
284            extra_namespaces: vec![],
285            attributes: vec![],
286            children: vec![text_hash],
287            inclusive_hash: elem_hash,
288        };
289
290        let doc_id: Vec<u8> = id.iter().chain(b"doc").copied().collect();
291        let doc_hash = ContentHash::from_canonical(&doc_id);
292        let doc = DocumentObject { root: elem_hash, prologue: vec![] };
293
294        let tree_id: Vec<u8> = id.iter().chain(b"tree").copied().collect();
295        let tree_hash = ContentHash::from_canonical(&tree_id);
296        let tree = TreeObject::new(vec![
297            TreeEntry { path: "doc.xml".into(), document: doc_hash },
298        ]);
299
300        let commit_id: Vec<u8> = id.iter().chain(b"commit").copied().collect();
301        let commit_hash = ContentHash::from_canonical(&commit_id);
302        let commit = CommitObject {
303            tree: tree_hash,
304            parents,
305            author: author(),
306            timestamp: Utc::now(),
307            message: format!("commit {}", String::from_utf8_lossy(id)),
308        };
309
310        let mut tx = store.transaction().await.unwrap();
311        tx.put(text_hash, Object::Text(text)).await.unwrap();
312        tx.put(elem_hash, Object::Element(elem)).await.unwrap();
313        tx.put(doc_hash, Object::Document(doc)).await.unwrap();
314        tx.put(tree_hash, Object::Tree(tree)).await.unwrap();
315        tx.put(commit_hash, Object::Commit(commit)).await.unwrap();
316        tx.commit().await.unwrap();
317
318        (commit_hash, doc_hash)
319    }
320
321    #[tokio::test]
322    async fn sync_transfer_objects_copies_missing() {
323        let src = MemoryStore::new();
324        let dst = MemoryStore::new();
325
326        let (commit_hash, _) = build_commit(&src, b"c1", vec![]).await;
327
328        // 5 objects: text, element, document, tree, commit
329        let count = transfer_objects(&src, &dst, commit_hash).await.unwrap();
330        assert_eq!(count, 5);
331
332        // All objects present on dst.
333        assert!(dst.contains(&commit_hash).await.unwrap());
334    }
335
336    #[tokio::test]
337    async fn sync_transfer_idempotent() {
338        let src = MemoryStore::new();
339        let dst = MemoryStore::new();
340
341        let (commit_hash, _) = build_commit(&src, b"c1", vec![]).await;
342
343        transfer_objects(&src, &dst, commit_hash).await.unwrap();
344        let second = transfer_objects(&src, &dst, commit_hash).await.unwrap();
345        assert_eq!(second, 0, "second transfer should copy 0 objects");
346    }
347
348    #[tokio::test]
349    async fn sync_ref_fast_forward() {
350        let src = MemoryStore::new();
351        let dst = MemoryStore::new();
352
353        // Linear: c1 <- c2
354        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
355        let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
356
357        // dst has c1 on the ref.
358        transfer_objects(&src, &dst, c1).await.unwrap();
359        dst.set_ref("refs/heads/main", c1).await.unwrap();
360
361        // src has c2 on the ref.
362        src.set_ref("refs/heads/main", c2).await.unwrap();
363
364        // Fast-forward should succeed and report updated.
365        let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &FastForwardOnly)
366            .await
367            .unwrap();
368        assert!(updated, "fast-forward should report ref was updated");
369
370        let dst_ref = dst.get_ref("refs/heads/main").await.unwrap();
371        assert_eq!(dst_ref, Some(c2));
372    }
373
374    #[tokio::test]
375    async fn sync_ref_fast_forward_rejects_diverged() {
376        let src = MemoryStore::new();
377        let dst = MemoryStore::new();
378
379        // Diverged: c1 <- c2 (src) and c1 <- c3 (dst)
380        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
381        let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
382        let (c3, _) = build_commit(&dst, b"c3", vec![c1]).await;
383
384        // Need c1 in dst too.
385        transfer_objects(&src, &dst, c1).await.unwrap();
386
387        src.set_ref("refs/heads/main", c2).await.unwrap();
388        dst.set_ref("refs/heads/main", c3).await.unwrap();
389
390        let result = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &FastForwardOnly).await;
391        assert!(result.is_err(), "should reject diverged histories");
392    }
393
394    #[tokio::test]
395    async fn sync_ref_overwrite_always_succeeds() {
396        let src = MemoryStore::new();
397        let dst = MemoryStore::new();
398
399        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
400        let (c2, _) = build_commit(&src, b"c2", vec![]).await; // no parent, diverged
401
402        transfer_objects(&src, &dst, c1).await.unwrap();
403        dst.set_ref("refs/heads/main", c1).await.unwrap();
404        src.set_ref("refs/heads/main", c2).await.unwrap();
405
406        let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Overwrite)
407            .await
408            .unwrap();
409        assert!(updated, "overwrite should report ref was updated");
410
411        let dst_ref = dst.get_ref("refs/heads/main").await.unwrap();
412        assert_eq!(dst_ref, Some(c2));
413    }
414
415    #[tokio::test]
416    async fn sync_ref_reject_fails_when_different() {
417        let src = MemoryStore::new();
418        let dst = MemoryStore::new();
419
420        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
421        let (c2, _) = build_commit(&src, b"c2", vec![]).await;
422
423        transfer_objects(&src, &dst, c1).await.unwrap();
424        dst.set_ref("refs/heads/main", c1).await.unwrap();
425        src.set_ref("refs/heads/main", c2).await.unwrap();
426
427        let result = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Reject).await;
428        assert!(result.is_err(), "should reject when refs differ");
429    }
430
431    #[tokio::test]
432    async fn sync_refs_with_prefix() {
433        let src = MemoryStore::new();
434        let dst = MemoryStore::new();
435
436        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
437        let (c2, _) = build_commit(&src, b"c2", vec![]).await;
438
439        src.set_ref("refs/heads/main", c1).await.unwrap();
440        src.set_ref("refs/heads/feature", c2).await.unwrap();
441
442        let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Overwrite)
443            .await
444            .unwrap();
445
446        assert_eq!(count, 2);
447        assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
448        assert_eq!(dst.get_ref("refs/heads/feature").await.unwrap(), Some(c2));
449    }
450
451    #[tokio::test]
452    async fn sync_ref_missing_src_ref_errors() {
453        let src = MemoryStore::new();
454        let dst = MemoryStore::new();
455
456        let result =
457            sync_ref(&src, &src, &dst, &dst, "refs/heads/missing", &Overwrite).await;
458        assert!(result.is_err(), "missing source ref should error");
459    }
460
461    #[tokio::test]
462    async fn sync_ref_already_up_to_date() {
463        let src = MemoryStore::new();
464        let dst = MemoryStore::new();
465
466        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
467
468        transfer_objects(&src, &dst, c1).await.unwrap();
469        src.set_ref("refs/heads/main", c1).await.unwrap();
470        dst.set_ref("refs/heads/main", c1).await.unwrap();
471
472        // Should succeed without calling conflict resolution, report not updated.
473        let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Reject)
474            .await
475            .unwrap();
476        assert!(!updated, "already up-to-date should report false");
477    }
478
479    // --- Gap 1: Tag, Comment, PI reachability ---
480
481    #[tokio::test]
482    async fn sync_reachable_follows_tag_to_commit() {
483        let src = MemoryStore::new();
484        let dst = MemoryStore::new();
485
486        let (commit_hash, _) = build_commit(&src, b"c1", vec![]).await;
487
488        // Create a tag pointing at the commit.
489        let tag_id = b"tag-v1";
490        let tag_hash = ContentHash::from_canonical(tag_id);
491        let tag = TagObject {
492            target: commit_hash,
493            name: "v1.0".into(),
494            tagger: author(),
495            timestamp: Utc::now(),
496            message: "release".into(),
497        };
498        let mut tx = src.transaction().await.unwrap();
499        tx.put(tag_hash, Object::Tag(tag)).await.unwrap();
500        tx.commit().await.unwrap();
501
502        // Transfer from the tag root: should pull tag + commit + tree + doc + elem + text = 6.
503        let count = transfer_objects(&src, &dst, tag_hash).await.unwrap();
504        assert_eq!(count, 6);
505        assert!(dst.contains(&tag_hash).await.unwrap());
506        assert!(dst.contains(&commit_hash).await.unwrap());
507    }
508
509    #[tokio::test]
510    async fn sync_reachable_follows_comment_and_pi_leaves() {
511        let src = MemoryStore::new();
512        let dst = MemoryStore::new();
513
514        // Build: comment + PI as children of an element -> document -> commit.
515        let comment_hash = ContentHash::from_canonical(b"comment1");
516        let comment = CommentObject {
517            content: "a comment".into(),
518        };
519
520        let pi_hash = ContentHash::from_canonical(b"pi1");
521        let pi = PIObject {
522            target: "xml-stylesheet".into(),
523            data: Some("type=\"text/xsl\"".into()),
524        };
525
526        let elem_hash = ContentHash::from_canonical(b"elem-mixed");
527        let elem = ElementObject {
528            local_name: "root".into(),
529            namespace_uri: None,
530            namespace_prefix: None,
531            extra_namespaces: vec![],
532            attributes: vec![],
533            children: vec![comment_hash, pi_hash],
534            inclusive_hash: elem_hash,
535        };
536
537        let doc_hash = ContentHash::from_canonical(b"doc-mixed");
538        let doc = DocumentObject { root: elem_hash, prologue: vec![] };
539
540        let tree_hash = ContentHash::from_canonical(b"tree-mixed");
541        let tree = TreeObject::new(vec![
542            TreeEntry { path: "doc.xml".into(), document: doc_hash },
543        ]);
544
545        let commit_hash = ContentHash::from_canonical(b"commit-mixed");
546        let commit = CommitObject {
547            tree: tree_hash,
548            parents: vec![],
549            author: author(),
550            timestamp: Utc::now(),
551            message: "mixed content".into(),
552        };
553
554        let mut tx = src.transaction().await.unwrap();
555        tx.put(comment_hash, Object::Comment(comment)).await.unwrap();
556        tx.put(pi_hash, Object::PI(pi)).await.unwrap();
557        tx.put(elem_hash, Object::Element(elem)).await.unwrap();
558        tx.put(doc_hash, Object::Document(doc)).await.unwrap();
559        tx.put(tree_hash, Object::Tree(tree)).await.unwrap();
560        tx.put(commit_hash, Object::Commit(commit)).await.unwrap();
561        tx.commit().await.unwrap();
562
563        let count = transfer_objects(&src, &dst, commit_hash).await.unwrap();
564        assert_eq!(count, 6); // commit + tree + doc + elem + comment + pi
565        assert!(dst.contains(&comment_hash).await.unwrap());
566        assert!(dst.contains(&pi_hash).await.unwrap());
567    }
568
569    // --- Gap 2: Verify all inner objects land on dst ---
570
571    #[tokio::test]
572    async fn sync_transfer_copies_all_inner_objects() {
573        let src = MemoryStore::new();
574        let dst = MemoryStore::new();
575
576        let (commit_hash, doc_hash) = build_commit(&src, b"c1", vec![]).await;
577
578        // Recover the text, element, and tree hashes used by build_commit.
579        let text_hash = ContentHash::from_canonical(b"c1");
580        let elem_hash = ContentHash::from_canonical(b"c1elem");
581        let tree_hash = ContentHash::from_canonical(b"c1tree");
582
583        transfer_objects(&src, &dst, commit_hash).await.unwrap();
584
585        // Every single object must be on dst, not just the commit.
586        assert!(dst.contains(&commit_hash).await.unwrap(), "commit missing");
587        assert!(dst.contains(&tree_hash).await.unwrap(), "tree missing");
588        assert!(dst.contains(&doc_hash).await.unwrap(), "document missing");
589        assert!(dst.contains(&elem_hash).await.unwrap(), "element missing");
590        assert!(dst.contains(&text_hash).await.unwrap(), "text missing");
591
592        // Also verify the objects are identical.
593        let src_text = src.get(&text_hash).await.unwrap().unwrap();
594        let dst_text = dst.get(&text_hash).await.unwrap().unwrap();
595        assert_eq!(src_text, dst_text);
596    }
597
598    // --- Gap 3: Multi-commit chain transfer follows parent links ---
599
600    #[tokio::test]
601    async fn sync_transfer_follows_parent_chain() {
602        let src = MemoryStore::new();
603        let dst = MemoryStore::new();
604
605        // c1 <- c2 <- c3, each with its own doc subtree (5 objects each).
606        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
607        let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
608        let (c3, _) = build_commit(&src, b"c3", vec![c2]).await;
609
610        // Transfer from c3 tip only. Must pull all 3 commits + all subtrees.
611        let count = transfer_objects(&src, &dst, c3).await.unwrap();
612        assert_eq!(count, 15); // 3 commits * 5 objects each
613
614        assert!(dst.contains(&c1).await.unwrap(), "ancestor c1 missing");
615        assert!(dst.contains(&c2).await.unwrap(), "ancestor c2 missing");
616        assert!(dst.contains(&c3).await.unwrap(), "tip c3 missing");
617    }
618
619    // --- Gap 4: sync_refs prefix actually filters ---
620
621    #[tokio::test]
622    async fn sync_refs_prefix_excludes_non_matching() {
623        let src = MemoryStore::new();
624        let dst = MemoryStore::new();
625
626        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
627        let (c2, _) = build_commit(&src, b"c2", vec![]).await;
628
629        src.set_ref("refs/heads/main", c1).await.unwrap();
630        src.set_ref("refs/tags/v1", c2).await.unwrap();
631
632        // Sync only heads, not tags.
633        let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Overwrite)
634            .await
635            .unwrap();
636
637        assert_eq!(count, 1);
638        assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
639        assert_eq!(
640            dst.get_ref("refs/tags/v1").await.unwrap(),
641            None,
642            "tag ref should NOT have been synced"
643        );
644    }
645
646    // --- Gap 5: sync_refs partial failure aborts on error ---
647
648    #[tokio::test]
649    async fn sync_refs_aborts_on_conflict_error() {
650        let src = MemoryStore::new();
651        let dst = MemoryStore::new();
652
653        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
654        let (c2, _) = build_commit(&src, b"c2", vec![]).await;
655        let (c3, _) = build_commit(&src, b"c3", vec![]).await;
656
657        // src has two branches.
658        src.set_ref("refs/heads/alpha", c1).await.unwrap();
659        src.set_ref("refs/heads/beta", c2).await.unwrap();
660
661        // dst has a conflicting value for one of them.
662        transfer_objects(&src, &dst, c3).await.unwrap();
663        dst.set_ref("refs/heads/alpha", c3).await.unwrap(); // different from src's c1
664
665        // Reject policy: the conflicting ref causes an error.
666        let result = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Reject).await;
667        assert!(result.is_err(), "should abort when a ref conflicts under Reject");
668    }
669
670    // --- Gap 6: FastForwardOnly via sync_ref (full flow, no manual pre-transfer) ---
671
672    #[tokio::test]
673    async fn sync_ref_fast_forward_full_flow() {
674        let src = MemoryStore::new();
675        let dst = MemoryStore::new();
676
677        // Build c1 <- c2 entirely in src.
678        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
679        let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
680
681        // Sync c1 to dst first (sets up the ref on both sides).
682        src.set_ref("refs/heads/main", c1).await.unwrap();
683        let created = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Overwrite)
684            .await
685            .unwrap();
686        assert!(created, "initial sync should report updated");
687
688        // Now advance src to c2. dst still at c1.
689        src.set_ref("refs/heads/main", c2).await.unwrap();
690
691        // FastForwardOnly through sync_ref: it should transfer objects first,
692        // then resolve on dst where both c1 and c2 now exist.
693        let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &FastForwardOnly)
694            .await
695            .unwrap();
696        assert!(updated, "fast-forward should report updated");
697
698        assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c2));
699        // Verify c1 is still reachable on dst (ancestor was preserved).
700        assert!(dst.contains(&c1).await.unwrap());
701    }
702
703    // --- Gap 7: Shared subtree deduplication ---
704
705    #[tokio::test]
706    async fn sync_transfer_deduplicates_shared_subtree() {
707        let src = MemoryStore::new();
708        let dst = MemoryStore::new();
709
710        // c1 has its own subtree (5 objects: text, elem, doc, tree, commit).
711        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
712
713        // c2 has its own subtree from build_commit = 5 new objects.
714        let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
715
716        // Transfer c1 first.
717        let first = transfer_objects(&src, &dst, c1).await.unwrap();
718        assert_eq!(first, 5);
719
720        // Transfer c2: should NOT re-transfer c1's objects.
721        // c2 adds: its own text + element + document + tree + commit = 5 new objects.
722        // c1's subtree is already on dst.
723        let second = transfer_objects(&src, &dst, c2).await.unwrap();
724        assert_eq!(second, 5, "should only transfer c2's new objects, not c1's");
725    }
726
727    // --- Gap 8: sync_ref creates ref on fresh dst ---
728
729    #[tokio::test]
730    async fn sync_ref_creates_new_ref_on_empty_dst() {
731        let src = MemoryStore::new();
732        let dst = MemoryStore::new();
733
734        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
735        src.set_ref("refs/heads/main", c1).await.unwrap();
736
737        // dst has no refs at all. sync_ref should create it.
738        let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &FastForwardOnly)
739            .await
740            .unwrap();
741        assert!(updated, "creating a new ref should report updated");
742
743        assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
744        assert!(dst.contains(&c1).await.unwrap(), "objects should be transferred");
745    }
746
747    // --- Skip policy: exercises the Ok(false) return path ---
748
749    /// A custom policy that skips conflicting refs instead of erroring.
750    struct Skip;
751
752    #[async_trait]
753    impl RefConflict for Skip {
754        async fn resolve(
755            &self,
756            _store: &dyn ObjectStore,
757            _ref_name: &str,
758            _src_hash: ContentHash,
759            _dst_hash: ContentHash,
760        ) -> Result<bool> {
761            Ok(false)
762        }
763    }
764
765    #[tokio::test]
766    async fn sync_ref_skip_leaves_dst_ref_unchanged() {
767        let src = MemoryStore::new();
768        let dst = MemoryStore::new();
769
770        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
771        let (c2, _) = build_commit(&src, b"c2", vec![]).await;
772
773        transfer_objects(&src, &dst, c1).await.unwrap();
774        dst.set_ref("refs/heads/main", c1).await.unwrap();
775        src.set_ref("refs/heads/main", c2).await.unwrap();
776
777        // Skip policy: resolve returns Ok(false). Ref must NOT be updated.
778        let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Skip)
779            .await
780            .unwrap();
781        assert!(!updated, "skip should report false");
782
783        assert_eq!(
784            dst.get_ref("refs/heads/main").await.unwrap(),
785            Some(c1),
786            "ref should remain at c1 after skip"
787        );
788    }
789
790    #[tokio::test]
791    async fn sync_ref_skip_still_transfers_objects() {
792        let src = MemoryStore::new();
793        let dst = MemoryStore::new();
794
795        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
796        let (c2, _) = build_commit(&src, b"c2", vec![]).await;
797
798        transfer_objects(&src, &dst, c1).await.unwrap();
799        dst.set_ref("refs/heads/main", c1).await.unwrap();
800        src.set_ref("refs/heads/main", c2).await.unwrap();
801
802        let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Skip)
803            .await
804            .unwrap();
805        assert!(!updated, "skip should not report updated");
806
807        // Objects are transferred before resolve is called, so c2's
808        // objects should be on dst even though the ref wasn't updated.
809        assert!(
810            dst.contains(&c2).await.unwrap(),
811            "c2 objects should be on dst even after skip"
812        );
813    }
814
815    // --- Merge commit: multi-parent DAG walking ---
816
817    #[tokio::test]
818    async fn sync_transfer_follows_merge_commit_parents() {
819        let src = MemoryStore::new();
820        let dst = MemoryStore::new();
821
822        //    c1
823        //   /  \
824        //  c2   c3
825        //   \  /
826        //    c4 (merge)
827        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
828        let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
829        let (c3, _) = build_commit(&src, b"c3", vec![c1]).await;
830        let (c4, _) = build_commit(&src, b"c4", vec![c2, c3]).await;
831
832        // Transfer from merge tip. Must follow both parents.
833        let count = transfer_objects(&src, &dst, c4).await.unwrap();
834        // c1=5, c2=5, c3=5, c4=5 = 20 total objects.
835        assert_eq!(count, 20);
836
837        assert!(dst.contains(&c1).await.unwrap(), "root c1 missing");
838        assert!(dst.contains(&c2).await.unwrap(), "left parent c2 missing");
839        assert!(dst.contains(&c3).await.unwrap(), "right parent c3 missing");
840        assert!(dst.contains(&c4).await.unwrap(), "merge c4 missing");
841    }
842
843    // --- Objects leak on conflict rejection ---
844
845    #[tokio::test]
846    async fn sync_ref_reject_still_leaves_objects_on_dst() {
847        let src = MemoryStore::new();
848        let dst = MemoryStore::new();
849
850        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
851        let (c2, _) = build_commit(&src, b"c2", vec![]).await;
852
853        transfer_objects(&src, &dst, c1).await.unwrap();
854        dst.set_ref("refs/heads/main", c1).await.unwrap();
855        src.set_ref("refs/heads/main", c2).await.unwrap();
856
857        // Reject will error, but objects are transferred before resolve.
858        let result = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Reject).await;
859        assert!(result.is_err());
860
861        // c2's objects are on dst even though the ref update was rejected.
862        assert!(
863            dst.contains(&c2).await.unwrap(),
864            "objects should be on dst despite rejection"
865        );
866        // Ref stays at c1.
867        assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
868    }
869
870    // --- sync_refs count includes skipped refs ---
871
872    #[tokio::test]
873    async fn sync_refs_excludes_skipped_from_count() {
874        let src = MemoryStore::new();
875        let dst = MemoryStore::new();
876
877        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
878        let (c2, _) = build_commit(&src, b"c2", vec![]).await;
879
880        src.set_ref("refs/heads/main", c1).await.unwrap();
881        src.set_ref("refs/heads/feature", c2).await.unwrap();
882
883        // dst already has main at a different hash -> Skip will skip it.
884        let (c3, _) = build_commit(&dst, b"c3", vec![]).await;
885        dst.set_ref("refs/heads/main", c3).await.unwrap();
886
887        let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Skip)
888            .await
889            .unwrap();
890
891        // Only feature was actually synced; main was skipped.
892        assert_eq!(count, 1, "count should exclude skipped refs");
893
894        // main was skipped, still at c3.
895        assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c3));
896        // feature was created.
897        assert_eq!(dst.get_ref("refs/heads/feature").await.unwrap(), Some(c2));
898    }
899
900    // --- sync_refs with empty prefix matches all ---
901
902    #[tokio::test]
903    async fn sync_refs_empty_prefix_matches_all() {
904        let src = MemoryStore::new();
905        let dst = MemoryStore::new();
906
907        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
908        let (c2, _) = build_commit(&src, b"c2", vec![]).await;
909
910        src.set_ref("refs/heads/main", c1).await.unwrap();
911        src.set_ref("refs/tags/v1", c2).await.unwrap();
912
913        let count = sync_refs(&src, &src, &dst, &dst, "", &Overwrite)
914            .await
915            .unwrap();
916
917        assert_eq!(count, 2);
918        assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
919        assert_eq!(dst.get_ref("refs/tags/v1").await.unwrap(), Some(c2));
920    }
921
922    // --- sync_refs excludes already-up-to-date from count ---
923
924    #[tokio::test]
925    async fn sync_refs_excludes_up_to_date_from_count() {
926        let src = MemoryStore::new();
927        let dst = MemoryStore::new();
928
929        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
930        let (c2, _) = build_commit(&src, b"c2", vec![]).await;
931
932        src.set_ref("refs/heads/main", c1).await.unwrap();
933        src.set_ref("refs/heads/feature", c2).await.unwrap();
934
935        // First sync: both refs are new.
936        let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Overwrite)
937            .await
938            .unwrap();
939        assert_eq!(count, 2);
940
941        // Second sync: both already up-to-date.
942        let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Overwrite)
943            .await
944            .unwrap();
945        assert_eq!(count, 0, "nothing changed, count should be 0");
946    }
947
948    // --- sync_refs returns 0 for non-matching prefix ---
949
950    #[tokio::test]
951    async fn sync_refs_no_matching_refs_returns_zero() {
952        let src = MemoryStore::new();
953        let dst = MemoryStore::new();
954
955        let (c1, _) = build_commit(&src, b"c1", vec![]).await;
956        src.set_ref("refs/heads/main", c1).await.unwrap();
957
958        let count = sync_refs(&src, &src, &dst, &dst, "refs/remotes/", &Overwrite)
959            .await
960            .unwrap();
961
962        assert_eq!(count, 0);
963    }
964
965    /// Regression: `arb_object_dag` used to produce hash collisions when
966    /// proptest shrunk all hash pool entries to the same value. Multiple
967    /// objects (Text, Element) shared a hash, and `tx.put()` silently
968    /// overwrote earlier entries. Fixed by deriving unique hashes from
969    /// a seed + index in the generator.
970    #[tokio::test]
971    async fn regression_transfer_with_former_hash_collision_dag() {
972        use crate::object::*;
973        // Reproduce the shrunk case: a DAG where every leaf and inner
974        // element would have shared the same hash under the old generator.
975        // With the fix, each object gets a unique hash derived from a seed.
976        let seed = ContentHash::from_canonical(b"regression-seed");
977        let mut hi: u32 = 0;
978        let mut next_h = || {
979            let mut input = seed.0.to_vec();
980            input.extend_from_slice(&hi.to_le_bytes());
981            hi += 1;
982            ContentHash::from_canonical(&input)
983        };
984
985        let mut objects = Vec::new();
986
987        // 4 text leaves (would all have been hash-colliding before fix).
988        let mut leaf_hashes = Vec::new();
989        for _ in 0..4 {
990            let h = next_h();
991            objects.push((h, Object::Text(TextObject { content: String::new() })));
992            leaf_hashes.push(h);
993        }
994
995        // 2 inner elements referencing leaves.
996        let inner1_h = next_h();
997        objects.push((
998            inner1_h,
999            Object::Element(ElementObject {
1000                local_name: "a".into(),
1001                namespace_uri: None,
1002                namespace_prefix: None,
1003                extra_namespaces: vec![],
1004                attributes: vec![],
1005                children: vec![leaf_hashes[0], leaf_hashes[1]],
1006                inclusive_hash: inner1_h,
1007            }),
1008        ));
1009        let inner2_h = next_h();
1010        objects.push((
1011            inner2_h,
1012            Object::Element(ElementObject {
1013                local_name: "b".into(),
1014                namespace_uri: None,
1015                namespace_prefix: None,
1016                extra_namespaces: vec![],
1017                attributes: vec![],
1018                children: vec![leaf_hashes[2], leaf_hashes[3]],
1019                inclusive_hash: inner2_h,
1020            }),
1021        ));
1022
1023        // Root element.
1024        let root_h = next_h();
1025        objects.push((
1026            root_h,
1027            Object::Element(ElementObject {
1028                local_name: "root".into(),
1029                namespace_uri: None,
1030                namespace_prefix: None,
1031                extra_namespaces: vec![],
1032                attributes: vec![],
1033                children: vec![inner1_h, inner2_h],
1034                inclusive_hash: root_h,
1035            }),
1036        ));
1037
1038        // Document.
1039        let doc_h = next_h();
1040        objects.push((
1041            doc_h,
1042            Object::Document(DocumentObject {
1043                root: root_h,
1044                prologue: vec![],
1045            }),
1046        ));
1047
1048        // All hashes must be unique.
1049        let unique: std::collections::HashSet<_> = objects.iter().map(|(h, _)| *h).collect();
1050        assert_eq!(
1051            unique.len(),
1052            objects.len(),
1053            "all hashes must be unique (this was the bug)"
1054        );
1055
1056        // Transfer and verify completeness.
1057        let src = MemoryStore::new();
1058        let dst = MemoryStore::new();
1059
1060        let mut tx = src.transaction().await.unwrap();
1061        for (h, o) in &objects {
1062            tx.put(*h, o.clone()).await.unwrap();
1063        }
1064        tx.commit().await.unwrap();
1065
1066        transfer_objects(&src, &dst, doc_h).await.unwrap();
1067
1068        for (h, _) in &objects {
1069            assert!(
1070                dst.contains(h).await.unwrap(),
1071                "hash {h} should be on dst after transfer"
1072            );
1073            let src_obj = src.get(h).await.unwrap();
1074            let dst_obj = dst.get(h).await.unwrap();
1075            assert_eq!(src_obj, dst_obj, "object at {h} should match");
1076        }
1077    }
1078
1079    // --- Group E: Sync Properties (proptest) ---
1080
1081    proptest! {
1082        #![proptest_config(ProptestConfig::with_cases(100))]
1083
1084        /// E1: Transfer idempotency - transfer_objects twice; second returns 0.
1085        #[test]
1086        fn prop_transfer_idempotent((dag, root) in crate::store::prop_strategies::arb_object_dag()) {
1087            let rt = crate::store::prop_strategies::runtime();
1088            rt.block_on(async {
1089                let src = MemoryStore::new();
1090                let dst = MemoryStore::new();
1091
1092                // Store all DAG objects in src
1093                let mut tx = src.transaction().await.unwrap();
1094                for (h, o) in &dag {
1095                    tx.put(*h, o.clone()).await.unwrap();
1096                }
1097                tx.commit().await.unwrap();
1098
1099                let first = transfer_objects(&src, &dst, root).await.unwrap();
1100                assert!(first > 0, "first transfer should copy objects");
1101
1102                let second = transfer_objects(&src, &dst, root).await.unwrap();
1103                assert_eq!(second, 0, "second transfer should be a no-op");
1104            });
1105        }
1106
1107        /// E2: Transfer completeness - after transfer, every hash from the DAG is on dst.
1108        #[test]
1109        fn prop_transfer_complete((dag, root) in crate::store::prop_strategies::arb_object_dag()) {
1110            let rt = crate::store::prop_strategies::runtime();
1111            rt.block_on(async {
1112                let src = MemoryStore::new();
1113                let dst = MemoryStore::new();
1114
1115                let mut tx = src.transaction().await.unwrap();
1116                for (h, o) in &dag {
1117                    tx.put(*h, o.clone()).await.unwrap();
1118                }
1119                tx.commit().await.unwrap();
1120
1121                transfer_objects(&src, &dst, root).await.unwrap();
1122
1123                for (h, _) in &dag {
1124                    assert!(
1125                        dst.contains(h).await.unwrap(),
1126                        "hash {h} should be on dst after transfer"
1127                    );
1128                }
1129            });
1130        }
1131
1132        /// E3: Transfer preserves objects - src.get(h) == dst.get(h) for all h.
1133        #[test]
1134        fn prop_transfer_preserves((dag, root) in crate::store::prop_strategies::arb_object_dag()) {
1135            let rt = crate::store::prop_strategies::runtime();
1136            rt.block_on(async {
1137                let src = MemoryStore::new();
1138                let dst = MemoryStore::new();
1139
1140                let mut tx = src.transaction().await.unwrap();
1141                for (h, o) in &dag {
1142                    tx.put(*h, o.clone()).await.unwrap();
1143                }
1144                tx.commit().await.unwrap();
1145
1146                transfer_objects(&src, &dst, root).await.unwrap();
1147
1148                for (h, _) in &dag {
1149                    let src_obj = src.get(h).await.unwrap();
1150                    let dst_obj = dst.get(h).await.unwrap();
1151                    assert_eq!(
1152                        src_obj, dst_obj,
1153                        "object at {h} should be identical on src and dst"
1154                    );
1155                }
1156            });
1157        }
1158
1159        /// E4: Transfer with commit DAGs - use arb_commit_dag() for more complex
1160        /// topologies with trees, commits, and merge parents.
1161        #[test]
1162        fn prop_transfer_commit_dag((dag, root, _order) in crate::store::prop_strategies::arb_commit_dag()) {
1163            let rt = crate::store::prop_strategies::runtime();
1164            rt.block_on(async {
1165                let src = MemoryStore::new();
1166                let dst = MemoryStore::new();
1167
1168                let mut tx = src.transaction().await.unwrap();
1169                for (h, o) in &dag {
1170                    tx.put(*h, o.clone()).await.unwrap();
1171                }
1172                tx.commit().await.unwrap();
1173
1174                // First transfer should move all objects
1175                let first = transfer_objects(&src, &dst, root).await.unwrap();
1176                assert!(first > 0, "first transfer should copy objects");
1177
1178                // Second transfer should be idempotent
1179                let second = transfer_objects(&src, &dst, root).await.unwrap();
1180                assert_eq!(second, 0, "second transfer should be a no-op");
1181
1182                // Every object reachable from root on src must be on dst and identical.
1183                // (We check reachable objects, not all dag entries, because proptest
1184                // shrinking can produce duplicate hashes that overwrite earlier objects.)
1185                let src_reachable: Vec<_> = src.subtree(&root)
1186                    .map(|r| r.unwrap())
1187                    .collect()
1188                    .await;
1189                for (h, src_obj) in &src_reachable {
1190                    assert!(
1191                        dst.contains(h).await.unwrap(),
1192                        "hash {h} should be on dst after commit-dag transfer"
1193                    );
1194                    let dst_obj = dst.get(h).await.unwrap();
1195                    assert_eq!(
1196                        dst_obj.as_ref(), Some(src_obj),
1197                        "object at {h} should be identical on src and dst"
1198                    );
1199                }
1200            });
1201        }
1202    }
1203}