syncular-runtime 0.1.0

Shared Rust runtime for Syncular SQLite-backed native and browser clients.
Documentation
use serde_json::Value;
use syncular_runtime::binary_snapshot::{decode_binary_snapshot_table, BinarySnapshotColumnType};
use syncular_runtime::binary_sync_pack::{
    decode_binary_sync_pack, is_binary_sync_pack_content_type, SYNC_PACK_CONTENT_TYPE,
};
use syncular_runtime::protocol::{
    validate_pull_snapshot_manifests, CombinedRequest, CombinedResponse,
};

#[test]
fn decodes_json_combined_sync_fixture() {
    let fixture = json_combined_sync_fixture();
    assert_eq!(fixture["name"].as_str(), Some("json-combined-sync-v1"));

    let request: CombinedRequest =
        serde_json::from_value(fixture["request"].clone()).expect("fixture request");
    assert_eq!(request.client_id, "fixture-client-1");

    let push = request.push.expect("push request");
    assert_eq!(push.commits.len(), 1);
    assert_eq!(push.commits[0].client_commit_id, "fixture-commit-1");
    assert_eq!(push.commits[0].schema_version, 3);
    assert_eq!(push.commits[0].operations.len(), 2);
    assert_eq!(push.commits[0].operations[0].table, "tasks");
    assert_eq!(push.commits[0].operations[0].row_id, "task-local-1");
    assert_eq!(push.commits[0].operations[0].op, "upsert");
    assert_eq!(push.commits[0].operations[0].base_version, Some(5));
    assert_eq!(
        push.commits[0].operations[0]
            .payload
            .as_ref()
            .and_then(|payload| payload.get("title"))
            .and_then(Value::as_str),
        Some("Local edit")
    );
    assert_eq!(push.commits[0].operations[1].op, "delete");
    assert!(push.commits[0].operations[1].payload.is_none());

    let pull = request.pull.expect("pull request");
    assert_eq!(pull.limit_commits, 50);
    assert_eq!(pull.limit_snapshot_rows, 1000);
    assert_eq!(pull.max_snapshot_pages, 4);
    assert_eq!(pull.dedupe_rows, Some(true));
    assert_eq!(pull.subscriptions.len(), 1);
    assert_eq!(pull.subscriptions[0].id, "sub-tasks");
    assert_eq!(pull.subscriptions[0].table, "tasks");
    assert_eq!(pull.subscriptions[0].cursor, 41);
    assert_eq!(pull.subscriptions[0].crdt_state_vectors.len(), 1);
    assert_eq!(
        pull.subscriptions[0].crdt_state_vectors[0].state_column,
        "title_yjs_state"
    );
    assert_eq!(
        pull.subscriptions[0]
            .scopes
            .get("user_id")
            .and_then(Value::as_str),
        Some("user-1")
    );
    assert_eq!(
        pull.subscriptions[0]
            .bootstrap_state
            .as_ref()
            .map(|state| state.row_cursor.as_deref()),
        Some(Some("task-0"))
    );

    let response: CombinedResponse =
        serde_json::from_value(fixture["response"].clone()).expect("fixture response");
    assert!(response.ok);
    assert_eq!(response.required_schema_version, Some(3));
    assert_eq!(response.latest_schema_version, Some(4));

    let push = response.push.expect("push response");
    assert!(push.ok);
    assert_eq!(push.commits[0].client_commit_id, "fixture-commit-1");
    assert_eq!(push.commits[0].commit_seq, Some(42));
    assert_eq!(push.commits[0].results.len(), 2);
    assert_eq!(push.commits[0].results[1].status, "applied");

    let pull = response.pull.expect("pull response");
    assert!(pull.ok);
    let subscription = &pull.subscriptions[0];
    assert_eq!(subscription.id, "sub-tasks");
    assert_eq!(subscription.next_cursor, 42);
    assert_eq!(
        subscription
            .bootstrap_state
            .as_ref()
            .map(|state| state.row_cursor.as_deref()),
        Some(Some("task-1"))
    );
    assert_eq!(subscription.commits[0].changes[0].row_id, "task-remote-1");

    let snapshot = &subscription.snapshots.as_ref().expect("snapshots")[0];
    assert_eq!(snapshot.table, "tasks");
    assert!(snapshot.is_first_page);
    assert!(!snapshot.is_last_page);
    assert_eq!(
        snapshot
            .bootstrap_state_after
            .as_ref()
            .map(|state| state.row_cursor.as_deref()),
        Some(Some("task-snapshot-1"))
    );
}

