Skip to main content

cli/client/
local_sync.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local repository synchronization.
3//!
4//! Direct access to local repositories without network protocol overhead.
5
6use std::{collections::HashSet, path::Path};
7
8use anyhow::{Result, anyhow};
9use objects::{
10    object::{ChangeId, ContentHash},
11    store::ObjectStore,
12};
13use repo::Repository;
14
15/// Synchronize objects from a local source repository to a target repository.
16pub struct LocalSync {
17    source: Repository,
18}
19
20impl LocalSync {
21    /// Open a local repository for synchronization.
22    pub fn open(path: &Path) -> Result<Self> {
23        let source = Repository::open(path)?;
24        Ok(Self { source })
25    }
26
27    /// Get the source repository.
28    pub fn source(&self) -> &Repository {
29        &self.source
30    }
31
32    /// List all threads in the source repository.
33    pub fn list_threads(&self) -> Result<Vec<(String, ChangeId)>> {
34        let mut threads = Vec::new();
35        for thread in self.source.refs().list_threads()? {
36            if let Some(state_id) = self.source.refs().get_thread(&thread)? {
37                threads.push((thread.to_string(), state_id));
38            }
39        }
40        Ok(threads)
41    }
42
43    /// List all markers in the source repository.
44    pub fn list_markers(&self) -> Result<Vec<(String, ChangeId)>> {
45        let mut markers = Vec::new();
46        for marker in self.source.refs().list_markers()? {
47            if let Some(state_id) = self.source.refs().get_marker(&marker)? {
48                markers.push((marker.to_string(), state_id));
49            }
50        }
51        Ok(markers)
52    }
53
54    /// Fetch a state and all its dependencies from source to target.
55    pub fn fetch_state(&self, target: &Repository, state_id: &ChangeId) -> Result<usize> {
56        let mut copied = 0;
57        let mut visited = HashSet::new();
58        self.copy_state_recursive(target, state_id, &mut visited, &mut copied, None)?;
59        Ok(copied)
60    }
61
62    /// Fetch a state with limited depth (shallow clone).
63    ///
64    /// Depth 1 means the target state and its immediate parents.
65    /// A depth of 0 should be treated by callers as "full history".
66    pub fn fetch_state_with_depth(
67        &self,
68        target: &Repository,
69        state_id: &ChangeId,
70        depth: u32,
71    ) -> Result<usize> {
72        let mut copied = 0;
73        let mut visited = HashSet::new();
74        self.copy_state_recursive(target, state_id, &mut visited, &mut copied, Some(depth))?;
75        Ok(copied)
76    }
77
78    fn copy_state_recursive(
79        &self,
80        target: &Repository,
81        state_id: &ChangeId,
82        visited: &mut HashSet<ChangeId>,
83        copied: &mut usize,
84        max_depth: Option<u32>,
85    ) -> Result<()> {
86        if visited.contains(state_id) {
87            return Ok(());
88        }
89        visited.insert(*state_id);
90
91        // Whether the target already has this state. We do NOT
92        // early-return on this — even when the object graph is fully
93        // present, an operator may have declared a redaction on the
94        // source *after* the target previously fetched the content.
95        // Subsequent syncs must still propagate the sidecar. We
96        // therefore always walk the tree(s) to surface redactions,
97        // and condition just the object-copy step on the
98        // `state_already_present` flag.
99        let target_state = target.store().get_state(state_id)?;
100        let state_already_present = target_state.is_some();
101
102        // Source-side state read drives both the object copy (when
103        // needed) and sidecar propagation (always).
104        // If the source no longer has the state but the target does,
105        // we can't enumerate sidecars for propagation — skip with
106        // no error in that case.
107        let state = match self.source.store().get_state(state_id)? {
108            Some(state) => state,
109            None if state_already_present => return Ok(()),
110            None => return Err(anyhow!("State {} not found in source", state_id)),
111        };
112
113        // Always propagate per-state visibility and per-blob redactions,
114        // regardless of whether the objects themselves need copying.
115        self.propagate_state_visibility_for_state(target, state_id)?;
116        let mut propagated_trees: HashSet<ContentHash> = HashSet::new();
117        self.propagate_redactions_in_tree(target, &state.tree, &mut propagated_trees)?;
118        if let Some(provenance_root) = state.provenance {
119            self.propagate_redactions_in_tree(target, &provenance_root, &mut propagated_trees)?;
120        }
121        if let Some(context_root) = state.context {
122            self.propagate_redactions_in_tree(target, &context_root, &mut propagated_trees)?;
123        }
124
125        if !state_already_present {
126            // Copy tree recursively
127            self.copy_tree_recursive(target, &state.tree, copied)?;
128            if let Some(provenance_root) = state.provenance {
129                self.copy_tree_recursive(target, &provenance_root, copied)?;
130            }
131            if let Some(context_root) = state.context {
132                self.copy_tree_recursive(target, &context_root, copied)?;
133            }
134        }
135        self.copy_state_blob_dependencies(target, &state, copied)?;
136
137        // Copy parent states recursively (if depth allows). We recurse
138        // on parents even when the current state was already present —
139        // a redaction declared on an ancestor blob still needs to
140        // reach the target's redactions store.
141        if let Some(depth) = max_depth {
142            if depth > 0 {
143                for parent in &state.parents {
144                    self.copy_state_recursive(target, parent, visited, copied, Some(depth - 1))?;
145                }
146            } else {
147                // Shallow state - mark parents as grafted
148                if !state_already_present {
149                    target.set_shallow(state_id, &state.parents)?;
150                }
151            }
152        } else {
153            for parent in &state.parents {
154                self.copy_state_recursive(target, parent, visited, copied, None)?;
155            }
156        }
157
158        if !state_already_present || state_metadata_roots_changed(target_state.as_ref(), &state) {
159            target.store().put_state(&state)?;
160            if !state_already_present {
161                *copied += 1;
162            }
163        }
164
165        Ok(())
166    }
167
168    fn copy_state_blob_dependencies(
169        &self,
170        target: &Repository,
171        state: &objects::object::State,
172        copied: &mut usize,
173    ) -> Result<()> {
174        for hash in [
175            state.risk_signals,
176            state.review_signatures,
177            state.discussions,
178            state.structured_conflicts,
179        ]
180        .into_iter()
181        .flatten()
182        {
183            self.copy_blob_dependency(target, &hash, copied)?;
184        }
185        Ok(())
186    }
187
188    fn copy_blob_dependency(
189        &self,
190        target: &Repository,
191        hash: &ContentHash,
192        copied: &mut usize,
193    ) -> Result<()> {
194        if target.store().has_blob(hash)? {
195            return Ok(());
196        }
197        let blob = self.source.require_blob(hash)?;
198        target.store().put_blob(&blob)?;
199        *copied += 1;
200        Ok(())
201    }
202
203    fn copy_tree_recursive(
204        &self,
205        target: &Repository,
206        tree_hash: &ContentHash,
207        copied: &mut usize,
208    ) -> Result<()> {
209        // Check if target already has this tree
210        if target.store().has_tree(tree_hash)? {
211            return Ok(());
212        }
213
214        // Get the tree from source
215        let tree = self
216            .source
217            .store()
218            .get_tree(tree_hash)?
219            .ok_or_else(|| anyhow!("Tree {} not found in source", tree_hash))?;
220
221        // Copy all blobs and sub-trees. Redaction propagation lives
222        // in `propagate_redactions_in_tree`, called by
223        // `copy_state_recursive` regardless of whether the tree was
224        // already present — so it's intentionally absent here.
225        for entry in tree.entries() {
226            match entry.entry_type {
227                objects::object::EntryType::Blob => {
228                    if !target.store().has_blob(&entry.hash)? {
229                        let blob = self.source.require_blob(&entry.hash)?;
230                        target.store().put_blob(&blob)?;
231                        *copied += 1;
232                    }
233                }
234                objects::object::EntryType::Tree => {
235                    self.copy_tree_recursive(target, &entry.hash, copied)?;
236                }
237                objects::object::EntryType::Symlink => {
238                    if !target.store().has_blob(&entry.hash)? {
239                        let blob = self.source.require_blob(&entry.hash)?;
240                        target.store().put_blob(&blob)?;
241                        *copied += 1;
242                    }
243                }
244            }
245        }
246
247        // Store the tree in target
248        target.store().put_tree(&tree)?;
249        *copied += 1;
250
251        Ok(())
252    }
253
254    /// Walk a source-side tree and propagate any redaction sidecars
255    /// found for the blobs it references. Runs regardless of whether
256    /// the tree (or its parent state) is already present on the
257    /// target — the whole point is to recover from the "redact-after-
258    /// peer-fetched" flow where the object graph is unchanged but a
259    /// new sidecar exists upstream.
260    ///
261    /// `propagated_trees` dedups within a single sync so we don't
262    /// re-walk the same subtree across `state.tree`, `provenance`,
263    /// and `context` roots that happen to share content.
264    fn propagate_redactions_in_tree(
265        &self,
266        target: &Repository,
267        tree_hash: &ContentHash,
268        propagated_trees: &mut HashSet<ContentHash>,
269    ) -> Result<()> {
270        if !propagated_trees.insert(*tree_hash) {
271            return Ok(());
272        }
273
274        // Tree must come from the source — if it's missing there, we
275        // can't enumerate blob hashes for sidecar lookup. Treat as a
276        // gap in propagation (best-effort), not a hard failure.
277        let Some(tree) = self.source.store().get_tree(tree_hash)? else {
278            return Ok(());
279        };
280
281        for entry in tree.entries() {
282            match entry.entry_type {
283                objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
284                    self.propagate_redactions_for_blob(target, &entry.hash)?;
285                }
286                objects::object::EntryType::Tree => {
287                    self.propagate_redactions_in_tree(target, &entry.hash, propagated_trees)?;
288                }
289            }
290        }
291        Ok(())
292    }
293
294    /// If the source repository has any redactions for `blob`, ferry
295    /// the sidecar bytes through `Repository::accept_wire_redactions`
296    /// on the target so signatures are verified and any `purged_at`
297    /// records trigger a local purge on the target.
298    ///
299    /// `LocalSync` is local→local, so we use the same wire-side
300    /// contract as the network path — same signature requirement,
301    /// same idempotency. Operators who redact unsigned locally and
302    /// expect that to propagate via a local fetch will see a clear
303    /// rejection rather than a silent skip.
304    fn propagate_redactions_for_blob(&self, target: &Repository, blob: &ContentHash) -> Result<()> {
305        let Some(bytes) = self.source.store().get_redactions_bytes_for_blob(blob)? else {
306            return Ok(());
307        };
308        target.accept_wire_redactions(*blob, &bytes)?;
309        Ok(())
310    }
311
312    /// If the source repository has state-visibility records for `state`,
313    /// ferry the sidecar bytes through the same repository boundary used by
314    /// the network path.
315    fn propagate_state_visibility_for_state(
316        &self,
317        target: &Repository,
318        state: &ChangeId,
319    ) -> Result<()> {
320        let Some(bytes) = self.source.get_state_visibility_bytes_for_state(state)? else {
321            return Ok(());
322        };
323        target.accept_wire_state_visibility(*state, &bytes)?;
324        Ok(())
325    }
326
327    /// Copy a specific blob from source to target.
328    pub fn copy_blob(&self, target: &Repository, hash: &ContentHash) -> Result<bool> {
329        if target.store().has_blob(hash)? {
330            return Ok(false);
331        }
332
333        let blob = self.source.require_blob(hash)?;
334
335        target.store().put_blob(&blob)?;
336        Ok(true)
337    }
338}
339
340fn state_metadata_roots_changed(
341    target_state: Option<&objects::object::State>,
342    source_state: &objects::object::State,
343) -> bool {
344    let Some(target_state) = target_state else {
345        return true;
346    };
347    target_state.risk_signals != source_state.risk_signals
348        || target_state.review_signatures != source_state.review_signatures
349        || target_state.discussions != source_state.discussions
350        || target_state.structured_conflicts != source_state.structured_conflicts
351}