Skip to main content

syncular_testkit/
native.rs

1use std::time::{Duration, Instant};
2
3use serde_json::{json, Value};
4use syncular_runtime::app_schema::{AppSchema, AppTableMetadata, EmbeddedMigration};
5use syncular_runtime::error::{ErrorKind, Result, SyncularError};
6use syncular_runtime::fixtures::todo;
7use syncular_runtime::native::{
8    NativeClientConfig, NativeClientOptions, NativeDiagnostic, NativeEvent, NativeEventKind,
9    NativeEventSubscription, NativeSyncularClient,
10};
11
12use crate::temp::TempDbPath;
13
14#[derive(Debug, Clone)]
15pub struct NativeFixtureOptions {
16    pub db_prefix: String,
17    pub base_url: String,
18    pub client_id: String,
19    pub actor_id: String,
20    pub project_id: Option<String>,
21    pub client_options: NativeClientOptions,
22}
23
24impl Default for NativeFixtureOptions {
25    fn default() -> Self {
26        Self {
27            db_prefix: "syncular-native-test".to_string(),
28            base_url: "http://127.0.0.1:9/sync".to_string(),
29            client_id: "native-test-client".to_string(),
30            actor_id: "user-rust".to_string(),
31            project_id: Some("p0".to_string()),
32            client_options: NativeClientOptions {
33                auto_sync_local_writes: false,
34            },
35        }
36    }
37}
38
39pub struct NativeFixture {
40    db: TempDbPath,
41    pub client: NativeSyncularClient,
42    pub events: NativeEventSubscription,
43}
44
45impl NativeFixture {
46    pub fn db_path(&self) -> String {
47        self.db.to_string_lossy()
48    }
49
50    pub fn close(mut self) -> Result<()> {
51        self.client.close()
52    }
53}
54
55pub fn open_native_client_with_schema(app_schema: AppSchema) -> Result<NativeFixture> {
56    open_native_client_with_schema_options(app_schema, NativeFixtureOptions::default())
57}
58
59pub fn open_native_client_with_schema_options(
60    app_schema: AppSchema,
61    options: NativeFixtureOptions,
62) -> Result<NativeFixture> {
63    let db = TempDbPath::new(&options.db_prefix);
64    let config = native_config_for_db(&db, &options, None);
65    let client = NativeSyncularClient::open_with_options_and_schema(
66        config.into(),
67        options.client_options,
68        app_schema,
69    )?;
70    let events = client.subscribe_events(256);
71    Ok(NativeFixture { db, client, events })
72}
73
74pub fn open_native_client_with_schema_json(schema_json: String) -> Result<NativeFixture> {
75    open_native_client_with_schema_json_options(schema_json, NativeFixtureOptions::default())
76}
77
78pub fn open_native_client_with_schema_json_options(
79    schema_json: String,
80    options: NativeFixtureOptions,
81) -> Result<NativeFixture> {
82    let db = TempDbPath::new(&options.db_prefix);
83    let config = native_config_for_db(&db, &options, Some(schema_json));
84    let client = NativeSyncularClient::open_native_with_options(config, options.client_options)?;
85    let events = client.subscribe_events(256);
86    Ok(NativeFixture { db, client, events })
87}
88
89pub fn native_config_for_db(
90    db: &TempDbPath,
91    options: &NativeFixtureOptions,
92    app_schema_json: Option<String>,
93) -> NativeClientConfig {
94    NativeClientConfig {
95        db_path: db.to_string_lossy(),
96        base_url: options.base_url.clone(),
97        client_id: options.client_id.clone(),
98        actor_id: options.actor_id.clone(),
99        project_id: options.project_id.clone(),
100        app_schema_json,
101    }
102}
103
104pub fn app_schema_json(app_schema: AppSchema) -> String {
105    json!({
106        "schemaVersion": app_schema.current_schema_version(),
107        "tables": app_schema
108            .app_table_metadata
109            .iter()
110            .map(app_table_metadata_json)
111            .collect::<Vec<_>>(),
112        "migrations": app_schema
113            .migrations
114            .iter()
115            .map(embedded_migration_json)
116            .collect::<Vec<_>>()
117    })
118    .to_string()
119}
120
121pub fn todo_app_schema_json() -> String {
122    app_schema_json(todo::app_schema())
123}
124
125pub fn wait_native_event(
126    events: &NativeEventSubscription,
127    kind: NativeEventKind,
128    timeout: Duration,
129) -> NativeEvent {
130    wait_native_event_matching(events, timeout, |event| event.kind == kind)
131        .unwrap_or_else(|| panic!("timed out waiting for native event {kind:?}"))
132}
133
134pub fn wait_native_event_matching(
135    events: &NativeEventSubscription,
136    timeout: Duration,
137    mut predicate: impl FnMut(&NativeEvent) -> bool,
138) -> Option<NativeEvent> {
139    let deadline = Instant::now() + timeout;
140    loop {
141        let now = Instant::now();
142        if now >= deadline {
143            return None;
144        }
145        let remaining = deadline.saturating_duration_since(now);
146        let event = events.next_event_timeout(remaining)?;
147        if predicate(&event) {
148            return Some(event);
149        }
150    }
151}
152
153pub fn drain_native_events(
154    subscription: &NativeEventSubscription,
155    timeout: Duration,
156) -> Vec<NativeEvent> {
157    let mut events = Vec::new();
158    while let Some(event) = subscription.next_event_timeout(timeout) {
159        events.push(event);
160    }
161    events
162}
163
164pub fn assert_native_event_kind(event: &NativeEvent, expected: NativeEventKind) {
165    assert_eq!(event.kind, expected, "unexpected native event: {event:?}");
166}
167
168pub fn assert_native_rows_changed(event: &NativeEvent, expected_tables: &[&str]) {
169    assert_native_event_kind(event, NativeEventKind::RowsChanged);
170    let expected = expected_tables
171        .iter()
172        .map(|table| table.to_string())
173        .collect::<Vec<_>>();
174    assert_eq!(event.tables, expected, "unexpected changed tables");
175}
176
177pub fn assert_native_table_row_count(
178    client: &mut NativeSyncularClient,
179    table: &str,
180    expected: usize,
181) -> Vec<Value> {
182    let rows_json = client.list_table_json(table).expect("native table rows");
183    let rows: Vec<Value> = serde_json::from_str(&rows_json).expect("native table rows json");
184    assert_eq!(
185        rows.len(),
186        expected,
187        "unexpected native row count for {table}"
188    );
189    rows
190}
191
192pub fn todo_task_upsert_operation_json(task_id: &str, title: &str) -> String {
193    json!({
194        "table": "tasks",
195        "row_id": task_id,
196        "op": "upsert",
197        "payload": {
198            "title": title,
199            "completed": 0,
200            "user_id": "user-rust",
201            "project_id": "p0"
202        },
203        "base_version": 0
204    })
205    .to_string()
206}
207
208pub fn apply_native_todo_task_upsert(
209    client: &mut NativeSyncularClient,
210    task_id: &str,
211    title: &str,
212) -> Result<String> {
213    client.apply_mutation_json(&todo_task_upsert_operation_json(task_id, title), None)
214}
215
216pub fn assert_native_error_kind(event: &NativeEvent, expected: ErrorKind) {
217    assert_eq!(
218        event.error.as_ref().map(|error| error.kind),
219        Some(expected),
220        "unexpected native event error: {event:?}"
221    );
222}
223
224pub fn assert_native_error_code(event: &NativeEvent, expected: &str) {
225    assert_eq!(
226        event.error.as_ref().map(|error| error.code.as_str()),
227        Some(expected),
228        "unexpected native event error code: {event:?}"
229    );
230}
231
232pub fn assert_native_diagnostic_code<'a>(
233    event: &'a NativeEvent,
234    expected: &str,
235) -> &'a NativeDiagnostic {
236    let diagnostic = event
237        .diagnostic
238        .as_ref()
239        .unwrap_or_else(|| panic!("expected native diagnostic on event: {event:?}"));
240    assert_eq!(
241        diagnostic.code, expected,
242        "unexpected native diagnostic code on event: {event:?}"
243    );
244    diagnostic
245}
246
247pub fn assert_native_diagnostic_detail(event: &NativeEvent, key: &str, expected: Value) {
248    let diagnostic = event
249        .diagnostic
250        .as_ref()
251        .unwrap_or_else(|| panic!("expected native diagnostic on event: {event:?}"));
252    assert_eq!(
253        diagnostic.details.get(key),
254        Some(&expected),
255        "unexpected native diagnostic detail {key} on event: {event:?}"
256    );
257}
258
259pub fn parse_native_event_json(event_json: &str) -> Result<NativeEvent> {
260    serde_json::from_str(event_json).map_err(SyncularError::from)
261}
262
263fn app_table_metadata_json(table: &AppTableMetadata) -> Value {
264    json!({
265        "name": table.name,
266        "primaryKeyColumn": table.primary_key_column,
267        "serverVersionColumn": table.server_version_column,
268        "softDeleteColumn": table.soft_delete_column,
269        "subscriptionId": table.subscription_id,
270        "columns": table.columns.iter().map(|column| {
271            json!({
272                "name": column.name,
273                "typeFamily": column.type_family,
274                "notnullRequired": column.notnull_required,
275                "primaryKey": column.primary_key
276            })
277        }).collect::<Vec<_>>(),
278        "blobColumns": table.blob_columns,
279        "crdtYjsFields": table.crdt_yjs_fields.iter().map(|field| {
280            json!({
281                "field": field.field,
282                "stateColumn": field.state_column,
283                "containerKey": field.container_key,
284                "rowIdField": field.row_id_field,
285                "kind": field.kind,
286                "syncMode": field.sync_mode
287            })
288        }).collect::<Vec<_>>(),
289        "encryptedFields": table.encrypted_fields.iter().map(|field| {
290            json!({
291                "field": field.field,
292                "scope": field.scope,
293                "rowIdField": field.row_id_field
294            })
295        }).collect::<Vec<_>>(),
296        "scopes": table.scopes.iter().map(|scope| {
297            json!({
298                "name": scope.name,
299                "column": scope.column,
300                "source": scope.source,
301                "required": scope.required
302            })
303        }).collect::<Vec<_>>()
304    })
305}
306
307fn embedded_migration_json(migration: &EmbeddedMigration) -> Value {
308    json!({
309        "version": migration.version,
310        "schemaVersion": migration.schema_version,
311        "name": migration.name,
312        "upSql": migration.up_sql
313    })
314}