use std::collections::HashMap;
use std::io;
use bincode::{config::standard, encode_to_vec};
use crate::infinitedb_core::{
address::{DimensionVector, RevisionId, SpaceId},
block::Record,
branch::BranchId,
merge::MergeStrategy,
};
use crate::InfiniteDb;
use super::merkle::{self, MerkleTree};
#[derive(Debug, Clone)]
pub struct BranchSyncState {
pub branch: BranchId,
pub merkle_root: [u8; 32],
pub revision: RevisionId,
}
fn latest_per_address(mut records: Vec<Record>) -> Vec<Record> {
let mut map: HashMap<Vec<u32>, Record> = HashMap::new();
for record in records.drain(..) {
let key = record.address.point.coords.clone();
map.entry(key)
.and_modify(|existing| {
if record.revision > existing.revision {
*existing = record.clone();
}
})
.or_insert(record);
}
let mut latest: Vec<Record> = map.into_values().collect();
latest.sort_by_key(|r| (r.address.point.coords.clone(), r.revision.0));
latest
}
pub fn snapshot_merkle(
db: &InfiniteDb,
space: SpaceId,
branch: BranchId,
) -> io::Result<MerkleTree> {
let records = latest_per_address(db.query_on_branch(branch, space, None)?);
let mut leaves = Vec::with_capacity(records.len());
for record in &records {
let encoded = encode_to_vec(
(&record.address, &record.data, record.tombstone),
standard(),
)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
leaves.push(merkle::hash_record(&encoded));
}
Ok(MerkleTree::build(&leaves))
}
pub fn branch_sync_state(
db: &InfiniteDb,
space: SpaceId,
branch: BranchId,
) -> io::Result<BranchSyncState> {
Ok(BranchSyncState {
branch,
merkle_root: snapshot_merkle(db, space, branch)?.root(),
revision: RevisionId(db.revision()),
})
}
pub fn import_branch_overlay(
local: &InfiniteDb,
remote: &InfiniteDb,
remote_branch: BranchId,
name: &str,
) -> Result<BranchId, String> {
remote.sync().map_err(|e| e.to_string())?;
let branch = local.create_branch(name, BranchId::MAIN)?;
for record in remote.branch_overlays.all_live_records(remote_branch) {
if record.tombstone {
local
.delete_on_branch(branch, record.address.space, record.address.point)
.map_err(|e| e.to_string())?;
} else {
local
.insert_on_branch(
branch,
record.address.space,
record.address.point,
record.data,
)
.map_err(|e| e.to_string())?;
}
}
Ok(branch)
}
pub fn converge_main_records(
local: &InfiniteDb,
remote: &InfiniteDb,
space: SpaceId,
) -> io::Result<()> {
remote.sync()?;
let remote_records = remote.query(space, None)?;
let local_records = local.query(space, None)?;
let mut local_latest: std::collections::HashMap<Vec<u32>, Record> =
std::collections::HashMap::new();
for r in local_records {
let key = r.address.point.coords.clone();
local_latest
.entry(key)
.and_modify(|e| {
if r.revision > e.revision {
*e = r.clone();
}
})
.or_insert(r);
}
for record in remote_records {
let coords = record.address.point.coords.clone();
let insert = match local_latest.get(&coords) {
None => true,
Some(existing) => record.revision > existing.revision,
};
if insert {
local.insert(space, DimensionVector::new(coords), record.data)?;
}
}
local.sync()
}
pub fn converge_with_branch_merge(
local: &InfiniteDb,
remote: &InfiniteDb,
space: SpaceId,
remote_branch: BranchId,
strategy: MergeStrategy,
) -> io::Result<()> {
converge_main_records(local, remote, space)?;
let imported = import_branch_overlay(local, remote, remote_branch, "sync-import")
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let result = local.merge_branch(BranchId::MAIN, imported, strategy, None)?;
if !result.conflicts.is_empty() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("sync merge left {} conflicts", result.conflicts.len()),
));
}
local.flush(space)?;
local.sync()
}