infinite-db 0.1.6

A spatial-graph database using n-dimensional curves and hyperedges for engineering logic.
Documentation
//! End-to-end block-level sync session driver.
//!
//! Negotiates Merkle roots over a framed byte stream (`serial::SyncMessage`),
//! transfers a [`Delta`] when roots differ, and applies it to the local database.

use std::io::{self, Read, Write};

use crate::infinitedb_core::{
    address::SpaceId,
    block::Block,
    snapshot::Snapshot,
};
use crate::infinitedb_sync::{
    delta::Delta,
    serial::{read_message, write_message, SyncMessage},
};


/// Run a sync session as the initiator (sends our root first, receives delta to apply).
pub fn run_sync_session_initiator<R: Read, W: Write>(
    local: &mut crate::InfiniteDb,
    space: SpaceId,
    reader: &mut R,
    writer: &mut W,
) -> io::Result<()> {
    let local_tree = local.snapshot_merkle(space)?;
    write_message(writer, &SyncMessage::MerkleRoot { root: local_tree.root() })?;

    let remote_root = match read_message(reader)? {
        SyncMessage::MerkleRoot { root } => root,
        SyncMessage::Error { message } => {
            return Err(io::Error::new(io::ErrorKind::Other, message));
        }
        other => {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                format!("expected MerkleRoot, got {other:?}"),
            ));
        }
    };

    if local_tree.root() == remote_root {
        return Ok(());
    }

    let remote_tree = match read_message(reader)? {
        SyncMessage::MerkleTree(t) => t,
        SyncMessage::Error { message } => {
            return Err(io::Error::new(io::ErrorKind::Other, message));
        }
        other => {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                format!("expected MerkleTree, got {other:?}"),
            ));
        }
    };

    let _diffs = local_tree.diff_leaves(&remote_tree);

    let delta = match read_message(reader)? {
        SyncMessage::Delta(d) => d,
        SyncMessage::Error { message } => {
            return Err(io::Error::new(io::ErrorKind::Other, message));
        }
        other => {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                format!("expected Delta, got {other:?}"),
            ));
        }
    };

    local.apply_delta(space, &delta)?;
    write_message(
        writer,
        &SyncMessage::Ack {
            applied_revision: local.revision(),
        },
    )?;
    Ok(())
}

/// Run a sync session as the responder (reads initiator root, may send tree + delta).
pub fn run_sync_session_responder<R: Read, W: Write>(
    local: &mut crate::InfiniteDb,
    space: SpaceId,
    reader: &mut R,
    writer: &mut W,
    supply_delta: impl FnOnce(&Snapshot) -> io::Result<Delta>,
) -> io::Result<()> {
    let local_tree = local.snapshot_merkle(space)?;

    let initiator_root = match read_message(reader)? {
        SyncMessage::MerkleRoot { root } => root,
        other => {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                format!("expected MerkleRoot, got {other:?}"),
            ));
        }
    };

    write_message(writer, &SyncMessage::MerkleRoot { root: local_tree.root() })?;

    if initiator_root == local_tree.root() {
        return Ok(());
    }

    write_message(writer, &SyncMessage::MerkleTree(local_tree.clone()))?;

    let local_snap = local
        .snapshot_for_space(space)
        .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no snapshot"))?;
    let delta = supply_delta(&local_snap)?;

    write_message(writer, &SyncMessage::Delta(delta))?;

    match read_message(reader)? {
        SyncMessage::Ack { .. } => Ok(()),
        SyncMessage::Error { message } => Err(io::Error::new(io::ErrorKind::Other, message)),
        other => Err(io::Error::new(
            io::ErrorKind::InvalidData,
            format!("expected Ack, got {other:?}"),
        )),
    }
}

/// In-memory convergence helper used by integration tests (no wire I/O).
pub fn converge_spaces(
    local: &mut crate::InfiniteDb,
    remote: &mut crate::InfiniteDb,
    space: SpaceId,
) -> io::Result<()> {
    let remote_snap = remote
        .snapshot_for_space(space)
        .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "remote snapshot"))?;
    let local_snap = local
        .snapshot_for_space(space)
        .unwrap_or_else(|| Snapshot::root(remote_snap.id, space));

    let remote_blocks: Vec<Block> = remote_snap
        .blocks
        .values()
        .map(|e| remote.read_block(e.block_id))
        .collect::<io::Result<_>>()?;

    let delta = Delta::compute(&remote_snap, &local_snap, remote_blocks);
    if delta.is_empty() {
        return Ok(());
    }
    local.apply_delta(space, &delta)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::infinitedb_core::address::DimensionVector;
    use crate::infinitedb_core::space::SpaceConfig;
    use tempfile::TempDir;

    #[test]
    fn converge_brings_diverged_databases_to_same_merkle_root() {
        let dir_a = TempDir::new().unwrap();
        let dir_b = TempDir::new().unwrap();
        let space = SpaceId(1);

        let mut a = crate::InfiniteDb::open(dir_a.path()).unwrap();
        let mut b = crate::InfiniteDb::open(dir_b.path()).unwrap();
        a.register_space(SpaceConfig::new(space, "s", 2)).unwrap();
        b.register_space(SpaceConfig::new(space, "s", 2)).unwrap();

        // Remote has data; local is empty — convergence should make roots match.
        b.insert(space, DimensionVector::new(vec![2, 2]), vec![2]).unwrap();
        b.flush(space).unwrap();

        let root_b = b.snapshot_merkle(space).unwrap().root();
        assert!(a.snapshot_for_space(space).is_none());

        converge_spaces(&mut a, &mut b, space).unwrap();
        assert_eq!(a.snapshot_merkle(space).unwrap().root(), root_b);
    }
}