#[test]
fn decodes_typescript_encoded_binary_sync_pack_fixture() {
    let fixture = binary_sync_pack_fixture();
    assert_eq!(
        fixture["contentType"].as_str(),
        Some(SYNC_PACK_CONTENT_TYPE)
    );
    assert!(is_binary_sync_pack_content_type(Some(
        "application/vnd.syncular.sync-pack.v1; charset=binary"
    )));

    let expected: CombinedResponse =
        serde_json::from_value(fixture["decodedResponse"].clone()).expect("fixture response");
    let encoded_hex = fixture["encodedHex"]
        .as_str()
        .expect("fixture encodedHex must be a string");
    let encoded = hex::decode(encoded_hex).expect("fixture encodedHex must be valid hex");
    assert_eq!(
        fixture["wireVersion"].as_u64(),
        Some(sync_pack_wire_version(&encoded) as u64)
    );
    let decoded = decode_binary_sync_pack(&encoded).expect("decode fixture");

    assert_eq!(decoded.ok, expected.ok);
    assert_eq!(decoded.required_schema_version, Some(2));
    assert_eq!(decoded.latest_schema_version, Some(3));

    let push = decoded.push.expect("push response");
    let expected_push = expected.push.expect("expected push response");
    assert!(push.ok);
    assert_eq!(push.commits.len(), expected_push.commits.len());
    assert_eq!(push.commits[0].client_commit_id, "fixture-local-1");
    assert_eq!(push.commits[0].status, "applied");
    assert_eq!(push.commits[0].commit_seq, Some(41));
    assert_eq!(push.commits[0].results[0].status, "applied");

    let conflict_commit = &push.commits[1];
    assert_eq!(conflict_commit.client_commit_id, "fixture-local-2");
    assert_eq!(conflict_commit.status, "rejected");
    assert_eq!(conflict_commit.results[0].status, "conflict");
    assert_eq!(
        conflict_commit.results[0].message.as_deref(),
        Some("server row changed")
    );
    assert_eq!(
        conflict_commit.results[0].code.as_deref(),
        Some("sync.version_conflict")
    );
    assert_eq!(conflict_commit.results[0].server_version, Some(7));
    assert_eq!(
        conflict_commit.results[0]
            .server_row
            .as_ref()
            .and_then(|row| row.get("title"))
            .and_then(Value::as_str),
        Some("Server")
    );

    let pull = decoded.pull.expect("pull response");
    let expected_pull = expected.pull.expect("expected pull response");
    assert!(pull.ok);
    assert_eq!(pull.subscriptions.len(), expected_pull.subscriptions.len());
    let subscription = &pull.subscriptions[0];
    assert_eq!(subscription.id, "sub-tasks");
    assert_eq!(subscription.status, "active");
    assert_eq!(subscription.next_cursor, 42);
    assert_eq!(
        subscription.scopes.get("user_id").and_then(Value::as_str),
        Some("user-1")
    );

    let commit = &subscription.commits[0];
    assert_eq!(commit.commit_seq, 42);
    assert_eq!(commit.actor_id, "user-2");
    let integrity = subscription
        .integrity
        .as_ref()
        .expect("subscription integrity");
    assert_eq!(
        integrity.previous_chain_root,
        "0000000000000000000000000000000000000000000000000000000000000000"
    );
    assert_eq!(
        integrity.commit_chain_root,
        "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
    );
    assert_eq!(integrity.commit_seq, 42);
    assert_eq!(commit.changes.len(), 1);
    assert_eq!(commit.changes[0].table, "tasks");
    assert_eq!(commit.changes[0].row_id, "task-1");
    assert_eq!(commit.changes[0].op, "upsert");
    assert_eq!(commit.changes[0].row_version, Some(42));
    assert_eq!(
        commit.changes[0]
            .row_json
            .as_ref()
            .and_then(|row| row.get("title"))
            .and_then(Value::as_str),
        Some("Remote")
    );

    let snapshots = subscription.snapshots.as_ref().expect("snapshots");
    let chunk = &snapshots[0].chunks.as_ref().expect("chunks")[0];
    let manifest = snapshots[0].manifest.as_ref().expect("manifest");
    assert_eq!(snapshots[0].table, "tasks");
    assert!(snapshots[0].is_first_page);
    assert!(snapshots[0].is_last_page);
    assert_eq!(manifest.version, 1);
    assert_eq!(manifest.table, "tasks");
    assert_eq!(manifest.chunks[0].id, chunk.id);
    assert_eq!(chunk.id, "chunk-1");
    assert_eq!(chunk.byte_length, 128);
    assert_eq!(chunk.encoding, "binary-table-v1");
    assert_eq!(chunk.compression, "gzip");
    validate_pull_snapshot_manifests(&pull).expect("snapshot manifest validation");
}

