Skip to main content

cli/client/
local_sync.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local repository synchronization.
3//!
4//! Direct access to local repositories without network protocol overhead.
5
6use std::{collections::HashSet, path::Path};
7
8use anyhow::{Result, anyhow};
9use objects::{
10    object::{ChangeId, ContentHash},
11    store::ObjectStore,
12};
13use repo::Repository;
14
15/// Synchronize objects from a local source repository to a target repository.
16pub struct LocalSync {
17    source: Repository,
18}
19
20impl LocalSync {
21    /// Open a local repository for synchronization.
22    pub fn open(path: &Path) -> Result<Self> {
23        let source = Repository::open(path)?;
24        Ok(Self { source })
25    }
26
27    /// Get the source repository.
28    pub fn source(&self) -> &Repository {
29        &self.source
30    }
31
32    /// List all threads in the source repository.
33    pub fn list_threads(&self) -> Result<Vec<(String, ChangeId)>> {
34        let mut threads = Vec::new();
35        for thread in self.source.refs().list_threads()? {
36            if let Some(state_id) = self.source.refs().get_thread(&thread)? {
37                threads.push((thread.to_string(), state_id));
38            }
39        }
40        Ok(threads)
41    }
42
43    /// List all markers in the source repository.
44    pub fn list_markers(&self) -> Result<Vec<(String, ChangeId)>> {
45        let mut markers = Vec::new();
46        for marker in self.source.refs().list_markers()? {
47            if let Some(state_id) = self.source.refs().get_marker(&marker)? {
48                markers.push((marker.to_string(), state_id));
49            }
50        }
51        Ok(markers)
52    }
53
54    /// Fetch a state and all its dependencies from source to target.
55    pub fn fetch_state(&self, target: &Repository, state_id: &ChangeId) -> Result<usize> {
56        let mut copied = 0;
57        let mut visited = HashSet::new();
58        self.copy_state_recursive(target, state_id, &mut visited, &mut copied, None)?;
59        Ok(copied)
60    }
61
62    /// Fetch a state with limited depth (shallow clone).
63    ///
64    /// Depth 1 means the target state and its immediate parents.
65    /// A depth of 0 should be treated by callers as "full history".
66    pub fn fetch_state_with_depth(
67        &self,
68        target: &Repository,
69        state_id: &ChangeId,
70        depth: u32,
71    ) -> Result<usize> {
72        let mut copied = 0;
73        let mut visited = HashSet::new();
74        self.copy_state_recursive(target, state_id, &mut visited, &mut copied, Some(depth))?;
75        Ok(copied)
76    }
77
78    fn copy_state_recursive(
79        &self,
80        target: &Repository,
81        state_id: &ChangeId,
82        visited: &mut HashSet<ChangeId>,
83        copied: &mut usize,
84        max_depth: Option<u32>,
85    ) -> Result<()> {
86        if visited.contains(state_id) {
87            return Ok(());
88        }
89        visited.insert(*state_id);
90
91        // Whether the target already has this state. We do NOT
92        // early-return on this — even when the object graph is fully
93        // present, an operator may have declared a redaction on the
94        // source *after* the target previously fetched the content.
95        // Subsequent syncs must still propagate the sidecar. We
96        // therefore always walk the tree(s) to surface redactions,
97        // and condition just the object-copy step on the
98        // `state_already_present` flag.
99        let state_already_present = target.store().has_state(state_id)?;
100
101        // Source-side state read drives both the object copy (when
102        // needed) and sidecar propagation (always).
103        // If the source no longer has the state but the target does,
104        // we can't enumerate sidecars for propagation — skip with
105        // no error in that case.
106        let state = match self.source.store().get_state(state_id)? {
107            Some(state) => state,
108            None if state_already_present => return Ok(()),
109            None => return Err(anyhow!("State {} not found in source", state_id)),
110        };
111
112        // Always propagate per-state visibility and per-blob redactions,
113        // regardless of whether the objects themselves need copying.
114        self.propagate_state_visibility_for_state(target, state_id)?;
115        let mut propagated_trees: HashSet<ContentHash> = HashSet::new();
116        self.propagate_redactions_in_tree(target, &state.tree, &mut propagated_trees)?;
117        if let Some(provenance_root) = state.provenance {
118            self.propagate_redactions_in_tree(target, &provenance_root, &mut propagated_trees)?;
119        }
120        if let Some(context_root) = state.context {
121            self.propagate_redactions_in_tree(target, &context_root, &mut propagated_trees)?;
122        }
123
124        if !state_already_present {
125            // Copy tree recursively
126            self.copy_tree_recursive(target, &state.tree, copied)?;
127            if let Some(provenance_root) = state.provenance {
128                self.copy_tree_recursive(target, &provenance_root, copied)?;
129            }
130            if let Some(context_root) = state.context {
131                self.copy_tree_recursive(target, &context_root, copied)?;
132            }
133        }
134
135        // Copy parent states recursively (if depth allows). We recurse
136        // on parents even when the current state was already present —
137        // a redaction declared on an ancestor blob still needs to
138        // reach the target's redactions store.
139        if let Some(depth) = max_depth {
140            if depth > 0 {
141                for parent in &state.parents {
142                    self.copy_state_recursive(target, parent, visited, copied, Some(depth - 1))?;
143                }
144            } else {
145                // Shallow state - mark parents as grafted
146                if !state_already_present {
147                    target.set_shallow(state_id, &state.parents)?;
148                }
149            }
150        } else {
151            for parent in &state.parents {
152                self.copy_state_recursive(target, parent, visited, copied, None)?;
153            }
154        }
155
156        if !state_already_present {
157            target.store().put_state(&state)?;
158            *copied += 1;
159        }
160
161        Ok(())
162    }
163
164    fn copy_tree_recursive(
165        &self,
166        target: &Repository,
167        tree_hash: &ContentHash,
168        copied: &mut usize,
169    ) -> Result<()> {
170        // Check if target already has this tree
171        if target.store().has_tree(tree_hash)? {
172            return Ok(());
173        }
174
175        // Get the tree from source
176        let tree = self
177            .source
178            .store()
179            .get_tree(tree_hash)?
180            .ok_or_else(|| anyhow!("Tree {} not found in source", tree_hash))?;
181
182        // Copy all blobs and sub-trees. Redaction propagation lives
183        // in `propagate_redactions_in_tree`, called by
184        // `copy_state_recursive` regardless of whether the tree was
185        // already present — so it's intentionally absent here.
186        for entry in tree.entries() {
187            match entry.entry_type {
188                objects::object::EntryType::Blob => {
189                    if !target.store().has_blob(&entry.hash)? {
190                        let blob = self.source.require_blob(&entry.hash)?;
191                        target.store().put_blob(&blob)?;
192                        *copied += 1;
193                    }
194                }
195                objects::object::EntryType::Tree => {
196                    self.copy_tree_recursive(target, &entry.hash, copied)?;
197                }
198                objects::object::EntryType::Symlink => {
199                    if !target.store().has_blob(&entry.hash)? {
200                        let blob = self.source.require_blob(&entry.hash)?;
201                        target.store().put_blob(&blob)?;
202                        *copied += 1;
203                    }
204                }
205            }
206        }
207
208        // Store the tree in target
209        target.store().put_tree(&tree)?;
210        *copied += 1;
211
212        Ok(())
213    }
214
215    /// Walk a source-side tree and propagate any redaction sidecars
216    /// found for the blobs it references. Runs regardless of whether
217    /// the tree (or its parent state) is already present on the
218    /// target — the whole point is to recover from the "redact-after-
219    /// peer-fetched" flow where the object graph is unchanged but a
220    /// new sidecar exists upstream.
221    ///
222    /// `propagated_trees` dedups within a single sync so we don't
223    /// re-walk the same subtree across `state.tree`, `provenance`,
224    /// and `context` roots that happen to share content.
225    fn propagate_redactions_in_tree(
226        &self,
227        target: &Repository,
228        tree_hash: &ContentHash,
229        propagated_trees: &mut HashSet<ContentHash>,
230    ) -> Result<()> {
231        if !propagated_trees.insert(*tree_hash) {
232            return Ok(());
233        }
234
235        // Tree must come from the source — if it's missing there, we
236        // can't enumerate blob hashes for sidecar lookup. Treat as a
237        // gap in propagation (best-effort), not a hard failure.
238        let Some(tree) = self.source.store().get_tree(tree_hash)? else {
239            return Ok(());
240        };
241
242        for entry in tree.entries() {
243            match entry.entry_type {
244                objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
245                    self.propagate_redactions_for_blob(target, &entry.hash)?;
246                }
247                objects::object::EntryType::Tree => {
248                    self.propagate_redactions_in_tree(target, &entry.hash, propagated_trees)?;
249                }
250            }
251        }
252        Ok(())
253    }
254
255    /// If the source repository has any redactions for `blob`, ferry
256    /// the sidecar bytes through `Repository::accept_wire_redactions`
257    /// on the target so signatures are verified and any `purged_at`
258    /// records trigger a local purge on the target.
259    ///
260    /// `LocalSync` is local→local, so we use the same wire-side
261    /// contract as the network path — same signature requirement,
262    /// same idempotency. Operators who redact unsigned locally and
263    /// expect that to propagate via a local fetch will see a clear
264    /// rejection rather than a silent skip.
265    fn propagate_redactions_for_blob(&self, target: &Repository, blob: &ContentHash) -> Result<()> {
266        let Some(bytes) = self.source.store().get_redactions_bytes_for_blob(blob)? else {
267            return Ok(());
268        };
269        target.accept_wire_redactions(*blob, &bytes)?;
270        Ok(())
271    }
272
273    /// If the source repository has state-visibility records for `state`,
274    /// ferry the sidecar bytes through the same repository boundary used by
275    /// the network path.
276    fn propagate_state_visibility_for_state(
277        &self,
278        target: &Repository,
279        state: &ChangeId,
280    ) -> Result<()> {
281        let Some(bytes) = self.source.get_state_visibility_bytes_for_state(state)? else {
282            return Ok(());
283        };
284        target.accept_wire_state_visibility(*state, &bytes)?;
285        Ok(())
286    }
287
288    /// Copy a specific blob from source to target.
289    pub fn copy_blob(&self, target: &Repository, hash: &ContentHash) -> Result<bool> {
290        if target.store().has_blob(hash)? {
291            return Ok(false);
292        }
293
294        let blob = self.source.require_blob(hash)?;
295
296        target.store().put_blob(&blob)?;
297        Ok(true)
298    }
299}