syncular-testkit 0.1.0

Rust-first test fixtures, in-memory app server, and assertions for Syncular apps.
Documentation
use serde_json::{Map, Value};
use syncular_runtime::protocol::{
    snapshot_manifest_digest, BootstrapState, CombinedRequest, CombinedResponse, OperationResult,
    PullResponse, PushBatchResponse, PushCommitResponse, ScopeValues, SnapshotChunkRef,
    SnapshotManifest, SnapshotManifestChunkRef, SubscriptionResponse, SyncChange, SyncCommit,
    SyncSnapshot,
};

pub fn scope_values(items: impl IntoIterator<Item = (impl Into<String>, Value)>) -> ScopeValues {
    items
        .into_iter()
        .map(|(key, value)| (key.into(), value))
        .collect::<Map<_, _>>()
}

pub fn actor_project_scopes(actor_id: &str, project_id: Option<&str>) -> ScopeValues {
    let mut scopes = ScopeValues::new();
    scopes.insert("user_id".to_string(), Value::String(actor_id.to_string()));
    if let Some(project_id) = project_id {
        scopes.insert(
            "project_id".to_string(),
            Value::String(project_id.to_string()),
        );
    }
    scopes
}

pub fn schema_required_response(required_schema_version: i32) -> CombinedResponse {
    CombinedResponse {
        ok: true,
        required_schema_version: Some(required_schema_version),
        latest_schema_version: Some(required_schema_version),
        push: None,
        pull: None,
    }
}

pub fn schema_latest_response(latest_schema_version: i32) -> CombinedResponse {
    CombinedResponse {
        ok: true,
        required_schema_version: None,
        latest_schema_version: Some(latest_schema_version),
        push: None,
        pull: None,
    }
}

pub fn combined_not_ok_response() -> CombinedResponse {
    CombinedResponse {
        ok: false,
        required_schema_version: None,
        latest_schema_version: None,
        push: None,
        pull: None,
    }
}

pub fn push_not_ok_response(request: &CombinedRequest) -> CombinedResponse {
    CombinedResponse {
        ok: true,
        required_schema_version: None,
        latest_schema_version: None,
        push: request.push.as_ref().map(|_| PushBatchResponse {
            ok: false,
            commits: Vec::new(),
        }),
        pull: None,
    }
}

pub fn pull_not_ok_response() -> CombinedResponse {
    CombinedResponse {
        ok: true,
        required_schema_version: None,
        latest_schema_version: None,
        push: None,
        pull: Some(PullResponse {
            ok: false,
            subscriptions: Vec::new(),
        }),
    }
}

pub fn snapshot_combined_response(
    subscription_id: &str,
    table: &str,
    rows: Vec<Value>,
    scopes: ScopeValues,
    next_cursor: i64,
) -> CombinedResponse {
    CombinedResponse {
        ok: true,
        required_schema_version: None,
        latest_schema_version: None,
        push: None,
        pull: Some(PullResponse {
            ok: true,
            subscriptions: vec![snapshot_subscription_response(
                subscription_id,
                table,
                rows,
                scopes,
                next_cursor,
            )],
        }),
    }
}

pub fn snapshot_page_combined_response(
    subscription_id: &str,
    table: &str,
    rows: Vec<Value>,
    scopes: ScopeValues,
    next_cursor: i64,
    is_first_page: bool,
    is_last_page: bool,
    bootstrap_state: Option<BootstrapState>,
) -> CombinedResponse {
    CombinedResponse {
        ok: true,
        required_schema_version: None,
        latest_schema_version: None,
        push: None,
        pull: Some(PullResponse {
            ok: true,
            subscriptions: vec![SubscriptionResponse {
                id: subscription_id.to_string(),
                status: "active".to_string(),
                scopes,
                bootstrap: true,
                bootstrap_state,
                next_cursor,
                integrity: None,
                commits: Vec::new(),
                snapshots: Some(vec![SyncSnapshot {
                    table: table.to_string(),
                    rows,
                    chunks: None,
                    artifacts: None,
                    manifest: None,
                    is_first_page,
                    is_last_page,
                    bootstrap_state_after: None,
                }]),
            }],
        }),
    }
}

pub fn snapshot_chunks_combined_response(
    subscription_id: &str,
    table: &str,
    chunks: Vec<SnapshotChunkRef>,
    scopes: ScopeValues,
    next_cursor: i64,
) -> CombinedResponse {
    let manifest = snapshot_manifest_for_chunks(table, next_cursor, &chunks);
    CombinedResponse {
        ok: true,
        required_schema_version: None,
        latest_schema_version: None,
        push: None,
        pull: Some(PullResponse {
            ok: true,
            subscriptions: vec![SubscriptionResponse {
                id: subscription_id.to_string(),
                status: "active".to_string(),
                scopes,
                bootstrap: true,
                bootstrap_state: None,
                next_cursor,
                integrity: None,
                commits: Vec::new(),
                snapshots: Some(vec![SyncSnapshot {
                    table: table.to_string(),
                    rows: Vec::new(),
                    chunks: Some(chunks),
                    artifacts: None,
                    manifest: Some(manifest),
                    is_first_page: true,
                    is_last_page: true,
                    bootstrap_state_after: None,
                }]),
            }],
        }),
    }
}

