1use std::{collections::HashSet, path::Path};
7
8use anyhow::{Result, anyhow};
9use objects::{
10 object::{ChangeId, ContentHash},
11 store::ObjectStore,
12};
13use repo::Repository;
14
15pub struct LocalSync {
17 source: Repository,
18}
19
20impl LocalSync {
21 pub fn open(path: &Path) -> Result<Self> {
23 let source = Repository::open(path)?;
24 Ok(Self { source })
25 }
26
27 pub fn source(&self) -> &Repository {
29 &self.source
30 }
31
32 pub fn list_threads(&self) -> Result<Vec<(String, ChangeId)>> {
34 let mut threads = Vec::new();
35 for thread in self.source.refs().list_threads()? {
36 if let Some(state_id) = self.source.refs().get_thread(&thread)? {
37 threads.push((thread.to_string(), state_id));
38 }
39 }
40 Ok(threads)
41 }
42
43 pub fn list_markers(&self) -> Result<Vec<(String, ChangeId)>> {
45 let mut markers = Vec::new();
46 for marker in self.source.refs().list_markers()? {
47 if let Some(state_id) = self.source.refs().get_marker(&marker)? {
48 markers.push((marker.to_string(), state_id));
49 }
50 }
51 Ok(markers)
52 }
53
54 pub fn fetch_state(&self, target: &Repository, state_id: &ChangeId) -> Result<usize> {
56 let mut copied = 0;
57 let mut visited = HashSet::new();
58 self.copy_state_recursive(target, state_id, &mut visited, &mut copied, None)?;
59 Ok(copied)
60 }
61
62 pub fn fetch_state_with_depth(
67 &self,
68 target: &Repository,
69 state_id: &ChangeId,
70 depth: u32,
71 ) -> Result<usize> {
72 let mut copied = 0;
73 let mut visited = HashSet::new();
74 self.copy_state_recursive(target, state_id, &mut visited, &mut copied, Some(depth))?;
75 Ok(copied)
76 }
77
78 fn copy_state_recursive(
79 &self,
80 target: &Repository,
81 state_id: &ChangeId,
82 visited: &mut HashSet<ChangeId>,
83 copied: &mut usize,
84 max_depth: Option<u32>,
85 ) -> Result<()> {
86 if visited.contains(state_id) {
87 return Ok(());
88 }
89 visited.insert(*state_id);
90
91 let target_state = target.store().get_state(state_id)?;
100 let state_already_present = target_state.is_some();
101
102 let state = match self.source.store().get_state(state_id)? {
108 Some(state) => state,
109 None if state_already_present => return Ok(()),
110 None => return Err(anyhow!("State {} not found in source", state_id)),
111 };
112
113 self.propagate_state_visibility_for_state(target, state_id)?;
116 let mut propagated_trees: HashSet<ContentHash> = HashSet::new();
117 self.propagate_redactions_in_tree(target, &state.tree, &mut propagated_trees)?;
118 if let Some(provenance_root) = state.provenance {
119 self.propagate_redactions_in_tree(target, &provenance_root, &mut propagated_trees)?;
120 }
121 if let Some(context_root) = state.context {
122 self.propagate_redactions_in_tree(target, &context_root, &mut propagated_trees)?;
123 }
124
125 if !state_already_present {
126 self.copy_tree_recursive(target, &state.tree, copied)?;
128 if let Some(provenance_root) = state.provenance {
129 self.copy_tree_recursive(target, &provenance_root, copied)?;
130 }
131 if let Some(context_root) = state.context {
132 self.copy_tree_recursive(target, &context_root, copied)?;
133 }
134 }
135 self.copy_state_blob_dependencies(target, &state, copied)?;
136
137 if let Some(depth) = max_depth {
142 if depth > 0 {
143 for parent in &state.parents {
144 self.copy_state_recursive(target, parent, visited, copied, Some(depth - 1))?;
145 }
146 } else {
147 if !state_already_present {
149 target.set_shallow(state_id, &state.parents)?;
150 }
151 }
152 } else {
153 for parent in &state.parents {
154 self.copy_state_recursive(target, parent, visited, copied, None)?;
155 }
156 }
157
158 if !state_already_present || state_metadata_roots_changed(target_state.as_ref(), &state) {
159 target.store().put_state(&state)?;
160 if !state_already_present {
161 *copied += 1;
162 }
163 }
164
165 Ok(())
166 }
167
168 fn copy_state_blob_dependencies(
169 &self,
170 target: &Repository,
171 state: &objects::object::State,
172 copied: &mut usize,
173 ) -> Result<()> {
174 for hash in [
175 state.risk_signals,
176 state.review_signatures,
177 state.discussions,
178 state.structured_conflicts,
179 ]
180 .into_iter()
181 .flatten()
182 {
183 self.copy_blob_dependency(target, &hash, copied)?;
184 }
185 Ok(())
186 }
187
188 fn copy_blob_dependency(
189 &self,
190 target: &Repository,
191 hash: &ContentHash,
192 copied: &mut usize,
193 ) -> Result<()> {
194 if target.store().has_blob(hash)? {
195 return Ok(());
196 }
197 let blob = self.source.require_blob(hash)?;
198 target.store().put_blob(&blob)?;
199 *copied += 1;
200 Ok(())
201 }
202
203 fn copy_tree_recursive(
204 &self,
205 target: &Repository,
206 tree_hash: &ContentHash,
207 copied: &mut usize,
208 ) -> Result<()> {
209 if target.store().has_tree(tree_hash)? {
211 return Ok(());
212 }
213
214 let tree = self
216 .source
217 .store()
218 .get_tree(tree_hash)?
219 .ok_or_else(|| anyhow!("Tree {} not found in source", tree_hash))?;
220
221 for entry in tree.entries() {
226 match entry.entry_type {
227 objects::object::EntryType::Blob => {
228 if !target.store().has_blob(&entry.hash)? {
229 let blob = self.source.require_blob(&entry.hash)?;
230 target.store().put_blob(&blob)?;
231 *copied += 1;
232 }
233 }
234 objects::object::EntryType::Tree => {
235 self.copy_tree_recursive(target, &entry.hash, copied)?;
236 }
237 objects::object::EntryType::Symlink => {
238 if !target.store().has_blob(&entry.hash)? {
239 let blob = self.source.require_blob(&entry.hash)?;
240 target.store().put_blob(&blob)?;
241 *copied += 1;
242 }
243 }
244 }
245 }
246
247 target.store().put_tree(&tree)?;
249 *copied += 1;
250
251 Ok(())
252 }
253
254 fn propagate_redactions_in_tree(
265 &self,
266 target: &Repository,
267 tree_hash: &ContentHash,
268 propagated_trees: &mut HashSet<ContentHash>,
269 ) -> Result<()> {
270 if !propagated_trees.insert(*tree_hash) {
271 return Ok(());
272 }
273
274 let Some(tree) = self.source.store().get_tree(tree_hash)? else {
278 return Ok(());
279 };
280
281 for entry in tree.entries() {
282 match entry.entry_type {
283 objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
284 self.propagate_redactions_for_blob(target, &entry.hash)?;
285 }
286 objects::object::EntryType::Tree => {
287 self.propagate_redactions_in_tree(target, &entry.hash, propagated_trees)?;
288 }
289 }
290 }
291 Ok(())
292 }
293
294 fn propagate_redactions_for_blob(&self, target: &Repository, blob: &ContentHash) -> Result<()> {
305 let Some(bytes) = self.source.store().get_redactions_bytes_for_blob(blob)? else {
306 return Ok(());
307 };
308 target.accept_wire_redactions(*blob, &bytes)?;
309 Ok(())
310 }
311
312 fn propagate_state_visibility_for_state(
316 &self,
317 target: &Repository,
318 state: &ChangeId,
319 ) -> Result<()> {
320 let Some(bytes) = self.source.get_state_visibility_bytes_for_state(state)? else {
321 return Ok(());
322 };
323 target.accept_wire_state_visibility(*state, &bytes)?;
324 Ok(())
325 }
326
327 pub fn copy_blob(&self, target: &Repository, hash: &ContentHash) -> Result<bool> {
329 if target.store().has_blob(hash)? {
330 return Ok(false);
331 }
332
333 let blob = self.source.require_blob(hash)?;
334
335 target.store().put_blob(&blob)?;
336 Ok(true)
337 }
338}
339
340fn state_metadata_roots_changed(
341 target_state: Option<&objects::object::State>,
342 source_state: &objects::object::State,
343) -> bool {
344 let Some(target_state) = target_state else {
345 return true;
346 };
347 target_state.risk_signals != source_state.risk_signals
348 || target_state.review_signatures != source_state.review_signatures
349 || target_state.discussions != source_state.discussions
350 || target_state.structured_conflicts != source_state.structured_conflicts
351}