syncular-testkit 0.1.0

Rust-first test fixtures, in-memory app server, and assertions for Syncular apps.
Documentation
use std::time::{Duration, Instant};

use serde_json::{json, Value};
use syncular_runtime::app_schema::{AppSchema, AppTableMetadata, EmbeddedMigration};
use syncular_runtime::error::{ErrorKind, Result, SyncularError};
use syncular_runtime::fixtures::todo;
use syncular_runtime::native::{
    NativeClientConfig, NativeClientOptions, NativeDiagnostic, NativeEvent, NativeEventKind,
    NativeEventSubscription, NativeSyncularClient,
};

use crate::temp::TempDbPath;

#[derive(Debug, Clone)]
pub struct NativeFixtureOptions {
    pub db_prefix: String,
    pub base_url: String,
    pub client_id: String,
    pub actor_id: String,
    pub project_id: Option<String>,
    pub client_options: NativeClientOptions,
}

impl Default for NativeFixtureOptions {
    fn default() -> Self {
        Self {
            db_prefix: "syncular-native-test".to_string(),
            base_url: "http://127.0.0.1:9/sync".to_string(),
            client_id: "native-test-client".to_string(),
            actor_id: "user-rust".to_string(),
            project_id: Some("p0".to_string()),
            client_options: NativeClientOptions {
                auto_sync_local_writes: false,
            },
        }
    }
}

pub struct NativeFixture {
    db: TempDbPath,
    pub client: NativeSyncularClient,
    pub events: NativeEventSubscription,
}

impl NativeFixture {
    pub fn db_path(&self) -> String {
        self.db.to_string_lossy()
    }

    pub fn close(mut self) -> Result<()> {
        self.client.close()
    }
}

pub fn open_native_client_with_schema(app_schema: AppSchema) -> Result<NativeFixture> {
    open_native_client_with_schema_options(app_schema, NativeFixtureOptions::default())
}

pub fn open_native_client_with_schema_options(
    app_schema: AppSchema,
    options: NativeFixtureOptions,
) -> Result<NativeFixture> {
    let db = TempDbPath::new(&options.db_prefix);
    let config = native_config_for_db(&db, &options, None);
    let client = NativeSyncularClient::open_with_options_and_schema(
        config.into(),
        options.client_options,
        app_schema,
    )?;
    let events = client.subscribe_events(256);
    Ok(NativeFixture { db, client, events })
}

pub fn open_native_client_with_schema_json(schema_json: String) -> Result<NativeFixture> {
    open_native_client_with_schema_json_options(schema_json, NativeFixtureOptions::default())
}

pub fn open_native_client_with_schema_json_options(
    schema_json: String,
    options: NativeFixtureOptions,
) -> Result<NativeFixture> {
    let db = TempDbPath::new(&options.db_prefix);
    let config = native_config_for_db(&db, &options, Some(schema_json));
    let client = NativeSyncularClient::open_native_with_options(config, options.client_options)?;
    let events = client.subscribe_events(256);
    Ok(NativeFixture { db, client, events })
}

pub fn native_config_for_db(
    db: &TempDbPath,
    options: &NativeFixtureOptions,
    app_schema_json: Option<String>,
) -> NativeClientConfig {
    NativeClientConfig {
        db_path: db.to_string_lossy(),
        base_url: options.base_url.clone(),
        client_id: options.client_id.clone(),
        actor_id: options.actor_id.clone(),
        project_id: options.project_id.clone(),
        app_schema_json,
    }
}

pub fn app_schema_json(app_schema: AppSchema) -> String {
    json!({
        "schemaVersion": app_schema.current_schema_version(),
        "tables": app_schema
            .app_table_metadata
            .iter()
            .map(app_table_metadata_json)
            .collect::<Vec<_>>(),
        "migrations": app_schema
            .migrations
            .iter()
            .map(embedded_migration_json)
            .collect::<Vec<_>>()
    })
    .to_string()
}

pub fn todo_app_schema_json() -> String {
    app_schema_json(todo::app_schema())
}

pub fn wait_native_event(
    events: &NativeEventSubscription,
    kind: NativeEventKind,
    timeout: Duration,
) -> NativeEvent {
    wait_native_event_matching(events, timeout, |event| event.kind == kind)
        .unwrap_or_else(|| panic!("timed out waiting for native event {kind:?}"))
}

pub fn wait_native_event_matching(
    events: &NativeEventSubscription,
    timeout: Duration,
    mut predicate: impl FnMut(&NativeEvent) -> bool,
) -> Option<NativeEvent> {
    let deadline = Instant::now() + timeout;
    loop {
        let now = Instant::now();
        if now >= deadline {
            return None;
        }
        let remaining = deadline.saturating_duration_since(now);
        let event = events.next_event_timeout(remaining)?;
        if predicate(&event) {
            return Some(event);
        }
    }
}

