Skip to main content

cli/client/
local_sync.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local repository synchronization.
3//!
4//! Direct access to local repositories without network protocol overhead.
5
6use std::{
7    collections::{HashSet, VecDeque},
8    path::Path,
9};
10
11use anyhow::{Result, anyhow};
12use objects::{
13    object::{ActionId, ChangeId, ContentHash},
14    store::ObjectStore,
15};
16use repo::Repository;
17use wire::{
18    GitLaneTransferIntent, ObjectId, ObjectType, PlannedObject, RepositoryTransferPlan,
19    StateClosureOptions,
20};
21
22/// Synchronize objects from a local source repository to a target repository.
23pub struct LocalSync {
24    source: Repository,
25}
26
27impl LocalSync {
28    /// Open a local repository for synchronization.
29    pub fn open(path: &Path) -> Result<Self> {
30        let source = Repository::open(path)?;
31        Ok(Self { source })
32    }
33
34    /// Get the source repository.
35    pub fn source(&self) -> &Repository {
36        &self.source
37    }
38
39    /// List all threads in the source repository.
40    pub fn list_threads(&self) -> Result<Vec<(String, ChangeId)>> {
41        let mut threads = Vec::new();
42        for thread in self.source.refs().list_threads()? {
43            if let Some(state_id) = self.source.refs().get_thread(&thread)? {
44                threads.push((thread.to_string(), state_id));
45            }
46        }
47        Ok(threads)
48    }
49
50    /// List all markers in the source repository.
51    pub fn list_markers(&self) -> Result<Vec<(String, ChangeId)>> {
52        let mut markers = Vec::new();
53        for marker in self.source.refs().list_markers()? {
54            if let Some(state_id) = self.source.refs().get_marker(&marker)? {
55                markers.push((marker.to_string(), state_id));
56            }
57        }
58        Ok(markers)
59    }
60
61    /// Fetch a state and all its dependencies from source to target.
62    pub fn fetch_state(&self, target: &Repository, state_id: &ChangeId) -> Result<usize> {
63        let transfer_plan = self.plan_state_transfer(*state_id, None)?;
64        self.copy_transfer_plan(target, &transfer_plan)
65    }
66
67    /// Fetch a state with limited depth (shallow clone).
68    ///
69    /// Depth 1 means the target state and its immediate parents.
70    /// A depth of 0 should be treated by callers as "full history".
71    pub fn fetch_state_with_depth(
72        &self,
73        target: &Repository,
74        state_id: &ChangeId,
75        depth: u32,
76    ) -> Result<usize> {
77        let state_already_present = target.store().get_state(state_id)?.is_some();
78        let transfer_plan = self.plan_state_transfer(*state_id, Some(depth))?;
79        let copied = self.copy_transfer_plan(target, &transfer_plan)?;
80        if !state_already_present {
81            self.mark_shallow_boundaries(target, *state_id, depth)?;
82        }
83        Ok(copied)
84    }
85
86    fn plan_state_transfer(
87        &self,
88        state_id: ChangeId,
89        depth: Option<u32>,
90    ) -> Result<RepositoryTransferPlan> {
91        // Local sync still executes through the existing dependency-preserving
92        // recursive copy path. The shared plan gives local and hosted Heddle
93        // object sync the same partition/stats contract without introducing a
94        // second local storage executor.
95        Ok(RepositoryTransferPlan::from_state_closure_plan(
96            self.source.store(),
97            state_id,
98            StateClosureOptions {
99                depth,
100                exclude_states: Vec::new(),
101            },
102            GitLaneTransferIntent::HeddleObjectsOnly,
103        )?)
104    }
105
106    fn copy_transfer_plan(
107        &self,
108        target: &Repository,
109        transfer_plan: &RepositoryTransferPlan,
110    ) -> Result<usize> {
111        let mut copied = 0;
112        for object in &transfer_plan.partitions.packable_objects {
113            if self.copy_planned_object(target, object)? {
114                copied += 1;
115            }
116        }
117        for object in &transfer_plan.partitions.sidecar_objects {
118            self.copy_planned_sidecar(target, object)?;
119        }
120        Ok(copied)
121    }
122
123    fn copy_planned_object(&self, target: &Repository, object: &PlannedObject) -> Result<bool> {
124        match (&object.id, object.obj_type) {
125            (ObjectId::Hash(hash), ObjectType::Blob) => self.copy_blob(target, hash),
126            (ObjectId::Hash(hash), ObjectType::Tree) => self.copy_tree(target, hash),
127            (ObjectId::Hash(hash), ObjectType::Action) => self.copy_action(target, hash),
128            (ObjectId::ChangeId(state_id), ObjectType::State) => self.copy_state(target, state_id),
129            (_, ObjectType::Redaction | ObjectType::StateVisibility) => Ok(false),
130            (id, obj_type) => Err(anyhow!(
131                "transfer plan object {id:?} has incompatible type {obj_type:?}"
132            )),
133        }
134    }
135
136    fn copy_tree(&self, target: &Repository, tree_hash: &ContentHash) -> Result<bool> {
137        if target.store().has_tree(tree_hash)? {
138            return Ok(false);
139        }
140        let tree = self
141            .source
142            .store()
143            .get_tree(tree_hash)?
144            .ok_or_else(|| anyhow!("Tree {} not found in source", tree_hash))?;
145        target.store().put_tree(&tree)?;
146        Ok(true)
147    }
148
149    fn copy_action(&self, target: &Repository, hash: &ContentHash) -> Result<bool> {
150        let action_id = ActionId::from_hash(*hash);
151        if target.store().get_action(&action_id)?.is_some() {
152            return Ok(false);
153        }
154        let mut action = self
155            .source
156            .store()
157            .get_action(&action_id)?
158            .ok_or_else(|| anyhow!("Action {} not found in source", hash))?;
159        target.store().put_action(&mut action)?;
160        Ok(true)
161    }
162
163    fn copy_state(&self, target: &Repository, state_id: &ChangeId) -> Result<bool> {
164        let target_state = target.store().get_state(state_id)?;
165        let state_already_present = target_state.is_some();
166        let state = self
167            .source
168            .store()
169            .get_state(state_id)?
170            .ok_or_else(|| anyhow!("State {} not found in source", state_id))?;
171
172        if !state_already_present || state_metadata_roots_changed(target_state.as_ref(), &state) {
173            target.store().put_state(&state)?;
174        }
175        Ok(!state_already_present)
176    }
177
178    fn copy_planned_sidecar(&self, target: &Repository, object: &PlannedObject) -> Result<()> {
179        match (&object.id, object.obj_type) {
180            (ObjectId::Hash(hash), ObjectType::Redaction) => {
181                self.propagate_redactions_for_blob(target, hash)
182            }
183            (ObjectId::ChangeId(state_id), ObjectType::StateVisibility) => {
184                self.propagate_state_visibility_for_state(target, state_id)
185            }
186            (_, ObjectType::Blob | ObjectType::Tree | ObjectType::State | ObjectType::Action) => {
187                Ok(())
188            }
189            (id, obj_type) => Err(anyhow!(
190                "transfer plan sidecar {id:?} has incompatible type {obj_type:?}"
191            )),
192        }
193    }
194
195    fn mark_shallow_boundaries(
196        &self,
197        target: &Repository,
198        state_id: ChangeId,
199        max_depth: u32,
200    ) -> Result<()> {
201        let mut seen: HashSet<ChangeId> = HashSet::new();
202        let mut queue = VecDeque::from([(state_id, 0u32)]);
203        while let Some((id, depth)) = queue.pop_front() {
204            if !seen.insert(id) {
205                continue;
206            }
207            let state = self
208                .source
209                .store()
210                .get_state(&id)?
211                .ok_or_else(|| anyhow!("State {} not found in source", id))?;
212            if depth == max_depth {
213                if !state.parents.is_empty() {
214                    target.set_shallow(&id, &state.parents)?;
215                }
216                continue;
217            }
218            for parent in &state.parents {
219                queue.push_back((*parent, depth + 1));
220            }
221        }
222        Ok(())
223    }
224
225    /// If the source repository has any redactions for `blob`, ferry
226    /// the sidecar bytes through `Repository::accept_wire_redactions`
227    /// on the target so signatures are verified and any `purged_at`
228    /// records trigger a local purge on the target.
229    ///
230    /// `LocalSync` is local→local, so we use the same wire-side
231    /// contract as the network path — same signature requirement,
232    /// same idempotency. Operators who redact unsigned locally and
233    /// expect that to propagate via a local fetch will see a clear
234    /// rejection rather than a silent skip.
235    fn propagate_redactions_for_blob(&self, target: &Repository, blob: &ContentHash) -> Result<()> {
236        let Some(bytes) = self.source.store().get_redactions_bytes_for_blob(blob)? else {
237            return Ok(());
238        };
239        target.accept_wire_redactions(*blob, &bytes)?;
240        Ok(())
241    }
242
243    /// If the source repository has state-visibility records for `state`,
244    /// ferry the sidecar bytes through the same repository boundary used by
245    /// the network path.
246    fn propagate_state_visibility_for_state(
247        &self,
248        target: &Repository,
249        state: &ChangeId,
250    ) -> Result<()> {
251        let Some(bytes) = self.source.get_state_visibility_bytes_for_state(state)? else {
252            return Ok(());
253        };
254        target.accept_wire_state_visibility(*state, &bytes)?;
255        Ok(())
256    }
257
258    /// Copy a specific blob from source to target.
259    pub fn copy_blob(&self, target: &Repository, hash: &ContentHash) -> Result<bool> {
260        if target.store().has_blob(hash)? {
261            return Ok(false);
262        }
263
264        let blob = self.source.require_blob(hash)?;
265
266        target.store().put_blob(&blob)?;
267        Ok(true)
268    }
269}
270
271fn state_metadata_roots_changed(
272    target_state: Option<&objects::object::State>,
273    source_state: &objects::object::State,
274) -> bool {
275    let Some(target_state) = target_state else {
276        return true;
277    };
278    target_state.risk_signals != source_state.risk_signals
279        || target_state.review_signatures != source_state.review_signatures
280        || target_state.discussions != source_state.discussions
281        || target_state.structured_conflicts != source_state.structured_conflicts
282}
283
284#[cfg(test)]
285mod tests {
286    use objects::object::{Attribution, Principal};
287    use tempfile::TempDir;
288
289    use super::*;
290
291    fn attribution() -> Attribution {
292        Attribution::human(Principal::new("Test User", "test@example.com"))
293    }
294
295    fn capture(repo: &Repository, file_content: &str, message: &str) -> ChangeId {
296        std::fs::write(repo.root().join("file.txt"), file_content).unwrap();
297        repo.snapshot_with_attribution(Some(message.to_string()), None, attribution())
298            .unwrap()
299            .change_id
300    }
301
302    #[test]
303    fn shallow_refetch_does_not_graft_already_present_history() {
304        let source_dir = TempDir::new().unwrap();
305        let target_dir = TempDir::new().unwrap();
306        let source = Repository::init_default(source_dir.path()).unwrap();
307        let target = Repository::init_default(target_dir.path()).unwrap();
308
309        let _first = capture(&source, "one\n", "one");
310        let second = capture(&source, "two\n", "two");
311        let third = capture(&source, "three\n", "three");
312
313        let sync = LocalSync::open(source_dir.path()).unwrap();
314        sync.fetch_state(&target, &third).unwrap();
315
316        assert!(
317            target.store().get_state(&second).unwrap().is_some(),
318            "full fetch should copy the parent state before the shallow re-fetch"
319        );
320        assert!(
321            !target.is_shallow(&second),
322            "parent starts as normal visible history"
323        );
324
325        sync.fetch_state_with_depth(&target, &third, 1).unwrap();
326
327        assert!(
328            !target.is_shallow(&second),
329            "incremental shallow re-fetch of an already-present tip must not graft its parent"
330        );
331    }
332}