use std::collections::HashMap;
use bincode::{config::standard, encode_to_vec};
use crate::engine::error::EngineError;
use crate::engine::query::address_key;
use crate::infinitedb_core::{
address::{DimensionVector, RevisionId, SpaceId},
block::Record,
branch::BranchId,
merge::MergeStrategy,
record_identity::AddressKey,
space::SpaceRegistry,
};
use crate::InfiniteDb;
use super::merkle::{self, MerkleTree};
#[derive(Debug, Clone)]
pub struct BranchSyncState {
pub branch: BranchId,
pub merkle_root: [u8; 32],
pub revision: RevisionId,
}
fn latest_per_address(spaces: &SpaceRegistry, mut records: Vec<Record>) -> Vec<Record> {
let mut map: HashMap<AddressKey, Record> = HashMap::new();
for record in records.drain(..) {
let key = address_key(spaces, &record);
map.entry(key)
.and_modify(|existing| {
if record.revision > existing.revision {
*existing = record.clone();
}
})
.or_insert(record);
}
let mut latest: Vec<Record> = map.into_values().collect();
latest.sort_by_key(|r| (r.address.point.coords.clone(), r.revision.legacy_sequence()));
latest
}
pub fn snapshot_merkle(
db: &InfiniteDb,
space: SpaceId,
branch: BranchId,
) -> Result<MerkleTree, EngineError> {
let spaces = db.spaces.read();
let records = latest_per_address(&spaces, db.query_on_branch(branch, space, None)?);
let mut leaves = Vec::with_capacity(records.len());
for record in &records {
let encoded = encode_to_vec(
(&record.address, &record.data, record.tombstone),
standard(),
)
.map_err(|e| EngineError::Other {
message: e.to_string(),
})?;
leaves.push(merkle::hash_record(&encoded));
}
Ok(MerkleTree::build(&leaves))
}
pub fn branch_sync_state(
db: &InfiniteDb,
space: SpaceId,
branch: BranchId,
) -> Result<BranchSyncState, EngineError> {
Ok(BranchSyncState {
branch,
merkle_root: snapshot_merkle(db, space, branch)?.root(),
revision: db.revision(),
})
}
pub fn import_branch_overlay(
local: &InfiniteDb,
remote: &InfiniteDb,
remote_branch: BranchId,
name: &str,
) -> Result<BranchId, String> {
remote.sync().map_err(|e| e.to_string())?;
let branch = local.create_branch(name, BranchId::MAIN)?;
let records = remote.branch_overlays.all_live_records(remote_branch);
local
.apply_records_on_branch(branch, records)
.map_err(|e| e.to_string())?;
Ok(branch)
}
pub fn converge_main_records(
local: &InfiniteDb,
remote: &InfiniteDb,
space: SpaceId,
) -> Result<(), EngineError> {
remote.sync()?;
let remote_records = remote.query(space, None)?;
let local_records = local.query(space, None)?;
let mut local_latest: std::collections::HashMap<Vec<u32>, Record> =
std::collections::HashMap::new();
for r in local_records {
let key = r.address.point.coords.clone();
local_latest
.entry(key)
.and_modify(|e| {
if r.revision > e.revision {
*e = r.clone();
}
})
.or_insert(r);
}
let mut rows: Vec<(DimensionVector, Vec<u8>)> = Vec::new();
for record in remote_records {
let coords = record.address.point.coords.clone();
let insert = match local_latest.get(&coords) {
None => true,
Some(existing) => record.revision > existing.revision,
};
if insert && !record.tombstone {
rows.push((DimensionVector::new(coords), record.data));
}
}
if !rows.is_empty() {
local.insert_many(space, rows)?;
}
local.sync()
}
pub fn converge_with_branch_merge(
local: &InfiniteDb,
remote: &InfiniteDb,
space: SpaceId,
remote_branch: BranchId,
strategy: MergeStrategy,
) -> Result<(), EngineError> {
converge_main_records(local, remote, space)?;
let imported = import_branch_overlay(local, remote, remote_branch, "sync-import")
.map_err(|e| EngineError::Other { message: e })?;
let result = local.merge_branch(BranchId::MAIN, imported, strategy, None)?;
if !result.conflicts.is_empty() {
return Err(EngineError::Other {
message: format!("sync merge left {} conflicts", result.conflicts.len()),
});
}
local.flush(space)?;
local.sync()
}