1use std::collections::{BTreeMap, BTreeSet, VecDeque};
2use std::sync::{Arc, Mutex};
3use std::thread;
4use std::time::{Duration, Instant};
5
6use serde_json::{Map, Value};
7use syncular_runtime::app_schema::{AppSchema, AppTableMetadata};
8use syncular_runtime::binary_snapshot::SnapshotChunkRows;
9use syncular_runtime::crdt_yjs::{transform_operation_payload_for_metadata, YJS_PAYLOAD_KEY};
10use syncular_runtime::error::{ErrorKind, Result, SyncularError};
11use syncular_runtime::protocol::{
12 BlobRef, CombinedRequest, CombinedResponse, OperationResult, PullResponse, PushBatchResponse,
13 PushCommitRequest, PushCommitResponse, ScopeValues, SnapshotChunkRef, SubscriptionResponse,
14 SyncChange, SyncCommit, SyncOperation, SyncSnapshot,
15};
16use syncular_runtime::transport::{
17 BlobTransport, RealtimeEvent, RealtimeTransport, SyncAuthHeaderStore, SyncAuthHeaders,
18 SyncTransport,
19};
20
21#[derive(Debug, Clone)]
22pub struct AppTestServerOptions {
23 pub actor_id: String,
24 pub created_at_prefix: String,
25 pub emit_realtime_sync: bool,
26 pub delivery_mode: AppTestServerDeliveryMode,
27 pub required_authorization: Option<String>,
28 pub required_schema_version: Option<i32>,
29 pub latest_schema_version: Option<i32>,
30}
31
32impl Default for AppTestServerOptions {
33 fn default() -> Self {
34 Self {
35 actor_id: "test-server".to_string(),
36 created_at_prefix: "2026-01-01T00:00:00".to_string(),
37 emit_realtime_sync: true,
38 delivery_mode: AppTestServerDeliveryMode::Normal,
39 required_authorization: None,
40 required_schema_version: None,
41 latest_schema_version: None,
42 }
43 }
44}
45
46impl AppTestServerOptions {
47 pub fn require_authorization(mut self, authorization: impl Into<String>) -> Self {
48 self.required_authorization = Some(authorization.into());
49 self
50 }
51
52 pub fn require_schema_version(mut self, schema_version: i32) -> Self {
53 self.required_schema_version = Some(schema_version);
54 self.latest_schema_version = Some(
55 self.latest_schema_version
56 .map_or(schema_version, |latest| latest.max(schema_version)),
57 );
58 self
59 }
60
61 pub fn latest_schema_version(mut self, schema_version: i32) -> Self {
62 self.latest_schema_version = Some(schema_version);
63 self
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum AppTestServerDeliveryMode {
69 Normal,
70 ReverseAndDuplicate,
71}
72
73#[derive(Debug, Clone)]
74pub struct AppTestServerCommit {
75 pub commit_seq: i64,
76 pub client_id: String,
77 pub changes: Vec<SyncChange>,
78}
79
80#[derive(Debug, Default)]
81struct AppTestServerState {
82 rows: BTreeMap<String, BTreeMap<String, Value>>,
83 commits: Vec<AppTestServerCommit>,
84 requests: Vec<CombinedRequest>,
85 ws_pushes: Vec<PushCommitRequest>,
86 auth_headers: Vec<SyncAuthHeaders>,
87 realtime_events: VecDeque<RealtimeEvent>,
88 blobs: BTreeMap<String, Vec<u8>>,
89 required_authorization: Option<String>,
90 revoked_subscription_ids: BTreeSet<String>,
91 required_schema_version: Option<i32>,
92 latest_schema_version: Option<i32>,
93 next_server_version: i64,
94 next_commit_seq: i64,
95 closed_realtime_count: usize,
96}
97
98#[derive(Clone)]
99pub struct AppTestServer {
100 app_schema: AppSchema,
101 options: AppTestServerOptions,
102 state: Arc<Mutex<AppTestServerState>>,
103}
104
105#[derive(Clone)]
106pub struct AppTestRealtime {
107 app_schema: AppSchema,
108 options: AppTestServerOptions,
109 state: Arc<Mutex<AppTestServerState>>,
110}
111
112impl AppTestServer {
113 pub fn new(app_schema: AppSchema) -> Self {
114 Self::with_options(app_schema, AppTestServerOptions::default())
115 }
116
117 pub fn with_options(app_schema: AppSchema, options: AppTestServerOptions) -> Self {
118 let required_authorization = options.required_authorization.clone();
119 let required_schema_version = options.required_schema_version;
120 let latest_schema_version = options.latest_schema_version;
121 Self {
122 app_schema,
123 options,
124 state: Arc::new(Mutex::new(AppTestServerState {
125 required_authorization,
126 required_schema_version,
127 latest_schema_version,
128 next_server_version: 1,
129 next_commit_seq: 1,
130 ..AppTestServerState::default()
131 })),
132 }
133 }
134
135 pub fn seed_row(&self, table: &str, row: Value) -> Result<Value> {
136 let metadata = self.table_metadata(table)?;
137 let row_id = row_id_from_row(metadata, &row)?;
138 let row = self.prepare_server_row(metadata, &row_id, row, None)?;
139 let version = row_server_version(metadata, &row).unwrap_or(0);
140 let mut state = self.state.lock().expect("app test server state");
141 bump_next_server_version_locked(&mut state, version);
142 state
143 .rows
144 .entry(table.to_string())
145 .or_default()
146 .insert(row_id, row.clone());
147 Ok(row)
148 }
149
150 pub fn set_schema_versions(
151 &self,
152 required_schema_version: Option<i32>,
153 latest_schema_version: Option<i32>,
154 ) {
155 let mut state = self.state.lock().expect("app test server state");
156 state.required_schema_version = required_schema_version;
157 state.latest_schema_version = latest_schema_version;
158 }
159
160 pub fn require_schema_version(&self, schema_version: i32) {
161 let mut state = self.state.lock().expect("app test server state");
162 state.required_schema_version = Some(schema_version);
163 state.latest_schema_version = Some(
164 state
165 .latest_schema_version
166 .map_or(schema_version, |latest| latest.max(schema_version)),
167 );
168 }
169
170 pub fn require_authorization(&self, authorization: impl Into<String>) {
171 self.state
172 .lock()
173 .expect("app test server state")
174 .required_authorization = Some(authorization.into());
175 }
176
177 pub fn clear_required_authorization(&self) {
178 self.state
179 .lock()
180 .expect("app test server state")
181 .required_authorization = None;
182 }
183
184 pub fn revoke_subscription(&self, subscription_id: impl Into<String>) {
185 self.state
186 .lock()
187 .expect("app test server state")
188 .revoked_subscription_ids
189 .insert(subscription_id.into());
190 }
191
192 pub fn restore_subscription(&self, subscription_id: &str) {
193 self.state
194 .lock()
195 .expect("app test server state")
196 .revoked_subscription_ids
197 .remove(subscription_id);
198 }
199
200 pub fn revoked_subscription_ids(&self) -> Vec<String> {
201 self.state
202 .lock()
203 .expect("app test server state")
204 .revoked_subscription_ids
205 .iter()
206 .cloned()
207 .collect()
208 }
209
210 pub fn commit_row(&self, table: &str, row: Value) -> Result<i64> {
211 let metadata = self.table_metadata(table)?;
212 let row_id = row_id_from_row(metadata, &row)?;
213 let mut state = self.state.lock().expect("app test server state");
214 let version = row
215 .get(metadata.server_version_column)
216 .and_then(Value::as_i64)
217 .unwrap_or_else(|| next_server_version_locked(&mut state));
218 let row = self.prepare_server_row(metadata, &row_id, row, Some(version))?;
219 let scopes = scopes_for_row(metadata, &row);
220 let change = SyncChange {
221 table: table.to_string(),
222 row_id: row_id.clone(),
223 op: "upsert".to_string(),
224 row_json: Some(row.clone()),
225 row_version: Some(version),
226 scopes,
227 };
228 state
229 .rows
230 .entry(table.to_string())
231 .or_default()
232 .insert(row_id, row);
233 let client_id = self.options.actor_id.clone();
234 let commit_seq = self.append_commit_locked(&mut state, client_id, vec![change]);
235 Ok(commit_seq)
236 }
237
238 pub fn delete_row(&self, table: &str, row_id: &str) -> Result<i64> {
239 let metadata = self.table_metadata(table)?;
240 let mut state = self.state.lock().expect("app test server state");
241 let old_row = state
242 .rows
243 .entry(table.to_string())
244 .or_default()
245 .remove(row_id);
246 let scopes = old_row
247 .as_ref()
248 .map(|row| scopes_for_row(metadata, row))
249 .unwrap_or_default();
250 let version = next_server_version_locked(&mut state);
251 let change = SyncChange {
252 table: table.to_string(),
253 row_id: row_id.to_string(),
254 op: "delete".to_string(),
255 row_json: None,
256 row_version: Some(version),
257 scopes,
258 };
259 let client_id = self.options.actor_id.clone();
260 Ok(self.append_commit_locked(&mut state, client_id, vec![change]))
261 }
262
263 pub fn rows(&self, table: &str) -> Vec<Value> {
264 self.state
265 .lock()
266 .expect("app test server state")
267 .rows
268 .get(table)
269 .map(|rows| rows.values().cloned().collect())
270 .unwrap_or_default()
271 }
272
273 pub fn row(&self, table: &str, row_id: &str) -> Option<Value> {
274 self.state
275 .lock()
276 .expect("app test server state")
277 .rows
278 .get(table)
279 .and_then(|rows| rows.get(row_id))
280 .cloned()
281 }
282
283 pub fn requests(&self) -> Vec<CombinedRequest> {
284 self.state
285 .lock()
286 .expect("app test server state")
287 .requests
288 .clone()
289 }
290
291 pub fn ws_pushes(&self) -> Vec<PushCommitRequest> {
292 self.state
293 .lock()
294 .expect("app test server state")
295 .ws_pushes
296 .clone()
297 }
298
299 pub fn commits(&self) -> Vec<AppTestServerCommit> {
300 self.state
301 .lock()
302 .expect("app test server state")
303 .commits
304 .clone()
305 }
306
307 pub fn auth_headers(&self) -> Vec<SyncAuthHeaders> {
308 self.state
309 .lock()
310 .expect("app test server state")
311 .auth_headers
312 .clone()
313 }
314
315 pub fn record_auth_headers(&self, headers: SyncAuthHeaders) {
316 self.state
317 .lock()
318 .expect("app test server state")
319 .auth_headers
320 .push(headers);
321 }
322
323 pub fn is_authorized_headers(&self, headers: &SyncAuthHeaders) -> bool {
324 let state = self.state.lock().expect("app test server state");
325 match state.required_authorization.as_ref() {
326 Some(required) => headers.get("authorization") == Some(required),
327 None => true,
328 }
329 }
330
331 pub fn closed_realtime_count(&self) -> usize {
332 self.state
333 .lock()
334 .expect("app test server state")
335 .closed_realtime_count
336 }
337
338 pub fn push_realtime_sync(&self) {
339 self.state
340 .lock()
341 .expect("app test server state")
342 .realtime_events
343 .push_back(RealtimeEvent::Sync);
344 }
345
346 pub fn wait_for_commit_count(
347 &self,
348 expected: usize,
349 timeout: Duration,
350 ) -> Vec<AppTestServerCommit> {
351 let deadline = Instant::now() + timeout;
352 loop {
353 let commits = self.commits();
354 if commits.len() >= expected || Instant::now() >= deadline {
355 return commits;
356 }
357 thread::sleep(Duration::from_millis(5));
358 }
359 }
360
361 fn table_metadata(&self, table: &str) -> Result<&'static AppTableMetadata> {
362 self.app_schema.table_metadata(table).ok_or_else(|| {
363 SyncularError::config(format!("unknown app table for AppTestServer: {table}"))
364 })
365 }
366
367 fn prepare_server_row(
368 &self,
369 metadata: &AppTableMetadata,
370 row_id: &str,
371 row: Value,
372 server_version: Option<i64>,
373 ) -> Result<Value> {
374 let Value::Object(mut row) = row else {
375 return Err(SyncularError::protocol_message(format!(
376 "row for table {} must be an object",
377 metadata.name
378 )));
379 };
380 row.insert(
381 metadata.primary_key_column.to_string(),
382 Value::String(row_id.to_string()),
383 );
384 let version = server_version
385 .or_else(|| {
386 row.get(metadata.server_version_column)
387 .and_then(Value::as_i64)
388 })
389 .unwrap_or(0);
390 row.insert(
391 metadata.server_version_column.to_string(),
392 Value::Number(version.into()),
393 );
394 Ok(Value::Object(row))
395 }
396
397 fn post_sync_locked(
398 &self,
399 state: &mut AppTestServerState,
400 request: &CombinedRequest,
401 ) -> Result<CombinedResponse> {
402 state.requests.push(request.clone());
403 if !self.is_authorized_locked(state) {
404 return Err(unauthorized_error());
405 }
406 let push = request
407 .push
408 .as_ref()
409 .map(|push| {
410 let mut commits = Vec::new();
411 for commit in &push.commits {
412 commits.push(self.apply_push_commit_locked(
413 state,
414 &request.client_id,
415 commit,
416 )?);
417 }
418 Ok::<PushBatchResponse, SyncularError>(PushBatchResponse { ok: true, commits })
419 })
420 .transpose()?;
421 let pull = request.pull.as_ref().map(|pull| PullResponse {
422 ok: true,
423 subscriptions: pull
424 .subscriptions
425 .iter()
426 .map(|subscription| {
427 self.subscription_response_locked(state, subscription, &request.client_id)
428 })
429 .collect(),
430 });
431 let required_schema_version = state.required_schema_version;
432 let latest_schema_version = state
433 .latest_schema_version
434 .or(required_schema_version)
435 .or(Some(self.app_schema.current_schema_version()));
436 Ok(CombinedResponse {
437 ok: true,
438 required_schema_version,
439 latest_schema_version,
440 push,
441 pull,
442 })
443 }
444
445 fn apply_push_commit_locked(
446 &self,
447 state: &mut AppTestServerState,
448 client_id: &str,
449 commit: &PushCommitRequest,
450 ) -> Result<PushCommitResponse> {
451 let conflict = commit
452 .operations
453 .iter()
454 .enumerate()
455 .find_map(|(index, operation)| {
456 self.preflight_operation_conflict(state, index, operation)
457 });
458 if let Some(response) = conflict {
459 return Ok(PushCommitResponse {
460 client_commit_id: commit.client_commit_id.clone(),
461 status: "rejected".to_string(),
462 commit_seq: None,
463 results: response,
464 });
465 }
466
467 let mut changes = Vec::new();
468 let mut results = Vec::new();
469 for (index, operation) in commit.operations.iter().enumerate() {
470 let result = self.apply_operation_locked(state, index, operation, &mut changes)?;
471 results.push(result);
472 }
473 let commit_seq = if changes.is_empty() {
474 None
475 } else {
476 Some(self.append_commit_locked(state, client_id.to_string(), changes))
477 };
478 Ok(PushCommitResponse {
479 client_commit_id: commit.client_commit_id.clone(),
480 status: "applied".to_string(),
481 commit_seq,
482 results,
483 })
484 }
485
486 fn preflight_operation_conflict(
487 &self,
488 state: &AppTestServerState,
489 index: usize,
490 operation: &SyncOperation,
491 ) -> Option<Vec<OperationResult>> {
492 let metadata = self.app_schema.table_metadata(&operation.table)?;
493 if is_server_merge_yjs_operation(operation, metadata) {
494 return None;
495 }
496 let base_version = operation.base_version?;
497 let current_row = state
498 .rows
499 .get(&operation.table)
500 .and_then(|rows| rows.get(&operation.row_id));
501 let current_version = current_row
502 .and_then(|row| row_server_version(metadata, row))
503 .unwrap_or(0);
504 if current_version == base_version {
505 return None;
506 }
507 Some(vec![OperationResult {
508 op_index: index as i32,
509 status: "conflict".to_string(),
510 message: Some("version conflict".to_string()),
511 error: None,
512 code: Some("sync.version_conflict".to_string()),
513 retriable: Some(false),
514 server_version: Some(current_version),
515 server_row: current_row.cloned(),
516 }])
517 }
518
519 fn apply_operation_locked(
520 &self,
521 state: &mut AppTestServerState,
522 index: usize,
523 operation: &SyncOperation,
524 changes: &mut Vec<SyncChange>,
525 ) -> Result<OperationResult> {
526 let metadata = self.table_metadata(&operation.table)?;
527 match operation.op.as_str() {
528 "upsert" => {
529 let existing_row = state
530 .rows
531 .get(&operation.table)
532 .and_then(|rows| rows.get(&operation.row_id))
533 .cloned();
534 let mut transformed = operation.clone();
535 transform_operation_payload_for_metadata(
536 &mut transformed,
537 existing_row.as_ref(),
538 metadata,
539 )?;
540 let version = next_server_version_locked(state);
541 let row = merged_server_row(
542 metadata,
543 &operation.row_id,
544 existing_row,
545 transformed.payload,
546 version,
547 )?;
548 let scopes = scopes_for_row(metadata, &row);
549 let change_row_json = if is_server_merge_yjs_operation(operation, metadata) {
550 operation.payload.clone()
551 } else {
552 Some(row.clone())
553 };
554 state
555 .rows
556 .entry(operation.table.clone())
557 .or_default()
558 .insert(operation.row_id.clone(), row.clone());
559 changes.push(SyncChange {
560 table: operation.table.clone(),
561 row_id: operation.row_id.clone(),
562 op: "upsert".to_string(),
563 row_json: change_row_json,
564 row_version: Some(version),
565 scopes,
566 });
567 Ok(applied_result(index, Some(version)))
568 }
569 "delete" => {
570 let old_row = state
571 .rows
572 .entry(operation.table.clone())
573 .or_default()
574 .remove(&operation.row_id);
575 let version = next_server_version_locked(state);
576 let scopes = old_row
577 .as_ref()
578 .map(|row| scopes_for_row(metadata, row))
579 .unwrap_or_default();
580 changes.push(SyncChange {
581 table: operation.table.clone(),
582 row_id: operation.row_id.clone(),
583 op: "delete".to_string(),
584 row_json: None,
585 row_version: Some(version),
586 scopes,
587 });
588 Ok(applied_result(index, Some(version)))
589 }
590 op => Ok(OperationResult {
591 op_index: index as i32,
592 status: "error".to_string(),
593 message: Some(format!("unsupported operation: {op}")),
594 error: Some(format!("unsupported operation: {op}")),
595 code: Some("sync.unsupported_operation".to_string()),
596 retriable: Some(false),
597 server_version: None,
598 server_row: None,
599 }),
600 }
601 }
602
603 fn subscription_response_locked(
604 &self,
605 state: &AppTestServerState,
606 subscription: &syncular_runtime::protocol::SubscriptionRequest,
607 request_client_id: &str,
608 ) -> SubscriptionResponse {
609 let metadata = self.app_schema.table_metadata(&subscription.table);
610 let next_cursor = state.next_commit_seq.saturating_sub(1).max(0);
611 if state.revoked_subscription_ids.contains(&subscription.id) {
612 return SubscriptionResponse {
613 id: subscription.id.clone(),
614 status: "revoked".to_string(),
615 scopes: ScopeValues::new(),
616 bootstrap: false,
617 bootstrap_state: None,
618 next_cursor,
619 integrity: None,
620 commits: Vec::new(),
621 snapshots: None,
622 };
623 }
624 if subscription.cursor < 0 {
625 let rows = metadata
626 .map(|metadata| {
627 state
628 .rows
629 .get(&subscription.table)
630 .into_iter()
631 .flat_map(|rows| rows.values())
632 .filter(|row| row_matches_scopes(metadata, row, &subscription.scopes))
633 .cloned()
634 .collect::<Vec<_>>()
635 })
636 .unwrap_or_default();
637 return SubscriptionResponse {
638 id: subscription.id.clone(),
639 status: "active".to_string(),
640 scopes: subscription.scopes.clone(),
641 bootstrap: !rows.is_empty(),
642 bootstrap_state: None,
643 next_cursor,
644 integrity: None,
645 commits: Vec::new(),
646 snapshots: Some(vec![SyncSnapshot {
647 table: subscription.table.clone(),
648 rows,
649 chunks: None,
650 artifacts: None,
651 manifest: None,
652 is_first_page: true,
653 is_last_page: true,
654 bootstrap_state_after: None,
655 }]),
656 };
657 }
658
659 let mut commits = metadata
660 .map(|metadata| {
661 state
662 .commits
663 .iter()
664 .filter(|commit| {
665 commit.commit_seq > subscription.cursor
666 && commit.client_id != request_client_id
667 })
668 .filter_map(|commit| {
669 let changes = commit
670 .changes
671 .iter()
672 .filter(|change| {
673 change.table == subscription.table
674 && change_matches_scopes(metadata, change, &subscription.scopes)
675 })
676 .cloned()
677 .collect::<Vec<_>>();
678 if changes.is_empty() {
679 None
680 } else {
681 Some(SyncCommit {
682 commit_seq: commit.commit_seq,
683 created_at: self.created_at(commit.commit_seq),
684 actor_id: self.options.actor_id.clone(),
685 changes,
686 })
687 }
688 })
689 .collect::<Vec<_>>()
690 })
691 .unwrap_or_default();
692 match self.options.delivery_mode {
693 AppTestServerDeliveryMode::Normal => {}
694 AppTestServerDeliveryMode::ReverseAndDuplicate => {
695 commits.reverse();
696 let duplicates = commits.clone();
697 commits.extend(duplicates);
698 }
699 }
700
701 SubscriptionResponse {
702 id: subscription.id.clone(),
703 status: "active".to_string(),
704 scopes: subscription.scopes.clone(),
705 bootstrap: false,
706 bootstrap_state: None,
707 next_cursor,
708 integrity: None,
709 commits,
710 snapshots: None,
711 }
712 }
713
714 fn append_commit_locked(
715 &self,
716 state: &mut AppTestServerState,
717 client_id: String,
718 changes: Vec<SyncChange>,
719 ) -> i64 {
720 let commit_seq = state.next_commit_seq;
721 state.next_commit_seq = state.next_commit_seq.saturating_add(1);
722 state.commits.push(AppTestServerCommit {
723 commit_seq,
724 client_id,
725 changes,
726 });
727 if self.options.emit_realtime_sync {
728 state.realtime_events.push_back(RealtimeEvent::Sync);
729 }
730 commit_seq
731 }
732
733 fn created_at(&self, commit_seq: i64) -> String {
734 format!("{}.{commit_seq:03}Z", self.options.created_at_prefix)
735 }
736
737 fn is_authorized_locked(&self, state: &AppTestServerState) -> bool {
738 match state.required_authorization.as_ref() {
739 Some(required) => {
740 state
741 .auth_headers
742 .last()
743 .and_then(|headers| headers.get("authorization"))
744 == Some(required)
745 }
746 None => true,
747 }
748 }
749}
750
751impl SyncAuthHeaderStore for AppTestServer {
752 fn set_auth_headers(&mut self, headers: SyncAuthHeaders) {
753 self.record_auth_headers(headers);
754 }
755}
756
757impl SyncTransport for AppTestServer {
758 type Realtime = AppTestRealtime;
759
760 fn post_sync(&self, request: &CombinedRequest) -> Result<CombinedResponse> {
761 let mut state = self.state.lock().expect("app test server state");
762 self.post_sync_locked(&mut state, request)
763 }
764
765 fn fetch_snapshot_chunk_rows(
766 &self,
767 _chunk: &SnapshotChunkRef,
768 _scopes: &Map<String, Value>,
769 ) -> Result<SnapshotChunkRows> {
770 Ok(SnapshotChunkRows::Json(Vec::new()))
771 }
772
773 fn connect_realtime(&self) -> Result<Self::Realtime> {
774 Ok(AppTestRealtime {
775 app_schema: self.app_schema,
776 options: self.options.clone(),
777 state: self.state.clone(),
778 })
779 }
780}
781
782impl RealtimeTransport for AppTestRealtime {
783 fn push_commit(&mut self, commit: PushCommitRequest) -> Result<PushCommitResponse> {
784 let server = AppTestServer {
785 app_schema: self.app_schema,
786 options: self.options.clone(),
787 state: self.state.clone(),
788 };
789 let mut state = self.state.lock().expect("app test server state");
790 state.ws_pushes.push(commit.clone());
791 let client_id = server.options.actor_id.clone();
792 server.apply_push_commit_locked(&mut state, &client_id, &commit)
793 }
794
795 fn read_event(&mut self) -> Result<Option<RealtimeEvent>> {
796 Ok(self
797 .state
798 .lock()
799 .expect("app test server state")
800 .realtime_events
801 .pop_front())
802 }
803
804 fn close(&mut self) {
805 self.state
806 .lock()
807 .expect("app test server state")
808 .closed_realtime_count += 1;
809 }
810}
811
812impl BlobTransport for AppTestServer {
813 fn upload_blob(&self, blob: &BlobRef, bytes: &[u8]) -> Result<()> {
814 self.state
815 .lock()
816 .expect("app test server state")
817 .blobs
818 .insert(blob.hash.clone(), bytes.to_vec());
819 Ok(())
820 }
821
822 fn download_blob(&self, blob: &BlobRef) -> Result<Vec<u8>> {
823 self.state
824 .lock()
825 .expect("app test server state")
826 .blobs
827 .get(&blob.hash)
828 .cloned()
829 .ok_or_else(|| {
830 SyncularError::message(
831 ErrorKind::Transport,
832 format!("app test server blob not found: {}", blob.hash),
833 )
834 })
835 }
836}
837
838fn next_server_version_locked(state: &mut AppTestServerState) -> i64 {
839 let version = state.next_server_version;
840 state.next_server_version = state.next_server_version.saturating_add(1);
841 version
842}
843
844fn bump_next_server_version_locked(state: &mut AppTestServerState, version: i64) {
845 state.next_server_version = state.next_server_version.max(version.saturating_add(1));
846}
847
848fn row_id_from_row(metadata: &AppTableMetadata, row: &Value) -> Result<String> {
849 row.get(metadata.primary_key_column)
850 .and_then(Value::as_str)
851 .map(str::to_string)
852 .ok_or_else(|| {
853 SyncularError::protocol_message(format!(
854 "row for table {} is missing text primary key {}",
855 metadata.name, metadata.primary_key_column
856 ))
857 })
858}
859
860fn row_server_version(metadata: &AppTableMetadata, row: &Value) -> Option<i64> {
861 row.get(metadata.server_version_column)
862 .and_then(Value::as_i64)
863}
864
865fn scopes_for_row(metadata: &AppTableMetadata, row: &Value) -> ScopeValues {
866 let mut scopes = Map::new();
867 for scope in metadata.scopes {
868 if let Some(value) = row.get(scope.column) {
869 scopes.insert(scope.name.to_string(), value.clone());
870 }
871 }
872 scopes
873}
874
875fn row_matches_scopes(metadata: &AppTableMetadata, row: &Value, scopes: &ScopeValues) -> bool {
876 metadata.scopes.iter().all(|scope| {
877 let Some(expected) = scopes.get(scope.name) else {
878 return !scope.required;
879 };
880 row.get(scope.column) == Some(expected)
881 })
882}
883
884fn change_matches_scopes(
885 metadata: &AppTableMetadata,
886 change: &SyncChange,
887 scopes: &ScopeValues,
888) -> bool {
889 if let Some(row) = &change.row_json {
890 if row_matches_scopes(metadata, row, scopes) {
891 return true;
892 }
893 }
894 metadata.scopes.iter().all(|scope| {
895 let Some(expected) = scopes.get(scope.name) else {
896 return !scope.required;
897 };
898 change.scopes.get(scope.name) == Some(expected)
899 })
900}
901
902fn is_server_merge_yjs_operation(operation: &SyncOperation, metadata: &AppTableMetadata) -> bool {
903 let Some(Value::Object(payload)) = &operation.payload else {
904 return false;
905 };
906 let Some(Value::Object(envelope)) = payload.get(YJS_PAYLOAD_KEY) else {
907 return false;
908 };
909 metadata.crdt_yjs_fields.iter().any(|field| {
910 (field.sync_mode == "server-merge" || field.sync_mode.is_empty())
911 && envelope.contains_key(field.field)
912 })
913}
914
915fn merged_server_row(
916 metadata: &AppTableMetadata,
917 row_id: &str,
918 existing_row: Option<Value>,
919 payload: Option<Value>,
920 version: i64,
921) -> Result<Value> {
922 let mut row = match existing_row {
923 Some(Value::Object(row)) => row,
924 Some(_) | None => Map::new(),
925 };
926 if let Some(payload) = payload {
927 let Value::Object(payload) = payload else {
928 return Err(SyncularError::protocol_message(format!(
929 "upsert payload for table {} must be an object",
930 metadata.name
931 )));
932 };
933 for (key, value) in payload {
934 row.insert(key, value);
935 }
936 }
937 row.insert(
938 metadata.primary_key_column.to_string(),
939 Value::String(row_id.to_string()),
940 );
941 row.insert(
942 metadata.server_version_column.to_string(),
943 Value::Number(version.into()),
944 );
945 Ok(Value::Object(row))
946}
947
948fn applied_result(index: usize, server_version: Option<i64>) -> OperationResult {
949 OperationResult {
950 op_index: index as i32,
951 status: "applied".to_string(),
952 message: None,
953 error: None,
954 code: None,
955 retriable: None,
956 server_version,
957 server_row: None,
958 }
959}
960
961fn unauthorized_error() -> SyncularError {
962 SyncularError::message(
963 ErrorKind::Transport,
964 "unauthorized: missing or invalid authorization header",
965 )
966}