fn snapshot_manifest_for_chunks(
    table: &str,
    as_of_commit_seq: i64,
    chunks: &[SnapshotChunkRef],
) -> SnapshotManifest {
    let mut manifest = SnapshotManifest {
        version: 1,
        digest: String::new(),
        table: table.to_string(),
        as_of_commit_seq,
        scope_digest: "0".repeat(64),
        row_cursor: None,
        row_limit: 1000,
        next_row_cursor: None,
        is_first_page: true,
        is_last_page: true,
        chunks: chunks
            .iter()
            .map(|chunk| SnapshotManifestChunkRef {
                id: chunk.id.clone(),
                byte_length: chunk.byte_length,
                sha256: chunk.sha256.clone(),
                encoding: chunk.encoding.clone(),
                compression: chunk.compression.clone(),
            })
            .collect(),
    };
    manifest.digest = snapshot_manifest_digest(&manifest).expect("test snapshot manifest digest");
    manifest
}

pub fn snapshot_subscription_response(
    subscription_id: &str,
    table: &str,
    rows: Vec<Value>,
    scopes: ScopeValues,
    next_cursor: i64,
) -> SubscriptionResponse {
    SubscriptionResponse {
        id: subscription_id.to_string(),
        status: "active".to_string(),
        scopes,
        bootstrap: true,
        bootstrap_state: None,
        next_cursor,
        integrity: None,
        commits: Vec::new(),
        snapshots: Some(vec![SyncSnapshot {
            table: table.to_string(),
            rows,
            chunks: None,
            artifacts: None,
            manifest: None,
            is_first_page: true,
            is_last_page: true,
            bootstrap_state_after: None,
        }]),
    }
}

pub fn revoked_subscription_response(
    subscription_id: &str,
    scopes: ScopeValues,
    next_cursor: i64,
) -> CombinedResponse {
    CombinedResponse {
        ok: true,
        required_schema_version: None,
        latest_schema_version: None,
        push: None,
        pull: Some(PullResponse {
            ok: true,
            subscriptions: vec![SubscriptionResponse {
                id: subscription_id.to_string(),
                status: "revoked".to_string(),
                scopes,
                bootstrap: false,
                bootstrap_state: None,
                next_cursor,
                integrity: None,
                commits: Vec::new(),
                snapshots: None,
            }],
        }),
    }
}

pub fn commit_combined_response(
    subscription_id: &str,
    scopes: ScopeValues,
    next_cursor: i64,
    commit_seq: i64,
    changes: Vec<SyncChange>,
) -> CombinedResponse {
    commits_combined_response(
        subscription_id,
        scopes,
        next_cursor,
        vec![SyncCommit {
            commit_seq,
            created_at: "2026-01-01T00:00:00.000Z".to_string(),
            actor_id: "test-server".to_string(),
            changes,
        }],
    )
}

pub fn commits_combined_response(
    subscription_id: &str,
    scopes: ScopeValues,
    next_cursor: i64,
    commits: Vec<SyncCommit>,
) -> CombinedResponse {
    CombinedResponse {
        ok: true,
        required_schema_version: None,
        latest_schema_version: None,
        push: None,
        pull: Some(PullResponse {
            ok: true,
            subscriptions: vec![SubscriptionResponse {
                id: subscription_id.to_string(),
                status: "active".to_string(),
                scopes,
                bootstrap: false,
                bootstrap_state: None,
                next_cursor,
                integrity: None,
                commits,
                snapshots: None,
            }],
        }),
    }
}

pub fn push_conflict_response(
    request: &CombinedRequest,
    message: &str,
    code: &str,
    server_row: Value,
    server_version: i64,
) -> CombinedResponse {
    CombinedResponse {
        ok: true,
        required_schema_version: None,
        latest_schema_version: None,
        push: request.push.as_ref().map(|push| PushBatchResponse {
            ok: true,
            commits: push
                .commits
                .iter()
                .map(|commit| PushCommitResponse {
                    client_commit_id: commit.client_commit_id.clone(),
                    status: "rejected".to_string(),
                    commit_seq: None,
                    results: vec![OperationResult {
                        op_index: 0,
                        status: "conflict".to_string(),
                        message: Some(message.to_string()),
                        error: None,
                        code: Some(code.to_string()),
                        retriable: Some(false),
                        server_version: Some(server_version),
                        server_row: Some(server_row.clone()),
                    }],
                })
                .collect(),
        }),
        pull: Some(PullResponse {
            ok: true,
            subscriptions: Vec::new(),
        }),
    }
}

pub fn upsert_change(table: &str, row_id: &str, row: Value, row_version: i64) -> SyncChange {
    SyncChange {
        table: table.to_string(),
        row_id: row_id.to_string(),
        op: "upsert".to_string(),
        row_json: Some(row),
        row_version: Some(row_version),
        scopes: ScopeValues::new(),
    }
}

pub fn delete_change(table: &str, row_id: &str, row_version: i64) -> SyncChange {
    SyncChange {
        table: table.to_string(),
        row_id: row_id.to_string(),
        op: "delete".to_string(),
        row_json: None,
        row_version: Some(row_version),
        scopes: ScopeValues::new(),
    }
}