1use std::time::{Duration, Instant};
2
3use serde_json::{json, Value};
4use syncular_runtime::app_schema::{AppSchema, AppTableMetadata, EmbeddedMigration};
5use syncular_runtime::error::{ErrorKind, Result, SyncularError};
6use syncular_runtime::fixtures::todo;
7use syncular_runtime::native::{
8 NativeClientConfig, NativeClientOptions, NativeDiagnostic, NativeEvent, NativeEventKind,
9 NativeEventSubscription, NativeSyncularClient,
10};
11
12use crate::temp::TempDbPath;
13
14#[derive(Debug, Clone)]
15pub struct NativeFixtureOptions {
16 pub db_prefix: String,
17 pub base_url: String,
18 pub client_id: String,
19 pub actor_id: String,
20 pub project_id: Option<String>,
21 pub client_options: NativeClientOptions,
22}
23
24impl Default for NativeFixtureOptions {
25 fn default() -> Self {
26 Self {
27 db_prefix: "syncular-native-test".to_string(),
28 base_url: "http://127.0.0.1:9/sync".to_string(),
29 client_id: "native-test-client".to_string(),
30 actor_id: "user-rust".to_string(),
31 project_id: Some("p0".to_string()),
32 client_options: NativeClientOptions {
33 auto_sync_local_writes: false,
34 },
35 }
36 }
37}
38
39pub struct NativeFixture {
40 db: TempDbPath,
41 pub client: NativeSyncularClient,
42 pub events: NativeEventSubscription,
43}
44
45impl NativeFixture {
46 pub fn db_path(&self) -> String {
47 self.db.to_string_lossy()
48 }
49
50 pub fn close(mut self) -> Result<()> {
51 self.client.close()
52 }
53}
54
55pub fn open_native_client_with_schema(app_schema: AppSchema) -> Result<NativeFixture> {
56 open_native_client_with_schema_options(app_schema, NativeFixtureOptions::default())
57}
58
59pub fn open_native_client_with_schema_options(
60 app_schema: AppSchema,
61 options: NativeFixtureOptions,
62) -> Result<NativeFixture> {
63 let db = TempDbPath::new(&options.db_prefix);
64 let config = native_config_for_db(&db, &options, None);
65 let client = NativeSyncularClient::open_with_options_and_schema(
66 config.into(),
67 options.client_options,
68 app_schema,
69 )?;
70 let events = client.subscribe_events(256);
71 Ok(NativeFixture { db, client, events })
72}
73
74pub fn open_native_client_with_schema_json(schema_json: String) -> Result<NativeFixture> {
75 open_native_client_with_schema_json_options(schema_json, NativeFixtureOptions::default())
76}
77
78pub fn open_native_client_with_schema_json_options(
79 schema_json: String,
80 options: NativeFixtureOptions,
81) -> Result<NativeFixture> {
82 let db = TempDbPath::new(&options.db_prefix);
83 let config = native_config_for_db(&db, &options, Some(schema_json));
84 let client = NativeSyncularClient::open_native_with_options(config, options.client_options)?;
85 let events = client.subscribe_events(256);
86 Ok(NativeFixture { db, client, events })
87}
88
89pub fn native_config_for_db(
90 db: &TempDbPath,
91 options: &NativeFixtureOptions,
92 app_schema_json: Option<String>,
93) -> NativeClientConfig {
94 NativeClientConfig {
95 db_path: db.to_string_lossy(),
96 base_url: options.base_url.clone(),
97 client_id: options.client_id.clone(),
98 actor_id: options.actor_id.clone(),
99 project_id: options.project_id.clone(),
100 app_schema_json,
101 }
102}
103
104pub fn app_schema_json(app_schema: AppSchema) -> String {
105 json!({
106 "schemaVersion": app_schema.current_schema_version(),
107 "tables": app_schema
108 .app_table_metadata
109 .iter()
110 .map(app_table_metadata_json)
111 .collect::<Vec<_>>(),
112 "migrations": app_schema
113 .migrations
114 .iter()
115 .map(embedded_migration_json)
116 .collect::<Vec<_>>()
117 })
118 .to_string()
119}
120
121pub fn todo_app_schema_json() -> String {
122 app_schema_json(todo::app_schema())
123}
124
125pub fn wait_native_event(
126 events: &NativeEventSubscription,
127 kind: NativeEventKind,
128 timeout: Duration,
129) -> NativeEvent {
130 wait_native_event_matching(events, timeout, |event| event.kind == kind)
131 .unwrap_or_else(|| panic!("timed out waiting for native event {kind:?}"))
132}
133
134pub fn wait_native_event_matching(
135 events: &NativeEventSubscription,
136 timeout: Duration,
137 mut predicate: impl FnMut(&NativeEvent) -> bool,
138) -> Option<NativeEvent> {
139 let deadline = Instant::now() + timeout;
140 loop {
141 let now = Instant::now();
142 if now >= deadline {
143 return None;
144 }
145 let remaining = deadline.saturating_duration_since(now);
146 let event = events.next_event_timeout(remaining)?;
147 if predicate(&event) {
148 return Some(event);
149 }
150 }
151}
152
153pub fn drain_native_events(
154 subscription: &NativeEventSubscription,
155 timeout: Duration,
156) -> Vec<NativeEvent> {
157 let mut events = Vec::new();
158 while let Some(event) = subscription.next_event_timeout(timeout) {
159 events.push(event);
160 }
161 events
162}
163
164pub fn assert_native_event_kind(event: &NativeEvent, expected: NativeEventKind) {
165 assert_eq!(event.kind, expected, "unexpected native event: {event:?}");
166}
167
168pub fn assert_native_rows_changed(event: &NativeEvent, expected_tables: &[&str]) {
169 assert_native_event_kind(event, NativeEventKind::RowsChanged);
170 let expected = expected_tables
171 .iter()
172 .map(|table| table.to_string())
173 .collect::<Vec<_>>();
174 assert_eq!(event.tables, expected, "unexpected changed tables");
175}
176
177pub fn assert_native_table_row_count(
178 client: &mut NativeSyncularClient,
179 table: &str,
180 expected: usize,
181) -> Vec<Value> {
182 let rows_json = client.list_table_json(table).expect("native table rows");
183 let rows: Vec<Value> = serde_json::from_str(&rows_json).expect("native table rows json");
184 assert_eq!(
185 rows.len(),
186 expected,
187 "unexpected native row count for {table}"
188 );
189 rows
190}
191
192pub fn todo_task_upsert_operation_json(task_id: &str, title: &str) -> String {
193 json!({
194 "table": "tasks",
195 "row_id": task_id,
196 "op": "upsert",
197 "payload": {
198 "title": title,
199 "completed": 0,
200 "user_id": "user-rust",
201 "project_id": "p0"
202 },
203 "base_version": 0
204 })
205 .to_string()
206}
207
208pub fn apply_native_todo_task_upsert(
209 client: &mut NativeSyncularClient,
210 task_id: &str,
211 title: &str,
212) -> Result<String> {
213 client.apply_mutation_json(&todo_task_upsert_operation_json(task_id, title), None)
214}
215
216pub fn assert_native_error_kind(event: &NativeEvent, expected: ErrorKind) {
217 assert_eq!(
218 event.error.as_ref().map(|error| error.kind),
219 Some(expected),
220 "unexpected native event error: {event:?}"
221 );
222}
223
224pub fn assert_native_error_code(event: &NativeEvent, expected: &str) {
225 assert_eq!(
226 event.error.as_ref().map(|error| error.code.as_str()),
227 Some(expected),
228 "unexpected native event error code: {event:?}"
229 );
230}
231
232pub fn assert_native_diagnostic_code<'a>(
233 event: &'a NativeEvent,
234 expected: &str,
235) -> &'a NativeDiagnostic {
236 let diagnostic = event
237 .diagnostic
238 .as_ref()
239 .unwrap_or_else(|| panic!("expected native diagnostic on event: {event:?}"));
240 assert_eq!(
241 diagnostic.code, expected,
242 "unexpected native diagnostic code on event: {event:?}"
243 );
244 diagnostic
245}
246
247pub fn assert_native_diagnostic_detail(event: &NativeEvent, key: &str, expected: Value) {
248 let diagnostic = event
249 .diagnostic
250 .as_ref()
251 .unwrap_or_else(|| panic!("expected native diagnostic on event: {event:?}"));
252 assert_eq!(
253 diagnostic.details.get(key),
254 Some(&expected),
255 "unexpected native diagnostic detail {key} on event: {event:?}"
256 );
257}
258
259pub fn parse_native_event_json(event_json: &str) -> Result<NativeEvent> {
260 serde_json::from_str(event_json).map_err(SyncularError::from)
261}
262
263fn app_table_metadata_json(table: &AppTableMetadata) -> Value {
264 json!({
265 "name": table.name,
266 "primaryKeyColumn": table.primary_key_column,
267 "serverVersionColumn": table.server_version_column,
268 "softDeleteColumn": table.soft_delete_column,
269 "subscriptionId": table.subscription_id,
270 "columns": table.columns.iter().map(|column| {
271 json!({
272 "name": column.name,
273 "typeFamily": column.type_family,
274 "notnullRequired": column.notnull_required,
275 "primaryKey": column.primary_key
276 })
277 }).collect::<Vec<_>>(),
278 "blobColumns": table.blob_columns,
279 "crdtYjsFields": table.crdt_yjs_fields.iter().map(|field| {
280 json!({
281 "field": field.field,
282 "stateColumn": field.state_column,
283 "containerKey": field.container_key,
284 "rowIdField": field.row_id_field,
285 "kind": field.kind,
286 "syncMode": field.sync_mode
287 })
288 }).collect::<Vec<_>>(),
289 "encryptedFields": table.encrypted_fields.iter().map(|field| {
290 json!({
291 "field": field.field,
292 "scope": field.scope,
293 "rowIdField": field.row_id_field
294 })
295 }).collect::<Vec<_>>(),
296 "scopes": table.scopes.iter().map(|scope| {
297 json!({
298 "name": scope.name,
299 "column": scope.column,
300 "source": scope.source,
301 "required": scope.required
302 })
303 }).collect::<Vec<_>>()
304 })
305}
306
307fn embedded_migration_json(migration: &EmbeddedMigration) -> Value {
308 json!({
309 "version": migration.version,
310 "schemaVersion": migration.schema_version,
311 "name": migration.name,
312 "upSql": migration.up_sql
313 })
314}