cli/client/local_sync.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Local repository synchronization.
3//!
4//! Direct access to local repositories without network protocol overhead.
5
6use std::{collections::HashSet, path::Path};
7
8use anyhow::{Result, anyhow};
9use objects::object::{ChangeId, ContentHash};
10use repo::Repository;
11
12/// Synchronize objects from a local source repository to a target repository.
13pub struct LocalSync {
14 source: Repository,
15}
16
17impl LocalSync {
18 /// Open a local repository for synchronization.
19 pub fn open(path: &Path) -> Result<Self> {
20 let source = Repository::open(path)?;
21 Ok(Self { source })
22 }
23
24 /// Get the source repository.
25 pub fn source(&self) -> &Repository {
26 &self.source
27 }
28
29 /// List all threads in the source repository.
30 pub fn list_threads(&self) -> Result<Vec<(String, ChangeId)>> {
31 let mut threads = Vec::new();
32 for thread in self.source.refs().list_threads()? {
33 if let Some(state_id) = self.source.refs().get_thread(&thread)? {
34 threads.push((thread, state_id));
35 }
36 }
37 Ok(threads)
38 }
39
40 /// List all markers in the source repository.
41 pub fn list_markers(&self) -> Result<Vec<(String, ChangeId)>> {
42 let mut markers = Vec::new();
43 for marker in self.source.refs().list_markers()? {
44 if let Some(state_id) = self.source.refs().get_marker(&marker)? {
45 markers.push((marker, state_id));
46 }
47 }
48 Ok(markers)
49 }
50
51 /// Fetch a state and all its dependencies from source to target.
52 pub fn fetch_state(&self, target: &Repository, state_id: &ChangeId) -> Result<usize> {
53 let mut copied = 0;
54 let mut visited = HashSet::new();
55 self.copy_state_recursive(target, state_id, &mut visited, &mut copied, None)?;
56 Ok(copied)
57 }
58
59 /// Fetch a state with limited depth (shallow clone).
60 ///
61 /// Depth 1 means the target state and its immediate parents.
62 /// A depth of 0 should be treated by callers as "full history".
63 pub fn fetch_state_with_depth(
64 &self,
65 target: &Repository,
66 state_id: &ChangeId,
67 depth: u32,
68 ) -> Result<usize> {
69 let mut copied = 0;
70 let mut visited = HashSet::new();
71 self.copy_state_recursive(target, state_id, &mut visited, &mut copied, Some(depth))?;
72 Ok(copied)
73 }
74
75 fn copy_state_recursive(
76 &self,
77 target: &Repository,
78 state_id: &ChangeId,
79 visited: &mut HashSet<ChangeId>,
80 copied: &mut usize,
81 max_depth: Option<u32>,
82 ) -> Result<()> {
83 if visited.contains(state_id) {
84 return Ok(());
85 }
86 visited.insert(*state_id);
87
88 // Whether the target already has this state. We do NOT
89 // early-return on this — even when the object graph is fully
90 // present, an operator may have declared a redaction on the
91 // source *after* the target previously fetched the content.
92 // Subsequent syncs must still propagate the sidecar. We
93 // therefore always walk the tree(s) to surface redactions,
94 // and condition just the object-copy step on the
95 // `state_already_present` flag.
96 let state_already_present = target.store().has_state(state_id)?;
97
98 // Source-side state read drives both the object copy (when
99 // needed) and the redaction-propagation tree walk (always).
100 // If the source no longer has the state but the target does,
101 // we can't enumerate blob hashes for propagation — skip with
102 // no error in that case.
103 let state = match self.source.store().get_state(state_id)? {
104 Some(state) => state,
105 None if state_already_present => return Ok(()),
106 None => return Err(anyhow!("State {} not found in source", state_id)),
107 };
108
109 // Always propagate redactions for every blob in the state's
110 // trees, regardless of whether the trees themselves need
111 // copying. `propagate_redactions_in_tree` walks the trees
112 // without the has_tree fast-path so re-syncs after a redact
113 // still ferry the sidecar.
114 let mut propagated_trees: HashSet<ContentHash> = HashSet::new();
115 self.propagate_redactions_in_tree(target, &state.tree, &mut propagated_trees)?;
116 if let Some(provenance_root) = state.provenance {
117 self.propagate_redactions_in_tree(target, &provenance_root, &mut propagated_trees)?;
118 }
119 if let Some(context_root) = state.context {
120 self.propagate_redactions_in_tree(target, &context_root, &mut propagated_trees)?;
121 }
122
123 if !state_already_present {
124 // Copy tree recursively
125 self.copy_tree_recursive(target, &state.tree, copied)?;
126 if let Some(provenance_root) = state.provenance {
127 self.copy_tree_recursive(target, &provenance_root, copied)?;
128 }
129 if let Some(context_root) = state.context {
130 self.copy_tree_recursive(target, &context_root, copied)?;
131 }
132 }
133
134 // Copy parent states recursively (if depth allows). We recurse
135 // on parents even when the current state was already present —
136 // a redaction declared on an ancestor blob still needs to
137 // reach the target's redactions store.
138 if let Some(depth) = max_depth {
139 if depth > 0 {
140 for parent in &state.parents {
141 self.copy_state_recursive(target, parent, visited, copied, Some(depth - 1))?;
142 }
143 } else {
144 // Shallow state - mark parents as grafted
145 if !state_already_present {
146 target.set_shallow(state_id, &state.parents)?;
147 }
148 }
149 } else {
150 for parent in &state.parents {
151 self.copy_state_recursive(target, parent, visited, copied, None)?;
152 }
153 }
154
155 if !state_already_present {
156 target.store().put_state(&state)?;
157 *copied += 1;
158 }
159
160 Ok(())
161 }
162
163 fn copy_tree_recursive(
164 &self,
165 target: &Repository,
166 tree_hash: &ContentHash,
167 copied: &mut usize,
168 ) -> Result<()> {
169 // Check if target already has this tree
170 if target.store().has_tree(tree_hash)? {
171 return Ok(());
172 }
173
174 // Get the tree from source
175 let tree = self
176 .source
177 .store()
178 .get_tree(tree_hash)?
179 .ok_or_else(|| anyhow!("Tree {} not found in source", tree_hash))?;
180
181 // Copy all blobs and sub-trees. Redaction propagation lives
182 // in `propagate_redactions_in_tree`, called by
183 // `copy_state_recursive` regardless of whether the tree was
184 // already present — so it's intentionally absent here.
185 for entry in tree.entries() {
186 match entry.entry_type {
187 objects::object::EntryType::Blob => {
188 if !target.store().has_blob(&entry.hash)? {
189 let blob = self.source.require_blob(&entry.hash)?;
190 target.store().put_blob(&blob)?;
191 *copied += 1;
192 }
193 }
194 objects::object::EntryType::Tree => {
195 self.copy_tree_recursive(target, &entry.hash, copied)?;
196 }
197 objects::object::EntryType::Symlink => {
198 if !target.store().has_blob(&entry.hash)? {
199 let blob = self.source.require_blob(&entry.hash)?;
200 target.store().put_blob(&blob)?;
201 *copied += 1;
202 }
203 }
204 }
205 }
206
207 // Store the tree in target
208 target.store().put_tree(&tree)?;
209 *copied += 1;
210
211 Ok(())
212 }
213
214 /// Walk a source-side tree and propagate any redaction sidecars
215 /// found for the blobs it references. Runs regardless of whether
216 /// the tree (or its parent state) is already present on the
217 /// target — the whole point is to recover from the "redact-after-
218 /// peer-fetched" flow where the object graph is unchanged but a
219 /// new sidecar exists upstream.
220 ///
221 /// `propagated_trees` dedups within a single sync so we don't
222 /// re-walk the same subtree across `state.tree`, `provenance`,
223 /// and `context` roots that happen to share content.
224 fn propagate_redactions_in_tree(
225 &self,
226 target: &Repository,
227 tree_hash: &ContentHash,
228 propagated_trees: &mut HashSet<ContentHash>,
229 ) -> Result<()> {
230 if !propagated_trees.insert(*tree_hash) {
231 return Ok(());
232 }
233
234 // Tree must come from the source — if it's missing there, we
235 // can't enumerate blob hashes for sidecar lookup. Treat as a
236 // gap in propagation (best-effort), not a hard failure.
237 let Some(tree) = self.source.store().get_tree(tree_hash)? else {
238 return Ok(());
239 };
240
241 for entry in tree.entries() {
242 match entry.entry_type {
243 objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
244 self.propagate_redactions_for_blob(target, &entry.hash)?;
245 }
246 objects::object::EntryType::Tree => {
247 self.propagate_redactions_in_tree(target, &entry.hash, propagated_trees)?;
248 }
249 }
250 }
251 Ok(())
252 }
253
254 /// If the source repository has any redactions for `blob`, ferry
255 /// the sidecar bytes through `Repository::accept_wire_redactions`
256 /// on the target so signatures are verified and any `purged_at`
257 /// records trigger a local purge on the target.
258 ///
259 /// `LocalSync` is local→local, so we use the same wire-side
260 /// contract as the network path — same signature requirement,
261 /// same idempotency. Operators who redact unsigned locally and
262 /// expect that to propagate via a local fetch will see a clear
263 /// rejection rather than a silent skip.
264 fn propagate_redactions_for_blob(&self, target: &Repository, blob: &ContentHash) -> Result<()> {
265 let Some(bytes) = self.source.store().get_redactions_bytes_for_blob(blob)? else {
266 return Ok(());
267 };
268 target.accept_wire_redactions(*blob, &bytes)?;
269 Ok(())
270 }
271
272 /// Copy a specific blob from source to target.
273 pub fn copy_blob(&self, target: &Repository, hash: &ContentHash) -> Result<bool> {
274 if target.store().has_blob(hash)? {
275 return Ok(false);
276 }
277
278 let blob = self.source.require_blob(hash)?;
279
280 target.store().put_blob(&blob)?;
281 Ok(true)
282 }
283}