Skip to main content

syncular_testkit/
app_server.rs

1use std::collections::{BTreeMap, BTreeSet, VecDeque};
2use std::sync::{Arc, Mutex};
3use std::thread;
4use std::time::{Duration, Instant};
5
6use serde_json::{Map, Value};
7use syncular_runtime::app_schema::{AppSchema, AppTableMetadata};
8use syncular_runtime::binary_snapshot::SnapshotChunkRows;
9use syncular_runtime::crdt_yjs::{transform_operation_payload_for_metadata, YJS_PAYLOAD_KEY};
10use syncular_runtime::error::{ErrorKind, Result, SyncularError};
11use syncular_runtime::protocol::{
12    BlobRef, CombinedRequest, CombinedResponse, OperationResult, PullResponse, PushBatchResponse,
13    PushCommitRequest, PushCommitResponse, ScopeValues, SnapshotChunkRef, SubscriptionResponse,
14    SyncChange, SyncCommit, SyncOperation, SyncSnapshot,
15};
16use syncular_runtime::transport::{
17    BlobTransport, RealtimeEvent, RealtimeTransport, SyncAuthHeaderStore, SyncAuthHeaders,
18    SyncTransport,
19};
20
21#[derive(Debug, Clone)]
22pub struct AppTestServerOptions {
23    pub actor_id: String,
24    pub created_at_prefix: String,
25    pub emit_realtime_sync: bool,
26    pub delivery_mode: AppTestServerDeliveryMode,
27    pub required_authorization: Option<String>,
28    pub required_schema_version: Option<i32>,
29    pub latest_schema_version: Option<i32>,
30}
31
32impl Default for AppTestServerOptions {
33    fn default() -> Self {
34        Self {
35            actor_id: "test-server".to_string(),
36            created_at_prefix: "2026-01-01T00:00:00".to_string(),
37            emit_realtime_sync: true,
38            delivery_mode: AppTestServerDeliveryMode::Normal,
39            required_authorization: None,
40            required_schema_version: None,
41            latest_schema_version: None,
42        }
43    }
44}
45
46impl AppTestServerOptions {
47    pub fn require_authorization(mut self, authorization: impl Into<String>) -> Self {
48        self.required_authorization = Some(authorization.into());
49        self
50    }
51
52    pub fn require_schema_version(mut self, schema_version: i32) -> Self {
53        self.required_schema_version = Some(schema_version);
54        self.latest_schema_version = Some(
55            self.latest_schema_version
56                .map_or(schema_version, |latest| latest.max(schema_version)),
57        );
58        self
59    }
60
61    pub fn latest_schema_version(mut self, schema_version: i32) -> Self {
62        self.latest_schema_version = Some(schema_version);
63        self
64    }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum AppTestServerDeliveryMode {
69    Normal,
70    ReverseAndDuplicate,
71}
72
73#[derive(Debug, Clone)]
74pub struct AppTestServerCommit {
75    pub commit_seq: i64,
76    pub client_id: String,
77    pub changes: Vec<SyncChange>,
78}
79
80#[derive(Debug, Default)]
81struct AppTestServerState {
82    rows: BTreeMap<String, BTreeMap<String, Value>>,
83    commits: Vec<AppTestServerCommit>,
84    requests: Vec<CombinedRequest>,
85    ws_pushes: Vec<PushCommitRequest>,
86    auth_headers: Vec<SyncAuthHeaders>,
87    realtime_events: VecDeque<RealtimeEvent>,
88    blobs: BTreeMap<String, Vec<u8>>,
89    required_authorization: Option<String>,
90    revoked_subscription_ids: BTreeSet<String>,
91    required_schema_version: Option<i32>,
92    latest_schema_version: Option<i32>,
93    next_server_version: i64,
94    next_commit_seq: i64,
95    closed_realtime_count: usize,
96}
97
98#[derive(Clone)]
99pub struct AppTestServer {
100    app_schema: AppSchema,
101    options: AppTestServerOptions,
102    state: Arc<Mutex<AppTestServerState>>,
103}
104
105#[derive(Clone)]
106pub struct AppTestRealtime {
107    app_schema: AppSchema,
108    options: AppTestServerOptions,
109    state: Arc<Mutex<AppTestServerState>>,
110}
111
112impl AppTestServer {
113    pub fn new(app_schema: AppSchema) -> Self {
114        Self::with_options(app_schema, AppTestServerOptions::default())
115    }
116
117    pub fn with_options(app_schema: AppSchema, options: AppTestServerOptions) -> Self {
118        let required_authorization = options.required_authorization.clone();
119        let required_schema_version = options.required_schema_version;
120        let latest_schema_version = options.latest_schema_version;
121        Self {
122            app_schema,
123            options,
124            state: Arc::new(Mutex::new(AppTestServerState {
125                required_authorization,
126                required_schema_version,
127                latest_schema_version,
128                next_server_version: 1,
129                next_commit_seq: 1,
130                ..AppTestServerState::default()
131            })),
132        }
133    }
134
135    pub fn seed_row(&self, table: &str, row: Value) -> Result<Value> {
136        let metadata = self.table_metadata(table)?;
137        let row_id = row_id_from_row(metadata, &row)?;
138        let row = self.prepare_server_row(metadata, &row_id, row, None)?;
139        let version = row_server_version(metadata, &row).unwrap_or(0);
140        let mut state = self.state.lock().expect("app test server state");
141        bump_next_server_version_locked(&mut state, version);
142        state
143            .rows
144            .entry(table.to_string())
145            .or_default()
146            .insert(row_id, row.clone());
147        Ok(row)
148    }
149
150    pub fn set_schema_versions(
151        &self,
152        required_schema_version: Option<i32>,
153        latest_schema_version: Option<i32>,
154    ) {
155        let mut state = self.state.lock().expect("app test server state");
156        state.required_schema_version = required_schema_version;
157        state.latest_schema_version = latest_schema_version;
158    }
159
160    pub fn require_schema_version(&self, schema_version: i32) {
161        let mut state = self.state.lock().expect("app test server state");
162        state.required_schema_version = Some(schema_version);
163        state.latest_schema_version = Some(
164            state
165                .latest_schema_version
166                .map_or(schema_version, |latest| latest.max(schema_version)),
167        );
168    }
169
170    pub fn require_authorization(&self, authorization: impl Into<String>) {
171        self.state
172            .lock()
173            .expect("app test server state")
174            .required_authorization = Some(authorization.into());
175    }
176
177    pub fn clear_required_authorization(&self) {
178        self.state
179            .lock()
180            .expect("app test server state")
181            .required_authorization = None;
182    }
183
184    pub fn revoke_subscription(&self, subscription_id: impl Into<String>) {
185        self.state
186            .lock()
187            .expect("app test server state")
188            .revoked_subscription_ids
189            .insert(subscription_id.into());
190    }
191
192    pub fn restore_subscription(&self, subscription_id: &str) {
193        self.state
194            .lock()
195            .expect("app test server state")
196            .revoked_subscription_ids
197            .remove(subscription_id);
198    }
199
200    pub fn revoked_subscription_ids(&self) -> Vec<String> {
201        self.state
202            .lock()
203            .expect("app test server state")
204            .revoked_subscription_ids
205            .iter()
206            .cloned()
207            .collect()
208    }
209
210    pub fn commit_row(&self, table: &str, row: Value) -> Result<i64> {
211        let metadata = self.table_metadata(table)?;
212        let row_id = row_id_from_row(metadata, &row)?;
213        let mut state = self.state.lock().expect("app test server state");
214        let version = row
215            .get(metadata.server_version_column)
216            .and_then(Value::as_i64)
217            .unwrap_or_else(|| next_server_version_locked(&mut state));
218        let row = self.prepare_server_row(metadata, &row_id, row, Some(version))?;
219        let scopes = scopes_for_row(metadata, &row);
220        let change = SyncChange {
221            table: table.to_string(),
222            row_id: row_id.clone(),
223            op: "upsert".to_string(),
224            row_json: Some(row.clone()),
225            row_version: Some(version),
226            scopes,
227        };
228        state
229            .rows
230            .entry(table.to_string())
231            .or_default()
232            .insert(row_id, row);
233        let client_id = self.options.actor_id.clone();
234        let commit_seq = self.append_commit_locked(&mut state, client_id, vec![change]);
235        Ok(commit_seq)
236    }
237
238    pub fn delete_row(&self, table: &str, row_id: &str) -> Result<i64> {
239        let metadata = self.table_metadata(table)?;
240        let mut state = self.state.lock().expect("app test server state");
241        let old_row = state
242            .rows
243            .entry(table.to_string())
244            .or_default()
245            .remove(row_id);
246        let scopes = old_row
247            .as_ref()
248            .map(|row| scopes_for_row(metadata, row))
249            .unwrap_or_default();
250        let version = next_server_version_locked(&mut state);
251        let change = SyncChange {
252            table: table.to_string(),
253            row_id: row_id.to_string(),
254            op: "delete".to_string(),
255            row_json: None,
256            row_version: Some(version),
257            scopes,
258        };
259        let client_id = self.options.actor_id.clone();
260        Ok(self.append_commit_locked(&mut state, client_id, vec![change]))
261    }
262
263    pub fn rows(&self, table: &str) -> Vec<Value> {
264        self.state
265            .lock()
266            .expect("app test server state")
267            .rows
268            .get(table)
269            .map(|rows| rows.values().cloned().collect())
270            .unwrap_or_default()
271    }
272
273    pub fn row(&self, table: &str, row_id: &str) -> Option<Value> {
274        self.state
275            .lock()
276            .expect("app test server state")
277            .rows
278            .get(table)
279            .and_then(|rows| rows.get(row_id))
280            .cloned()
281    }
282
283    pub fn requests(&self) -> Vec<CombinedRequest> {
284        self.state
285            .lock()
286            .expect("app test server state")
287            .requests
288            .clone()
289    }
290
291    pub fn ws_pushes(&self) -> Vec<PushCommitRequest> {
292        self.state
293            .lock()
294            .expect("app test server state")
295            .ws_pushes
296            .clone()
297    }
298
299    pub fn commits(&self) -> Vec<AppTestServerCommit> {
300        self.state
301            .lock()
302            .expect("app test server state")
303            .commits
304            .clone()
305    }
306
307    pub fn auth_headers(&self) -> Vec<SyncAuthHeaders> {
308        self.state
309            .lock()
310            .expect("app test server state")
311            .auth_headers
312            .clone()
313    }
314
315    pub fn record_auth_headers(&self, headers: SyncAuthHeaders) {
316        self.state
317            .lock()
318            .expect("app test server state")
319            .auth_headers
320            .push(headers);
321    }
322
323    pub fn is_authorized_headers(&self, headers: &SyncAuthHeaders) -> bool {
324        let state = self.state.lock().expect("app test server state");
325        match state.required_authorization.as_ref() {
326            Some(required) => headers.get("authorization") == Some(required),
327            None => true,
328        }
329    }
330
331    pub fn closed_realtime_count(&self) -> usize {
332        self.state
333            .lock()
334            .expect("app test server state")
335            .closed_realtime_count
336    }
337
338    pub fn push_realtime_sync(&self) {
339        self.state
340            .lock()
341            .expect("app test server state")
342            .realtime_events
343            .push_back(RealtimeEvent::Sync);
344    }
345
346    pub fn wait_for_commit_count(
347        &self,
348        expected: usize,
349        timeout: Duration,
350    ) -> Vec<AppTestServerCommit> {
351        let deadline = Instant::now() + timeout;
352        loop {
353            let commits = self.commits();
354            if commits.len() >= expected || Instant::now() >= deadline {
355                return commits;
356            }
357            thread::sleep(Duration::from_millis(5));
358        }
359    }
360
361    fn table_metadata(&self, table: &str) -> Result<&'static AppTableMetadata> {
362        self.app_schema.table_metadata(table).ok_or_else(|| {
363            SyncularError::config(format!("unknown app table for AppTestServer: {table}"))
364        })
365    }
366
367    fn prepare_server_row(
368        &self,
369        metadata: &AppTableMetadata,
370        row_id: &str,
371        row: Value,
372        server_version: Option<i64>,
373    ) -> Result<Value> {
374        let Value::Object(mut row) = row else {
375            return Err(SyncularError::protocol_message(format!(
376                "row for table {} must be an object",
377                metadata.name
378            )));
379        };
380        row.insert(
381            metadata.primary_key_column.to_string(),
382            Value::String(row_id.to_string()),
383        );
384        let version = server_version
385            .or_else(|| {
386                row.get(metadata.server_version_column)
387                    .and_then(Value::as_i64)
388            })
389            .unwrap_or(0);
390        row.insert(
391            metadata.server_version_column.to_string(),
392            Value::Number(version.into()),
393        );
394        Ok(Value::Object(row))
395    }
396
397    fn post_sync_locked(
398        &self,
399        state: &mut AppTestServerState,
400        request: &CombinedRequest,
401    ) -> Result<CombinedResponse> {
402        state.requests.push(request.clone());
403        if !self.is_authorized_locked(state) {
404            return Err(unauthorized_error());
405        }
406        let push = request
407            .push
408            .as_ref()
409            .map(|push| {
410                let mut commits = Vec::new();
411                for commit in &push.commits {
412                    commits.push(self.apply_push_commit_locked(
413                        state,
414                        &request.client_id,
415                        commit,
416                    )?);
417                }
418                Ok::<PushBatchResponse, SyncularError>(PushBatchResponse { ok: true, commits })
419            })
420            .transpose()?;
421        let pull = request.pull.as_ref().map(|pull| PullResponse {
422            ok: true,
423            subscriptions: pull
424                .subscriptions
425                .iter()
426                .map(|subscription| {
427                    self.subscription_response_locked(state, subscription, &request.client_id)
428                })
429                .collect(),
430        });
431        let required_schema_version = state.required_schema_version;
432        let latest_schema_version = state
433            .latest_schema_version
434            .or(required_schema_version)
435            .or(Some(self.app_schema.current_schema_version()));
436        Ok(CombinedResponse {
437            ok: true,
438            required_schema_version,
439            latest_schema_version,
440            push,
441            pull,
442        })
443    }
444
445    fn apply_push_commit_locked(
446        &self,
447        state: &mut AppTestServerState,
448        client_id: &str,
449        commit: &PushCommitRequest,
450    ) -> Result<PushCommitResponse> {
451        let conflict = commit
452            .operations
453            .iter()
454            .enumerate()
455            .find_map(|(index, operation)| {
456                self.preflight_operation_conflict(state, index, operation)
457            });
458        if let Some(response) = conflict {
459            return Ok(PushCommitResponse {
460                client_commit_id: commit.client_commit_id.clone(),
461                status: "rejected".to_string(),
462                commit_seq: None,
463                results: response,
464            });
465        }
466
467        let mut changes = Vec::new();
468        let mut results = Vec::new();
469        for (index, operation) in commit.operations.iter().enumerate() {
470            let result = self.apply_operation_locked(state, index, operation, &mut changes)?;
471            results.push(result);
472        }
473        let commit_seq = if changes.is_empty() {
474            None
475        } else {
476            Some(self.append_commit_locked(state, client_id.to_string(), changes))
477        };
478        Ok(PushCommitResponse {
479            client_commit_id: commit.client_commit_id.clone(),
480            status: "applied".to_string(),
481            commit_seq,
482            results,
483        })
484    }
485
486    fn preflight_operation_conflict(
487        &self,
488        state: &AppTestServerState,
489        index: usize,
490        operation: &SyncOperation,
491    ) -> Option<Vec<OperationResult>> {
492        let metadata = self.app_schema.table_metadata(&operation.table)?;
493        if is_server_merge_yjs_operation(operation, metadata) {
494            return None;
495        }
496        let base_version = operation.base_version?;
497        let current_row = state
498            .rows
499            .get(&operation.table)
500            .and_then(|rows| rows.get(&operation.row_id));
501        let current_version = current_row
502            .and_then(|row| row_server_version(metadata, row))
503            .unwrap_or(0);
504        if current_version == base_version {
505            return None;
506        }
507        Some(vec![OperationResult {
508            op_index: index as i32,
509            status: "conflict".to_string(),
510            message: Some("version conflict".to_string()),
511            error: None,
512            code: Some("sync.version_conflict".to_string()),
513            retriable: Some(false),
514            server_version: Some(current_version),
515            server_row: current_row.cloned(),
516        }])
517    }
518
519    fn apply_operation_locked(
520        &self,
521        state: &mut AppTestServerState,
522        index: usize,
523        operation: &SyncOperation,
524        changes: &mut Vec<SyncChange>,
525    ) -> Result<OperationResult> {
526        let metadata = self.table_metadata(&operation.table)?;
527        match operation.op.as_str() {
528            "upsert" => {
529                let existing_row = state
530                    .rows
531                    .get(&operation.table)
532                    .and_then(|rows| rows.get(&operation.row_id))
533                    .cloned();
534                let mut transformed = operation.clone();
535                transform_operation_payload_for_metadata(
536                    &mut transformed,
537                    existing_row.as_ref(),
538                    metadata,
539                )?;
540                let version = next_server_version_locked(state);
541                let row = merged_server_row(
542                    metadata,
543                    &operation.row_id,
544                    existing_row,
545                    transformed.payload,
546                    version,
547                )?;
548                let scopes = scopes_for_row(metadata, &row);
549                let change_row_json = if is_server_merge_yjs_operation(operation, metadata) {
550                    operation.payload.clone()
551                } else {
552                    Some(row.clone())
553                };
554                state
555                    .rows
556                    .entry(operation.table.clone())
557                    .or_default()
558                    .insert(operation.row_id.clone(), row.clone());
559                changes.push(SyncChange {
560                    table: operation.table.clone(),
561                    row_id: operation.row_id.clone(),
562                    op: "upsert".to_string(),
563                    row_json: change_row_json,
564                    row_version: Some(version),
565                    scopes,
566                });
567                Ok(applied_result(index, Some(version)))
568            }
569            "delete" => {
570                let old_row = state
571                    .rows
572                    .entry(operation.table.clone())
573                    .or_default()
574                    .remove(&operation.row_id);
575                let version = next_server_version_locked(state);
576                let scopes = old_row
577                    .as_ref()
578                    .map(|row| scopes_for_row(metadata, row))
579                    .unwrap_or_default();
580                changes.push(SyncChange {
581                    table: operation.table.clone(),
582                    row_id: operation.row_id.clone(),
583                    op: "delete".to_string(),
584                    row_json: None,
585                    row_version: Some(version),
586                    scopes,
587                });
588                Ok(applied_result(index, Some(version)))
589            }
590            op => Ok(OperationResult {
591                op_index: index as i32,
592                status: "error".to_string(),
593                message: Some(format!("unsupported operation: {op}")),
594                error: Some(format!("unsupported operation: {op}")),
595                code: Some("sync.unsupported_operation".to_string()),
596                retriable: Some(false),
597                server_version: None,
598                server_row: None,
599            }),
600        }
601    }
602
603    fn subscription_response_locked(
604        &self,
605        state: &AppTestServerState,
606        subscription: &syncular_runtime::protocol::SubscriptionRequest,
607        request_client_id: &str,
608    ) -> SubscriptionResponse {
609        let metadata = self.app_schema.table_metadata(&subscription.table);
610        let next_cursor = state.next_commit_seq.saturating_sub(1).max(0);
611        if state.revoked_subscription_ids.contains(&subscription.id) {
612            return SubscriptionResponse {
613                id: subscription.id.clone(),
614                status: "revoked".to_string(),
615                scopes: ScopeValues::new(),
616                bootstrap: false,
617                bootstrap_state: None,
618                next_cursor,
619                integrity: None,
620                commits: Vec::new(),
621                snapshots: None,
622            };
623        }
624        if subscription.cursor < 0 {
625            let rows = metadata
626                .map(|metadata| {
627                    state
628                        .rows
629                        .get(&subscription.table)
630                        .into_iter()
631                        .flat_map(|rows| rows.values())
632                        .filter(|row| row_matches_scopes(metadata, row, &subscription.scopes))
633                        .cloned()
634                        .collect::<Vec<_>>()
635                })
636                .unwrap_or_default();
637            return SubscriptionResponse {
638                id: subscription.id.clone(),
639                status: "active".to_string(),
640                scopes: subscription.scopes.clone(),
641                bootstrap: !rows.is_empty(),
642                bootstrap_state: None,
643                next_cursor,
644                integrity: None,
645                commits: Vec::new(),
646                snapshots: Some(vec![SyncSnapshot {
647                    table: subscription.table.clone(),
648                    rows,
649                    chunks: None,
650                    artifacts: None,
651                    manifest: None,
652                    is_first_page: true,
653                    is_last_page: true,
654                    bootstrap_state_after: None,
655                }]),
656            };
657        }
658
659        let mut commits = metadata
660            .map(|metadata| {
661                state
662                    .commits
663                    .iter()
664                    .filter(|commit| {
665                        commit.commit_seq > subscription.cursor
666                            && commit.client_id != request_client_id
667                    })
668                    .filter_map(|commit| {
669                        let changes = commit
670                            .changes
671                            .iter()
672                            .filter(|change| {
673                                change.table == subscription.table
674                                    && change_matches_scopes(metadata, change, &subscription.scopes)
675                            })
676                            .cloned()
677                            .collect::<Vec<_>>();
678                        if changes.is_empty() {
679                            None
680                        } else {
681                            Some(SyncCommit {
682                                commit_seq: commit.commit_seq,
683                                created_at: self.created_at(commit.commit_seq),
684                                actor_id: self.options.actor_id.clone(),
685                                changes,
686                            })
687                        }
688                    })
689                    .collect::<Vec<_>>()
690            })
691            .unwrap_or_default();
692        match self.options.delivery_mode {
693            AppTestServerDeliveryMode::Normal => {}
694            AppTestServerDeliveryMode::ReverseAndDuplicate => {
695                commits.reverse();
696                let duplicates = commits.clone();
697                commits.extend(duplicates);
698            }
699        }
700
701        SubscriptionResponse {
702            id: subscription.id.clone(),
703            status: "active".to_string(),
704            scopes: subscription.scopes.clone(),
705            bootstrap: false,
706            bootstrap_state: None,
707            next_cursor,
708            integrity: None,
709            commits,
710            snapshots: None,
711        }
712    }
713
714    fn append_commit_locked(
715        &self,
716        state: &mut AppTestServerState,
717        client_id: String,
718        changes: Vec<SyncChange>,
719    ) -> i64 {
720        let commit_seq = state.next_commit_seq;
721        state.next_commit_seq = state.next_commit_seq.saturating_add(1);
722        state.commits.push(AppTestServerCommit {
723            commit_seq,
724            client_id,
725            changes,
726        });
727        if self.options.emit_realtime_sync {
728            state.realtime_events.push_back(RealtimeEvent::Sync);
729        }
730        commit_seq
731    }
732
733    fn created_at(&self, commit_seq: i64) -> String {
734        format!("{}.{commit_seq:03}Z", self.options.created_at_prefix)
735    }
736
737    fn is_authorized_locked(&self, state: &AppTestServerState) -> bool {
738        match state.required_authorization.as_ref() {
739            Some(required) => {
740                state
741                    .auth_headers
742                    .last()
743                    .and_then(|headers| headers.get("authorization"))
744                    == Some(required)
745            }
746            None => true,
747        }
748    }
749}
750
751impl SyncAuthHeaderStore for AppTestServer {
752    fn set_auth_headers(&mut self, headers: SyncAuthHeaders) {
753        self.record_auth_headers(headers);
754    }
755}
756
757impl SyncTransport for AppTestServer {
758    type Realtime = AppTestRealtime;
759
760    fn post_sync(&self, request: &CombinedRequest) -> Result<CombinedResponse> {
761        let mut state = self.state.lock().expect("app test server state");
762        self.post_sync_locked(&mut state, request)
763    }
764
765    fn fetch_snapshot_chunk_rows(
766        &self,
767        _chunk: &SnapshotChunkRef,
768        _scopes: &Map<String, Value>,
769    ) -> Result<SnapshotChunkRows> {
770        Ok(SnapshotChunkRows::Json(Vec::new()))
771    }
772
773    fn connect_realtime(&self) -> Result<Self::Realtime> {
774        Ok(AppTestRealtime {
775            app_schema: self.app_schema,
776            options: self.options.clone(),
777            state: self.state.clone(),
778        })
779    }
780}
781
782impl RealtimeTransport for AppTestRealtime {
783    fn push_commit(&mut self, commit: PushCommitRequest) -> Result<PushCommitResponse> {
784        let server = AppTestServer {
785            app_schema: self.app_schema,
786            options: self.options.clone(),
787            state: self.state.clone(),
788        };
789        let mut state = self.state.lock().expect("app test server state");
790        state.ws_pushes.push(commit.clone());
791        let client_id = server.options.actor_id.clone();
792        server.apply_push_commit_locked(&mut state, &client_id, &commit)
793    }
794
795    fn read_event(&mut self) -> Result<Option<RealtimeEvent>> {
796        Ok(self
797            .state
798            .lock()
799            .expect("app test server state")
800            .realtime_events
801            .pop_front())
802    }
803
804    fn close(&mut self) {
805        self.state
806            .lock()
807            .expect("app test server state")
808            .closed_realtime_count += 1;
809    }
810}
811
812impl BlobTransport for AppTestServer {
813    fn upload_blob(&self, blob: &BlobRef, bytes: &[u8]) -> Result<()> {
814        self.state
815            .lock()
816            .expect("app test server state")
817            .blobs
818            .insert(blob.hash.clone(), bytes.to_vec());
819        Ok(())
820    }
821
822    fn download_blob(&self, blob: &BlobRef) -> Result<Vec<u8>> {
823        self.state
824            .lock()
825            .expect("app test server state")
826            .blobs
827            .get(&blob.hash)
828            .cloned()
829            .ok_or_else(|| {
830                SyncularError::message(
831                    ErrorKind::Transport,
832                    format!("app test server blob not found: {}", blob.hash),
833                )
834            })
835    }
836}
837
838fn next_server_version_locked(state: &mut AppTestServerState) -> i64 {
839    let version = state.next_server_version;
840    state.next_server_version = state.next_server_version.saturating_add(1);
841    version
842}
843
844fn bump_next_server_version_locked(state: &mut AppTestServerState, version: i64) {
845    state.next_server_version = state.next_server_version.max(version.saturating_add(1));
846}
847
848fn row_id_from_row(metadata: &AppTableMetadata, row: &Value) -> Result<String> {
849    row.get(metadata.primary_key_column)
850        .and_then(Value::as_str)
851        .map(str::to_string)
852        .ok_or_else(|| {
853            SyncularError::protocol_message(format!(
854                "row for table {} is missing text primary key {}",
855                metadata.name, metadata.primary_key_column
856            ))
857        })
858}
859
860fn row_server_version(metadata: &AppTableMetadata, row: &Value) -> Option<i64> {
861    row.get(metadata.server_version_column)
862        .and_then(Value::as_i64)
863}
864
865fn scopes_for_row(metadata: &AppTableMetadata, row: &Value) -> ScopeValues {
866    let mut scopes = Map::new();
867    for scope in metadata.scopes {
868        if let Some(value) = row.get(scope.column) {
869            scopes.insert(scope.name.to_string(), value.clone());
870        }
871    }
872    scopes
873}
874
875fn row_matches_scopes(metadata: &AppTableMetadata, row: &Value, scopes: &ScopeValues) -> bool {
876    metadata.scopes.iter().all(|scope| {
877        let Some(expected) = scopes.get(scope.name) else {
878            return !scope.required;
879        };
880        row.get(scope.column) == Some(expected)
881    })
882}
883
884fn change_matches_scopes(
885    metadata: &AppTableMetadata,
886    change: &SyncChange,
887    scopes: &ScopeValues,
888) -> bool {
889    if let Some(row) = &change.row_json {
890        if row_matches_scopes(metadata, row, scopes) {
891            return true;
892        }
893    }
894    metadata.scopes.iter().all(|scope| {
895        let Some(expected) = scopes.get(scope.name) else {
896            return !scope.required;
897        };
898        change.scopes.get(scope.name) == Some(expected)
899    })
900}
901
902fn is_server_merge_yjs_operation(operation: &SyncOperation, metadata: &AppTableMetadata) -> bool {
903    let Some(Value::Object(payload)) = &operation.payload else {
904        return false;
905    };
906    let Some(Value::Object(envelope)) = payload.get(YJS_PAYLOAD_KEY) else {
907        return false;
908    };
909    metadata.crdt_yjs_fields.iter().any(|field| {
910        (field.sync_mode == "server-merge" || field.sync_mode.is_empty())
911            && envelope.contains_key(field.field)
912    })
913}
914
915fn merged_server_row(
916    metadata: &AppTableMetadata,
917    row_id: &str,
918    existing_row: Option<Value>,
919    payload: Option<Value>,
920    version: i64,
921) -> Result<Value> {
922    let mut row = match existing_row {
923        Some(Value::Object(row)) => row,
924        Some(_) | None => Map::new(),
925    };
926    if let Some(payload) = payload {
927        let Value::Object(payload) = payload else {
928            return Err(SyncularError::protocol_message(format!(
929                "upsert payload for table {} must be an object",
930                metadata.name
931            )));
932        };
933        for (key, value) in payload {
934            row.insert(key, value);
935        }
936    }
937    row.insert(
938        metadata.primary_key_column.to_string(),
939        Value::String(row_id.to_string()),
940    );
941    row.insert(
942        metadata.server_version_column.to_string(),
943        Value::Number(version.into()),
944    );
945    Ok(Value::Object(row))
946}
947
948fn applied_result(index: usize, server_version: Option<i64>) -> OperationResult {
949    OperationResult {
950        op_index: index as i32,
951        status: "applied".to_string(),
952        message: None,
953        error: None,
954        code: None,
955        retriable: None,
956        server_version,
957        server_row: None,
958    }
959}
960
961fn unauthorized_error() -> SyncularError {
962    SyncularError::message(
963        ErrorKind::Transport,
964        "unauthorized: missing or invalid authorization header",
965    )
966}