nodedb 0.0.0-beta.1

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
//! Integration tests verifying Origin and Lite speak the exact same sync protocol.
//!
//! Uses shared types from `nodedb-types` to build frames and verify roundtrip
//! compatibility between what Lite sends and what Origin parses.

use std::collections::HashMap;

use nodedb_types::sync::compensation::CompensationHint;
use nodedb_types::sync::shape::{ShapeDefinition, ShapeType};
use nodedb_types::sync::wire::*;

#[test]
fn lite_handshake_frame_roundtrips() {
    let msg = HandshakeMsg {
        jwt_token: "eyJ.test.token".into(),
        vector_clock: {
            let mut m = HashMap::new();
            let mut inner = HashMap::new();
            inner.insert("0000000000000001".into(), 42u64);
            m.insert("_global".into(), inner);
            m
        },
        subscribed_shapes: vec!["shape-1".into(), "shape-2".into()],
        client_version: "0.1.0-lite".into(),
        lite_id: String::new(),
        epoch: 0,
        wire_version: 1,
    };

    let frame = SyncFrame::new_msgpack(SyncMessageType::Handshake, &msg).unwrap();
    let bytes = frame.to_bytes();
    let parsed: HandshakeMsg = SyncFrame::from_bytes(&bytes)
        .unwrap()
        .decode_body()
        .unwrap();

    assert_eq!(parsed.jwt_token, "eyJ.test.token");
    assert_eq!(parsed.subscribed_shapes.len(), 2);
    assert_eq!(parsed.client_version, "0.1.0-lite");
}

#[test]
fn lite_delta_push_frame_roundtrips() {
    let msg = DeltaPushMsg {
        collection: "knowledge_base".into(),
        document_id: "doc-42".into(),
        delta: vec![0xDE, 0xAD, 0xBE, 0xEF],
        peer_id: 12345,
        mutation_id: 7,
        checksum: 0,
    };

    let frame = SyncFrame::new_msgpack(SyncMessageType::DeltaPush, &msg).unwrap();
    let parsed: DeltaPushMsg = SyncFrame::from_bytes(&frame.to_bytes())
        .unwrap()
        .decode_body()
        .unwrap();

    assert_eq!(parsed.collection, "knowledge_base");
    assert_eq!(parsed.delta, vec![0xDE, 0xAD, 0xBE, 0xEF]);
    assert_eq!(parsed.peer_id, 12345);
    assert_eq!(parsed.mutation_id, 7);
}

#[test]
fn origin_reject_unique_violation_hint() {
    let reject = DeltaRejectMsg {
        mutation_id: 42,
        reason: "UNIQUE(users.email)".into(),
        compensation: Some(CompensationHint::UniqueViolation {
            field: "email".into(),
            conflicting_value: "alice@example.com".into(),
        }),
    };

    let frame = SyncFrame::encode_or_empty(SyncMessageType::DeltaReject, &reject);
    let parsed: DeltaRejectMsg = SyncFrame::from_bytes(&frame.to_bytes())
        .unwrap()
        .decode_body()
        .unwrap();

    assert_eq!(parsed.mutation_id, 42);
    match parsed.compensation.unwrap() {
        CompensationHint::UniqueViolation {
            field,
            conflicting_value,
        } => {
            assert_eq!(field, "email");
            assert_eq!(conflicting_value, "alice@example.com");
        }
        other => panic!("expected UniqueViolation, got {other:?}"),
    }
}

#[test]
fn origin_reject_fk_missing_hint() {
    let reject = DeltaRejectMsg {
        mutation_id: 99,
        reason: "FK missing".into(),
        compensation: Some(CompensationHint::ForeignKeyMissing {
            referenced_id: "user-999".into(),
        }),
    };

    let frame = SyncFrame::encode_or_empty(SyncMessageType::DeltaReject, &reject);
    let parsed: DeltaRejectMsg = SyncFrame::from_bytes(&frame.to_bytes())
        .unwrap()
        .decode_body()
        .unwrap();

    assert!(matches!(
        parsed.compensation,
        Some(CompensationHint::ForeignKeyMissing { .. })
    ));
}

#[test]
fn origin_reject_permission_denied_no_leak() {
    let reject = DeltaRejectMsg {
        mutation_id: 5,
        reason: "security".into(),
        compensation: Some(CompensationHint::PermissionDenied),
    };

    let frame = SyncFrame::encode_or_empty(SyncMessageType::DeltaReject, &reject);
    let parsed: DeltaRejectMsg = SyncFrame::from_bytes(&frame.to_bytes())
        .unwrap()
        .decode_body()
        .unwrap();

    assert!(matches!(
        parsed.compensation,
        Some(CompensationHint::PermissionDenied)
    ));
    assert!(!parsed.reason.contains("rls_policy"));
}

#[test]
fn origin_reject_rate_limited_hint() {
    let reject = DeltaRejectMsg {
        mutation_id: 10,
        reason: "rate limited".into(),
        compensation: Some(CompensationHint::RateLimited {
            retry_after_ms: 5000,
        }),
    };

    let frame = SyncFrame::encode_or_empty(SyncMessageType::DeltaReject, &reject);
    let parsed: DeltaRejectMsg = SyncFrame::from_bytes(&frame.to_bytes())
        .unwrap()
        .decode_body()
        .unwrap();

    match parsed.compensation.unwrap() {
        CompensationHint::RateLimited { retry_after_ms } => assert_eq!(retry_after_ms, 5000),
        other => panic!("expected RateLimited, got {other:?}"),
    }
}

