Skip to main content

cli/client/
local_sync.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local repository synchronization.
3//!
4//! Direct access to local repositories without network protocol overhead.
5
6use std::{collections::HashSet, path::Path};
7
8use anyhow::{Result, anyhow};
9use objects::object::{ChangeId, ContentHash};
10use repo::Repository;
11
12/// Synchronize objects from a local source repository to a target repository.
13pub struct LocalSync {
14    source: Repository,
15}
16
17impl LocalSync {
18    /// Open a local repository for synchronization.
19    pub fn open(path: &Path) -> Result<Self> {
20        let source = Repository::open(path)?;
21        Ok(Self { source })
22    }
23
24    /// Get the source repository.
25    pub fn source(&self) -> &Repository {
26        &self.source
27    }
28
29    /// List all threads in the source repository.
30    pub fn list_threads(&self) -> Result<Vec<(String, ChangeId)>> {
31        let mut threads = Vec::new();
32        for thread in self.source.refs().list_threads()? {
33            if let Some(state_id) = self.source.refs().get_thread(&thread)? {
34                threads.push((thread, state_id));
35            }
36        }
37        Ok(threads)
38    }
39
40    /// List all markers in the source repository.
41    pub fn list_markers(&self) -> Result<Vec<(String, ChangeId)>> {
42        let mut markers = Vec::new();
43        for marker in self.source.refs().list_markers()? {
44            if let Some(state_id) = self.source.refs().get_marker(&marker)? {
45                markers.push((marker, state_id));
46            }
47        }
48        Ok(markers)
49    }
50
51    /// Fetch a state and all its dependencies from source to target.
52    pub fn fetch_state(&self, target: &Repository, state_id: &ChangeId) -> Result<usize> {
53        let mut copied = 0;
54        let mut visited = HashSet::new();
55        self.copy_state_recursive(target, state_id, &mut visited, &mut copied, None)?;
56        Ok(copied)
57    }
58
59    /// Fetch a state with limited depth (shallow clone).
60    ///
61    /// Depth 1 means the target state and its immediate parents.
62    /// A depth of 0 should be treated by callers as "full history".
63    pub fn fetch_state_with_depth(
64        &self,
65        target: &Repository,
66        state_id: &ChangeId,
67        depth: u32,
68    ) -> Result<usize> {
69        let mut copied = 0;
70        let mut visited = HashSet::new();
71        self.copy_state_recursive(target, state_id, &mut visited, &mut copied, Some(depth))?;
72        Ok(copied)
73    }
74
75    fn copy_state_recursive(
76        &self,
77        target: &Repository,
78        state_id: &ChangeId,
79        visited: &mut HashSet<ChangeId>,
80        copied: &mut usize,
81        max_depth: Option<u32>,
82    ) -> Result<()> {
83        if visited.contains(state_id) {
84            return Ok(());
85        }
86        visited.insert(*state_id);
87
88        // Whether the target already has this state. We do NOT
89        // early-return on this — even when the object graph is fully
90        // present, an operator may have declared a redaction on the
91        // source *after* the target previously fetched the content.
92        // Subsequent syncs must still propagate the sidecar. We
93        // therefore always walk the tree(s) to surface redactions,
94        // and condition just the object-copy step on the
95        // `state_already_present` flag.
96        let state_already_present = target.store().has_state(state_id)?;
97
98        // Source-side state read drives both the object copy (when
99        // needed) and the redaction-propagation tree walk (always).
100        // If the source no longer has the state but the target does,
101        // we can't enumerate blob hashes for propagation — skip with
102        // no error in that case.
103        let state = match self.source.store().get_state(state_id)? {
104            Some(state) => state,
105            None if state_already_present => return Ok(()),
106            None => return Err(anyhow!("State {} not found in source", state_id)),
107        };
108
109        // Always propagate redactions for every blob in the state's
110        // trees, regardless of whether the trees themselves need
111        // copying. `propagate_redactions_in_tree` walks the trees
112        // without the has_tree fast-path so re-syncs after a redact
113        // still ferry the sidecar.
114        let mut propagated_trees: HashSet<ContentHash> = HashSet::new();
115        self.propagate_redactions_in_tree(target, &state.tree, &mut propagated_trees)?;
116        if let Some(provenance_root) = state.provenance {
117            self.propagate_redactions_in_tree(target, &provenance_root, &mut propagated_trees)?;
118        }
119        if let Some(context_root) = state.context {
120            self.propagate_redactions_in_tree(target, &context_root, &mut propagated_trees)?;
121        }
122
123        if !state_already_present {
124            // Copy tree recursively
125            self.copy_tree_recursive(target, &state.tree, copied)?;
126            if let Some(provenance_root) = state.provenance {
127                self.copy_tree_recursive(target, &provenance_root, copied)?;
128            }
129            if let Some(context_root) = state.context {
130                self.copy_tree_recursive(target, &context_root, copied)?;
131            }
132        }
133
134        // Copy parent states recursively (if depth allows). We recurse
135        // on parents even when the current state was already present —
136        // a redaction declared on an ancestor blob still needs to
137        // reach the target's redactions store.
138        if let Some(depth) = max_depth {
139            if depth > 0 {
140                for parent in &state.parents {
141                    self.copy_state_recursive(target, parent, visited, copied, Some(depth - 1))?;
142                }
143            } else {
144                // Shallow state - mark parents as grafted
145                if !state_already_present {
146                    target.set_shallow(state_id, &state.parents)?;
147                }
148            }
149        } else {
150            for parent in &state.parents {
151                self.copy_state_recursive(target, parent, visited, copied, None)?;
152            }
153        }
154
155        if !state_already_present {
156            target.store().put_state(&state)?;
157            *copied += 1;
158        }
159
160        Ok(())
161    }
162
163    fn copy_tree_recursive(
164        &self,
165        target: &Repository,
166        tree_hash: &ContentHash,
167        copied: &mut usize,
168    ) -> Result<()> {
169        // Check if target already has this tree
170        if target.store().has_tree(tree_hash)? {
171            return Ok(());
172        }
173
174        // Get the tree from source
175        let tree = self
176            .source
177            .store()
178            .get_tree(tree_hash)?
179            .ok_or_else(|| anyhow!("Tree {} not found in source", tree_hash))?;
180
181        // Copy all blobs and sub-trees. Redaction propagation lives
182        // in `propagate_redactions_in_tree`, called by
183        // `copy_state_recursive` regardless of whether the tree was
184        // already present — so it's intentionally absent here.
185        for entry in tree.entries() {
186            match entry.entry_type {
187                objects::object::EntryType::Blob => {
188                    if !target.store().has_blob(&entry.hash)? {
189                        let blob = self.source.require_blob(&entry.hash)?;
190                        target.store().put_blob(&blob)?;
191                        *copied += 1;
192                    }
193                }
194                objects::object::EntryType::Tree => {
195                    self.copy_tree_recursive(target, &entry.hash, copied)?;
196                }
197                objects::object::EntryType::Symlink => {
198                    if !target.store().has_blob(&entry.hash)? {
199                        let blob = self.source.require_blob(&entry.hash)?;
200                        target.store().put_blob(&blob)?;
201                        *copied += 1;
202                    }
203                }
204            }
205        }
206
207        // Store the tree in target
208        target.store().put_tree(&tree)?;
209        *copied += 1;
210
211        Ok(())
212    }
213
214    /// Walk a source-side tree and propagate any redaction sidecars
215    /// found for the blobs it references. Runs regardless of whether
216    /// the tree (or its parent state) is already present on the
217    /// target — the whole point is to recover from the "redact-after-
218    /// peer-fetched" flow where the object graph is unchanged but a
219    /// new sidecar exists upstream.
220    ///
221    /// `propagated_trees` dedups within a single sync so we don't
222    /// re-walk the same subtree across `state.tree`, `provenance`,
223    /// and `context` roots that happen to share content.
224    fn propagate_redactions_in_tree(
225        &self,
226        target: &Repository,
227        tree_hash: &ContentHash,
228        propagated_trees: &mut HashSet<ContentHash>,
229    ) -> Result<()> {
230        if !propagated_trees.insert(*tree_hash) {
231            return Ok(());
232        }
233
234        // Tree must come from the source — if it's missing there, we
235        // can't enumerate blob hashes for sidecar lookup. Treat as a
236        // gap in propagation (best-effort), not a hard failure.
237        let Some(tree) = self.source.store().get_tree(tree_hash)? else {
238            return Ok(());
239        };
240
241        for entry in tree.entries() {
242            match entry.entry_type {
243                objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
244                    self.propagate_redactions_for_blob(target, &entry.hash)?;
245                }
246                objects::object::EntryType::Tree => {
247                    self.propagate_redactions_in_tree(target, &entry.hash, propagated_trees)?;
248                }
249            }
250        }
251        Ok(())
252    }
253
254    /// If the source repository has any redactions for `blob`, ferry
255    /// the sidecar bytes through `Repository::accept_wire_redactions`
256    /// on the target so signatures are verified and any `purged_at`
257    /// records trigger a local purge on the target.
258    ///
259    /// `LocalSync` is local→local, so we use the same wire-side
260    /// contract as the network path — same signature requirement,
261    /// same idempotency. Operators who redact unsigned locally and
262    /// expect that to propagate via a local fetch will see a clear
263    /// rejection rather than a silent skip.
264    fn propagate_redactions_for_blob(&self, target: &Repository, blob: &ContentHash) -> Result<()> {
265        let Some(bytes) = self.source.store().get_redactions_bytes_for_blob(blob)? else {
266            return Ok(());
267        };
268        target.accept_wire_redactions(*blob, &bytes)?;
269        Ok(())
270    }
271
272    /// Copy a specific blob from source to target.
273    pub fn copy_blob(&self, target: &Repository, hash: &ContentHash) -> Result<bool> {
274        if target.store().has_blob(hash)? {
275            return Ok(false);
276        }
277
278        let blob = self.source.require_blob(hash)?;
279
280        target.store().put_blob(&blob)?;
281        Ok(true)
282    }
283}