1use std::{
7 collections::{HashSet, VecDeque},
8 path::Path,
9};
10
11use anyhow::{Result, anyhow};
12use objects::{
13 object::{ActionId, ChangeId, ContentHash},
14 store::ObjectStore,
15};
16use repo::Repository;
17use wire::{
18 GitLaneTransferIntent, ObjectId, ObjectType, PlannedObject, RepositoryTransferPlan,
19 StateClosureOptions,
20};
21
22pub struct LocalSync {
24 source: Repository,
25}
26
27impl LocalSync {
28 pub fn open(path: &Path) -> Result<Self> {
30 let source = Repository::open(path)?;
31 Ok(Self { source })
32 }
33
34 pub fn source(&self) -> &Repository {
36 &self.source
37 }
38
39 pub fn list_threads(&self) -> Result<Vec<(String, ChangeId)>> {
41 let mut threads = Vec::new();
42 for thread in self.source.refs().list_threads()? {
43 if let Some(state_id) = self.source.refs().get_thread(&thread)? {
44 threads.push((thread.to_string(), state_id));
45 }
46 }
47 Ok(threads)
48 }
49
50 pub fn list_markers(&self) -> Result<Vec<(String, ChangeId)>> {
52 let mut markers = Vec::new();
53 for marker in self.source.refs().list_markers()? {
54 if let Some(state_id) = self.source.refs().get_marker(&marker)? {
55 markers.push((marker.to_string(), state_id));
56 }
57 }
58 Ok(markers)
59 }
60
61 pub fn fetch_state(&self, target: &Repository, state_id: &ChangeId) -> Result<usize> {
63 let transfer_plan = self.plan_state_transfer(*state_id, None)?;
64 self.copy_transfer_plan(target, &transfer_plan)
65 }
66
67 pub fn fetch_state_with_depth(
72 &self,
73 target: &Repository,
74 state_id: &ChangeId,
75 depth: u32,
76 ) -> Result<usize> {
77 let state_already_present = target.store().get_state(state_id)?.is_some();
78 let transfer_plan = self.plan_state_transfer(*state_id, Some(depth))?;
79 let copied = self.copy_transfer_plan(target, &transfer_plan)?;
80 if !state_already_present {
81 self.mark_shallow_boundaries(target, *state_id, depth)?;
82 }
83 Ok(copied)
84 }
85
86 fn plan_state_transfer(
87 &self,
88 state_id: ChangeId,
89 depth: Option<u32>,
90 ) -> Result<RepositoryTransferPlan> {
91 Ok(RepositoryTransferPlan::from_state_closure_plan(
96 self.source.store(),
97 state_id,
98 StateClosureOptions {
99 depth,
100 exclude_states: Vec::new(),
101 },
102 GitLaneTransferIntent::HeddleObjectsOnly,
103 )?)
104 }
105
106 fn copy_transfer_plan(
107 &self,
108 target: &Repository,
109 transfer_plan: &RepositoryTransferPlan,
110 ) -> Result<usize> {
111 let mut copied = 0;
112 for object in &transfer_plan.partitions.packable_objects {
113 if self.copy_planned_object(target, object)? {
114 copied += 1;
115 }
116 }
117 for object in &transfer_plan.partitions.sidecar_objects {
118 self.copy_planned_sidecar(target, object)?;
119 }
120 Ok(copied)
121 }
122
123 fn copy_planned_object(&self, target: &Repository, object: &PlannedObject) -> Result<bool> {
124 match (&object.id, object.obj_type) {
125 (ObjectId::Hash(hash), ObjectType::Blob) => self.copy_blob(target, hash),
126 (ObjectId::Hash(hash), ObjectType::Tree) => self.copy_tree(target, hash),
127 (ObjectId::Hash(hash), ObjectType::Action) => self.copy_action(target, hash),
128 (ObjectId::ChangeId(state_id), ObjectType::State) => self.copy_state(target, state_id),
129 (_, ObjectType::Redaction | ObjectType::StateVisibility) => Ok(false),
130 (id, obj_type) => Err(anyhow!(
131 "transfer plan object {id:?} has incompatible type {obj_type:?}"
132 )),
133 }
134 }
135
136 fn copy_tree(&self, target: &Repository, tree_hash: &ContentHash) -> Result<bool> {
137 if target.store().has_tree(tree_hash)? {
138 return Ok(false);
139 }
140 let tree = self
141 .source
142 .store()
143 .get_tree(tree_hash)?
144 .ok_or_else(|| anyhow!("Tree {} not found in source", tree_hash))?;
145 target.store().put_tree(&tree)?;
146 Ok(true)
147 }
148
149 fn copy_action(&self, target: &Repository, hash: &ContentHash) -> Result<bool> {
150 let action_id = ActionId::from_hash(*hash);
151 if target.store().get_action(&action_id)?.is_some() {
152 return Ok(false);
153 }
154 let mut action = self
155 .source
156 .store()
157 .get_action(&action_id)?
158 .ok_or_else(|| anyhow!("Action {} not found in source", hash))?;
159 target.store().put_action(&mut action)?;
160 Ok(true)
161 }
162
163 fn copy_state(&self, target: &Repository, state_id: &ChangeId) -> Result<bool> {
164 let target_state = target.store().get_state(state_id)?;
165 let state_already_present = target_state.is_some();
166 let state = self
167 .source
168 .store()
169 .get_state(state_id)?
170 .ok_or_else(|| anyhow!("State {} not found in source", state_id))?;
171
172 if !state_already_present || state_metadata_roots_changed(target_state.as_ref(), &state) {
173 target.store().put_state(&state)?;
174 }
175 Ok(!state_already_present)
176 }
177
178 fn copy_planned_sidecar(&self, target: &Repository, object: &PlannedObject) -> Result<()> {
179 match (&object.id, object.obj_type) {
180 (ObjectId::Hash(hash), ObjectType::Redaction) => {
181 self.propagate_redactions_for_blob(target, hash)
182 }
183 (ObjectId::ChangeId(state_id), ObjectType::StateVisibility) => {
184 self.propagate_state_visibility_for_state(target, state_id)
185 }
186 (_, ObjectType::Blob | ObjectType::Tree | ObjectType::State | ObjectType::Action) => {
187 Ok(())
188 }
189 (id, obj_type) => Err(anyhow!(
190 "transfer plan sidecar {id:?} has incompatible type {obj_type:?}"
191 )),
192 }
193 }
194
195 fn mark_shallow_boundaries(
196 &self,
197 target: &Repository,
198 state_id: ChangeId,
199 max_depth: u32,
200 ) -> Result<()> {
201 let mut seen: HashSet<ChangeId> = HashSet::new();
202 let mut queue = VecDeque::from([(state_id, 0u32)]);
203 while let Some((id, depth)) = queue.pop_front() {
204 if !seen.insert(id) {
205 continue;
206 }
207 let state = self
208 .source
209 .store()
210 .get_state(&id)?
211 .ok_or_else(|| anyhow!("State {} not found in source", id))?;
212 if depth == max_depth {
213 if !state.parents.is_empty() {
214 target.set_shallow(&id, &state.parents)?;
215 }
216 continue;
217 }
218 for parent in &state.parents {
219 queue.push_back((*parent, depth + 1));
220 }
221 }
222 Ok(())
223 }
224
225 fn propagate_redactions_for_blob(&self, target: &Repository, blob: &ContentHash) -> Result<()> {
236 let Some(bytes) = self.source.store().get_redactions_bytes_for_blob(blob)? else {
237 return Ok(());
238 };
239 target.accept_wire_redactions(*blob, &bytes)?;
240 Ok(())
241 }
242
243 fn propagate_state_visibility_for_state(
247 &self,
248 target: &Repository,
249 state: &ChangeId,
250 ) -> Result<()> {
251 let Some(bytes) = self.source.get_state_visibility_bytes_for_state(state)? else {
252 return Ok(());
253 };
254 target.accept_wire_state_visibility(*state, &bytes)?;
255 Ok(())
256 }
257
258 pub fn copy_blob(&self, target: &Repository, hash: &ContentHash) -> Result<bool> {
260 if target.store().has_blob(hash)? {
261 return Ok(false);
262 }
263
264 let blob = self.source.require_blob(hash)?;
265
266 target.store().put_blob(&blob)?;
267 Ok(true)
268 }
269}
270
271fn state_metadata_roots_changed(
272 target_state: Option<&objects::object::State>,
273 source_state: &objects::object::State,
274) -> bool {
275 let Some(target_state) = target_state else {
276 return true;
277 };
278 target_state.risk_signals != source_state.risk_signals
279 || target_state.review_signatures != source_state.review_signatures
280 || target_state.discussions != source_state.discussions
281 || target_state.structured_conflicts != source_state.structured_conflicts
282}
283
284#[cfg(test)]
285mod tests {
286 use objects::object::{Attribution, Principal};
287 use tempfile::TempDir;
288
289 use super::*;
290
291 fn attribution() -> Attribution {
292 Attribution::human(Principal::new("Test User", "test@example.com"))
293 }
294
295 fn capture(repo: &Repository, file_content: &str, message: &str) -> ChangeId {
296 std::fs::write(repo.root().join("file.txt"), file_content).unwrap();
297 repo.snapshot_with_attribution(Some(message.to_string()), None, attribution())
298 .unwrap()
299 .change_id
300 }
301
302 #[test]
303 fn shallow_refetch_does_not_graft_already_present_history() {
304 let source_dir = TempDir::new().unwrap();
305 let target_dir = TempDir::new().unwrap();
306 let source = Repository::init_default(source_dir.path()).unwrap();
307 let target = Repository::init_default(target_dir.path()).unwrap();
308
309 let _first = capture(&source, "one\n", "one");
310 let second = capture(&source, "two\n", "two");
311 let third = capture(&source, "three\n", "three");
312
313 let sync = LocalSync::open(source_dir.path()).unwrap();
314 sync.fetch_state(&target, &third).unwrap();
315
316 assert!(
317 target.store().get_state(&second).unwrap().is_some(),
318 "full fetch should copy the parent state before the shallow re-fetch"
319 );
320 assert!(
321 !target.is_shallow(&second),
322 "parent starts as normal visible history"
323 );
324
325 sync.fetch_state_with_depth(&target, &third, 1).unwrap();
326
327 assert!(
328 !target.is_shallow(&second),
329 "incremental shallow re-fetch of an already-present tip must not graft its parent"
330 );
331 }
332}