pub fn drain_native_events(
    subscription: &NativeEventSubscription,
    timeout: Duration,
) -> Vec<NativeEvent> {
    let mut events = Vec::new();
    while let Some(event) = subscription.next_event_timeout(timeout) {
        events.push(event);
    }
    events
}

pub fn assert_native_event_kind(event: &NativeEvent, expected: NativeEventKind) {
    assert_eq!(event.kind, expected, "unexpected native event: {event:?}");
}

pub fn assert_native_rows_changed(event: &NativeEvent, expected_tables: &[&str]) {
    assert_native_event_kind(event, NativeEventKind::RowsChanged);
    let expected = expected_tables
        .iter()
        .map(|table| table.to_string())
        .collect::<Vec<_>>();
    assert_eq!(event.tables, expected, "unexpected changed tables");
}

pub fn assert_native_table_row_count(
    client: &mut NativeSyncularClient,
    table: &str,
    expected: usize,
) -> Vec<Value> {
    let rows_json = client.list_table_json(table).expect("native table rows");
    let rows: Vec<Value> = serde_json::from_str(&rows_json).expect("native table rows json");
    assert_eq!(
        rows.len(),
        expected,
        "unexpected native row count for {table}"
    );
    rows
}

pub fn todo_task_upsert_operation_json(task_id: &str, title: &str) -> String {
    json!({
        "table": "tasks",
        "row_id": task_id,
        "op": "upsert",
        "payload": {
            "title": title,
            "completed": 0,
            "user_id": "user-rust",
            "project_id": "p0"
        },
        "base_version": 0
    })
    .to_string()
}

pub fn apply_native_todo_task_upsert(
    client: &mut NativeSyncularClient,
    task_id: &str,
    title: &str,
) -> Result<String> {
    client.apply_mutation_json(&todo_task_upsert_operation_json(task_id, title), None)
}

pub fn assert_native_error_kind(event: &NativeEvent, expected: ErrorKind) {
    assert_eq!(
        event.error.as_ref().map(|error| error.kind),
        Some(expected),
        "unexpected native event error: {event:?}"
    );
}

pub fn assert_native_error_code(event: &NativeEvent, expected: &str) {
    assert_eq!(
        event.error.as_ref().map(|error| error.code.as_str()),
        Some(expected),
        "unexpected native event error code: {event:?}"
    );
}

pub fn assert_native_diagnostic_code<'a>(
    event: &'a NativeEvent,
    expected: &str,
) -> &'a NativeDiagnostic {
    let diagnostic = event
        .diagnostic
        .as_ref()
        .unwrap_or_else(|| panic!("expected native diagnostic on event: {event:?}"));
    assert_eq!(
        diagnostic.code, expected,
        "unexpected native diagnostic code on event: {event:?}"
    );
    diagnostic
}

pub fn assert_native_diagnostic_detail(event: &NativeEvent, key: &str, expected: Value) {
    let diagnostic = event
        .diagnostic
        .as_ref()
        .unwrap_or_else(|| panic!("expected native diagnostic on event: {event:?}"));
    assert_eq!(
        diagnostic.details.get(key),
        Some(&expected),
        "unexpected native diagnostic detail {key} on event: {event:?}"
    );
}

pub fn parse_native_event_json(event_json: &str) -> Result<NativeEvent> {
    serde_json::from_str(event_json).map_err(SyncularError::from)
}

fn app_table_metadata_json(table: &AppTableMetadata) -> Value {
    json!({
        "name": table.name,
        "primaryKeyColumn": table.primary_key_column,
        "serverVersionColumn": table.server_version_column,
        "softDeleteColumn": table.soft_delete_column,
        "subscriptionId": table.subscription_id,
        "columns": table.columns.iter().map(|column| {
            json!({
                "name": column.name,
                "typeFamily": column.type_family,
                "notnullRequired": column.notnull_required,
                "primaryKey": column.primary_key
            })
        }).collect::<Vec<_>>(),
        "blobColumns": table.blob_columns,
        "crdtYjsFields": table.crdt_yjs_fields.iter().map(|field| {
            json!({
                "field": field.field,
                "stateColumn": field.state_column,
                "containerKey": field.container_key,
                "rowIdField": field.row_id_field,
                "kind": field.kind,
                "syncMode": field.sync_mode
            })
        }).collect::<Vec<_>>(),
        "encryptedFields": table.encrypted_fields.iter().map(|field| {
            json!({
                "field": field.field,
                "scope": field.scope,
                "rowIdField": field.row_id_field
            })
        }).collect::<Vec<_>>(),
        "scopes": table.scopes.iter().map(|scope| {
            json!({
                "name": scope.name,
                "column": scope.column,
                "source": scope.source,
                "required": scope.required
            })
        }).collect::<Vec<_>>()
    })
}

fn embedded_migration_json(migration: &EmbeddedMigration) -> Value {
    json!({
        "version": migration.version,
        "schemaVersion": migration.schema_version,
        "name": migration.name,
        "upSql": migration.up_sql
    })
}