1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
// SPDX-License-Identifier: Apache-2.0
//! Local repository synchronization.
//!
//! Direct access to local repositories without network protocol overhead.
use std::{collections::HashSet, path::Path};
use anyhow::{Result, anyhow};
use objects::object::{ChangeId, ContentHash};
use repo::Repository;
/// Synchronize objects from a local source repository to a target repository.
pub struct LocalSync {
source: Repository,
}
impl LocalSync {
/// Open a local repository for synchronization.
pub fn open(path: &Path) -> Result<Self> {
let source = Repository::open(path)?;
Ok(Self { source })
}
/// Get the source repository.
pub fn source(&self) -> &Repository {
&self.source
}
/// List all threads in the source repository.
pub fn list_threads(&self) -> Result<Vec<(String, ChangeId)>> {
let mut threads = Vec::new();
for thread in self.source.refs().list_threads()? {
if let Some(state_id) = self.source.refs().get_thread(&thread)? {
threads.push((thread, state_id));
}
}
Ok(threads)
}
/// List all markers in the source repository.
pub fn list_markers(&self) -> Result<Vec<(String, ChangeId)>> {
let mut markers = Vec::new();
for marker in self.source.refs().list_markers()? {
if let Some(state_id) = self.source.refs().get_marker(&marker)? {
markers.push((marker, state_id));
}
}
Ok(markers)
}
/// Fetch a state and all its dependencies from source to target.
pub fn fetch_state(&self, target: &Repository, state_id: &ChangeId) -> Result<usize> {
let mut copied = 0;
let mut visited = HashSet::new();
self.copy_state_recursive(target, state_id, &mut visited, &mut copied, None)?;
Ok(copied)
}
/// Fetch a state with limited depth (shallow clone).
///
/// Depth 1 means the target state and its immediate parents.
/// A depth of 0 should be treated by callers as "full history".
pub fn fetch_state_with_depth(
&self,
target: &Repository,
state_id: &ChangeId,
depth: u32,
) -> Result<usize> {
let mut copied = 0;
let mut visited = HashSet::new();
self.copy_state_recursive(target, state_id, &mut visited, &mut copied, Some(depth))?;
Ok(copied)
}
fn copy_state_recursive(
&self,
target: &Repository,
state_id: &ChangeId,
visited: &mut HashSet<ChangeId>,
copied: &mut usize,
max_depth: Option<u32>,
) -> Result<()> {
if visited.contains(state_id) {
return Ok(());
}
visited.insert(*state_id);
// Whether the target already has this state. We do NOT
// early-return on this — even when the object graph is fully
// present, an operator may have declared a redaction on the
// source *after* the target previously fetched the content.
// Subsequent syncs must still propagate the sidecar. We
// therefore always walk the tree(s) to surface redactions,
// and condition just the object-copy step on the
// `state_already_present` flag.
let state_already_present = target.store().has_state(state_id)?;
// Source-side state read drives both the object copy (when
// needed) and the redaction-propagation tree walk (always).
// If the source no longer has the state but the target does,
// we can't enumerate blob hashes for propagation — skip with
// no error in that case.
let state = match self.source.store().get_state(state_id)? {
Some(state) => state,
None if state_already_present => return Ok(()),
None => return Err(anyhow!("State {} not found in source", state_id)),
};
// Always propagate redactions for every blob in the state's
// trees, regardless of whether the trees themselves need
// copying. `propagate_redactions_in_tree` walks the trees
// without the has_tree fast-path so re-syncs after a redact
// still ferry the sidecar.
let mut propagated_trees: HashSet<ContentHash> = HashSet::new();
self.propagate_redactions_in_tree(target, &state.tree, &mut propagated_trees)?;
if let Some(provenance_root) = state.provenance {
self.propagate_redactions_in_tree(target, &provenance_root, &mut propagated_trees)?;
}
if let Some(context_root) = state.context {
self.propagate_redactions_in_tree(target, &context_root, &mut propagated_trees)?;
}
if !state_already_present {
// Copy tree recursively
self.copy_tree_recursive(target, &state.tree, copied)?;
if let Some(provenance_root) = state.provenance {
self.copy_tree_recursive(target, &provenance_root, copied)?;
}
if let Some(context_root) = state.context {
self.copy_tree_recursive(target, &context_root, copied)?;
}
}
// Copy parent states recursively (if depth allows). We recurse
// on parents even when the current state was already present —
// a redaction declared on an ancestor blob still needs to
// reach the target's redactions store.
if let Some(depth) = max_depth {
if depth > 0 {
for parent in &state.parents {
self.copy_state_recursive(target, parent, visited, copied, Some(depth - 1))?;
}
} else {
// Shallow state - mark parents as grafted
if !state_already_present {
target.set_shallow(state_id, &state.parents)?;
}
}
} else {
for parent in &state.parents {
self.copy_state_recursive(target, parent, visited, copied, None)?;
}
}
if !state_already_present {
target.store().put_state(&state)?;
*copied += 1;
}
Ok(())
}
fn copy_tree_recursive(
&self,
target: &Repository,
tree_hash: &ContentHash,
copied: &mut usize,
) -> Result<()> {
// Check if target already has this tree
if target.store().has_tree(tree_hash)? {
return Ok(());
}
// Get the tree from source
let tree = self
.source
.store()
.get_tree(tree_hash)?
.ok_or_else(|| anyhow!("Tree {} not found in source", tree_hash))?;
// Copy all blobs and sub-trees. Redaction propagation lives
// in `propagate_redactions_in_tree`, called by
// `copy_state_recursive` regardless of whether the tree was
// already present — so it's intentionally absent here.
for entry in tree.entries() {
match entry.entry_type {
objects::object::EntryType::Blob => {
if !target.store().has_blob(&entry.hash)? {
let blob = self.source.require_blob(&entry.hash)?;
target.store().put_blob(&blob)?;
*copied += 1;
}
}
objects::object::EntryType::Tree => {
self.copy_tree_recursive(target, &entry.hash, copied)?;
}
objects::object::EntryType::Symlink => {
if !target.store().has_blob(&entry.hash)? {
let blob = self.source.require_blob(&entry.hash)?;
target.store().put_blob(&blob)?;
*copied += 1;
}
}
}
}
// Store the tree in target
target.store().put_tree(&tree)?;
*copied += 1;
Ok(())
}
/// Walk a source-side tree and propagate any redaction sidecars
/// found for the blobs it references. Runs regardless of whether
/// the tree (or its parent state) is already present on the
/// target — the whole point is to recover from the "redact-after-
/// peer-fetched" flow where the object graph is unchanged but a
/// new sidecar exists upstream.
///
/// `propagated_trees` dedups within a single sync so we don't
/// re-walk the same subtree across `state.tree`, `provenance`,
/// and `context` roots that happen to share content.
fn propagate_redactions_in_tree(
&self,
target: &Repository,
tree_hash: &ContentHash,
propagated_trees: &mut HashSet<ContentHash>,
) -> Result<()> {
if !propagated_trees.insert(*tree_hash) {
return Ok(());
}
// Tree must come from the source — if it's missing there, we
// can't enumerate blob hashes for sidecar lookup. Treat as a
// gap in propagation (best-effort), not a hard failure.
let Some(tree) = self.source.store().get_tree(tree_hash)? else {
return Ok(());
};
for entry in tree.entries() {
match entry.entry_type {
objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
self.propagate_redactions_for_blob(target, &entry.hash)?;
}
objects::object::EntryType::Tree => {
self.propagate_redactions_in_tree(target, &entry.hash, propagated_trees)?;
}
}
}
Ok(())
}
/// If the source repository has any redactions for `blob`, ferry
/// the sidecar bytes through `Repository::accept_wire_redactions`
/// on the target so signatures are verified and any `purged_at`
/// records trigger a local purge on the target.
///
/// `LocalSync` is local→local, so we use the same wire-side
/// contract as the network path — same signature requirement,
/// same idempotency. Operators who redact unsigned locally and
/// expect that to propagate via a local fetch will see a clear
/// rejection rather than a silent skip.
fn propagate_redactions_for_blob(&self, target: &Repository, blob: &ContentHash) -> Result<()> {
let Some(bytes) = self.source.store().get_redactions_bytes_for_blob(blob)? else {
return Ok(());
};
target.accept_wire_redactions(*blob, &bytes)?;
Ok(())
}
/// Copy a specific blob from source to target.
pub fn copy_blob(&self, target: &Repository, hash: &ContentHash) -> Result<bool> {
if target.store().has_blob(hash)? {
return Ok(false);
}
let blob = self.source.require_blob(hash)?;
target.store().put_blob(&blob)?;
Ok(true)
}
}