#[test]
fn lite_document_shape_subscribe() {
    let msg = ShapeSubscribeMsg {
        shape: ShapeDefinition {
            shape_id: "user-data".into(),
            tenant_id: 5,
            shape_type: ShapeType::Document {
                collection: "users".into(),
                predicate: Vec::new(),
            },
            description: "all users".into(),
            field_filter: vec![],
        },
    };

    let frame = SyncFrame::new_msgpack(SyncMessageType::ShapeSubscribe, &msg).unwrap();
    let parsed: ShapeSubscribeMsg = SyncFrame::from_bytes(&frame.to_bytes())
        .unwrap()
        .decode_body()
        .unwrap();

    assert_eq!(parsed.shape.shape_id, "user-data");
    assert!(parsed.shape.could_match("users", "u1"));
    assert!(!parsed.shape.could_match("orders", "o1"));
}

#[test]
fn lite_graph_shape_subscribe() {
    let msg = ShapeSubscribeMsg {
        shape: ShapeDefinition {
            shape_id: "alice-net".into(),
            tenant_id: 1,
            shape_type: ShapeType::Graph {
                root_nodes: vec!["alice".into()],
                max_depth: 2,
                edge_label: Some("KNOWS".into()),
            },
            description: "alice network".into(),
            field_filter: vec![],
        },
    };

    let frame = SyncFrame::new_msgpack(SyncMessageType::ShapeSubscribe, &msg).unwrap();
    let parsed: ShapeSubscribeMsg = SyncFrame::from_bytes(&frame.to_bytes())
        .unwrap()
        .decode_body()
        .unwrap();

    match &parsed.shape.shape_type {
        ShapeType::Graph {
            root_nodes,
            max_depth,
            edge_label,
        } => {
            assert_eq!(root_nodes, &["alice"]);
            assert_eq!(*max_depth, 2);
            assert_eq!(edge_label.as_deref(), Some("KNOWS"));
        }
        _ => panic!("expected Graph shape"),
    }
}

#[test]
fn lite_vector_shape_subscribe() {
    let msg = ShapeSubscribeMsg {
        shape: ShapeDefinition {
            shape_id: "emb-all".into(),
            tenant_id: 1,
            shape_type: ShapeType::Vector {
                collection: "embeddings".into(),
                field_name: None,
            },
            description: "all embeddings".into(),
            field_filter: vec![],
        },
    };

    let frame = SyncFrame::new_msgpack(SyncMessageType::ShapeSubscribe, &msg).unwrap();
    let parsed: ShapeSubscribeMsg = SyncFrame::from_bytes(&frame.to_bytes())
        .unwrap()
        .decode_body()
        .unwrap();

    assert!(parsed.shape.could_match("embeddings", "e1"));
}

#[test]
fn vector_clock_sync_roundtrip() {
    let msg = VectorClockSyncMsg {
        clocks: {
            let mut m = HashMap::new();
            m.insert("0000000000000001".into(), 100u64);
            m.insert("0000000000000002".into(), 200u64);
            m
        },
        sender_id: 1,
    };

    let frame = SyncFrame::new_msgpack(SyncMessageType::VectorClockSync, &msg).unwrap();
    let parsed: VectorClockSyncMsg = SyncFrame::from_bytes(&frame.to_bytes())
        .unwrap()
        .decode_body()
        .unwrap();

    assert_eq!(parsed.clocks.len(), 2);
    assert_eq!(*parsed.clocks.get("0000000000000001").unwrap(), 100);
}

#[test]
fn ping_pong_roundtrip() {
    let ping = PingPongMsg {
        timestamp_ms: 1711000000000,
        is_pong: false,
    };

    let frame = SyncFrame::new_msgpack(SyncMessageType::PingPong, &ping).unwrap();
    let parsed: PingPongMsg = SyncFrame::from_bytes(&frame.to_bytes())
        .unwrap()
        .decode_body()
        .unwrap();

    assert_eq!(parsed.timestamp_ms, 1711000000000);
    assert!(!parsed.is_pong);
}

#[test]
fn all_11_message_types_valid() {
    for (code, expected) in [
        (0x01u8, SyncMessageType::Handshake),
        (0x02, SyncMessageType::HandshakeAck),
        (0x10, SyncMessageType::DeltaPush),
        (0x11, SyncMessageType::DeltaAck),
        (0x12, SyncMessageType::DeltaReject),
        (0x20, SyncMessageType::ShapeSubscribe),
        (0x21, SyncMessageType::ShapeSnapshot),
        (0x22, SyncMessageType::ShapeDelta),
        (0x23, SyncMessageType::ShapeUnsubscribe),
        (0x30, SyncMessageType::VectorClockSync),
        (0xFF, SyncMessageType::PingPong),
    ] {
        let parsed = SyncMessageType::from_u8(code).unwrap();
        assert_eq!(parsed, expected);
        assert_eq!(parsed as u8, code);
    }
    assert!(SyncMessageType::from_u8(0x99).is_none());
}