use std::{
collections::{HashSet, VecDeque},
path::Path,
};
use anyhow::{Result, anyhow};
use objects::{
object::{ActionId, ChangeId, ContentHash},
store::ObjectStore,
};
use repo::Repository;
use wire::{
GitLaneTransferIntent, ObjectId, ObjectType, PlannedObject, RepositoryTransferPlan,
StateClosureOptions,
};
pub struct LocalSync {
source: Repository,
}
impl LocalSync {
pub fn open(path: &Path) -> Result<Self> {
let source = Repository::open(path)?;
Ok(Self { source })
}
pub fn source(&self) -> &Repository {
&self.source
}
pub fn list_threads(&self) -> Result<Vec<(String, ChangeId)>> {
let mut threads = Vec::new();
for thread in self.source.refs().list_threads()? {
if let Some(state_id) = self.source.refs().get_thread(&thread)? {
threads.push((thread.to_string(), state_id));
}
}
Ok(threads)
}
pub fn list_markers(&self) -> Result<Vec<(String, ChangeId)>> {
let mut markers = Vec::new();
for marker in self.source.refs().list_markers()? {
if let Some(state_id) = self.source.refs().get_marker(&marker)? {
markers.push((marker.to_string(), state_id));
}
}
Ok(markers)
}
pub fn fetch_state(&self, target: &Repository, state_id: &ChangeId) -> Result<usize> {
let transfer_plan = self.plan_state_transfer(*state_id, None)?;
self.copy_transfer_plan(target, &transfer_plan)
}
pub fn fetch_state_with_depth(
&self,
target: &Repository,
state_id: &ChangeId,
depth: u32,
) -> Result<usize> {
let state_already_present = target.store().get_state(state_id)?.is_some();
let transfer_plan = self.plan_state_transfer(*state_id, Some(depth))?;
let copied = self.copy_transfer_plan(target, &transfer_plan)?;
if !state_already_present {
self.mark_shallow_boundaries(target, *state_id, depth)?;
}
Ok(copied)
}
fn plan_state_transfer(
&self,
state_id: ChangeId,
depth: Option<u32>,
) -> Result<RepositoryTransferPlan> {
Ok(RepositoryTransferPlan::from_state_closure_plan(
self.source.store(),
state_id,
StateClosureOptions {
depth,
exclude_states: Vec::new(),
},
GitLaneTransferIntent::HeddleObjectsOnly,
)?)
}
fn copy_transfer_plan(
&self,
target: &Repository,
transfer_plan: &RepositoryTransferPlan,
) -> Result<usize> {
let mut copied = 0;
for object in &transfer_plan.partitions.packable_objects {
if self.copy_planned_object(target, object)? {
copied += 1;
}
}
for object in &transfer_plan.partitions.sidecar_objects {
self.copy_planned_sidecar(target, object)?;
}
Ok(copied)
}
fn copy_planned_object(&self, target: &Repository, object: &PlannedObject) -> Result<bool> {
match (&object.id, object.obj_type) {
(ObjectId::Hash(hash), ObjectType::Blob) => self.copy_blob(target, hash),
(ObjectId::Hash(hash), ObjectType::Tree) => self.copy_tree(target, hash),
(ObjectId::Hash(hash), ObjectType::Action) => self.copy_action(target, hash),
(ObjectId::ChangeId(state_id), ObjectType::State) => self.copy_state(target, state_id),
(_, ObjectType::Redaction | ObjectType::StateVisibility) => Ok(false),
(id, obj_type) => Err(anyhow!(
"transfer plan object {id:?} has incompatible type {obj_type:?}"
)),
}
}
fn copy_tree(&self, target: &Repository, tree_hash: &ContentHash) -> Result<bool> {
if target.store().has_tree(tree_hash)? {
return Ok(false);
}
let tree = self
.source
.store()
.get_tree(tree_hash)?
.ok_or_else(|| anyhow!("Tree {} not found in source", tree_hash))?;
target.store().put_tree(&tree)?;
Ok(true)
}
fn copy_action(&self, target: &Repository, hash: &ContentHash) -> Result<bool> {
let action_id = ActionId::from_hash(*hash);
if target.store().get_action(&action_id)?.is_some() {
return Ok(false);
}
let mut action = self
.source
.store()
.get_action(&action_id)?
.ok_or_else(|| anyhow!("Action {} not found in source", hash))?;
target.store().put_action(&mut action)?;
Ok(true)
}
fn copy_state(&self, target: &Repository, state_id: &ChangeId) -> Result<bool> {
let target_state = target.store().get_state(state_id)?;
let state_already_present = target_state.is_some();
let state = self
.source
.store()
.get_state(state_id)?
.ok_or_else(|| anyhow!("State {} not found in source", state_id))?;
if !state_already_present || state_metadata_roots_changed(target_state.as_ref(), &state) {
target.store().put_state(&state)?;
}
Ok(!state_already_present)
}
fn copy_planned_sidecar(&self, target: &Repository, object: &PlannedObject) -> Result<()> {
match (&object.id, object.obj_type) {
(ObjectId::Hash(hash), ObjectType::Redaction) => {
self.propagate_redactions_for_blob(target, hash)
}
(ObjectId::ChangeId(state_id), ObjectType::StateVisibility) => {
self.propagate_state_visibility_for_state(target, state_id)
}
(_, ObjectType::Blob | ObjectType::Tree | ObjectType::State | ObjectType::Action) => {
Ok(())
}
(id, obj_type) => Err(anyhow!(
"transfer plan sidecar {id:?} has incompatible type {obj_type:?}"
)),
}
}
fn mark_shallow_boundaries(
&self,
target: &Repository,
state_id: ChangeId,
max_depth: u32,
) -> Result<()> {
let mut seen: HashSet<ChangeId> = HashSet::new();
let mut queue = VecDeque::from([(state_id, 0u32)]);
while let Some((id, depth)) = queue.pop_front() {
if !seen.insert(id) {
continue;
}
let state = self
.source
.store()
.get_state(&id)?
.ok_or_else(|| anyhow!("State {} not found in source", id))?;
if depth == max_depth {
if !state.parents.is_empty() {
target.set_shallow(&id, &state.parents)?;
}
continue;
}
for parent in &state.parents {
queue.push_back((*parent, depth + 1));
}
}
Ok(())
}
fn propagate_redactions_for_blob(&self, target: &Repository, blob: &ContentHash) -> Result<()> {
let Some(bytes) = self.source.store().get_redactions_bytes_for_blob(blob)? else {
return Ok(());
};
target.accept_wire_redactions(*blob, &bytes)?;
Ok(())
}
fn propagate_state_visibility_for_state(
&self,
target: &Repository,
state: &ChangeId,
) -> Result<()> {
let Some(bytes) = self.source.get_state_visibility_bytes_for_state(state)? else {
return Ok(());
};
target.accept_wire_state_visibility(*state, &bytes)?;
Ok(())
}
pub fn copy_blob(&self, target: &Repository, hash: &ContentHash) -> Result<bool> {
if target.store().has_blob(hash)? {
return Ok(false);
}
let blob = self.source.require_blob(hash)?;
target.store().put_blob(&blob)?;
Ok(true)
}
}
fn state_metadata_roots_changed(
target_state: Option<&objects::object::State>,
source_state: &objects::object::State,
) -> bool {
let Some(target_state) = target_state else {
return true;
};
target_state.risk_signals != source_state.risk_signals
|| target_state.review_signatures != source_state.review_signatures
|| target_state.discussions != source_state.discussions
|| target_state.structured_conflicts != source_state.structured_conflicts
}
#[cfg(test)]
mod tests {
use objects::object::{Attribution, Principal};
use tempfile::TempDir;
use super::*;
fn attribution() -> Attribution {
Attribution::human(Principal::new("Test User", "test@example.com"))
}
fn capture(repo: &Repository, file_content: &str, message: &str) -> ChangeId {
std::fs::write(repo.root().join("file.txt"), file_content).unwrap();
repo.snapshot_with_attribution(Some(message.to_string()), None, attribution())
.unwrap()
.change_id
}
#[test]
fn shallow_refetch_does_not_graft_already_present_history() {
let source_dir = TempDir::new().unwrap();
let target_dir = TempDir::new().unwrap();
let source = Repository::init_default(source_dir.path()).unwrap();
let target = Repository::init_default(target_dir.path()).unwrap();
let _first = capture(&source, "one\n", "one");
let second = capture(&source, "two\n", "two");
let third = capture(&source, "three\n", "three");
let sync = LocalSync::open(source_dir.path()).unwrap();
sync.fetch_state(&target, &third).unwrap();
assert!(
target.store().get_state(&second).unwrap().is_some(),
"full fetch should copy the parent state before the shallow re-fetch"
);
assert!(
!target.is_shallow(&second),
"parent starts as normal visible history"
);
sync.fetch_state_with_depth(&target, &third, 1).unwrap();
assert!(
!target.is_shallow(&second),
"incremental shallow re-fetch of an already-present tip must not graft its parent"
);
}
}