#[test]
fn decodes_typescript_encoded_binary_snapshot_table_fixture() {
    let fixture = binary_snapshot_table_fixture();
    assert_eq!(fixture["encoding"].as_str(), Some("binary-table-v1"));
    assert_eq!(fixture["wireVersion"].as_u64(), Some(1));

    let encoded_hex = fixture["encodedHex"]
        .as_str()
        .expect("fixture encodedHex must be a string");
    let encoded = hex::decode(encoded_hex).expect("fixture encodedHex must be valid hex");
    let decoded = decode_binary_snapshot_table(&encoded).expect("decode fixture");

    assert_eq!(decoded.table, "tasks");
    assert_eq!(decoded.columns.len(), 6);
    assert_eq!(decoded.columns[0].name, "id");
    assert_eq!(
        decoded.columns[0].column_type,
        BinarySnapshotColumnType::String
    );
    assert!(!decoded.columns[0].nullable);
    assert_eq!(decoded.columns[2].name, "server_version");
    assert_eq!(
        decoded.columns[2].column_type,
        BinarySnapshotColumnType::Integer
    );
    assert_eq!(decoded.columns[3].name, "score");
    assert_eq!(
        decoded.columns[3].column_type,
        BinarySnapshotColumnType::Float
    );
    assert_eq!(decoded.columns[4].name, "done");
    assert_eq!(
        decoded.columns[4].column_type,
        BinarySnapshotColumnType::Boolean
    );
    assert_eq!(decoded.columns[5].name, "metadata");
    assert_eq!(
        decoded.columns[5].column_type,
        BinarySnapshotColumnType::Json
    );
    assert!(decoded.columns[5].nullable);

    assert_eq!(decoded.rows.len(), 2);
    assert_eq!(
        decoded.rows[0].get("id").and_then(Value::as_str),
        Some("task-1")
    );
    assert_eq!(
        decoded.rows[0].get("title").and_then(Value::as_str),
        Some("Remote")
    );
    assert_eq!(
        decoded.rows[0]
            .get("server_version")
            .and_then(Value::as_i64),
        Some(42)
    );
    assert_eq!(
        decoded.rows[0].get("score").and_then(Value::as_f64),
        Some(1.5)
    );
    assert_eq!(
        decoded.rows[0].get("done").and_then(Value::as_bool),
        Some(false)
    );
    assert_eq!(
        decoded.rows[0]
            .get("metadata")
            .and_then(|metadata| metadata.get("priority"))
            .and_then(Value::as_str),
        Some("high")
    );
    assert_eq!(
        decoded.rows[1].get("id").and_then(Value::as_str),
        Some("task-2")
    );
    assert_eq!(
        decoded.rows[1].get("score").and_then(Value::as_f64),
        Some(-2.25)
    );
    assert_eq!(
        decoded.rows[1].get("done").and_then(Value::as_bool),
        Some(true)
    );
    assert!(decoded.rows[1].get("metadata").is_some_and(Value::is_null));
}

fn binary_sync_pack_fixture() -> Value {
    serde_json::from_str(include_str!(
        "fixtures/binary-sync-pack-v1-combined-response.json"
    ))
    .expect("binary sync pack fixture JSON")
}

fn sync_pack_wire_version(bytes: &[u8]) -> u16 {
    u16::from_le_bytes([bytes[4], bytes[5]])
}

fn json_combined_sync_fixture() -> Value {
    serde_json::from_str(include_str!("fixtures/json-combined-sync-v1.json"))
        .expect("JSON combined sync fixture")
}

fn binary_snapshot_table_fixture() -> Value {
    serde_json::from_str(include_str!("fixtures/binary-snapshot-table-v1-tasks.json"))
        .expect("binary snapshot table fixture JSON")
}