1#[cfg(feature = "native")]
2use crate::app_schema::validate_app_schema_runtime_features;
3use crate::app_schema::{
4 default_app_schema, validate_auth_lease_payload_against_app_schema,
5 validate_blob_encryption_against_app_schema, validate_encrypted_crdt_against_app_schema,
6 validate_field_encryption_rules_against_app_schema, AppSchema, AppTableMetadata,
7};
8#[cfg(feature = "native")]
9use crate::app_schema::{
10 validate_auth_lease_issue_request_against_app_schema,
11 validate_auth_lease_issue_response_against_app_schema,
12 validate_blob_runtime_against_app_schema,
13};
14use crate::binary_snapshot::{
15 BinarySnapshotCell, BinarySnapshotPayload, BorrowedBinarySnapshotCell,
16 DecodedBinarySnapshotRows, SnapshotChunkRows,
17};
18use crate::command_history::{CommandHistoryEntry, CommandHistoryRecord, CommandHistoryState};
19#[cfg(feature = "native")]
20use crate::crdt_field::{
21 validate_crdt_field, CrdtDocumentSnapshot, CrdtField, CrdtFieldCompactionStats, CrdtFieldId,
22 CrdtFieldSyncMode, CrdtUpdateLogEntry,
23};
24#[cfg(feature = "native")]
25use crate::crdt_yjs::{
26 build_yjs_text_update, materialize_yjs_state, validate_crdt_request_json_size,
27 validate_yjs_text_input_size, validate_yjs_update_envelope_size, yjs_state_vector_base64,
28 BuildYjsTextUpdateArgs,
29};
30use crate::crdt_yjs::{YjsUpdateEnvelope, YJS_PAYLOAD_KEY};
31#[cfg(feature = "native")]
32use crate::diesel_sqlite::DieselSqliteStore;
33#[cfg(feature = "native")]
34use crate::encrypted_crdt::{
35 encrypted_crdt_stream_id, encrypted_field_metadata, BuildEncryptedCrdtCheckpointArgs,
36 BuildEncryptedCrdtTextUpdateArgs, BuildEncryptedCrdtYjsUpdateArgs, EncryptedCrdtStreamStats,
37};
38use crate::encrypted_crdt::{is_encrypted_crdt_system_table, EncryptedCrdt};
39use crate::encryption::{BlobEncryption, FieldEncryption, FieldEncryptionContext};
40use crate::error::{ErrorKind, Result, SyncularError};
41#[cfg(feature = "demo-todo-native-fixture")]
42use crate::fixtures::todo::rusqlite_sqlite::RusqliteStore;
43#[cfg(feature = "native")]
44use crate::limits::{DEFAULT_BLOB_UPLOAD_BATCH_LIMIT, DEFAULT_CRDT_UPDATE_QUEUE_CAPACITY};
45use crate::limits::{
46 DEFAULT_CRDT_STATE_VECTOR_HINT_LIMIT, DEFAULT_OUTBOX_PUSH_BATCH_LIMIT,
47 DEFAULT_PULL_LIMIT_COMMITS, DEFAULT_PULL_LIMIT_SNAPSHOT_ROWS, DEFAULT_PULL_MAX_SNAPSHOT_PAGES,
48 MAX_SCOPE_KEYS_PER_SUBSCRIPTION, MAX_SCOPE_VALUES_PER_CLIENT,
49 MAX_SCOPE_VALUES_PER_SUBSCRIPTION, MAX_SUBSCRIPTIONS_PER_CLIENT,
50 MAX_SUBSCRIPTION_PARAMS_PER_SUBSCRIPTION,
51};
52use crate::protocol::*;
53#[cfg(feature = "native")]
54use crate::store::next_blob_upload_retry_at;
55#[cfg(feature = "native")]
56use crate::store::MAX_BLOB_UPLOAD_RETRIES;
57use crate::store::{
58 next_retry_at, now_ms, OutboxCommit, SubscriptionState, SyncStateStore, SyncStore, SyncStoreTx,
59 VerifiedRoot, MAX_SYNC_RETRIES,
60};
61#[cfg(feature = "demo-todo-fixture")]
62use crate::store::{DemoTaskStore, Task};
63#[cfg(feature = "native")]
64use crate::transport::BlobTransport;
65#[cfg(feature = "native")]
66use crate::transport::{HttpSyncTransport, SyncTransportConfig};
67use crate::transport::{
68 RealtimeEvent, RealtimeTransport, SyncAuthHeaderStore, SyncAuthHeaders, SyncTransport,
69};
70#[cfg(feature = "native")]
71use crate::transport::{SyncAuthSigner, SyncAuthSignerStore};
72use serde::{Deserialize, Serialize};
73#[cfg(feature = "native")]
74use serde_json::json;
75use serde_json::{Map, Value};
76#[cfg(feature = "native")]
77use std::collections::BTreeMap;
78use std::collections::{HashMap, HashSet};
79use std::fmt;
80#[cfg(feature = "native")]
81use std::fs;
82#[cfg(feature = "native")]
83use std::fs::File;
84#[cfg(feature = "native")]
85use std::path::Path;
86use std::sync::{Mutex, OnceLock};
87use std::time::{Duration, SystemTime};
88#[cfg(feature = "demo-todo-fixture")]
89use uuid::Uuid;
90
91const DEFAULT_STATE_ID: &str = "default";
92pub const LOCAL_SYNC_DISABLED_BASE_URL: &str = "syncular-local://disabled";
93const MAX_PULL_ROUNDS: usize = 20;
94static ACTIVE_SYNC_KEYS: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
95
96#[derive(Debug, Clone)]
97pub struct SyncularClientConfig {
98 pub db_path: String,
99 pub base_url: String,
100 pub client_id: String,
101 pub actor_id: String,
102 pub project_id: Option<String>,
103}
104
105impl SyncularClientConfig {
106 pub fn local_sync_compatible(
107 db_path: impl Into<String>,
108 client_id: impl Into<String>,
109 actor_id: impl Into<String>,
110 project_id: Option<String>,
111 ) -> Self {
112 Self {
113 db_path: db_path.into(),
114 base_url: LOCAL_SYNC_DISABLED_BASE_URL.to_string(),
115 client_id: client_id.into(),
116 actor_id: actor_id.into(),
117 project_id,
118 }
119 }
120
121 pub fn is_local_sync_compatible(&self) -> bool {
122 self.base_url == LOCAL_SYNC_DISABLED_BASE_URL
123 }
124}
125
126pub trait SyncularMutationExecutor {
127 fn apply_mutation<M>(&mut self, mutation: M) -> Result<MutationReceipt>
128 where
129 M: IntoSyncularMutation;
130
131 fn apply_mutation_batch(&mut self, batch: SyncularMutationBatch) -> Result<MutationReceipt>;
132
133 fn commit_mutations<R>(
134 &mut self,
135 f: impl FnOnce(&mut SyncularMutationBatch) -> Result<R>,
136 ) -> Result<MutationCommit<R>> {
137 let mut batch = SyncularMutationBatch::new();
138 let result = f(&mut batch)?;
139 let commit = self.apply_mutation_batch(batch)?;
140 Ok(MutationCommit { result, commit })
141 }
142}
143
144pub trait SyncularLeasedMutationExecutor: SyncularMutationExecutor {
145 fn apply_leased_mutation<M>(&mut self, mutation: M) -> Result<MutationReceipt>
146 where
147 M: IntoSyncularMutation;
148
149 fn apply_leased_mutation_batch(
150 &mut self,
151 batch: SyncularMutationBatch,
152 ) -> Result<MutationReceipt>;
153
154 fn commit_leased_mutations<R>(
155 &mut self,
156 f: impl FnOnce(&mut SyncularMutationBatch) -> Result<R>,
157 ) -> Result<MutationCommit<R>> {
158 let mut batch = SyncularMutationBatch::new();
159 let result = f(&mut batch)?;
160 let commit = self.apply_leased_mutation_batch(batch)?;
161 Ok(MutationCommit { result, commit })
162 }
163}
164
165pub trait SyncularCommandHistoryExecutor: SyncularMutationExecutor {
166 fn command_history_current_row_json(
167 &mut self,
168 table: &str,
169 row_id: &str,
170 ) -> Result<Option<Value>>;
171
172 fn command_history_record(
173 &mut self,
174 mutation_scope: &str,
175 entries: &[CommandHistoryEntry],
176 receipt: &MutationReceipt,
177 ) -> Result<CommandHistoryRecord>;
178
179 fn command_history_latest(
180 &mut self,
181 state: CommandHistoryState,
182 ) -> Result<Option<CommandHistoryRecord>>;
183
184 fn command_history_mark(
185 &mut self,
186 id: &str,
187 state: CommandHistoryState,
188 receipt: &MutationReceipt,
189 ) -> Result<()>;
190
191 fn apply_command_history_batch(
192 &mut self,
193 mutation_scope: &str,
194 batch: SyncularMutationBatch,
195 ) -> Result<MutationReceipt>;
196
197 fn apply_command_history_tracked_batch(
198 &mut self,
199 mutation_scope: &str,
200 batch: SyncularMutationBatch,
201 ) -> Result<MutationReceipt> {
202 let pending = command_history_pending_entries(self, batch.mutations())?;
203 let receipt = self.apply_command_history_batch(mutation_scope, batch)?;
204 let entries = command_history_committed_entries(self, pending)?;
205 if !entries.is_empty() {
206 self.command_history_record(mutation_scope, &entries, &receipt)?;
207 }
208 Ok(receipt)
209 }
210}
211
212#[derive(Debug, Clone)]
213struct CommandHistoryPendingEntry {
214 table: String,
215 row_id: String,
216 before: Option<Value>,
217}
218
219fn command_history_pending_entries<C>(
220 client: &mut C,
221 mutations: &[PendingSyncularMutation],
222) -> Result<Vec<CommandHistoryPendingEntry>>
223where
224 C: SyncularCommandHistoryExecutor + ?Sized,
225{
226 let mut entries = Vec::new();
227 for mutation in mutations {
228 if entries.iter().any(|entry: &CommandHistoryPendingEntry| {
229 entry.table == mutation.table && entry.row_id == mutation.row_id
230 }) {
231 continue;
232 }
233 let before = client.command_history_current_row_json(&mutation.table, &mutation.row_id)?;
234 entries.push(CommandHistoryPendingEntry {
235 table: mutation.table.clone(),
236 row_id: mutation.row_id.clone(),
237 before,
238 });
239 }
240 Ok(entries)
241}
242
243fn command_history_committed_entries<C>(
244 client: &mut C,
245 pending: Vec<CommandHistoryPendingEntry>,
246) -> Result<Vec<CommandHistoryEntry>>
247where
248 C: SyncularCommandHistoryExecutor + ?Sized,
249{
250 let mut entries = Vec::new();
251 for entry in pending {
252 let after = client.command_history_current_row_json(&entry.table, &entry.row_id)?;
253 if entry.before == after {
254 continue;
255 }
256 entries.push(CommandHistoryEntry {
257 table: entry.table,
258 row_id: entry.row_id,
259 before: entry.before,
260 after,
261 });
262 }
263 Ok(entries)
264}
265
266pub trait SyncularEncryptedCrdtMutationExecutor {
267 fn apply_encrypted_crdt_text_update(
268 &mut self,
269 metadata: &'static AppTableMetadata,
270 field: &'static str,
271 row_id: &str,
272 next_text: &str,
273 ) -> Result<MutationReceipt>;
274
275 fn apply_encrypted_crdt_yjs_update(
276 &mut self,
277 metadata: &'static AppTableMetadata,
278 field: &'static str,
279 row_id: &str,
280 update: YjsUpdateEnvelope,
281 ) -> Result<MutationReceipt>;
282
283 fn apply_encrypted_crdt_checkpoint(
284 &mut self,
285 metadata: &'static AppTableMetadata,
286 field: &'static str,
287 row_id: &str,
288 min_uncheckpointed_updates: i64,
289 ) -> Result<Option<MutationReceipt>>;
290}
291
292#[cfg(feature = "native")]
293#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
294#[serde(rename_all = "camelCase")]
295pub struct CrdtFieldWriteReceipt {
296 pub client_commit_id: String,
297 pub sync_mode: CrdtFieldSyncMode,
298}
299
300#[cfg(feature = "native")]
301#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
302#[serde(rename_all = "camelCase")]
303pub struct CrdtFieldMaterialization {
304 pub value: Value,
305 pub state_base64: Option<String>,
306 pub state_vector_base64: String,
307}
308
309#[cfg(feature = "native")]
310#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
311#[serde(rename_all = "camelCase")]
312pub struct CrdtFieldCompactionReceipt {
313 pub checkpoint_created: bool,
314 pub client_commit_id: Option<String>,
315 pub before: CrdtFieldCompactionStats,
316 pub after: CrdtFieldCompactionStats,
317 pub encrypted_stream_before: Option<EncryptedCrdtStreamStats>,
318 pub encrypted_stream_after: Option<EncryptedCrdtStreamStats>,
319}
320
321#[cfg(feature = "native")]
322#[derive(Debug, Clone, Deserialize)]
323#[serde(rename_all = "camelCase")]
324struct EncryptedCrdtUpdateJsonRequest {
325 table: String,
326 field: String,
327 row_id: String,
328 #[serde(default)]
329 next_text: Option<String>,
330 #[serde(default)]
331 update: Option<YjsUpdateEnvelope>,
332}
333
334#[cfg(feature = "native")]
335#[derive(Debug, Clone, Deserialize)]
336#[serde(rename_all = "camelCase")]
337struct EncryptedCrdtCheckpointJsonRequest {
338 table: String,
339 field: String,
340 row_id: String,
341 #[serde(default)]
342 min_uncheckpointed_updates: Option<i64>,
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize)]
346pub struct SubscriptionSpec {
347 pub id: String,
348 pub table: String,
349 pub scopes: ScopeValues,
350 pub params: Map<String, Value>,
351 #[serde(default, rename = "bootstrapPhase")]
352 pub bootstrap_phase: i64,
353}
354
355pub fn validate_subscription_limits(subscriptions: &[SubscriptionSpec]) -> Result<()> {
356 if subscriptions.len() > MAX_SUBSCRIPTIONS_PER_CLIENT {
357 return Err(subscription_limit_error(
358 "maxSubscriptionsPerClient",
359 subscriptions.len(),
360 MAX_SUBSCRIPTIONS_PER_CLIENT,
361 "too many Syncular subscriptions configured",
362 ));
363 }
364
365 let mut total_scope_values = 0usize;
366 for subscription in subscriptions {
367 if subscription.scopes.len() > MAX_SCOPE_KEYS_PER_SUBSCRIPTION {
368 return Err(subscription_limit_error(
369 "maxScopeKeysPerSubscription",
370 subscription.scopes.len(),
371 MAX_SCOPE_KEYS_PER_SUBSCRIPTION,
372 format!(
373 "too many scope keys configured for subscription {}",
374 subscription.id
375 ),
376 ));
377 }
378 if subscription.params.len() > MAX_SUBSCRIPTION_PARAMS_PER_SUBSCRIPTION {
379 return Err(subscription_limit_error(
380 "maxSubscriptionParamsPerSubscription",
381 subscription.params.len(),
382 MAX_SUBSCRIPTION_PARAMS_PER_SUBSCRIPTION,
383 format!(
384 "too many params configured for subscription {}",
385 subscription.id
386 ),
387 ));
388 }
389
390 let subscription_scope_values = count_scope_values(&subscription.scopes);
391 if subscription_scope_values > MAX_SCOPE_VALUES_PER_SUBSCRIPTION {
392 return Err(subscription_limit_error(
393 "maxScopeValuesPerSubscription",
394 subscription_scope_values,
395 MAX_SCOPE_VALUES_PER_SUBSCRIPTION,
396 format!(
397 "too many scope values configured for subscription {}",
398 subscription.id
399 ),
400 ));
401 }
402 total_scope_values = total_scope_values.saturating_add(subscription_scope_values);
403 if total_scope_values > MAX_SCOPE_VALUES_PER_CLIENT {
404 return Err(subscription_limit_error(
405 "maxScopeValuesPerClient",
406 total_scope_values,
407 MAX_SCOPE_VALUES_PER_CLIENT,
408 "too many total scope values configured for Syncular client",
409 ));
410 }
411 }
412
413 Ok(())
414}
415
416fn count_scope_values(scopes: &ScopeValues) -> usize {
417 scopes
418 .values()
419 .map(|value| value.as_array().map_or(1, Vec::len))
420 .sum()
421}
422
423fn subscription_limit_error(
424 limit: &'static str,
425 observed: usize,
426 max: usize,
427 message: impl fmt::Display,
428) -> SyncularError {
429 SyncularError::limit_exceeded(limit, observed, max, message)
430}
431
432#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
433#[serde(rename_all = "camelCase")]
434pub struct SyncChangedCrdtField {
435 pub field: String,
436 pub state_column: String,
437 pub container_key: String,
438 pub row_id_field: String,
439 pub kind: String,
440 pub sync_mode: String,
441}
442
443#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
444#[serde(rename_all = "camelCase")]
445pub struct SyncChangedRow {
446 pub table: String,
447 #[serde(skip_serializing_if = "Option::is_none")]
448 pub row_id: Option<String>,
449 pub operation: String,
450 #[serde(default)]
451 pub changed_fields: Vec<String>,
452 #[serde(default)]
453 pub crdt_fields: Vec<String>,
454 #[serde(default, skip_serializing_if = "Vec::is_empty")]
455 pub crdt_field_changes: Vec<SyncChangedCrdtField>,
456 #[serde(skip_serializing_if = "Option::is_none")]
457 pub commit_id: Option<String>,
458 #[serde(skip_serializing_if = "Option::is_none")]
459 pub commit_seq: Option<i64>,
460 #[serde(skip_serializing_if = "Option::is_none")]
461 pub subscription_id: Option<String>,
462 #[serde(skip_serializing_if = "Option::is_none")]
463 pub server_version: Option<i64>,
464}
465
466#[derive(Debug, Clone, Default, PartialEq, Eq)]
467pub struct SyncReport {
468 pub changed_tables: Vec<String>,
469 pub changed_rows: Vec<SyncChangedRow>,
470 pub conflicts_changed: bool,
471}
472
473#[cfg(feature = "native")]
474#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
475#[serde(rename_all = "camelCase")]
476pub struct BootstrapStatus {
477 pub channel_phase: String,
478 pub progress_percent: i64,
479 pub is_bootstrapping: bool,
480 pub critical_ready: bool,
481 pub interactive_ready: bool,
482 pub complete: bool,
483 pub active_phase: Option<i64>,
484 pub expected_subscription_ids: Vec<String>,
485 pub ready_subscription_ids: Vec<String>,
486 pub pending_subscription_ids: Vec<String>,
487 pub subscriptions: Vec<BootstrapSubscriptionStatus>,
488 pub phases: Vec<BootstrapPhaseStatus>,
489}
490
491#[cfg(feature = "native")]
492#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
493#[serde(rename_all = "camelCase")]
494pub struct BootstrapSubscriptionStatus {
495 pub id: String,
496 pub table: String,
497 pub expected: bool,
498 pub ready: bool,
499 pub status: Option<String>,
500 pub phase: String,
501 pub progress_percent: i64,
502 pub cursor: Option<i64>,
503 pub bootstrap_state: Option<BootstrapState>,
504 pub bootstrap_phase: i64,
505}
506
507#[cfg(feature = "native")]
508#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
509#[serde(rename_all = "camelCase")]
510pub struct BootstrapPhaseStatus {
511 pub phase: i64,
512 pub expected_subscription_ids: Vec<String>,
513 pub ready_subscription_ids: Vec<String>,
514 pub pending_subscription_ids: Vec<String>,
515 pub is_ready: bool,
516 pub progress_percent: i64,
517}
518
519#[derive(Debug, Clone)]
520struct PreparedSnapshot {
521 snapshot: SyncSnapshot,
522 chunk_batches: Vec<SnapshotChunkRows>,
523 artifact_rows: Vec<Value>,
524}
525
526impl SyncReport {
527 pub fn table_changed(table: impl Into<String>) -> Self {
528 Self {
529 changed_tables: vec![table.into()],
530 changed_rows: Vec::new(),
531 conflicts_changed: false,
532 }
533 }
534
535 pub fn tables_changed<I, Table>(tables: I) -> Self
536 where
537 I: IntoIterator<Item = Table>,
538 Table: Into<String>,
539 {
540 let mut report = Self::default();
541 for table in tables {
542 report.add_changed_table(&table.into());
543 }
544 report
545 }
546
547 pub fn conflicts_changed() -> Self {
548 Self {
549 changed_tables: Vec::new(),
550 changed_rows: Vec::new(),
551 conflicts_changed: true,
552 }
553 }
554
555 fn add_changed_table(&mut self, table: &str) {
556 if !self.changed_tables.iter().any(|existing| existing == table) {
557 self.changed_tables.push(table.to_string());
558 }
559 }
560
561 pub fn changes_table(&self, table: &str) -> bool {
562 self.changed_tables.iter().any(|existing| existing == table)
563 }
564
565 pub fn changes_any_table<'a>(&self, tables: impl IntoIterator<Item = &'a str>) -> bool {
566 tables.into_iter().any(|table| self.changes_table(table))
567 }
568
569 fn add_changed_row(&mut self, row: SyncChangedRow) {
570 self.add_changed_table(&row.table);
571 self.changed_rows.push(row);
572 }
573
574 fn merge(&mut self, other: SyncReport) {
575 self.conflicts_changed |= other.conflicts_changed;
576 for table in other.changed_tables {
577 self.add_changed_table(&table);
578 }
579 self.changed_rows.extend(other.changed_rows);
580 }
581}
582
583pub fn sync_changed_row_for_operation(
584 app_schema: AppSchema,
585 operation: &SyncOperation,
586 commit_id: Option<String>,
587) -> Option<SyncChangedRow> {
588 sync_changed_row_for_local_operation(app_schema, operation, None, None, commit_id)
589}
590
591pub fn sync_changed_row_for_local_operation(
592 app_schema: AppSchema,
593 operation: &SyncOperation,
594 previous_row: Option<&Value>,
595 local_row: Option<&Value>,
596 commit_id: Option<String>,
597) -> Option<SyncChangedRow> {
598 let metadata = app_schema.table_metadata(&operation.table)?;
599 let changed_fields = if operation.op == "delete" {
600 Vec::new()
601 } else if let Some(local_row) = local_row {
602 changed_fields_from_row_diff(metadata, previous_row, Some(local_row))
603 } else {
604 changed_fields_from_payload(metadata, operation.payload.as_ref())
605 };
606 let operation_kind = if operation.op == "upsert" {
607 if previous_row.is_some() {
608 "update"
609 } else {
610 "insert"
611 }
612 } else {
613 operation.op.as_str()
614 };
615 let crdt_field_changes = crdt_field_changes_for_fields(metadata, &changed_fields);
616 let crdt_fields = crdt_state_columns_for_changes(&crdt_field_changes);
617 Some(SyncChangedRow {
618 table: operation.table.clone(),
619 row_id: Some(operation.row_id.clone()),
620 operation: operation_kind.to_string(),
621 crdt_fields,
622 crdt_field_changes,
623 changed_fields,
624 commit_id,
625 commit_seq: None,
626 subscription_id: None,
627 server_version: operation.base_version,
628 })
629}
630
631pub fn sync_changed_row_for_change(
632 app_schema: AppSchema,
633 change: &SyncChange,
634 previous_row: Option<&Value>,
635 commit_seq: i64,
636 subscription_id: &str,
637) -> Option<SyncChangedRow> {
638 let metadata = app_schema.table_metadata(&change.table)?;
639 let changed_fields = changed_fields_from_remote_change(metadata, change, previous_row);
640 let crdt_field_changes = crdt_field_changes_for_fields(metadata, &changed_fields);
641 let crdt_fields = crdt_state_columns_for_changes(&crdt_field_changes);
642 Some(SyncChangedRow {
643 table: change.table.clone(),
644 row_id: Some(change.row_id.clone()),
645 operation: if change.op == "delete" {
646 "delete".to_string()
647 } else if previous_row.is_some() {
648 "update".to_string()
649 } else {
650 "insert".to_string()
651 },
652 crdt_fields,
653 crdt_field_changes,
654 changed_fields,
655 commit_id: Some(commit_seq.to_string()),
656 commit_seq: Some(commit_seq),
657 subscription_id: Some(subscription_id.to_string()),
658 server_version: change.row_version,
659 })
660}
661
662pub fn sync_changed_row_for_snapshot(
663 app_schema: AppSchema,
664 table: &str,
665 row: &Value,
666 previous_row: Option<&Value>,
667 subscription_id: &str,
668) -> Option<SyncChangedRow> {
669 let metadata = app_schema.table_metadata(table)?;
670 let row_id = row
671 .get(metadata.primary_key_column)
672 .and_then(Value::as_str)
673 .map(str::to_string);
674 let changed_fields = changed_fields_from_row_diff(metadata, previous_row, Some(row));
675 let crdt_field_changes = crdt_field_changes_for_fields(metadata, &changed_fields);
676 let crdt_fields = crdt_state_columns_for_changes(&crdt_field_changes);
677 Some(SyncChangedRow {
678 table: table.to_string(),
679 row_id,
680 operation: if previous_row.is_some() {
681 "update".to_string()
682 } else {
683 "insert".to_string()
684 },
685 crdt_fields,
686 crdt_field_changes,
687 changed_fields,
688 commit_id: None,
689 commit_seq: None,
690 subscription_id: Some(subscription_id.to_string()),
691 server_version: row
692 .get(metadata.server_version_column)
693 .and_then(Value::as_i64),
694 })
695}
696
697pub(crate) fn sync_changed_rows_for_cleared_snapshot_chunk(
698 app_schema: AppSchema,
699 table: &str,
700 rows: &SnapshotChunkRows,
701 subscription_id: &str,
702) -> Vec<SyncChangedRow> {
703 sync_changed_rows_for_cleared_snapshot_chunk_limited(
704 app_schema,
705 table,
706 rows,
707 subscription_id,
708 usize::MAX,
709 )
710 .0
711}
712
713pub(crate) fn sync_changed_rows_for_cleared_snapshot_chunk_limited(
714 app_schema: AppSchema,
715 table: &str,
716 rows: &SnapshotChunkRows,
717 subscription_id: &str,
718 limit: usize,
719) -> (Vec<SyncChangedRow>, bool) {
720 match rows {
721 SnapshotChunkRows::Json(rows) => (
722 rows.iter()
723 .take(limit)
724 .filter_map(|row| {
725 sync_changed_row_for_snapshot(app_schema, table, row, None, subscription_id)
726 })
727 .collect(),
728 rows.len() > limit,
729 ),
730 SnapshotChunkRows::Binary(rows) => sync_changed_rows_for_cleared_binary_snapshot_chunk(
731 app_schema,
732 table,
733 rows,
734 subscription_id,
735 limit,
736 ),
737 SnapshotChunkRows::BinaryPayload(rows) => {
738 sync_changed_rows_for_cleared_binary_snapshot_payload(
739 app_schema,
740 table,
741 rows,
742 subscription_id,
743 limit,
744 )
745 }
746 }
747}
748
749fn sync_changed_rows_for_cleared_binary_snapshot_chunk(
750 app_schema: AppSchema,
751 table: &str,
752 rows: &DecodedBinarySnapshotRows,
753 subscription_id: &str,
754 limit: usize,
755) -> (Vec<SyncChangedRow>, bool) {
756 sync_changed_rows_for_cleared_binary_snapshot_chunk_limited(
757 app_schema,
758 table,
759 rows,
760 subscription_id,
761 limit,
762 )
763 .unwrap_or_default()
764}
765
766fn sync_changed_rows_for_cleared_binary_snapshot_chunk_limited(
767 app_schema: AppSchema,
768 table: &str,
769 rows: &DecodedBinarySnapshotRows,
770 subscription_id: &str,
771 limit: usize,
772) -> Result<(Vec<SyncChangedRow>, bool)> {
773 let Some(metadata) = app_schema.table_metadata(table) else {
774 return Ok((Vec::new(), false));
775 };
776 let Some(primary_key_index) = rows
777 .columns
778 .iter()
779 .position(|column| column.name == metadata.primary_key_column)
780 else {
781 return Ok((Vec::new(), false));
782 };
783 let server_version_index = rows
784 .columns
785 .iter()
786 .position(|column| column.name == metadata.server_version_column);
787 let present_columns = rows
788 .columns
789 .iter()
790 .map(|column| column.name.as_str())
791 .collect::<HashSet<_>>();
792 let changed_fields = metadata
793 .columns
794 .iter()
795 .filter_map(|column| {
796 if column.name == metadata.primary_key_column || !present_columns.contains(column.name)
797 {
798 return None;
799 }
800 Some(column.name.to_string())
801 })
802 .collect::<Vec<_>>();
803 let crdt_field_changes = crdt_field_changes_for_fields(metadata, &changed_fields);
804 let crdt_fields = crdt_state_columns_for_changes(&crdt_field_changes);
805 Ok((
806 rows.rows
807 .iter()
808 .take(limit)
809 .map(|row| SyncChangedRow {
810 table: table.to_string(),
811 row_id: row
812 .get(primary_key_index)
813 .and_then(binary_snapshot_cell_row_id),
814 operation: "insert".to_string(),
815 crdt_fields: crdt_fields.clone(),
816 crdt_field_changes: crdt_field_changes.clone(),
817 changed_fields: changed_fields.clone(),
818 commit_id: None,
819 commit_seq: None,
820 subscription_id: Some(subscription_id.to_string()),
821 server_version: server_version_index
822 .and_then(|index| row.get(index))
823 .and_then(binary_snapshot_cell_i64),
824 })
825 .collect(),
826 rows.rows.len() > limit,
827 ))
828}
829
830fn binary_snapshot_cell_row_id(cell: &BinarySnapshotCell) -> Option<String> {
831 match cell {
832 BinarySnapshotCell::String(value) => Some(value.clone()),
833 BinarySnapshotCell::Integer(value) => Some(value.to_string()),
834 _ => None,
835 }
836}
837
838fn binary_snapshot_cell_i64(cell: &BinarySnapshotCell) -> Option<i64> {
839 match cell {
840 BinarySnapshotCell::Integer(value) => Some(*value),
841 BinarySnapshotCell::String(value) => value.parse().ok(),
842 _ => None,
843 }
844}
845
846fn sync_changed_rows_for_cleared_binary_snapshot_payload(
847 app_schema: AppSchema,
848 table: &str,
849 payload: &BinarySnapshotPayload,
850 subscription_id: &str,
851 limit: usize,
852) -> (Vec<SyncChangedRow>, bool) {
853 sync_changed_rows_for_cleared_binary_snapshot_payload_limited(
854 app_schema,
855 table,
856 payload,
857 subscription_id,
858 limit,
859 )
860 .unwrap_or_default()
861}
862
863fn sync_changed_rows_for_cleared_binary_snapshot_payload_limited(
864 app_schema: AppSchema,
865 table: &str,
866 payload: &BinarySnapshotPayload,
867 subscription_id: &str,
868 limit: usize,
869) -> Result<(Vec<SyncChangedRow>, bool)> {
870 let truncated = payload.row_count() > limit;
871 if limit == 0 {
872 return Ok((Vec::new(), truncated));
873 }
874 let Some(metadata) = app_schema.table_metadata(table) else {
875 return Ok((Vec::new(), false));
876 };
877 let Some(primary_key_index) = payload
878 .columns
879 .iter()
880 .position(|column| column.name == metadata.primary_key_column)
881 else {
882 return Ok((Vec::new(), false));
883 };
884 let server_version_index = payload
885 .columns
886 .iter()
887 .position(|column| column.name == metadata.server_version_column);
888 let present_columns = payload
889 .columns
890 .iter()
891 .map(|column| column.name.as_str())
892 .collect::<HashSet<_>>();
893 let changed_fields = metadata
894 .columns
895 .iter()
896 .filter_map(|column| {
897 if column.name == metadata.primary_key_column || !present_columns.contains(column.name)
898 {
899 return None;
900 }
901 Some(column.name.to_string())
902 })
903 .collect::<Vec<_>>();
904 let crdt_field_changes = crdt_field_changes_for_fields(metadata, &changed_fields);
905 let crdt_fields = crdt_state_columns_for_changes(&crdt_field_changes);
906 let mut cursor = payload.row_cursor();
907 let row_limit = payload.row_count().min(limit);
908 let mut rows = Vec::with_capacity(row_limit);
909 for _ in 0..row_limit {
910 let mut row_id = None;
911 let mut server_version = None;
912 let read = cursor.read_next_row(|column_index, _column, cell| {
913 if column_index == primary_key_index {
914 row_id = borrowed_binary_snapshot_cell_row_id(cell);
915 }
916 if Some(column_index) == server_version_index {
917 server_version = borrowed_binary_snapshot_cell_i64(cell);
918 }
919 Ok(())
920 })?;
921 if !read {
922 break;
923 }
924 rows.push(SyncChangedRow {
925 table: table.to_string(),
926 row_id,
927 operation: "insert".to_string(),
928 crdt_fields: crdt_fields.clone(),
929 crdt_field_changes: crdt_field_changes.clone(),
930 changed_fields: changed_fields.clone(),
931 commit_id: None,
932 commit_seq: None,
933 subscription_id: Some(subscription_id.to_string()),
934 server_version,
935 });
936 }
937 if !truncated {
938 cursor.assert_done()?;
939 }
940 Ok((rows, truncated))
941}
942
943fn borrowed_binary_snapshot_cell_row_id(cell: BorrowedBinarySnapshotCell<'_>) -> Option<String> {
944 match cell {
945 BorrowedBinarySnapshotCell::String(value) => Some(value.to_string()),
946 BorrowedBinarySnapshotCell::Integer(value) => Some(value.to_string()),
947 _ => None,
948 }
949}
950
951fn borrowed_binary_snapshot_cell_i64(cell: BorrowedBinarySnapshotCell<'_>) -> Option<i64> {
952 match cell {
953 BorrowedBinarySnapshotCell::Integer(value) => Some(value),
954 BorrowedBinarySnapshotCell::String(value) => value.parse().ok(),
955 _ => None,
956 }
957}
958
959#[cfg(test)]
960mod changed_rows_tests {
961 use super::*;
962 use crate::app_schema::{ColumnMetadata, EmbeddedMigration};
963 use crate::binary_snapshot::decode_binary_snapshot_payload;
964
965 static TEST_COLUMNS: [ColumnMetadata; 3] = [
966 ColumnMetadata {
967 name: "id",
968 type_family: "text",
969 notnull_required: true,
970 primary_key: true,
971 },
972 ColumnMetadata {
973 name: "title",
974 type_family: "text",
975 notnull_required: false,
976 primary_key: false,
977 },
978 ColumnMetadata {
979 name: "server_version",
980 type_family: "integer",
981 notnull_required: true,
982 primary_key: false,
983 },
984 ];
985
986 static TEST_TABLES: [&str; 1] = ["tasks"];
987 static TEST_TABLE_METADATA: [AppTableMetadata; 1] = [AppTableMetadata {
988 name: "tasks",
989 primary_key_column: "id",
990 server_version_column: "server_version",
991 soft_delete_column: None,
992 subscription_id: "tasks",
993 columns: &TEST_COLUMNS,
994 blob_columns: &[],
995 crdt_yjs_fields: &[],
996 encrypted_fields: &[],
997 scopes: &[],
998 }];
999 static TEST_MIGRATIONS: [EmbeddedMigration; 0] = [];
1000
1001 fn default_subscriptions(_: &SyncularClientConfig) -> Vec<SubscriptionSpec> {
1002 Vec::new()
1003 }
1004
1005 #[cfg(feature = "native")]
1006 fn adapter_for(_: &str) -> Result<&'static dyn crate::app_schema::DieselTableAdapter> {
1007 Err(SyncularError::config("test schema has no diesel adapter"))
1008 }
1009
1010 fn test_schema() -> AppSchema {
1011 AppSchema {
1012 app_tables: &TEST_TABLES,
1013 app_table_metadata: &TEST_TABLE_METADATA,
1014 migrations: &TEST_MIGRATIONS,
1015 schema_version: Some(1),
1016 default_subscriptions,
1017 #[cfg(feature = "native")]
1018 adapter_for,
1019 }
1020 }
1021
1022 #[test]
1023 fn builds_changed_rows_from_binary_snapshot_payload() {
1024 let payload = decode_binary_snapshot_payload(binary_snapshot_bytes()).unwrap();
1025 let rows = sync_changed_rows_for_cleared_snapshot_chunk(
1026 test_schema(),
1027 "tasks",
1028 &SnapshotChunkRows::BinaryPayload(payload),
1029 "sub-tasks",
1030 );
1031
1032 assert_eq!(rows.len(), 2);
1033 assert_eq!(rows[0].row_id.as_deref(), Some("task-1"));
1034 assert_eq!(rows[0].operation, "insert");
1035 assert_eq!(rows[0].changed_fields, vec!["title", "server_version"]);
1036 assert_eq!(rows[0].server_version, Some(41));
1037 assert_eq!(rows[0].subscription_id.as_deref(), Some("sub-tasks"));
1038 assert_eq!(rows[1].row_id.as_deref(), Some("task-2"));
1039 assert_eq!(rows[1].server_version, Some(42));
1040
1041 let payload = decode_binary_snapshot_payload(binary_snapshot_bytes()).unwrap();
1042 let (limited, truncated) = sync_changed_rows_for_cleared_snapshot_chunk_limited(
1043 test_schema(),
1044 "tasks",
1045 &SnapshotChunkRows::BinaryPayload(payload),
1046 "sub-tasks",
1047 1,
1048 );
1049 assert!(truncated);
1050 assert_eq!(limited.len(), 1);
1051 assert_eq!(limited[0].row_id.as_deref(), Some("task-1"));
1052 }
1053
1054 fn binary_snapshot_bytes() -> Vec<u8> {
1055 let mut bytes = Vec::new();
1056 bytes.extend_from_slice(b"SBT1");
1057 push_u16(&mut bytes, 1);
1058 push_u16(&mut bytes, 0);
1059 push_string16(&mut bytes, "tasks");
1060 push_u16(&mut bytes, 3);
1061 for (name, tag, flags) in [
1062 ("id", 1u8, 0u8),
1063 ("title", 1u8, 0u8),
1064 ("server_version", 2u8, 0u8),
1065 ] {
1066 push_string16(&mut bytes, name);
1067 bytes.push(tag);
1068 bytes.push(flags);
1069 }
1070 push_u32(&mut bytes, 2);
1071
1072 bytes.push(0);
1073 push_string32(&mut bytes, "task-1");
1074 push_string32(&mut bytes, "First");
1075 push_i64(&mut bytes, 41);
1076
1077 bytes.push(0);
1078 push_string32(&mut bytes, "task-2");
1079 push_string32(&mut bytes, "Second");
1080 push_i64(&mut bytes, 42);
1081 bytes
1082 }
1083
1084 fn push_u16(bytes: &mut Vec<u8>, value: u16) {
1085 bytes.extend_from_slice(&value.to_le_bytes());
1086 }
1087
1088 fn push_u32(bytes: &mut Vec<u8>, value: u32) {
1089 bytes.extend_from_slice(&value.to_le_bytes());
1090 }
1091
1092 fn push_i64(bytes: &mut Vec<u8>, value: i64) {
1093 bytes.extend_from_slice(&value.to_le_bytes());
1094 }
1095
1096 fn push_string16(bytes: &mut Vec<u8>, value: &str) {
1097 push_u16(bytes, value.len() as u16);
1098 bytes.extend_from_slice(value.as_bytes());
1099 }
1100
1101 fn push_string32(bytes: &mut Vec<u8>, value: &str) {
1102 push_u32(bytes, value.len() as u32);
1103 bytes.extend_from_slice(value.as_bytes());
1104 }
1105}
1106
1107fn changed_fields_from_remote_change(
1108 metadata: &AppTableMetadata,
1109 change: &SyncChange,
1110 previous_row: Option<&Value>,
1111) -> Vec<String> {
1112 if change.op == "delete" {
1113 return Vec::new();
1114 }
1115 let Some(row) = change.row_json.as_ref() else {
1116 return Vec::new();
1117 };
1118 if row
1119 .as_object()
1120 .is_some_and(|object| object.contains_key(YJS_PAYLOAD_KEY))
1121 {
1122 return changed_fields_from_yjs_envelope(metadata, row);
1123 }
1124 changed_fields_from_row_diff(metadata, previous_row, Some(row))
1125}
1126
1127fn changed_fields_from_row_diff(
1128 metadata: &AppTableMetadata,
1129 previous_row: Option<&Value>,
1130 next_row: Option<&Value>,
1131) -> Vec<String> {
1132 let Some(next_row) = next_row.and_then(Value::as_object) else {
1133 return Vec::new();
1134 };
1135 let previous_row = previous_row.and_then(Value::as_object);
1136 metadata
1137 .columns
1138 .iter()
1139 .filter_map(|column| {
1140 if column.name == metadata.primary_key_column || !next_row.contains_key(column.name) {
1141 return None;
1142 }
1143 match previous_row.and_then(|row| row.get(column.name)) {
1144 Some(previous) if Some(previous) == next_row.get(column.name) => None,
1145 _ => Some(column.name.to_string()),
1146 }
1147 })
1148 .collect()
1149}
1150
1151fn changed_fields_from_payload(
1152 metadata: &AppTableMetadata,
1153 payload: Option<&Value>,
1154) -> Vec<String> {
1155 let Some(payload) = payload else {
1156 return Vec::new();
1157 };
1158 if payload
1159 .as_object()
1160 .is_some_and(|object| object.contains_key(YJS_PAYLOAD_KEY))
1161 {
1162 return changed_fields_from_yjs_envelope(metadata, payload);
1163 }
1164 let Some(payload) = payload.as_object() else {
1165 return Vec::new();
1166 };
1167 metadata
1168 .columns
1169 .iter()
1170 .filter_map(|column| {
1171 if column.name == metadata.primary_key_column || !payload.contains_key(column.name) {
1172 return None;
1173 }
1174 Some(column.name.to_string())
1175 })
1176 .collect()
1177}
1178
1179fn changed_fields_from_yjs_envelope(metadata: &AppTableMetadata, payload: &Value) -> Vec<String> {
1180 let Some(envelope) = payload.get(YJS_PAYLOAD_KEY).and_then(Value::as_object) else {
1181 return Vec::new();
1182 };
1183 let mut fields = Vec::new();
1184 for field_name in envelope.keys() {
1185 if let Some(field) = metadata
1186 .crdt_yjs_fields
1187 .iter()
1188 .find(|candidate| candidate.field == field_name.as_str())
1189 {
1190 push_unique(&mut fields, field.field);
1191 push_unique(&mut fields, field.state_column);
1192 }
1193 }
1194 fields
1195}
1196
1197fn crdt_field_changes_for_fields(
1198 metadata: &AppTableMetadata,
1199 changed_fields: &[String],
1200) -> Vec<SyncChangedCrdtField> {
1201 let changed = changed_fields
1202 .iter()
1203 .map(String::as_str)
1204 .collect::<HashSet<_>>();
1205 metadata
1206 .crdt_yjs_fields
1207 .iter()
1208 .filter_map(|field| {
1209 if changed.contains(field.field) || changed.contains(field.state_column) {
1210 Some(sync_changed_crdt_field_from_metadata(field))
1211 } else {
1212 None
1213 }
1214 })
1215 .collect()
1216}
1217
1218pub fn sync_changed_crdt_field_from_metadata(
1219 field: &crate::app_schema::CrdtYjsFieldMetadata,
1220) -> SyncChangedCrdtField {
1221 SyncChangedCrdtField {
1222 field: field.field.to_string(),
1223 state_column: field.state_column.to_string(),
1224 container_key: field.container_key.to_string(),
1225 row_id_field: field.row_id_field.to_string(),
1226 kind: field.kind.to_string(),
1227 sync_mode: field.sync_mode.to_string(),
1228 }
1229}
1230
1231fn crdt_state_columns_for_changes(changes: &[SyncChangedCrdtField]) -> Vec<String> {
1232 changes
1233 .iter()
1234 .map(|field| field.state_column.clone())
1235 .collect()
1236}
1237
1238fn push_unique(values: &mut Vec<String>, value: &str) {
1239 if !values.iter().any(|existing| existing == value) {
1240 values.push(value.to_string());
1241 }
1242}
1243
1244fn row_id_for_metadata(app_schema: AppSchema, table: &str, row: &Value) -> Option<String> {
1245 let metadata = app_schema.table_metadata(table)?;
1246 row.get(metadata.primary_key_column)
1247 .and_then(Value::as_str)
1248 .map(str::to_string)
1249}
1250
1251fn snapshot_clear_removes_all_rows(app_schema: AppSchema, table: &str) -> bool {
1252 app_schema.table_metadata(table).is_some_and(|metadata| {
1253 !metadata
1254 .crdt_yjs_fields
1255 .iter()
1256 .any(|field| field.sync_mode == "encrypted-update-log")
1257 })
1258}
1259
1260fn normalize_bootstrap_phase(phase: i64) -> i64 {
1261 phase.max(0)
1262}
1263
1264fn native_subscription_ready(state: Option<&SubscriptionState>) -> bool {
1265 state.is_some_and(|state| {
1266 state.status == "active" && state.bootstrap_state_json.is_none() && state.cursor >= 0
1267 })
1268}
1269
1270fn native_subscription_bootstrapping(state: Option<&SubscriptionState>) -> bool {
1271 state.is_some_and(|state| state.status == "active" && state.bootstrap_state_json.is_some())
1272}
1273
1274fn resolve_active_bootstrap_phase_for_native(
1275 entries: &[(SubscriptionSpec, Option<SubscriptionState>)],
1276) -> Option<i64> {
1277 entries
1278 .iter()
1279 .filter(|(_, state)| !native_subscription_ready(state.as_ref()))
1280 .map(|(spec, _)| normalize_bootstrap_phase(spec.bootstrap_phase))
1281 .min()
1282}
1283
1284fn should_include_pull_subscription(
1285 spec: &SubscriptionSpec,
1286 state: Option<&SubscriptionState>,
1287 active_phase: Option<i64>,
1288) -> bool {
1289 let Some(active_phase) = active_phase else {
1290 return true;
1291 };
1292 let phase = normalize_bootstrap_phase(spec.bootstrap_phase);
1293 phase <= active_phase
1294 || native_subscription_ready(state)
1295 || native_subscription_bootstrapping(state)
1296}
1297
1298#[cfg(feature = "native")]
1299fn native_subscription_phase(
1300 status: Option<&str>,
1301 cursor: Option<i64>,
1302 bootstrap_state: Option<&BootstrapState>,
1303) -> String {
1304 if status == Some("revoked") {
1305 "error".to_string()
1306 } else if bootstrap_state.is_some() {
1307 "bootstrapping".to_string()
1308 } else if status == Some("active") && cursor.is_some_and(|cursor| cursor >= 0) {
1309 "live".to_string()
1310 } else {
1311 "pending".to_string()
1312 }
1313}
1314
1315#[cfg(feature = "native")]
1316fn bootstrap_progress_percent(ready: bool, bootstrap_state: Option<&BootstrapState>) -> i64 {
1317 if ready {
1318 return 100;
1319 }
1320 let Some(state) = bootstrap_state else {
1321 return 0;
1322 };
1323 let total = state.tables.len() as i64;
1324 if total <= 0 {
1325 return 0;
1326 }
1327 let processed = state.table_index.clamp(0, total);
1328 ((processed * 100) / total).clamp(0, 100)
1329}
1330
1331#[cfg(feature = "native")]
1332fn parse_bootstrap_state_json(value: Option<&str>) -> Result<Option<BootstrapState>> {
1333 value
1334 .map(serde_json::from_str)
1335 .transpose()
1336 .map_err(Into::into)
1337}
1338
1339#[cfg(feature = "native")]
1340fn build_bootstrap_status(
1341 subscriptions: &[SubscriptionSpec],
1342 states: Vec<Option<SubscriptionState>>,
1343 critical_phase: i64,
1344 interactive_phase: i64,
1345) -> Result<BootstrapStatus> {
1346 let critical_phase = normalize_bootstrap_phase(critical_phase);
1347 let interactive_phase = normalize_bootstrap_phase(interactive_phase).max(critical_phase);
1348 let mut subscription_statuses = Vec::with_capacity(subscriptions.len());
1349
1350 for (spec, state) in subscriptions.iter().zip(states.into_iter()) {
1351 let bootstrap_phase = normalize_bootstrap_phase(spec.bootstrap_phase);
1352 let bootstrap_state = parse_bootstrap_state_json(
1353 state
1354 .as_ref()
1355 .and_then(|state| state.bootstrap_state_json.as_deref()),
1356 )?;
1357 let ready = native_subscription_ready(state.as_ref());
1358 let status = state.as_ref().map(|state| state.status.clone());
1359 let cursor = state.as_ref().map(|state| state.cursor);
1360 let phase = native_subscription_phase(status.as_deref(), cursor, bootstrap_state.as_ref());
1361 let progress_percent = bootstrap_progress_percent(ready, bootstrap_state.as_ref());
1362
1363 subscription_statuses.push(BootstrapSubscriptionStatus {
1364 id: spec.id.clone(),
1365 table: spec.table.clone(),
1366 expected: true,
1367 ready,
1368 status,
1369 phase,
1370 progress_percent,
1371 cursor,
1372 bootstrap_state,
1373 bootstrap_phase,
1374 });
1375 }
1376
1377 let expected_subscription_ids = subscription_statuses
1378 .iter()
1379 .map(|subscription| subscription.id.clone())
1380 .collect::<Vec<_>>();
1381 let ready_subscription_ids = subscription_statuses
1382 .iter()
1383 .filter(|subscription| subscription.ready)
1384 .map(|subscription| subscription.id.clone())
1385 .collect::<Vec<_>>();
1386 let pending_subscription_ids = subscription_statuses
1387 .iter()
1388 .filter(|subscription| !subscription.ready)
1389 .map(|subscription| subscription.id.clone())
1390 .collect::<Vec<_>>();
1391 let complete = pending_subscription_ids.is_empty();
1392 let critical_ready = subscription_statuses
1393 .iter()
1394 .all(|subscription| subscription.bootstrap_phase > critical_phase || subscription.ready);
1395 let interactive_ready = subscription_statuses
1396 .iter()
1397 .all(|subscription| subscription.bootstrap_phase > interactive_phase || subscription.ready);
1398 let active_phase = subscription_statuses
1399 .iter()
1400 .filter(|subscription| !subscription.ready)
1401 .map(|subscription| subscription.bootstrap_phase)
1402 .min();
1403 let has_error = subscription_statuses
1404 .iter()
1405 .any(|subscription| subscription.phase == "error");
1406 let is_bootstrapping = subscription_statuses
1407 .iter()
1408 .any(|subscription| !subscription.ready && subscription.phase != "error");
1409 let channel_phase = if has_error {
1410 "error"
1411 } else if is_bootstrapping {
1412 "bootstrapping"
1413 } else if complete && !expected_subscription_ids.is_empty() {
1414 "live"
1415 } else {
1416 "idle"
1417 }
1418 .to_string();
1419 let progress_percent = if subscription_statuses.is_empty() {
1420 100
1421 } else {
1422 let total = subscription_statuses
1423 .iter()
1424 .map(|subscription| subscription.progress_percent)
1425 .sum::<i64>();
1426 ((total as f64) / (subscription_statuses.len() as f64)).round() as i64
1427 };
1428
1429 let mut phase_map = BTreeMap::<i64, (Vec<String>, Vec<String>, Vec<String>, i64)>::new();
1430 for subscription in &subscription_statuses {
1431 let entry = phase_map
1432 .entry(subscription.bootstrap_phase)
1433 .or_insert_with(|| (Vec::new(), Vec::new(), Vec::new(), 0));
1434 entry.0.push(subscription.id.clone());
1435 if subscription.ready {
1436 entry.1.push(subscription.id.clone());
1437 } else {
1438 entry.2.push(subscription.id.clone());
1439 }
1440 entry.3 += subscription.progress_percent;
1441 }
1442 let phases = phase_map
1443 .into_iter()
1444 .map(
1445 |(phase, (expected_ids, ready_ids, pending_ids, progress_total))| {
1446 let progress_percent = if expected_ids.is_empty() {
1447 100
1448 } else {
1449 ((progress_total as f64) / (expected_ids.len() as f64)).round() as i64
1450 };
1451 BootstrapPhaseStatus {
1452 phase,
1453 expected_subscription_ids: expected_ids,
1454 ready_subscription_ids: ready_ids,
1455 is_ready: pending_ids.is_empty(),
1456 pending_subscription_ids: pending_ids,
1457 progress_percent,
1458 }
1459 },
1460 )
1461 .collect();
1462
1463 Ok(BootstrapStatus {
1464 channel_phase,
1465 progress_percent,
1466 is_bootstrapping,
1467 critical_ready,
1468 interactive_ready,
1469 complete,
1470 active_phase,
1471 expected_subscription_ids,
1472 ready_subscription_ids,
1473 pending_subscription_ids,
1474 subscriptions: subscription_statuses,
1475 phases,
1476 })
1477}
1478
1479#[cfg(all(test, feature = "native"))]
1480mod bootstrap_phase_tests {
1481 use super::*;
1482
1483 #[test]
1484 fn staged_pull_selection_matches_subscription_readiness() {
1485 let initial = vec![
1486 (subscription("critical", 0), None),
1487 (subscription("later", 1), None),
1488 ];
1489 let active = resolve_active_bootstrap_phase_for_native(&initial);
1490 assert_eq!(active, Some(0));
1491 assert!(should_include_pull_subscription(
1492 &initial[0].0,
1493 initial[0].1.as_ref(),
1494 active
1495 ));
1496 assert!(!should_include_pull_subscription(
1497 &initial[1].0,
1498 initial[1].1.as_ref(),
1499 active
1500 ));
1501
1502 let critical_ready = vec![
1503 (
1504 subscription("critical", 0),
1505 Some(subscription_state("critical", 1, None)),
1506 ),
1507 (subscription("later", 1), None),
1508 ];
1509 let active = resolve_active_bootstrap_phase_for_native(&critical_ready);
1510 assert_eq!(active, Some(1));
1511 assert!(critical_ready.iter().all(|(spec, state)| {
1512 should_include_pull_subscription(spec, state.as_ref(), active)
1513 }));
1514
1515 let later_bootstrapping = vec![
1516 (
1517 subscription("critical", 0),
1518 Some(subscription_state("critical", 1, None)),
1519 ),
1520 (
1521 subscription("later", 5),
1522 Some(subscription_state("later", -1, Some("{}"))),
1523 ),
1524 (subscription("other", 2), None),
1525 ];
1526 let active = resolve_active_bootstrap_phase_for_native(&later_bootstrapping);
1527 assert_eq!(active, Some(2));
1528 assert!(should_include_pull_subscription(
1529 &later_bootstrapping[1].0,
1530 later_bootstrapping[1].1.as_ref(),
1531 active
1532 ));
1533 }
1534
1535 #[test]
1536 fn bootstrap_status_reports_staged_readiness() {
1537 let subscriptions = vec![subscription("critical", 0), subscription("interactive", 1)];
1538 let states = vec![
1539 Some(subscription_state("critical", 1, None)),
1540 Some(subscription_state(
1541 "interactive",
1542 -1,
1543 Some(
1544 r#"{"asOfCommitSeq":10,"tables":["tasks","projects"],"tableIndex":1,"rowCursor":"tasks:10"}"#,
1545 ),
1546 )),
1547 ];
1548
1549 let status = build_bootstrap_status(&subscriptions, states, 0, 1)
1550 .expect("bootstrap status should parse");
1551
1552 assert_eq!(status.channel_phase, "bootstrapping");
1553 assert_eq!(status.progress_percent, 75);
1554 assert!(status.critical_ready);
1555 assert!(!status.interactive_ready);
1556 assert!(!status.complete);
1557 assert_eq!(status.active_phase, Some(1));
1558 assert_eq!(status.ready_subscription_ids, vec!["critical".to_string()]);
1559 assert_eq!(
1560 status.pending_subscription_ids,
1561 vec!["interactive".to_string()]
1562 );
1563 assert_eq!(status.subscriptions[1].phase, "bootstrapping");
1564 assert_eq!(status.subscriptions[1].progress_percent, 50);
1565 assert_eq!(
1566 status.subscriptions[1]
1567 .bootstrap_state
1568 .as_ref()
1569 .and_then(|state| state.row_cursor.as_deref()),
1570 Some("tasks:10")
1571 );
1572 assert_eq!(status.phases.len(), 2);
1573 assert!(status.phases[0].is_ready);
1574 assert!(!status.phases[1].is_ready);
1575 }
1576
1577 #[test]
1578 fn bootstrap_status_rejects_invalid_checkpoint_json() {
1579 let subscriptions = vec![subscription("critical", 0)];
1580 let states = vec![Some(subscription_state(
1581 "critical",
1582 -1,
1583 Some("{not valid json"),
1584 ))];
1585
1586 assert!(build_bootstrap_status(&subscriptions, states, 0, 1).is_err());
1587 }
1588
1589 fn subscription(id: &str, bootstrap_phase: i64) -> SubscriptionSpec {
1590 SubscriptionSpec {
1591 id: id.to_string(),
1592 table: "tasks".to_string(),
1593 scopes: Map::new(),
1594 params: Map::new(),
1595 bootstrap_phase,
1596 }
1597 }
1598
1599 fn subscription_state(
1600 subscription_id: &str,
1601 cursor: i64,
1602 bootstrap_state_json: Option<&str>,
1603 ) -> SubscriptionState {
1604 SubscriptionState {
1605 state_id: DEFAULT_STATE_ID.to_string(),
1606 subscription_id: subscription_id.to_string(),
1607 table: "tasks".to_string(),
1608 scopes_json: "{}".to_string(),
1609 params_json: "{}".to_string(),
1610 cursor,
1611 bootstrap_state_json: bootstrap_state_json.map(str::to_string),
1612 status: "active".to_string(),
1613 }
1614 }
1615}
1616
1617#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1618#[serde(rename_all = "kebab-case")]
1619pub enum ConflictResolution {
1620 KeepLocal,
1621 #[serde(rename = "keep-server")]
1622 AcceptServer,
1623 Dismiss,
1624}
1625
1626impl ConflictResolution {
1627 pub fn as_str(self) -> &'static str {
1628 match self {
1629 Self::KeepLocal => "keep-local",
1630 Self::AcceptServer => "keep-server",
1631 Self::Dismiss => "dismiss",
1632 }
1633 }
1634}
1635
1636impl fmt::Display for ConflictResolution {
1637 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1638 f.write_str(self.as_str())
1639 }
1640}
1641
1642#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1643pub struct ConflictResolutionReceipt {
1644 pub conflict_id: String,
1645 pub resolution: ConflictResolution,
1646 pub retry_client_commit_id: Option<String>,
1647}
1648
1649pub struct SyncularConflicts<'a, S, T> {
1650 client: &'a mut SyncularClient<S, T>,
1651}
1652
1653#[cfg(feature = "native")]
1654#[derive(Debug)]
1655pub struct SyncularLiveQuery<QF, Row> {
1656 tables: Vec<String>,
1657 build_query: QF,
1658 rows: Vec<Row>,
1659 revision: u64,
1660}
1661
1662#[cfg(feature = "native")]
1663impl<QF, Row> SyncularLiveQuery<QF, Row> {
1664 pub fn new<I, Table>(tables: I, build_query: QF) -> Self
1665 where
1666 I: IntoIterator<Item = Table>,
1667 Table: Into<String>,
1668 {
1669 Self {
1670 tables: tables.into_iter().map(Into::into).collect(),
1671 build_query,
1672 rows: Vec::new(),
1673 revision: 0,
1674 }
1675 }
1676
1677 pub fn tables(&self) -> &[String] {
1678 &self.tables
1679 }
1680
1681 pub fn rows(&self) -> &[Row] {
1682 &self.rows
1683 }
1684
1685 pub fn revision(&self) -> u64 {
1686 self.revision
1687 }
1688
1689 pub fn into_rows(self) -> Vec<Row> {
1690 self.rows
1691 }
1692
1693 pub fn is_affected_by(&self, report: &SyncReport) -> bool {
1694 report.changes_any_table(self.tables.iter().map(String::as_str))
1695 }
1696
1697 pub fn refresh<T, Q>(
1698 &mut self,
1699 client: &mut SyncularClient<DieselSqliteStore, T>,
1700 ) -> Result<&[Row]>
1701 where
1702 T: SyncTransport,
1703 QF: FnMut() -> Q,
1704 for<'query> Q: diesel::query_dsl::LoadQuery<'query, diesel::sqlite::SqliteConnection, Row>,
1705 {
1706 self.rows = client.read((self.build_query)())?;
1707 self.revision = self.revision.saturating_add(1);
1708 Ok(&self.rows)
1709 }
1710
1711 pub fn refresh_if_changed<T, Q>(
1712 &mut self,
1713 client: &mut SyncularClient<DieselSqliteStore, T>,
1714 report: &SyncReport,
1715 ) -> Result<bool>
1716 where
1717 T: SyncTransport,
1718 QF: FnMut() -> Q,
1719 for<'query> Q: diesel::query_dsl::LoadQuery<'query, diesel::sqlite::SqliteConnection, Row>,
1720 {
1721 if !self.is_affected_by(report) {
1722 return Ok(false);
1723 }
1724 self.refresh(client)?;
1725 Ok(true)
1726 }
1727}
1728
1729pub struct SyncularClient<
1730 #[cfg(feature = "native")] S = DieselSqliteStore,
1731 #[cfg(not(feature = "native"))] S,
1732 #[cfg(feature = "native")] T = HttpSyncTransport,
1733 #[cfg(not(feature = "native"))] T,
1734> {
1735 config: SyncularClientConfig,
1736 store: S,
1737 transport: T,
1738 subscriptions: Vec<SubscriptionSpec>,
1739 app_schema: AppSchema,
1740 schema_version: i32,
1741 sync_lock_key: String,
1742 field_encryption: Option<FieldEncryption>,
1743 encrypted_crdt: Option<EncryptedCrdt>,
1744 blob_encryption: Option<BlobEncryption>,
1745}
1746
1747#[cfg(feature = "native")]
1748impl SyncularClient<DieselSqliteStore, HttpSyncTransport> {
1749 pub fn open(config: SyncularClientConfig) -> Result<Self> {
1750 Self::open_with_schema(config, default_app_schema())
1751 }
1752
1753 pub fn open_with_schema(config: SyncularClientConfig, app_schema: AppSchema) -> Result<Self> {
1754 validate_app_schema_runtime_features(&app_schema)?;
1755 let store = DieselSqliteStore::open_with_schema(&config.db_path, app_schema)?;
1756 let transport = HttpSyncTransport::new(SyncTransportConfig::new(
1757 config.base_url.clone(),
1758 config.client_id.clone(),
1759 config.actor_id.clone(),
1760 ))
1761 .with_schema_version(app_schema.current_schema_version());
1762 Ok(Self::with_app_schema_parts(
1763 config, store, transport, app_schema,
1764 ))
1765 }
1766}
1767
1768#[cfg(feature = "native")]
1769impl<S> SyncularClient<S, HttpSyncTransport>
1770where
1771 S: SyncStore,
1772{
1773 pub fn with_store(config: SyncularClientConfig, store: S) -> Self {
1774 let app_schema = default_app_schema();
1775 let transport = HttpSyncTransport::new(SyncTransportConfig::new(
1776 config.base_url.clone(),
1777 config.client_id.clone(),
1778 config.actor_id.clone(),
1779 ))
1780 .with_schema_version(app_schema.current_schema_version());
1781 Self::with_parts(config, store, transport)
1782 }
1783
1784 pub fn issue_auth_lease(
1785 &mut self,
1786 request: &AuthLeaseIssueRequest,
1787 ) -> Result<crate::store::AuthLeaseRecord> {
1788 validate_auth_lease_issue_request_against_app_schema(self.app_schema, request)?;
1789 let response = self.transport.issue_auth_lease(request)?;
1790 validate_auth_lease_issue_response_against_app_schema(
1791 self.app_schema,
1792 &response,
1793 request.schema_version,
1794 )?;
1795 let record = auth_lease_record_from_issue_response(response, now_ms())?;
1796 self.store.transaction(|tx| tx.upsert_auth_lease(&record))?;
1797 Ok(record)
1798 }
1799
1800 pub fn issue_auth_lease_json(&mut self, request_json: &str) -> Result<String> {
1801 let request: AuthLeaseIssueRequest = serde_json::from_str(request_json)?;
1802 Ok(serde_json::to_string(&self.issue_auth_lease(&request)?)?)
1803 }
1804}
1805
1806#[cfg(feature = "native")]
1807fn auth_lease_record_from_issue_response(
1808 response: AuthLeaseIssueResponse,
1809 updated_at_ms: i64,
1810) -> Result<crate::store::AuthLeaseRecord> {
1811 if !response.ok {
1812 return Err(SyncularError::message(
1813 ErrorKind::Transport,
1814 "auth lease issue returned ok=false",
1815 ));
1816 }
1817 if response.token.is_empty() {
1818 return Err(SyncularError::protocol_message(
1819 "auth lease issue returned an empty token",
1820 ));
1821 }
1822 if response.protected_header.alg != AUTH_LEASE_ALG_ES256
1823 || response.protected_header.typ != AUTH_LEASE_TYP
1824 || response.protected_header.kid.is_empty()
1825 {
1826 return Err(SyncularError::protocol_message(
1827 "auth lease issue returned an invalid protected header",
1828 ));
1829 }
1830 let payload = response.payload;
1831 if payload.version != AUTH_LEASE_VERSION
1832 || payload.protocol_version != AUTH_LEASE_PROTOCOL_VERSION
1833 || payload.lease_id.is_empty()
1834 || payload.actor_id.is_empty()
1835 || payload.schema_version < 1
1836 || payload.scopes.is_empty()
1837 {
1838 return Err(SyncularError::protocol_message(
1839 "auth lease issue returned an invalid payload",
1840 ));
1841 }
1842 let payload_json = serde_json::to_string(&payload)?;
1843 Ok(crate::store::AuthLeaseRecord {
1844 lease_id: payload.lease_id,
1845 kid: response.protected_header.kid,
1846 actor_id: payload.actor_id,
1847 issued_at_ms: payload.issued_at_ms,
1848 not_before_ms: payload.not_before_ms,
1849 expires_at_ms: payload.expires_at_ms,
1850 schema_version: payload.schema_version,
1851 payload_json,
1852 token: response.token,
1853 status: "active".to_string(),
1854 last_validation_error: None,
1855 created_at_ms: payload.issued_at_ms,
1856 updated_at_ms,
1857 })
1858}
1859
1860fn validate_auth_lease_record_against_app_schema(
1861 app_schema: AppSchema,
1862 lease: &crate::store::AuthLeaseRecord,
1863) -> Result<()> {
1864 let payload: AuthLeasePayload = serde_json::from_str(&lease.payload_json)?;
1865 if lease.schema_version != payload.schema_version {
1866 return Err(SyncularError::protocol_message(format!(
1867 "auth lease record schemaVersion {} does not match payload schemaVersion {}",
1868 lease.schema_version, payload.schema_version
1869 )));
1870 }
1871 if lease.lease_id != payload.lease_id {
1872 return Err(SyncularError::protocol_message(
1873 "auth lease record leaseId does not match payload leaseId",
1874 ));
1875 }
1876 if lease.actor_id != payload.actor_id {
1877 return Err(SyncularError::protocol_message(
1878 "auth lease record actorId does not match payload actorId",
1879 ));
1880 }
1881 validate_auth_lease_payload_against_app_schema(app_schema, &payload)
1882}
1883
1884impl<S, T> SyncularClient<S, T>
1885where
1886 S: SyncStateStore,
1887{
1888 pub fn next_outbox_retry_at_ms(&mut self) -> Result<Option<i64>> {
1889 self.store.next_outbox_retry_at()
1890 }
1891
1892 pub fn next_blob_upload_retry_at_ms(&mut self) -> Result<Option<i64>> {
1893 self.store.next_blob_upload_retry_at()
1894 }
1895}
1896
1897impl<S, T> SyncularClient<S, T>
1898where
1899 S: SyncStore,
1900 T: SyncTransport,
1901{
1902 fn ensure_remote_sync_enabled(&self, operation: &str) -> Result<()> {
1903 if !self.config.is_local_sync_compatible() {
1904 return Ok(());
1905 }
1906 Err(SyncularError::config(format!(
1907 "Syncular {operation} requires remote mode; client was opened with local-sync-compatible config"
1908 )))
1909 }
1910
1911 pub fn with_parts(config: SyncularClientConfig, store: S, transport: T) -> Self {
1912 Self::with_app_schema_parts(config, store, transport, default_app_schema())
1913 }
1914
1915 pub fn with_app_schema_parts(
1916 config: SyncularClientConfig,
1917 store: S,
1918 transport: T,
1919 app_schema: AppSchema,
1920 ) -> Self {
1921 let subscriptions = app_schema.default_subscriptions(&config);
1922 Self::with_subscriptions_and_schema(
1923 config,
1924 store,
1925 transport,
1926 subscriptions,
1927 app_schema,
1928 app_schema.current_schema_version(),
1929 )
1930 }
1931
1932 pub fn with_subscriptions(
1933 config: SyncularClientConfig,
1934 store: S,
1935 transport: T,
1936 subscriptions: Vec<SubscriptionSpec>,
1937 schema_version: i32,
1938 ) -> Self {
1939 Self::with_subscriptions_and_schema(
1940 config,
1941 store,
1942 transport,
1943 subscriptions,
1944 default_app_schema(),
1945 schema_version,
1946 )
1947 }
1948
1949 fn with_subscriptions_and_schema(
1950 config: SyncularClientConfig,
1951 store: S,
1952 transport: T,
1953 subscriptions: Vec<SubscriptionSpec>,
1954 app_schema: AppSchema,
1955 schema_version: i32,
1956 ) -> Self {
1957 Self {
1958 sync_lock_key: config.db_path.clone(),
1959 config,
1960 store,
1961 transport,
1962 subscriptions,
1963 app_schema,
1964 schema_version,
1965 field_encryption: None,
1966 encrypted_crdt: None,
1967 blob_encryption: None,
1968 }
1969 }
1970
1971 pub fn set_field_encryption(&mut self, encryption: Option<FieldEncryption>) -> Result<()> {
1972 if let Some(encryption) = &encryption {
1973 validate_field_encryption_rules_against_app_schema(
1974 self.app_schema,
1975 encryption.rules(),
1976 )?;
1977 }
1978 self.field_encryption = encryption;
1979 Ok(())
1980 }
1981
1982 pub fn set_field_encryption_json(&mut self, config_json: &str) -> Result<()> {
1983 self.set_field_encryption(FieldEncryption::from_static_config_json(config_json)?)
1984 }
1985
1986 pub fn set_encrypted_crdt(&mut self, encryption: Option<EncryptedCrdt>) -> Result<()> {
1987 if encryption.is_some() {
1988 validate_encrypted_crdt_against_app_schema(self.app_schema)?;
1989 }
1990 self.encrypted_crdt = encryption;
1991 Ok(())
1992 }
1993
1994 pub fn set_encrypted_crdt_json(&mut self, config_json: &str) -> Result<()> {
1995 self.set_encrypted_crdt(EncryptedCrdt::from_static_config_json(config_json)?)
1996 }
1997
1998 pub fn set_blob_encryption(&mut self, encryption: Option<BlobEncryption>) -> Result<()> {
1999 if encryption.is_some() {
2000 validate_blob_encryption_against_app_schema(self.app_schema)?;
2001 }
2002 self.blob_encryption = encryption;
2003 Ok(())
2004 }
2005
2006 pub fn set_blob_encryption_json(&mut self, config_json: &str) -> Result<()> {
2007 self.set_blob_encryption(BlobEncryption::from_static_config_json(config_json)?)
2008 }
2009
2010 pub fn table_metadata(&self, table: &str) -> Option<&'static AppTableMetadata> {
2011 self.app_schema.table_metadata(table)
2012 }
2013
2014 pub fn app_schema(&self) -> AppSchema {
2015 self.app_schema
2016 }
2017
2018 pub fn current_row_json(&mut self, table: &str, row_id: &str) -> Result<Option<Value>> {
2019 self.store
2020 .transaction(|tx| tx.current_row_json(table, row_id))
2021 }
2022
2023 pub fn subscriptions(&self) -> &[SubscriptionSpec] {
2024 &self.subscriptions
2025 }
2026
2027 pub fn set_subscriptions(&mut self, subscriptions: Vec<SubscriptionSpec>) -> Result<()> {
2028 validate_subscription_limits(&subscriptions)?;
2029 self.subscriptions = subscriptions;
2030 Ok(())
2031 }
2032
2033 pub fn set_subscriptions_json(&mut self, subscriptions_json: &str) -> Result<()> {
2034 let subscriptions: Vec<SubscriptionSpec> = serde_json::from_str(subscriptions_json)?;
2035 self.set_subscriptions(subscriptions)
2036 }
2037
2038 pub fn force_subscriptions_bootstrap(&mut self, subscription_ids: &[String]) -> Result<usize> {
2039 let ids = if subscription_ids.is_empty() {
2040 self.subscriptions
2041 .iter()
2042 .map(|subscription| subscription.id.clone())
2043 .collect::<Vec<_>>()
2044 } else {
2045 subscription_ids.to_vec()
2046 };
2047 self.store.transaction(|tx| {
2048 for subscription_id in &ids {
2049 tx.delete_verified_root(DEFAULT_STATE_ID, subscription_id)?;
2050 tx.delete_subscription_state(DEFAULT_STATE_ID, subscription_id)?;
2051 }
2052 Ok(ids.len())
2053 })
2054 }
2055
2056 pub fn force_subscriptions_bootstrap_json(
2057 &mut self,
2058 subscription_ids_json: &str,
2059 ) -> Result<String> {
2060 let subscription_ids: Vec<String> = serde_json::from_str(subscription_ids_json)?;
2061 Ok(serde_json::to_string(
2062 &self.force_subscriptions_bootstrap(&subscription_ids)?,
2063 )?)
2064 }
2065
2066 #[cfg(feature = "native")]
2067 pub fn bootstrap_status(&mut self) -> Result<BootstrapStatus> {
2068 self.bootstrap_status_for_phases(0, 1)
2069 }
2070
2071 #[cfg(feature = "native")]
2072 pub fn bootstrap_status_for_phases(
2073 &mut self,
2074 critical_phase: i64,
2075 interactive_phase: i64,
2076 ) -> Result<BootstrapStatus> {
2077 let states = self.store.transaction(|tx| {
2078 self.subscriptions
2079 .iter()
2080 .map(|subscription| tx.subscription_state(DEFAULT_STATE_ID, &subscription.id))
2081 .collect::<Result<Vec<_>>>()
2082 })?;
2083 build_bootstrap_status(
2084 &self.subscriptions,
2085 states,
2086 critical_phase,
2087 interactive_phase,
2088 )
2089 }
2090
2091 #[cfg(feature = "native")]
2092 pub fn bootstrap_status_json(&mut self) -> Result<String> {
2093 Ok(serde_json::to_string(&self.bootstrap_status()?)?)
2094 }
2095
2096 pub fn sync_http(&mut self) -> Result<SyncReport> {
2097 let _guard = SyncLockGuard::acquire(&self.sync_lock_key)?;
2098 self.ensure_remote_sync_enabled("sync_http")?;
2099 self.sync_http_unlocked()
2100 }
2101
2102 fn sync_http_unlocked(&mut self) -> Result<SyncReport> {
2103 let pending = self.prepare_sync()?;
2104 let request = CombinedRequest {
2105 client_id: self.config.client_id.clone(),
2106 push: self.build_push_request(&pending)?,
2107 pull: Some(self.build_pull_request()?),
2108 };
2109
2110 let response = match self.transport.post_sync(&request) {
2111 Ok(response) => response,
2112 Err(error) => {
2113 self.schedule_outbox_retry(&pending, &error)?;
2114 return Err(error);
2115 }
2116 };
2117 self.apply_combined_response(&pending, response)
2118 }
2119
2120 pub fn sync_ws(&mut self) -> Result<SyncReport> {
2121 let _guard = SyncLockGuard::acquire(&self.sync_lock_key)?;
2122 self.ensure_remote_sync_enabled("sync_ws")?;
2123 self.sync_ws_unlocked()
2124 }
2125
2126 fn sync_ws_unlocked(&mut self) -> Result<SyncReport> {
2127 let pending = self.prepare_sync()?;
2128 let mut report = SyncReport::default();
2129 if !pending.is_empty() {
2130 let mut socket = match self.transport.connect_realtime() {
2131 Ok(socket) => socket,
2132 Err(error) => {
2133 self.schedule_outbox_retry(&pending, &error)?;
2134 return Err(error);
2135 }
2136 };
2137
2138 for commit in &pending {
2139 let operations = self.operations_for_push(commit)?;
2140 let response = match socket.push_commit(PushCommitRequest {
2141 client_commit_id: commit.client_commit_id.clone(),
2142 operations,
2143 schema_version: commit.schema_version,
2144 auth_lease: commit.auth_lease.clone(),
2145 }) {
2146 Ok(response) => response,
2147 Err(error) => {
2148 self.schedule_outbox_retry(std::slice::from_ref(commit), &error)?;
2149 return Err(error);
2150 }
2151 };
2152 report.merge(self.apply_single_push_response(commit, response)?);
2153 }
2154
2155 socket.close();
2156 }
2157
2158 let request = CombinedRequest {
2159 client_id: self.config.client_id.clone(),
2160 push: None,
2161 pull: Some(self.build_pull_request()?),
2162 };
2163 let response = match self.transport.post_sync(&request) {
2164 Ok(response) => response,
2165 Err(error) => {
2166 self.schedule_outbox_retry(&[], &error)?;
2167 return Err(error);
2168 }
2169 };
2170 report.merge(self.apply_combined_response(&[], response)?);
2171 Ok(report)
2172 }
2173
2174 pub fn watch<F>(&mut self, seconds: u64, mut on_event: F) -> Result<()>
2175 where
2176 F: FnMut(&RealtimeEvent),
2177 {
2178 let mut socket = self.transport.connect_realtime()?;
2179 let deadline = SystemTime::now()
2180 .checked_add(Duration::from_secs(seconds))
2181 .unwrap_or_else(SystemTime::now);
2182
2183 while SystemTime::now() < deadline {
2184 let Some(event) = socket.read_event()? else {
2185 continue;
2186 };
2187 on_event(&event);
2188 if matches!(event, RealtimeEvent::Sync) {
2189 let _ = self.sync_http_unlocked()?;
2190 }
2191 }
2192
2193 socket.close();
2194 Ok(())
2195 }
2196
2197 pub fn process_realtime_events<F>(
2198 &mut self,
2199 max_events: usize,
2200 mut on_event: F,
2201 ) -> Result<usize>
2202 where
2203 F: FnMut(&RealtimeEvent),
2204 {
2205 let mut socket = self.transport.connect_realtime()?;
2206 let mut processed = 0usize;
2207
2208 for _ in 0..max_events {
2209 let Some(event) = socket.read_event()? else {
2210 break;
2211 };
2212 on_event(&event);
2213 processed += 1;
2214 if matches!(event, RealtimeEvent::Sync) {
2215 let _ = self.sync_http_unlocked()?;
2216 }
2217 }
2218
2219 socket.close();
2220 Ok(processed)
2221 }
2222}
2223
2224impl<S, T> SyncularClient<S, T>
2225where
2226 S: SyncStore,
2227 T: SyncTransport + SyncAuthHeaderStore,
2228{
2229 pub fn set_auth_headers(&mut self, headers: SyncAuthHeaders) {
2230 self.transport.set_auth_headers(headers);
2231 }
2232}
2233
2234#[cfg(feature = "native")]
2235impl<S, T> SyncularClient<S, T>
2236where
2237 S: SyncStore,
2238 T: SyncTransport + SyncAuthSignerStore,
2239{
2240 pub fn set_auth_signer(&mut self, signer: Option<SyncAuthSigner>) {
2241 self.transport.set_auth_signer(signer);
2242 }
2243}
2244
2245#[cfg(feature = "native")]
2246impl<T> SyncularClient<DieselSqliteStore, T>
2247where
2248 T: SyncTransport + BlobTransport,
2249{
2250 pub fn store_blob_bytes(
2251 &mut self,
2252 data: &[u8],
2253 mime_type: &str,
2254 immediate: bool,
2255 ) -> Result<BlobRef> {
2256 self.ensure_blob_runtime_declared()?;
2257 let (blob, body) = if let Some(encryption) = &self.blob_encryption {
2258 let encrypted = encryption.encrypt_blob(data, mime_type)?;
2259 self.store
2260 .store_blob_body(&encrypted.blob, &encrypted.body, !immediate)?;
2261 (encrypted.blob, encrypted.body)
2262 } else {
2263 let blob = self.store.store_blob_bytes(data, mime_type, !immediate)?;
2264 (blob, data.to_vec())
2265 };
2266 if immediate {
2267 self.transport.upload_blob(&blob, &body)?;
2268 }
2269 Ok(blob)
2270 }
2271
2272 pub fn store_blob_file(
2273 &mut self,
2274 path: &Path,
2275 mime_type: &str,
2276 immediate: bool,
2277 cache_local: bool,
2278 ) -> Result<BlobRef> {
2279 self.ensure_blob_runtime_declared()?;
2280 if cache_local {
2281 let metadata = fs::metadata(path).map_err(|err| {
2282 SyncularError::storage(err).context(format!("stat blob file {path:?}"))
2283 })?;
2284 validate_blob_size_bytes(i64::try_from(metadata.len()).unwrap_or(i64::MAX))?;
2285 let data = fs::read(path).map_err(|err| {
2286 SyncularError::storage(err).context(format!("read blob file {path:?}"))
2287 })?;
2288 let (blob, body) = if let Some(encryption) = &self.blob_encryption {
2289 let encrypted = encryption.encrypt_blob(&data, mime_type)?;
2290 self.store
2291 .store_blob_body(&encrypted.blob, &encrypted.body, !immediate)?;
2292 (encrypted.blob, encrypted.body)
2293 } else {
2294 let blob = self.store.store_blob_bytes(&data, mime_type, !immediate)?;
2295 (blob, data)
2296 };
2297 if immediate {
2298 self.transport.upload_blob(&blob, &body)?;
2299 }
2300 return Ok(blob);
2301 }
2302
2303 if self.blob_encryption.is_some() {
2304 return Err(SyncularError::config(
2305 "encrypted native blob file storage requires cacheLocal=true until streaming encryption is implemented",
2306 ));
2307 }
2308
2309 if !immediate {
2310 return Err(SyncularError::config(
2311 "native blob file storage with cacheLocal=false requires immediate=true",
2312 ));
2313 }
2314
2315 let file = File::open(path).map_err(|err| {
2316 SyncularError::storage(err).context(format!("open blob file {path:?}"))
2317 })?;
2318 let (hash, size) = blob_hash_reader(file)?;
2319 let blob = BlobRef {
2320 hash,
2321 size,
2322 mime_type: normalize_blob_mime_type(mime_type),
2323 encrypted: false,
2324 key_id: None,
2325 };
2326 validate_blob_ref_size(&blob)?;
2327 self.transport.upload_blob_file(&blob, path)?;
2328 Ok(blob)
2329 }
2330
2331 pub fn retrieve_blob_bytes(&mut self, blob: &BlobRef) -> Result<Vec<u8>> {
2332 self.ensure_blob_runtime_declared()?;
2333 self.ensure_blob_decryption_available(blob)?;
2334 if let Some(bytes) = self.store.read_cached_blob(&blob.hash)? {
2335 return self.decode_blob_body(blob, bytes);
2336 }
2337 let bytes = self.transport.download_blob(blob)?;
2338 let plaintext = self.decode_blob_body(blob, bytes.clone())?;
2339 self.store.cache_blob_bytes(blob, &bytes)?;
2340 Ok(plaintext)
2341 }
2342
2343 pub fn retrieve_blob_file(
2344 &mut self,
2345 blob: &BlobRef,
2346 path: &Path,
2347 cache_local: bool,
2348 ) -> Result<()> {
2349 self.ensure_blob_runtime_declared()?;
2350 self.ensure_blob_decryption_available(blob)?;
2351 if let Some(bytes) = self.store.read_cached_blob(&blob.hash)? {
2352 let plaintext = self.decode_blob_body(blob, bytes)?;
2353 fs::write(path, plaintext).map_err(|err| {
2354 SyncularError::storage(err).context(format!("write blob file {path:?}"))
2355 })?;
2356 return Ok(());
2357 }
2358
2359 if cache_local {
2360 let bytes = self.transport.download_blob(blob)?;
2361 let plaintext = self.decode_blob_body(blob, bytes.clone())?;
2362 self.store.cache_blob_bytes(blob, &bytes)?;
2363 fs::write(path, plaintext).map_err(|err| {
2364 SyncularError::storage(err).context(format!("write blob file {path:?}"))
2365 })?;
2366 } else if blob.encrypted {
2367 let bytes = self.transport.download_blob(blob)?;
2368 let plaintext = self.decode_blob_body(blob, bytes)?;
2369 fs::write(path, plaintext).map_err(|err| {
2370 SyncularError::storage(err).context(format!("write blob file {path:?}"))
2371 })?;
2372 } else {
2373 self.transport.download_blob_to_file(blob, path)?;
2374 }
2375 Ok(())
2376 }
2377
2378 pub fn process_blob_upload_queue(
2379 &mut self,
2380 ) -> Result<crate::diesel_sqlite::BlobUploadQueueResult> {
2381 self.process_blob_upload_queue_with_options(false)
2382 }
2383
2384 pub fn process_blob_upload_queue_with_options(
2385 &mut self,
2386 retry_now: bool,
2387 ) -> Result<crate::diesel_sqlite::BlobUploadQueueResult> {
2388 self.ensure_blob_runtime_declared()?;
2389 self.store.requeue_stale_blob_uploads()?;
2390 let pending = self
2391 .store
2392 .pending_blob_uploads(DEFAULT_BLOB_UPLOAD_BATCH_LIMIT, retry_now)?;
2393 let mut result = crate::diesel_sqlite::BlobUploadQueueResult {
2394 uploaded: 0,
2395 failed: 0,
2396 };
2397 for item in pending {
2398 let next_attempt_count = item.attempt_count + 1;
2399 self.store
2400 .mark_blob_uploading(&item.hash, next_attempt_count)?;
2401 let blob = BlobRef {
2402 hash: item.hash.clone(),
2403 size: item.size,
2404 mime_type: item.mime_type.clone(),
2405 encrypted: item.encrypted != 0,
2406 key_id: item.key_id.clone(),
2407 };
2408 match self.transport.upload_blob(&blob, &item.body) {
2409 Ok(()) => {
2410 self.store.delete_blob_upload(&item.hash)?;
2411 result.uploaded += 1;
2412 }
2413 Err(error) => {
2414 let failed = next_attempt_count >= MAX_BLOB_UPLOAD_RETRIES;
2415 let now = now_ms();
2416 self.store.mark_blob_upload_error(
2417 &item.hash,
2418 if failed { "failed" } else { "pending" },
2419 &error.to_string(),
2420 if failed {
2421 0
2422 } else {
2423 next_blob_upload_retry_at(now, next_attempt_count)
2424 },
2425 )?;
2426 if failed {
2427 result.failed += 1;
2428 }
2429 }
2430 }
2431 }
2432 Ok(result)
2433 }
2434}
2435
2436#[cfg(feature = "native")]
2437impl<T> SyncularClient<DieselSqliteStore, T> {
2438 fn ensure_blob_runtime_declared(&self) -> Result<()> {
2439 validate_blob_runtime_against_app_schema(self.app_schema)
2440 }
2441
2442 fn ensure_blob_decryption_available(&self, blob: &BlobRef) -> Result<()> {
2443 if !blob.encrypted {
2444 return Ok(());
2445 }
2446 let Some(encryption) = &self.blob_encryption else {
2447 return Err(SyncularError::config(
2448 "encrypted blob retrieval requires set_blob_encryption(...)",
2449 ));
2450 };
2451 encryption.ensure_can_decrypt(blob)
2452 }
2453
2454 fn decode_blob_body(&self, blob: &BlobRef, body: Vec<u8>) -> Result<Vec<u8>> {
2455 self.ensure_blob_decryption_available(blob)?;
2456 if blob.encrypted {
2457 return self
2458 .blob_encryption
2459 .as_ref()
2460 .expect("checked encrypted blob decryption")
2461 .decrypt_blob(blob, &body);
2462 }
2463 validate_blob_bytes(blob, &body)?;
2464 Ok(body)
2465 }
2466}
2467
2468#[cfg(feature = "native")]
2469impl<T> SyncularClient<DieselSqliteStore, T>
2470where
2471 T: SyncTransport,
2472{
2473 pub fn store_blob_file_local_json(
2474 &mut self,
2475 path: &Path,
2476 mime_type: &str,
2477 enqueue_upload: bool,
2478 ) -> Result<String> {
2479 self.ensure_blob_runtime_declared()?;
2480 let metadata = fs::metadata(path).map_err(|err| {
2481 SyncularError::storage(err).context(format!("stat blob file {path:?}"))
2482 })?;
2483 validate_blob_size_bytes(i64::try_from(metadata.len()).unwrap_or(i64::MAX))?;
2484 let data = fs::read(path).map_err(|err| {
2485 SyncularError::storage(err).context(format!("read blob file {path:?}"))
2486 })?;
2487 let blob = if let Some(encryption) = &self.blob_encryption {
2488 let encrypted = encryption.encrypt_blob(&data, mime_type)?;
2489 self.store
2490 .store_blob_body(&encrypted.blob, &encrypted.body, enqueue_upload)?;
2491 encrypted.blob
2492 } else {
2493 self.store
2494 .store_blob_bytes(&data, mime_type, enqueue_upload)?
2495 };
2496 Ok(serde_json::to_string(&blob)?)
2497 }
2498
2499 pub fn retrieve_cached_blob_file_json(
2500 &mut self,
2501 blob: &BlobRef,
2502 path: &Path,
2503 ) -> Result<String> {
2504 self.ensure_blob_runtime_declared()?;
2505 let Some(bytes) = self.store.read_cached_blob(&blob.hash)? else {
2506 return Err(SyncularError::config(
2507 "blob is not present in the local cache",
2508 ));
2509 };
2510 let plaintext = self.decode_blob_body(blob, bytes)?;
2511 fs::write(path, plaintext).map_err(|err| {
2512 SyncularError::storage(err).context(format!("write blob file {path:?}"))
2513 })?;
2514 Ok(serde_json::to_string(&json!({ "ok": true }))?)
2515 }
2516
2517 pub fn is_blob_local(&mut self, hash: &str) -> Result<bool> {
2518 self.ensure_blob_runtime_declared()?;
2519 self.store.is_blob_local(hash)
2520 }
2521
2522 pub fn blob_upload_queue_stats(
2523 &mut self,
2524 ) -> Result<crate::diesel_sqlite::BlobUploadQueueStats> {
2525 self.ensure_blob_runtime_declared()?;
2526 self.store.blob_upload_queue_stats()
2527 }
2528
2529 pub fn blob_cache_stats(&mut self) -> Result<crate::diesel_sqlite::BlobCacheStats> {
2530 self.ensure_blob_runtime_declared()?;
2531 self.store.blob_cache_stats()
2532 }
2533
2534 pub fn prune_blob_cache(&mut self, max_bytes: i64) -> Result<i64> {
2535 self.ensure_blob_runtime_declared()?;
2536 self.store.prune_blob_cache(max_bytes)
2537 }
2538
2539 pub fn clear_blob_cache(&mut self) -> Result<()> {
2540 self.ensure_blob_runtime_declared()?;
2541 self.store.clear_blob_cache()
2542 }
2543
2544 pub fn compact_storage_json(&mut self, options_json: Option<&str>) -> Result<String> {
2545 self.store.compact_storage_json(options_json)
2546 }
2547
2548 pub fn readonly_query_json(&self, request_json: &str) -> Result<String> {
2549 crate::sqlite_query::execute_readonly_query_json_with_schema(
2550 &self.config.db_path,
2551 request_json,
2552 self.app_schema,
2553 )
2554 }
2555}
2556
2557impl<S, T> SyncularClient<S, T>
2558where
2559 S: SyncStore,
2560 T: SyncTransport,
2561{
2562 fn prepare_sync(&mut self) -> Result<Vec<OutboxCommit>> {
2563 self.store.transaction(|tx| {
2564 tx.requeue_stale_outbox()?;
2565 let pending = tx.pending_outbox(DEFAULT_OUTBOX_PUSH_BATCH_LIMIT)?;
2566 for commit in &pending {
2567 validate_outbox_schema_version(commit, self.schema_version)?;
2568 }
2569 for commit in &pending {
2570 tx.mark_outbox_sending(&commit.id)?;
2571 }
2572 Ok(pending)
2573 })
2574 }
2575
2576 fn build_pull_request(&mut self) -> Result<PullRequest> {
2577 let specs = self.subscriptions.clone();
2578 let snapshot_artifacts =
2579 self.store
2580 .supports_sqlite_snapshot_artifacts()
2581 .then(|| SnapshotArtifactsRequest {
2582 artifact_kinds: vec![SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1.to_string()],
2583 compressions: vec![SNAPSHOT_CHUNK_COMPRESSION_GZIP.to_string()],
2584 feature_set: Vec::new(),
2585 });
2586 self.store.transaction(|tx| {
2587 let mut entries = Vec::new();
2588 for spec in specs {
2589 let state = tx.subscription_state(DEFAULT_STATE_ID, &spec.id)?;
2590 entries.push((spec, state));
2591 }
2592 let active_phase = resolve_active_bootstrap_phase_for_native(&entries);
2593
2594 let mut subscriptions = Vec::new();
2595 for (spec, state) in entries {
2596 if !should_include_pull_subscription(&spec, state.as_ref(), active_phase) {
2597 continue;
2598 }
2599 let scopes_changed = state
2600 .as_ref()
2601 .map(|row| {
2602 let scopes: ScopeValues = serde_json::from_str(&row.scopes_json)?;
2603 Ok::<bool, SyncularError>(scopes != spec.scopes)
2604 })
2605 .transpose()?
2606 .unwrap_or(false);
2607 let verified_root = if scopes_changed {
2608 None
2609 } else {
2610 tx.verified_root(DEFAULT_STATE_ID, &spec.id)?
2611 .map(|root| root.root)
2612 };
2613 let crdt_state_vectors = if is_encrypted_crdt_system_table(&spec.table) {
2614 Vec::new()
2615 } else {
2616 tx.crdt_state_vector_hints(
2617 &spec.table,
2618 &spec.scopes,
2619 DEFAULT_CRDT_STATE_VECTOR_HINT_LIMIT,
2620 )?
2621 };
2622 subscriptions.push(SubscriptionRequest {
2623 id: spec.id,
2624 table: spec.table,
2625 scopes: spec.scopes,
2626 params: spec.params,
2627 cursor: if scopes_changed {
2628 -1
2629 } else {
2630 state.as_ref().map(|row| row.cursor).unwrap_or(-1)
2631 },
2632 bootstrap_state: if scopes_changed {
2633 None
2634 } else {
2635 state
2636 .and_then(|row| row.bootstrap_state_json)
2637 .map(|json| serde_json::from_str(&json))
2638 .transpose()?
2639 },
2640 verified_root,
2641 crdt_state_vectors,
2642 });
2643 }
2644
2645 Ok(PullRequest {
2646 schema_version: self.schema_version,
2647 limit_commits: DEFAULT_PULL_LIMIT_COMMITS,
2648 limit_snapshot_rows: DEFAULT_PULL_LIMIT_SNAPSHOT_ROWS,
2649 max_snapshot_pages: DEFAULT_PULL_MAX_SNAPSHOT_PAGES,
2650 dedupe_rows: None,
2651 snapshot_artifacts,
2652 subscriptions,
2653 })
2654 })
2655 }
2656
2657 fn build_push_request(&self, pending: &[OutboxCommit]) -> Result<Option<PushBatchRequest>> {
2658 if pending.is_empty() {
2659 return Ok(None);
2660 }
2661
2662 let ctx = self.encryption_context();
2663 Ok(Some(PushBatchRequest {
2664 commits: pending
2665 .iter()
2666 .map(|commit| {
2667 let operations: Vec<SyncOperation> =
2668 serde_json::from_str(&commit.operations_json)?;
2669 let operations = if let Some(encryption) = &self.field_encryption {
2670 encryption.transform_operations_for_push(&ctx, operations)?
2671 } else {
2672 operations
2673 };
2674 Ok(PushCommitRequest {
2675 client_commit_id: commit.client_commit_id.clone(),
2676 operations,
2677 schema_version: commit.schema_version,
2678 auth_lease: commit.auth_lease.clone(),
2679 })
2680 })
2681 .collect::<Result<Vec<_>>>()?,
2682 }))
2683 }
2684
2685 fn operations_for_push(&self, commit: &OutboxCommit) -> Result<Vec<SyncOperation>> {
2686 let operations: Vec<SyncOperation> = serde_json::from_str(&commit.operations_json)?;
2687 if let Some(encryption) = &self.field_encryption {
2688 encryption.transform_operations_for_push(&self.encryption_context(), operations)
2689 } else {
2690 Ok(operations)
2691 }
2692 }
2693
2694 fn transform_push_response(
2695 &self,
2696 outbox: &OutboxCommit,
2697 response: PushCommitResponse,
2698 ) -> Result<PushCommitResponse> {
2699 let Some(encryption) = &self.field_encryption else {
2700 return Ok(response);
2701 };
2702 let operations: Vec<SyncOperation> = serde_json::from_str(&outbox.operations_json)?;
2703 encryption.transform_push_response(&self.encryption_context(), &operations, response)
2704 }
2705
2706 fn transform_pull_response(&self, response: PullResponse) -> Result<PullResponse> {
2707 let response = if let Some(encryption) = &self.field_encryption {
2708 encryption.transform_pull_response(&self.encryption_context(), response)?
2709 } else {
2710 response
2711 };
2712 if let Some(encryption) = &self.encrypted_crdt {
2713 encryption.transform_pull_response(response)
2714 } else {
2715 Ok(response)
2716 }
2717 }
2718
2719 fn transform_snapshot_row(&self, snapshot_table: &str, row: Value) -> Result<Value> {
2720 if let Some(encryption) = &self.field_encryption {
2721 encryption.transform_snapshot_row(&self.encryption_context(), snapshot_table, row)
2722 } else {
2723 Ok(row)
2724 }
2725 }
2726
2727 fn transform_snapshot_chunk_rows(
2728 &self,
2729 snapshot_table: &str,
2730 rows: SnapshotChunkRows,
2731 ) -> Result<SnapshotChunkRows> {
2732 if self.field_encryption.is_none() {
2733 return Ok(rows);
2734 }
2735 rows.try_into_value_rows()?
2736 .into_iter()
2737 .map(|row| self.transform_snapshot_row(snapshot_table, row))
2738 .collect::<Result<Vec<_>>>()
2739 .map(SnapshotChunkRows::Json)
2740 }
2741
2742 fn encryption_context(&self) -> FieldEncryptionContext {
2743 FieldEncryptionContext {
2744 actor_id: self.config.actor_id.clone(),
2745 client_id: self.config.client_id.clone(),
2746 }
2747 }
2748
2749 fn apply_single_push_response(
2750 &mut self,
2751 outbox: &OutboxCommit,
2752 response: PushCommitResponse,
2753 ) -> Result<SyncReport> {
2754 let response = self.transform_push_response(outbox, response)?;
2755 self.store.transaction(|tx| {
2756 let conflicts_changed = apply_push_commit_response(tx, outbox, &response)?;
2757 Ok(SyncReport {
2758 changed_tables: Vec::new(),
2759 changed_rows: Vec::new(),
2760 conflicts_changed,
2761 })
2762 })
2763 }
2764
2765 fn apply_combined_response(
2766 &mut self,
2767 pending: &[OutboxCommit],
2768 response: CombinedResponse,
2769 ) -> Result<SyncReport> {
2770 if let Err(error) = validate_server_schema_version(&response, self.schema_version) {
2771 self.schedule_outbox_retry(pending, &error)?;
2772 return Err(error);
2773 }
2774
2775 if !response.ok {
2776 let error = SyncularError::protocol_message("combined sync response was not ok");
2777 self.schedule_outbox_retry(pending, &error)?;
2778 return Err(error);
2779 }
2780
2781 let mut report = SyncReport::default();
2782 if let Some(push) = response.push {
2783 if !push.ok {
2784 let error = SyncularError::protocol_message("push response was not ok");
2785 self.schedule_outbox_retry(pending, &error)?;
2786 return Err(error);
2787 }
2788
2789 let mut transformed_commits = Vec::new();
2790 for commit_response in push.commits {
2791 let Some(index) = pending
2792 .iter()
2793 .position(|row| row.client_commit_id == commit_response.client_commit_id)
2794 else {
2795 continue;
2796 };
2797 let commit_response =
2798 self.transform_push_response(&pending[index], commit_response)?;
2799 transformed_commits.push((index, commit_response));
2800 }
2801
2802 report.conflicts_changed |= self.store.transaction(|tx| {
2803 let mut conflicts_changed = false;
2804 for (index, commit_response) in transformed_commits {
2805 let outbox = &pending[index];
2806 conflicts_changed |= apply_push_commit_response(tx, outbox, &commit_response)?;
2807 }
2808 Ok(conflicts_changed)
2809 })?;
2810 }
2811
2812 if let Some(pull) = response.pull {
2813 report.merge(self.apply_pull_until_settled(pull)?);
2814 }
2815
2816 Ok(report)
2817 }
2818
2819 fn apply_pull_until_settled(&mut self, mut response: PullResponse) -> Result<SyncReport> {
2820 let mut report = SyncReport::default();
2821 for round in 0..MAX_PULL_ROUNDS {
2822 if !response.ok {
2823 return Err(SyncularError::protocol_message("pull response was not ok"));
2824 }
2825
2826 let verified_roots = self.verify_pull_response_integrity(&response)?;
2827 let transformed_response = self.transform_pull_response(response)?;
2828 let needs_more = pull_response_needs_another_round(&transformed_response, 50);
2829 report.merge(self.apply_pull_response(transformed_response, verified_roots)?);
2830
2831 if !needs_more {
2832 return Ok(report);
2833 }
2834
2835 if round + 1 == MAX_PULL_ROUNDS {
2836 return Ok(report);
2837 }
2838
2839 let request = CombinedRequest {
2840 client_id: self.config.client_id.clone(),
2841 push: None,
2842 pull: Some(self.build_pull_request()?),
2843 };
2844 let combined = self.transport.post_sync(&request)?;
2845 validate_server_schema_version(&combined, self.schema_version)?;
2846 response = combined.pull.unwrap_or(PullResponse {
2847 ok: true,
2848 subscriptions: Vec::new(),
2849 });
2850 }
2851
2852 Ok(report)
2853 }
2854
2855 fn verify_pull_response_integrity(
2856 &mut self,
2857 response: &PullResponse,
2858 ) -> Result<HashMap<String, Option<VerifiedCommitRoot>>> {
2859 validate_pull_commit_integrity_metadata(response)?;
2860 validate_pull_snapshot_manifests(response)?;
2861 self.store.transaction(|tx| {
2862 let mut verified_roots = HashMap::new();
2863 for sub in &response.subscriptions {
2864 if sub.status == "revoked" {
2865 verified_roots.insert(sub.id.clone(), None);
2866 continue;
2867 }
2868
2869 let previous_state = tx.subscription_state(DEFAULT_STATE_ID, &sub.id)?;
2870 let stored_root = if previous_state
2871 .as_ref()
2872 .map(|prev| {
2873 let previous_scopes: ScopeValues = serde_json::from_str(&prev.scopes_json)?;
2874 Ok::<bool, SyncularError>(previous_scopes != sub.scopes)
2875 })
2876 .transpose()?
2877 .unwrap_or(false)
2878 {
2879 None
2880 } else {
2881 tx.verified_root(DEFAULT_STATE_ID, &sub.id)?
2882 };
2883 let verified_root = verify_subscription_commit_integrity(
2884 &sub.id,
2885 stored_root.as_ref().map(|root| root.root.as_str()),
2886 sub.integrity.as_ref(),
2887 &sub.commits,
2888 )?;
2889 verified_roots.insert(sub.id.clone(), verified_root);
2890 }
2891 Ok(verified_roots)
2892 })
2893 }
2894
2895 fn apply_pull_response(
2896 &mut self,
2897 response: PullResponse,
2898 mut verified_roots: HashMap<String, Option<VerifiedCommitRoot>>,
2899 ) -> Result<SyncReport> {
2900 let mut report = SyncReport::default();
2901 for sub in response.subscriptions {
2902 if sub.status == "revoked" {
2903 self.store.transaction(|tx| {
2904 if let Some(prev) = tx.subscription_state(DEFAULT_STATE_ID, &sub.id)? {
2905 let scopes: ScopeValues = serde_json::from_str(&prev.scopes_json)?;
2906 tx.clear_table_for_scopes(&prev.table, &scopes)?;
2907 report.add_changed_table(&prev.table);
2908 }
2909 tx.delete_verified_root(DEFAULT_STATE_ID, &sub.id)?;
2910 tx.delete_subscription_state(DEFAULT_STATE_ID, &sub.id)
2911 })?;
2912 continue;
2913 }
2914 let spec = self
2915 .subscriptions
2916 .iter()
2917 .find(|candidate| candidate.id == sub.id);
2918 let table = spec
2919 .map(|spec| spec.table.clone())
2920 .unwrap_or_else(|| "tasks".to_string());
2921 let params_json = spec
2922 .map(|spec| serde_json::to_string(&spec.params))
2923 .transpose()?
2924 .unwrap_or_else(|| "{}".to_string());
2925
2926 let mut prepared_snapshots = Vec::new();
2927 if let Some(snapshots) = sub.snapshots.as_ref() {
2928 for snapshot in snapshots {
2929 let mut artifact_rows = Vec::new();
2930 if let Some(artifacts) = &snapshot.artifacts {
2931 if !artifacts.is_empty() && !self.store.supports_sqlite_snapshot_artifacts()
2932 {
2933 return Err(SyncularError::protocol_message(
2934 "snapshot artifacts are not supported by this store",
2935 ));
2936 }
2937 for artifact in artifacts {
2938 validate_sqlite_snapshot_artifact_for_apply(
2939 artifact,
2940 &sub.id,
2941 &snapshot.table,
2942 )?;
2943 let bytes = self
2944 .transport
2945 .fetch_snapshot_artifact_bytes(artifact, &sub.scopes)?;
2946 let rows = self
2947 .store
2948 .decode_sqlite_snapshot_artifact_rows(&snapshot.table, &bytes)?;
2949 for row in rows {
2950 artifact_rows
2951 .push(self.transform_snapshot_row(&snapshot.table, row)?);
2952 }
2953 }
2954 }
2955 let mut chunk_batches = Vec::new();
2956 if let Some(chunks) = &snapshot.chunks {
2957 for chunk in chunks {
2958 let rows = self
2959 .transport
2960 .fetch_snapshot_chunk_rows(chunk, &sub.scopes)?;
2961 chunk_batches
2962 .push(self.transform_snapshot_chunk_rows(&snapshot.table, rows)?);
2963 }
2964 }
2965 if snapshot.is_first_page
2966 || !snapshot.rows.is_empty()
2967 || !artifact_rows.is_empty()
2968 || chunk_batches.iter().any(|rows| !rows.is_empty())
2969 {
2970 report.add_changed_table(&snapshot.table);
2971 }
2972 prepared_snapshots.push(PreparedSnapshot {
2973 snapshot: snapshot.clone(),
2974 chunk_batches,
2975 artifact_rows,
2976 });
2977 }
2978 }
2979
2980 self.store.transaction(|tx| {
2981 let previous_state = tx.subscription_state(DEFAULT_STATE_ID, &sub.id)?;
2982 let mut previous_scopes_match = false;
2983 if let Some(prev) = &previous_state {
2984 let previous_scopes: ScopeValues = serde_json::from_str(&prev.scopes_json)?;
2985 if previous_scopes != sub.scopes {
2986 tx.clear_table_for_scopes(&prev.table, &previous_scopes)?;
2987 tx.delete_verified_root(DEFAULT_STATE_ID, &sub.id)?;
2988 report.add_changed_table(&prev.table);
2989 } else {
2990 previous_scopes_match = true;
2991 }
2992 }
2993
2994 let verified_root = verified_roots.remove(&sub.id).flatten();
2995
2996 let mut snapshot_cleared_tables = HashSet::new();
2997 if let Some(prev) = previous_state.as_ref() {
2998 if prev.bootstrap_state_json.is_some()
2999 && previous_scopes_match
3000 && snapshot_clear_removes_all_rows(self.app_schema, &prev.table)
3001 {
3002 snapshot_cleared_tables.insert(prev.table.clone());
3003 }
3004 }
3005 for prepared in &prepared_snapshots {
3006 let snapshot = &prepared.snapshot;
3007 if snapshot.is_first_page {
3008 tx.clear_table_for_scopes_preserving_local_crdt(
3009 &snapshot.table,
3010 &sub.scopes,
3011 )?;
3012 if snapshot_clear_removes_all_rows(self.app_schema, &snapshot.table) {
3013 snapshot_cleared_tables.insert(snapshot.table.clone());
3014 }
3015 }
3016 if snapshot_cleared_tables.contains(&snapshot.table) {
3017 tx.upsert_rows(&snapshot.table, &snapshot.rows, None)?;
3018 for row in &snapshot.rows {
3019 if let Some(changed_row) = sync_changed_row_for_snapshot(
3020 self.app_schema,
3021 &snapshot.table,
3022 row,
3023 None,
3024 &sub.id,
3025 ) {
3026 report.add_changed_row(changed_row);
3027 }
3028 }
3029
3030 tx.upsert_rows(&snapshot.table, &prepared.artifact_rows, None)?;
3031 for row in &prepared.artifact_rows {
3032 if let Some(changed_row) = sync_changed_row_for_snapshot(
3033 self.app_schema,
3034 &snapshot.table,
3035 row,
3036 None,
3037 &sub.id,
3038 ) {
3039 report.add_changed_row(changed_row);
3040 }
3041 }
3042
3043 for chunk_rows in &prepared.chunk_batches {
3044 tx.upsert_snapshot_chunk_rows(&snapshot.table, chunk_rows, None)?;
3045 for changed_row in sync_changed_rows_for_cleared_snapshot_chunk(
3046 self.app_schema,
3047 &snapshot.table,
3048 chunk_rows,
3049 &sub.id,
3050 ) {
3051 report.add_changed_row(changed_row);
3052 }
3053 }
3054 } else {
3055 for row in snapshot.rows.iter().chain(prepared.artifact_rows.iter()) {
3056 let previous_row =
3057 row_id_for_metadata(self.app_schema, &snapshot.table, row)
3058 .map(|row_id| tx.current_row_json(&snapshot.table, &row_id))
3059 .transpose()?
3060 .flatten();
3061 tx.upsert_row(&snapshot.table, row, None)?;
3062 if let Some(changed_row) = sync_changed_row_for_snapshot(
3063 self.app_schema,
3064 &snapshot.table,
3065 row,
3066 previous_row.as_ref(),
3067 &sub.id,
3068 ) {
3069 report.add_changed_row(changed_row);
3070 }
3071 }
3072 for chunk_rows in &prepared.chunk_batches {
3073 let chunk_rows = chunk_rows.clone().try_into_value_rows()?;
3074 for row in &chunk_rows {
3075 let previous_row =
3076 row_id_for_metadata(self.app_schema, &snapshot.table, row)
3077 .map(|row_id| tx.current_row_json(&snapshot.table, &row_id))
3078 .transpose()?
3079 .flatten();
3080 tx.upsert_row(&snapshot.table, row, None)?;
3081 if let Some(changed_row) = sync_changed_row_for_snapshot(
3082 self.app_schema,
3083 &snapshot.table,
3084 row,
3085 previous_row.as_ref(),
3086 &sub.id,
3087 ) {
3088 report.add_changed_row(changed_row);
3089 }
3090 }
3091 }
3092 }
3093 }
3094
3095 for commit in &sub.commits {
3096 for change in &commit.changes {
3097 let previous_row = tx.current_row_json(&change.table, &change.row_id)?;
3098 tx.apply_change(change)?;
3099 if let Some(changed_row) = sync_changed_row_for_change(
3100 self.app_schema,
3101 change,
3102 previous_row.as_ref(),
3103 commit.commit_seq,
3104 &sub.id,
3105 ) {
3106 report.add_changed_row(changed_row);
3107 } else {
3108 report.add_changed_table(&change.table);
3109 }
3110 }
3111 }
3112
3113 tx.upsert_subscription_state(&SubscriptionState {
3114 state_id: DEFAULT_STATE_ID.to_string(),
3115 subscription_id: sub.id.clone(),
3116 table: table.clone(),
3117 scopes_json: serde_json::to_string(&sub.scopes)?,
3118 params_json,
3119 cursor: sub.next_cursor,
3120 bootstrap_state_json: sub
3121 .bootstrap_state
3122 .as_ref()
3123 .map(serde_json::to_string)
3124 .transpose()?,
3125 status: sub.status.clone(),
3126 })?;
3127 if let Some(root) = verified_root {
3128 tx.upsert_verified_root(&VerifiedRoot {
3129 state_id: DEFAULT_STATE_ID.to_string(),
3130 subscription_id: sub.id.clone(),
3131 partition_id: root.partition_id,
3132 commit_seq: root.commit_seq,
3133 root: root.root,
3134 })?;
3135 }
3136
3137 Ok(())
3138 })?;
3139 }
3140
3141 Ok(report)
3142 }
3143
3144 fn schedule_outbox_retry(
3145 &mut self,
3146 pending: &[OutboxCommit],
3147 error: &SyncularError,
3148 ) -> Result<()> {
3149 if pending.is_empty() {
3150 return Ok(());
3151 }
3152
3153 let now = now_ms();
3154 let message = error.to_string();
3155 let auth_error = is_auth_transport_error(error);
3156 self.store.transaction(|tx| {
3157 for commit in pending {
3158 let attempt_count = commit.attempt_count.saturating_add(1);
3159 let failed = attempt_count >= MAX_SYNC_RETRIES;
3160 let next_attempt_at = if failed || auth_error {
3161 0
3162 } else {
3163 next_retry_at(now, attempt_count)
3164 };
3165 tx.mark_outbox_retry(&commit.id, &message, next_attempt_at, failed)?;
3166 }
3167 Ok(())
3168 })
3169 }
3170}
3171
3172#[cfg(feature = "demo-todo-native-fixture")]
3173impl<T> SyncularClient<RusqliteStore, T>
3174where
3175 T: SyncTransport,
3176{
3177 pub fn list_table_json(&mut self, table: &str) -> Result<String> {
3178 Ok(serde_json::to_string(&self.store.list_table_json(table)?)?)
3179 }
3180
3181 pub fn apply_mutation_json(
3182 &mut self,
3183 mutation_json: &str,
3184 local_row_json: Option<&str>,
3185 ) -> Result<String> {
3186 validate_mutation_json_input_size(mutation_json, local_row_json)?;
3187 let operation: SyncOperation = serde_json::from_str(mutation_json)?;
3188 let local_row = local_row_json.map(serde_json::from_str).transpose()?;
3189 self.store.apply_local_operation(operation, local_row)
3190 }
3191
3192 pub fn apply_encrypted_crdt_update_json(
3193 &mut self,
3194 request_json: &str,
3195 ) -> Result<MutationReceipt> {
3196 let _request: EncryptedCrdtUpdateJsonRequest = serde_json::from_str(request_json)?;
3197 Err(SyncularError::config(
3198 "encrypted CRDT update JSON is not supported by RusqliteStore",
3199 ))
3200 }
3201
3202 pub fn apply_encrypted_crdt_checkpoint_json(
3203 &mut self,
3204 request_json: &str,
3205 ) -> Result<Option<MutationReceipt>> {
3206 let _request: EncryptedCrdtCheckpointJsonRequest = serde_json::from_str(request_json)?;
3207 Err(SyncularError::config(
3208 "encrypted CRDT checkpoint JSON is not supported by RusqliteStore",
3209 ))
3210 }
3211}
3212
3213#[cfg(feature = "native")]
3214impl<T> SyncularClient<DieselSqliteStore, T>
3215where
3216 T: SyncTransport,
3217{
3218 pub fn read<'query, Q, Row>(&mut self, query: Q) -> Result<Vec<Row>>
3219 where
3220 Q: diesel::query_dsl::LoadQuery<'query, diesel::sqlite::SqliteConnection, Row>,
3221 {
3222 self.store.read(query)
3223 }
3224
3225 pub fn live_query<QF, Q, Row, I, Table>(
3226 &mut self,
3227 tables: I,
3228 build_query: QF,
3229 ) -> Result<SyncularLiveQuery<QF, Row>>
3230 where
3231 QF: FnMut() -> Q,
3232 for<'query> Q: diesel::query_dsl::LoadQuery<'query, diesel::sqlite::SqliteConnection, Row>,
3233 I: IntoIterator<Item = Table>,
3234 Table: Into<String>,
3235 {
3236 let mut live_query = SyncularLiveQuery::new(tables, build_query);
3237 live_query.refresh(self)?;
3238 Ok(live_query)
3239 }
3240
3241 pub fn apply<M>(&mut self, mutation: M) -> Result<MutationReceipt>
3242 where
3243 M: IntoSyncularMutation,
3244 {
3245 self.apply_mutation(mutation)
3246 }
3247
3248 pub fn apply_mutation_batch(
3249 &mut self,
3250 batch: SyncularMutationBatch,
3251 ) -> Result<MutationReceipt> {
3252 <Self as SyncularMutationExecutor>::apply_mutation_batch(self, batch)
3253 }
3254
3255 pub fn commit_mutations<R>(
3256 &mut self,
3257 f: impl FnOnce(&mut SyncularMutationBatch) -> Result<R>,
3258 ) -> Result<MutationCommit<R>> {
3259 let mut batch = SyncularMutationBatch::new();
3260 let result = f(&mut batch)?;
3261 let commit = self.apply_mutation_batch(batch)?;
3262 Ok(MutationCommit { result, commit })
3263 }
3264
3265 pub fn list_table_json(&mut self, table: &str) -> Result<String> {
3266 Ok(serde_json::to_string(&self.store.list_table_json(table)?)?)
3267 }
3268
3269 pub fn apply_mutation_json(
3270 &mut self,
3271 mutation_json: &str,
3272 local_row_json: Option<&str>,
3273 ) -> Result<String> {
3274 validate_mutation_json_input_size(mutation_json, local_row_json)?;
3275 let operation: SyncOperation = serde_json::from_str(mutation_json)?;
3276 let local_row = local_row_json.map(serde_json::from_str).transpose()?;
3277 self.store.apply_local_operation(operation, local_row)
3278 }
3279
3280 pub fn apply_leased_mutation_json(
3281 &mut self,
3282 mutation_json: &str,
3283 local_row_json: Option<&str>,
3284 ) -> Result<String> {
3285 validate_mutation_json_input_size(mutation_json, local_row_json)?;
3286 let operation: SyncOperation = serde_json::from_str(mutation_json)?;
3287 let local_row = local_row_json.map(serde_json::from_str).transpose()?;
3288 let actor_id = self.config.actor_id.clone();
3289 self.store.apply_local_operation_with_active_auth_lease(
3290 Some(&actor_id),
3291 now_ms(),
3292 operation,
3293 local_row,
3294 )
3295 }
3296
3297 pub fn apply_encrypted_crdt_update_json(
3298 &mut self,
3299 request_json: &str,
3300 ) -> Result<MutationReceipt> {
3301 validate_crdt_request_json_size(request_json)?;
3302 let request: EncryptedCrdtUpdateJsonRequest = serde_json::from_str(request_json)?;
3303 let (metadata, field) =
3304 self.encrypted_crdt_metadata_field(&request.table, &request.field)?;
3305 match (request.next_text, request.update) {
3306 (Some(next_text), None) => {
3307 validate_yjs_text_input_size(&next_text)?;
3308 self.apply_encrypted_crdt_text_update(metadata, field, &request.row_id, &next_text)
3309 }
3310 (None, Some(update)) => {
3311 validate_yjs_update_envelope_size(&update)?;
3312 self.apply_encrypted_crdt_yjs_update(metadata, field, &request.row_id, update)
3313 }
3314 (Some(_), Some(_)) => Err(SyncularError::config(
3315 "encrypted CRDT update JSON must provide either nextText or update, not both",
3316 )),
3317 (None, None) => Err(SyncularError::config(
3318 "encrypted CRDT update JSON requires nextText or update",
3319 )),
3320 }
3321 }
3322
3323 pub fn apply_encrypted_crdt_checkpoint_json(
3324 &mut self,
3325 request_json: &str,
3326 ) -> Result<Option<MutationReceipt>> {
3327 validate_crdt_request_json_size(request_json)?;
3328 let request: EncryptedCrdtCheckpointJsonRequest = serde_json::from_str(request_json)?;
3329 let (metadata, field) =
3330 self.encrypted_crdt_metadata_field(&request.table, &request.field)?;
3331 self.apply_encrypted_crdt_checkpoint(
3332 metadata,
3333 field,
3334 &request.row_id,
3335 request.min_uncheckpointed_updates.unwrap_or(1),
3336 )
3337 }
3338
3339 pub fn open_crdt_field(&self, id: CrdtFieldId) -> Result<CrdtField> {
3340 let field = validate_crdt_field(self.app_schema, &id)?;
3341 if field.sync_mode() == CrdtFieldSyncMode::EncryptedUpdateLog
3342 && self.encrypted_crdt.is_none()
3343 {
3344 return Err(SyncularError::config(
3345 "encrypted CRDT fields require set_encrypted_crdt(...)",
3346 ));
3347 }
3348 Ok(field)
3349 }
3350
3351 pub fn apply_crdt_field_yjs_update(
3352 &mut self,
3353 field: &CrdtField,
3354 update: YjsUpdateEnvelope,
3355 ) -> Result<CrdtFieldWriteReceipt> {
3356 validate_yjs_update_envelope_size(&update)?;
3357 match field.sync_mode() {
3358 CrdtFieldSyncMode::ServerMerge => {
3359 let client_commit_id = self.store.apply_crdt_field_yjs_update(
3360 field,
3361 update,
3362 DEFAULT_CRDT_UPDATE_QUEUE_CAPACITY,
3363 )?;
3364 Ok(CrdtFieldWriteReceipt {
3365 client_commit_id,
3366 sync_mode: field.sync_mode(),
3367 })
3368 }
3369 CrdtFieldSyncMode::EncryptedUpdateLog => {
3370 let receipt = self.apply_encrypted_crdt_yjs_update(
3371 field.metadata(),
3372 field.field(),
3373 field.row_id(),
3374 update,
3375 )?;
3376 Ok(CrdtFieldWriteReceipt {
3377 client_commit_id: receipt.client_commit_id,
3378 sync_mode: field.sync_mode(),
3379 })
3380 }
3381 }
3382 }
3383
3384 pub fn apply_crdt_field_yjs_update_with_queue_capacity(
3385 &mut self,
3386 field: &CrdtField,
3387 update: YjsUpdateEnvelope,
3388 max_pending_updates: i64,
3389 ) -> Result<CrdtFieldWriteReceipt> {
3390 validate_yjs_update_envelope_size(&update)?;
3391 match field.sync_mode() {
3392 CrdtFieldSyncMode::ServerMerge => {
3393 let client_commit_id =
3394 self.store
3395 .apply_crdt_field_yjs_update(field, update, max_pending_updates)?;
3396 Ok(CrdtFieldWriteReceipt {
3397 client_commit_id,
3398 sync_mode: field.sync_mode(),
3399 })
3400 }
3401 CrdtFieldSyncMode::EncryptedUpdateLog => {
3402 self.apply_crdt_field_yjs_update(field, update)
3403 }
3404 }
3405 }
3406
3407 pub fn apply_crdt_field_text(
3408 &mut self,
3409 field: &CrdtField,
3410 next_text: &str,
3411 ) -> Result<CrdtFieldWriteReceipt> {
3412 validate_yjs_text_input_size(next_text)?;
3413 if field.field_metadata().kind != "text" {
3414 return Err(SyncularError::config(format!(
3415 "apply_crdt_field_text requires a text CRDT field, got {}",
3416 field.field_metadata().kind
3417 )));
3418 }
3419 match field.sync_mode() {
3420 CrdtFieldSyncMode::ServerMerge => {
3421 let current_row = self.store.read_row_json(field.table(), field.row_id())?;
3422 let previous_state_base64 = current_row.as_ref().and_then(|row| {
3423 row.get(field.state_column())
3424 .and_then(Value::as_str)
3425 .filter(|value| !value.is_empty())
3426 .map(str::to_string)
3427 });
3428 if previous_state_base64.is_none() {
3429 if let Some(existing_text) = current_row
3430 .as_ref()
3431 .and_then(|row| row.get(field.field()))
3432 .and_then(Value::as_str)
3433 .filter(|value| !value.is_empty() && *value != next_text)
3434 {
3435 return Err(SyncularError::config(format!(
3436 "cannot replace non-empty CRDT text field {}.{} row {} without existing Yjs state; migrate or initialize {} first (current value: {existing_text:?})",
3437 field.table(),
3438 field.field(),
3439 field.row_id(),
3440 field.state_column()
3441 )));
3442 }
3443 }
3444 let update = build_yjs_text_update(BuildYjsTextUpdateArgs {
3445 previous_state_base64,
3446 next_text: next_text.to_string(),
3447 container_key: Some(field.container_key().to_string()),
3448 update_id: None,
3449 })?;
3450 self.apply_crdt_field_yjs_update(field, update.update)
3451 }
3452 CrdtFieldSyncMode::EncryptedUpdateLog => {
3453 let receipt = self.apply_encrypted_crdt_text_update(
3454 field.metadata(),
3455 field.field(),
3456 field.row_id(),
3457 next_text,
3458 )?;
3459 Ok(CrdtFieldWriteReceipt {
3460 client_commit_id: receipt.client_commit_id,
3461 sync_mode: field.sync_mode(),
3462 })
3463 }
3464 }
3465 }
3466
3467 pub fn materialize_crdt_field(
3468 &mut self,
3469 field: &CrdtField,
3470 ) -> Result<CrdtFieldMaterialization> {
3471 let row = self
3472 .store
3473 .read_row_json(field.table(), field.row_id())?
3474 .ok_or_else(|| {
3475 SyncularError::protocol_message(format!(
3476 "cannot materialize CRDT field {}.{} for missing row {}",
3477 field.table(),
3478 field.field(),
3479 field.row_id()
3480 ))
3481 })?;
3482 let state_base64 = row
3483 .get(field.state_column())
3484 .and_then(Value::as_str)
3485 .filter(|value| !value.is_empty())
3486 .map(str::to_string);
3487 let value = match state_base64.as_deref() {
3488 Some(state_base64) => materialize_yjs_state(state_base64, &field.yjs_rule()?)?,
3489 None => row.get(field.field()).cloned().unwrap_or(Value::Null),
3490 };
3491 let state_vector_base64 = yjs_state_vector_base64(state_base64.as_deref())?;
3492 Ok(CrdtFieldMaterialization {
3493 value,
3494 state_base64,
3495 state_vector_base64,
3496 })
3497 }
3498
3499 pub fn materialize_crdt_field_json(&mut self, field: &CrdtField) -> Result<String> {
3500 Ok(serde_json::to_string(&self.materialize_crdt_field(field)?)?)
3501 }
3502
3503 pub fn crdt_document_snapshot(&mut self, field: &CrdtField) -> Result<CrdtDocumentSnapshot> {
3504 self.store.crdt_document_snapshot(field)
3505 }
3506
3507 pub fn crdt_document_snapshot_json(&mut self, field: &CrdtField) -> Result<String> {
3508 Ok(serde_json::to_string(&self.crdt_document_snapshot(field)?)?)
3509 }
3510
3511 pub fn crdt_update_log(
3512 &mut self,
3513 field: &CrdtField,
3514 limit: i64,
3515 ) -> Result<Vec<CrdtUpdateLogEntry>> {
3516 self.store.crdt_update_log(field, limit)
3517 }
3518
3519 pub fn crdt_update_log_json(&mut self, field: &CrdtField, limit: i64) -> Result<String> {
3520 Ok(serde_json::to_string(&self.crdt_update_log(field, limit)?)?)
3521 }
3522
3523 pub fn snapshot_crdt_field_state_vector_base64(&mut self, field: &CrdtField) -> Result<String> {
3524 let row = self.store.read_row_json(field.table(), field.row_id())?;
3525 let state_base64 = row.as_ref().and_then(|row| {
3526 row.get(field.state_column())
3527 .and_then(Value::as_str)
3528 .filter(|value| !value.is_empty())
3529 });
3530 yjs_state_vector_base64(state_base64)
3531 }
3532
3533 pub fn compact_crdt_field(
3534 &mut self,
3535 field: &CrdtField,
3536 min_uncheckpointed_updates: i64,
3537 ) -> Result<CrdtFieldCompactionReceipt> {
3538 let before = self.store.crdt_document_snapshot(field)?;
3539 let encrypted_stream_before = self.encrypted_crdt_stream_stats_for_field(field)?;
3540 match field.sync_mode() {
3541 CrdtFieldSyncMode::ServerMerge => {
3542 let after = self.store.compact_crdt_document(field)?;
3543 Ok(CrdtFieldCompactionReceipt {
3544 checkpoint_created: false,
3545 client_commit_id: None,
3546 before: CrdtFieldCompactionStats::from(&before),
3547 after: CrdtFieldCompactionStats::from(&after),
3548 encrypted_stream_before,
3549 encrypted_stream_after: self.encrypted_crdt_stream_stats_for_field(field)?,
3550 })
3551 }
3552 CrdtFieldSyncMode::EncryptedUpdateLog => {
3553 let receipt = self.apply_encrypted_crdt_checkpoint(
3554 field.metadata(),
3555 field.field(),
3556 field.row_id(),
3557 min_uncheckpointed_updates,
3558 )?;
3559 let after = self.store.crdt_document_snapshot(field)?;
3560 Ok(CrdtFieldCompactionReceipt {
3561 checkpoint_created: receipt.is_some(),
3562 client_commit_id: receipt.map(|receipt| receipt.client_commit_id),
3563 before: CrdtFieldCompactionStats::from(&before),
3564 after: CrdtFieldCompactionStats::from(&after),
3565 encrypted_stream_before,
3566 encrypted_stream_after: self.encrypted_crdt_stream_stats_for_field(field)?,
3567 })
3568 }
3569 }
3570 }
3571
3572 fn encrypted_crdt_stream_stats_for_field(
3573 &mut self,
3574 field: &CrdtField,
3575 ) -> Result<Option<EncryptedCrdtStreamStats>> {
3576 if field.sync_mode() != CrdtFieldSyncMode::EncryptedUpdateLog {
3577 return Ok(None);
3578 }
3579 let Some(encryption) = &self.encrypted_crdt else {
3580 return Ok(None);
3581 };
3582 let stream_id = encrypted_crdt_stream_id(field.table(), field.row_id(), field.field());
3583 self.store
3584 .encrypted_crdt_stream_stats(encryption.partition_id(), &stream_id)
3585 .map(Some)
3586 }
3587
3588 fn encrypted_crdt_metadata_field(
3589 &self,
3590 table: &str,
3591 field: &str,
3592 ) -> Result<(&'static AppTableMetadata, &'static str)> {
3593 let metadata = self.table_metadata(table).ok_or_else(|| {
3594 SyncularError::config(format!("unknown generated app table: {table}"))
3595 })?;
3596 let field = encrypted_field_metadata(metadata, field)?.field;
3597 Ok((metadata, field))
3598 }
3599}
3600
3601#[cfg(feature = "native")]
3602impl<T> SyncularMutationExecutor for SyncularClient<DieselSqliteStore, T>
3603where
3604 T: SyncTransport,
3605{
3606 fn apply_mutation<M>(&mut self, mutation: M) -> Result<MutationReceipt>
3607 where
3608 M: IntoSyncularMutation,
3609 {
3610 self.store
3611 .apply_syncular_mutations(vec![mutation.into_syncular_mutation()])
3612 }
3613
3614 fn apply_mutation_batch(&mut self, batch: SyncularMutationBatch) -> Result<MutationReceipt> {
3615 self.store.apply_syncular_mutations(batch.into_mutations())
3616 }
3617}
3618
3619#[cfg(feature = "native")]
3620impl<T> SyncularLeasedMutationExecutor for SyncularClient<DieselSqliteStore, T>
3621where
3622 T: SyncTransport,
3623{
3624 fn apply_leased_mutation<M>(&mut self, mutation: M) -> Result<MutationReceipt>
3625 where
3626 M: IntoSyncularMutation,
3627 {
3628 let actor_id = self.config.actor_id.clone();
3629 self.store.apply_syncular_mutations_with_active_auth_lease(
3630 Some(&actor_id),
3631 now_ms(),
3632 vec![mutation.into_syncular_mutation()],
3633 )
3634 }
3635
3636 fn apply_leased_mutation_batch(
3637 &mut self,
3638 batch: SyncularMutationBatch,
3639 ) -> Result<MutationReceipt> {
3640 let actor_id = self.config.actor_id.clone();
3641 self.store.apply_syncular_mutations_with_active_auth_lease(
3642 Some(&actor_id),
3643 now_ms(),
3644 batch.into_mutations(),
3645 )
3646 }
3647}
3648
3649#[cfg(feature = "native")]
3650impl<T> SyncularCommandHistoryExecutor for SyncularClient<DieselSqliteStore, T>
3651where
3652 T: SyncTransport,
3653{
3654 fn command_history_current_row_json(
3655 &mut self,
3656 table: &str,
3657 row_id: &str,
3658 ) -> Result<Option<Value>> {
3659 self.current_row_json(table, row_id)
3660 }
3661
3662 fn command_history_record(
3663 &mut self,
3664 mutation_scope: &str,
3665 entries: &[CommandHistoryEntry],
3666 receipt: &MutationReceipt,
3667 ) -> Result<CommandHistoryRecord> {
3668 self.store
3669 .record_command_history(mutation_scope, entries, receipt)
3670 }
3671
3672 fn command_history_latest(
3673 &mut self,
3674 state: CommandHistoryState,
3675 ) -> Result<Option<CommandHistoryRecord>> {
3676 self.store.latest_command_history(state)
3677 }
3678
3679 fn command_history_mark(
3680 &mut self,
3681 id: &str,
3682 state: CommandHistoryState,
3683 receipt: &MutationReceipt,
3684 ) -> Result<()> {
3685 self.store.mark_command_history(id, state, receipt)
3686 }
3687
3688 fn apply_command_history_batch(
3689 &mut self,
3690 mutation_scope: &str,
3691 batch: SyncularMutationBatch,
3692 ) -> Result<MutationReceipt> {
3693 match mutation_scope {
3694 "mutations" => self.apply_mutation_batch(batch),
3695 "leasedMutations" => self.apply_leased_mutation_batch(batch),
3696 other => Err(SyncularError::config(format!(
3697 "sync.command_history_scope_unsupported: {other}"
3698 ))),
3699 }
3700 }
3701
3702 fn apply_command_history_tracked_batch(
3703 &mut self,
3704 mutation_scope: &str,
3705 batch: SyncularMutationBatch,
3706 ) -> Result<MutationReceipt> {
3707 let actor_id = self.config.actor_id.clone();
3708 self.store.apply_syncular_mutations_with_command_history(
3709 mutation_scope,
3710 Some(&actor_id),
3711 now_ms(),
3712 batch.into_mutations(),
3713 )
3714 }
3715}
3716
3717#[cfg(feature = "native")]
3718impl<T> SyncularEncryptedCrdtMutationExecutor for SyncularClient<DieselSqliteStore, T>
3719where
3720 T: SyncTransport,
3721{
3722 fn apply_encrypted_crdt_text_update(
3723 &mut self,
3724 metadata: &'static AppTableMetadata,
3725 field: &'static str,
3726 row_id: &str,
3727 next_text: &str,
3728 ) -> Result<MutationReceipt> {
3729 validate_yjs_text_input_size(next_text)?;
3730 let Some(encryption) = &self.encrypted_crdt else {
3731 return Err(SyncularError::config(
3732 "encrypted CRDT updates require set_encrypted_crdt(...)",
3733 ));
3734 };
3735 let existing_row = self
3736 .store
3737 .read_row_json(metadata.name, row_id)?
3738 .ok_or_else(|| {
3739 SyncularError::protocol_message(format!(
3740 "cannot update encrypted CRDT field {}.{} before local row {row_id} exists",
3741 metadata.name, field
3742 ))
3743 })?;
3744 let mutation = encryption.build_text_update_mutation(BuildEncryptedCrdtTextUpdateArgs {
3745 ctx: self.encryption_context(),
3746 metadata,
3747 field,
3748 row_id,
3749 existing_row: &existing_row,
3750 next_text,
3751 })?;
3752 self.store.apply_syncular_mutations(vec![mutation])
3753 }
3754
3755 fn apply_encrypted_crdt_yjs_update(
3756 &mut self,
3757 metadata: &'static AppTableMetadata,
3758 field: &'static str,
3759 row_id: &str,
3760 update: YjsUpdateEnvelope,
3761 ) -> Result<MutationReceipt> {
3762 validate_yjs_update_envelope_size(&update)?;
3763 let Some(encryption) = &self.encrypted_crdt else {
3764 return Err(SyncularError::config(
3765 "encrypted CRDT updates require set_encrypted_crdt(...)",
3766 ));
3767 };
3768 let existing_row = self
3769 .store
3770 .read_row_json(metadata.name, row_id)?
3771 .ok_or_else(|| {
3772 SyncularError::protocol_message(format!(
3773 "cannot update encrypted CRDT field {}.{} before local row {row_id} exists",
3774 metadata.name, field
3775 ))
3776 })?;
3777 let mutation = encryption.build_yjs_update_mutation(BuildEncryptedCrdtYjsUpdateArgs {
3778 ctx: self.encryption_context(),
3779 metadata,
3780 field,
3781 row_id,
3782 existing_row: &existing_row,
3783 update,
3784 })?;
3785 self.store.apply_syncular_mutations(vec![mutation])
3786 }
3787
3788 fn apply_encrypted_crdt_checkpoint(
3789 &mut self,
3790 metadata: &'static AppTableMetadata,
3791 field: &'static str,
3792 row_id: &str,
3793 min_uncheckpointed_updates: i64,
3794 ) -> Result<Option<MutationReceipt>> {
3795 if min_uncheckpointed_updates < 1 {
3796 return Err(SyncularError::config(
3797 "encrypted CRDT checkpoint threshold must be at least 1",
3798 ));
3799 }
3800 let Some(encryption) = &self.encrypted_crdt else {
3801 return Err(SyncularError::config(
3802 "encrypted CRDT checkpoints require set_encrypted_crdt(...)",
3803 ));
3804 };
3805 let stream_id = encrypted_crdt_stream_id(metadata.name, row_id, field);
3806 let stats = self
3807 .store
3808 .encrypted_crdt_stream_stats(encryption.partition_id(), &stream_id)?;
3809 if stats.checkpointable_update_count < min_uncheckpointed_updates {
3810 return Ok(None);
3811 }
3812 let Some(covers_seq) = stats.max_server_seq else {
3813 return Ok(None);
3814 };
3815 if stats
3816 .latest_checkpoint_covers_seq
3817 .is_some_and(|latest| latest >= covers_seq)
3818 {
3819 return Ok(None);
3820 }
3821 let existing_row = self
3822 .store
3823 .read_row_json(metadata.name, row_id)?
3824 .ok_or_else(|| {
3825 SyncularError::protocol_message(format!(
3826 "cannot checkpoint encrypted CRDT field {}.{} before local row {row_id} exists",
3827 metadata.name, field
3828 ))
3829 })?;
3830 let mutation = encryption.build_checkpoint_mutation(BuildEncryptedCrdtCheckpointArgs {
3831 ctx: self.encryption_context(),
3832 metadata,
3833 field,
3834 row_id,
3835 existing_row: &existing_row,
3836 covers_seq,
3837 })?;
3838 Ok(Some(self.store.apply_syncular_mutations(vec![mutation])?))
3839 }
3840}
3841
3842struct SyncLockGuard {
3843 key: String,
3844}
3845
3846impl SyncLockGuard {
3847 fn acquire(key: &str) -> Result<Self> {
3848 let locks = ACTIVE_SYNC_KEYS.get_or_init(|| Mutex::new(HashSet::new()));
3849 let mut active = locks
3850 .lock()
3851 .map_err(|_| SyncularError::busy("sync lock is poisoned"))?;
3852 if !active.insert(key.to_string()) {
3853 return Err(SyncularError::busy(format!(
3854 "sync already active for local database {key}"
3855 )));
3856 }
3857 Ok(Self {
3858 key: key.to_string(),
3859 })
3860 }
3861}
3862
3863impl Drop for SyncLockGuard {
3864 fn drop(&mut self) {
3865 if let Some(locks) = ACTIVE_SYNC_KEYS.get() {
3866 if let Ok(mut active) = locks.lock() {
3867 active.remove(&self.key);
3868 }
3869 }
3870 }
3871}
3872
3873fn pull_response_needs_another_round(response: &PullResponse, limit_commits: i64) -> bool {
3874 let mut total_commits = 0usize;
3875 for sub in &response.subscriptions {
3876 if sub.status != "active" {
3877 continue;
3878 }
3879 if sub.bootstrap_state.is_some() {
3880 return true;
3881 }
3882 total_commits += sub.commits.len();
3883 }
3884 total_commits >= limit_commits as usize
3885}
3886
3887#[cfg(feature = "demo-todo-fixture")]
3888impl<S, T> SyncularClient<S, T>
3889where
3890 S: SyncStore + DemoTaskStore,
3891 T: SyncTransport,
3892{
3893 pub fn add_task(&mut self, title: String, id: Option<String>) -> Result<String> {
3894 let task_id = id.unwrap_or_else(|| Uuid::new_v4().to_string());
3895 self.store.add_task(
3896 &self.config.actor_id,
3897 self.config.project_id.as_deref(),
3898 task_id.clone(),
3899 title,
3900 )?;
3901 Ok(task_id)
3902 }
3903
3904 pub fn patch_task_title(&mut self, id: String, title: String) -> Result<()> {
3905 self.store
3906 .patch_task_title(self.config.project_id.as_deref(), id, title)
3907 }
3908
3909 pub fn list_tasks(&mut self) -> Result<Vec<Task>> {
3910 self.store.list_tasks()
3911 }
3912}
3913
3914impl<S, T> SyncularClient<S, T>
3915where
3916 S: SyncStore + SyncStateStore,
3917 T: SyncTransport,
3918{
3919 pub fn applied_migrations(&mut self) -> Result<Vec<crate::store::AppliedMigration>> {
3920 self.store.applied_migrations()
3921 }
3922
3923 pub fn app_schema_state(&mut self) -> Result<crate::store::AppSchemaState> {
3924 self.store
3925 .app_schema_state(self.app_schema.current_schema_version())
3926 }
3927
3928 pub fn app_schema_state_json(&mut self) -> Result<String> {
3929 Ok(serde_json::to_string(&self.app_schema_state()?)?)
3930 }
3931
3932 pub fn outbox_summaries(&mut self) -> Result<Vec<crate::store::OutboxSummary>> {
3933 self.store.outbox_summaries()
3934 }
3935
3936 pub fn upsert_auth_lease(&mut self, lease: &crate::store::AuthLeaseRecord) -> Result<()> {
3937 validate_auth_lease_record_against_app_schema(self.app_schema, lease)?;
3938 self.store.transaction(|tx| tx.upsert_auth_lease(lease))
3939 }
3940
3941 pub fn auth_lease(&mut self, lease_id: &str) -> Result<Option<crate::store::AuthLeaseRecord>> {
3942 self.store.transaction(|tx| tx.auth_lease(lease_id))
3943 }
3944
3945 pub fn active_auth_leases(
3946 &mut self,
3947 actor_id: Option<&str>,
3948 now_ms: i64,
3949 ) -> Result<Vec<crate::store::AuthLeaseRecord>> {
3950 self.store
3951 .transaction(|tx| tx.active_auth_leases(actor_id, now_ms))
3952 }
3953
3954 pub fn set_outbox_auth_lease(
3955 &mut self,
3956 client_commit_id: &str,
3957 provenance: Option<&crate::protocol::AuthLeaseProvenance>,
3958 ) -> Result<()> {
3959 self.store
3960 .transaction(|tx| tx.set_outbox_auth_lease(client_commit_id, provenance))
3961 }
3962
3963 pub fn conflict_summaries(&mut self) -> Result<Vec<crate::store::ConflictSummary>> {
3964 self.store.conflict_summaries()
3965 }
3966
3967 pub fn local_health_check(&mut self) -> Result<crate::health::LocalHealthReport> {
3968 let mut report = crate::health::check_local_health(
3969 &mut self.store,
3970 DEFAULT_STATE_ID,
3971 &self.subscriptions,
3972 )?;
3973 let app_schema_state = self.app_schema_state()?;
3974 let outbox = self.outbox_summaries()?;
3975 let conflicts = self.conflict_summaries()?;
3976 let scoped_rows = self.store.scoped_rows_health_summary(&self.subscriptions)?;
3977 let blob_health = self.store.blob_health_summary()?;
3978 let crdt_health = self.store.crdt_health_summary()?;
3979 crate::health::check_local_sync_state_health(
3980 &mut report,
3981 self.app_schema.current_schema_version(),
3982 &app_schema_state,
3983 &outbox,
3984 &conflicts,
3985 scoped_rows.as_ref(),
3986 blob_health.as_ref(),
3987 crdt_health.as_ref(),
3988 );
3989 Ok(report)
3990 }
3991
3992 pub fn local_health_check_json(&mut self) -> Result<String> {
3993 Ok(serde_json::to_string(&self.local_health_check()?)?)
3994 }
3995
3996 pub fn export_local_support_bundle(&mut self) -> Result<crate::health::LocalSupportBundle> {
3997 let mut states = Vec::new();
3998 let mut roots = Vec::new();
3999 self.store.transaction(|tx| {
4000 states = tx.subscription_states(DEFAULT_STATE_ID)?;
4001 roots = tx.verified_roots(DEFAULT_STATE_ID)?;
4002 Ok(())
4003 })?;
4004 let mut health = crate::health::check_local_health_records(
4005 DEFAULT_STATE_ID,
4006 &self.subscriptions,
4007 &states,
4008 &roots,
4009 );
4010 let app_schema_state = self.app_schema_state()?;
4011 let outbox = self.outbox_summaries()?;
4012 let conflicts = self.conflict_summaries()?;
4013 let scoped_rows = self.store.scoped_rows_health_summary(&self.subscriptions)?;
4014 let blob_health = self.store.blob_health_summary()?;
4015 let crdt_health = self.store.crdt_health_summary()?;
4016 crate::health::check_local_sync_state_health(
4017 &mut health,
4018 self.app_schema.current_schema_version(),
4019 &app_schema_state,
4020 &outbox,
4021 &conflicts,
4022 scoped_rows.as_ref(),
4023 blob_health.as_ref(),
4024 crdt_health.as_ref(),
4025 );
4026 Ok(crate::health::local_support_bundle_from_records(
4027 "rust",
4028 health,
4029 &self.subscriptions,
4030 &states,
4031 &roots,
4032 app_schema_state,
4033 &outbox,
4034 &conflicts,
4035 blob_health,
4036 crdt_health,
4037 ))
4038 }
4039
4040 pub fn export_local_support_bundle_json(&mut self) -> Result<String> {
4041 Ok(serde_json::to_string(&self.export_local_support_bundle()?)?)
4042 }
4043
4044 pub fn import_local_support_bundle_json(&mut self, bundle_json: &str) -> Result<String> {
4045 Ok(serde_json::to_string(
4046 &crate::health::import_local_support_bundle_json(bundle_json)?,
4047 )?)
4048 }
4049
4050 pub fn repair_local_health(
4051 &mut self,
4052 request: crate::health::LocalHealthRepairRequest,
4053 ) -> Result<crate::health::LocalHealthRepairReport> {
4054 match request.action {
4055 crate::health::LocalHealthRepairAction::ForceRebootstrap => {
4056 self.repair_force_rebootstrap(&request.subscription_ids)
4057 }
4058 crate::health::LocalHealthRepairAction::ClearOrphanedState => {
4059 self.repair_clear_orphaned_state(&request.subscription_ids)
4060 }
4061 crate::health::LocalHealthRepairAction::ClearOrphanedSyncedRows => {
4062 self.repair_clear_orphaned_synced_rows(&request)
4063 }
4064 crate::health::LocalHealthRepairAction::ManualInspection => Err(SyncularError::config(
4065 "manualInspection health findings cannot be repaired automatically",
4066 )),
4067 }
4068 }
4069
4070 pub fn repair_local_health_json(&mut self, request_json: &str) -> Result<String> {
4071 let request: crate::health::LocalHealthRepairRequest = serde_json::from_str(request_json)?;
4072 Ok(serde_json::to_string(&self.repair_local_health(request)?)?)
4073 }
4074
4075 pub fn reset_local_sync_state(
4076 &mut self,
4077 request: crate::health::LocalSyncResetRequest,
4078 ) -> Result<crate::health::LocalSyncResetReport> {
4079 let selected = self.selected_reset_subscriptions(&request.subscription_ids)?;
4080 if request.clear_synced_rows {
4081 let unresolved_outbox = self
4082 .outbox_summaries()?
4083 .iter()
4084 .filter(|commit| commit.status != "acked")
4085 .count();
4086 if unresolved_outbox > 0 {
4087 return Err(SyncularError::config(format!(
4088 "resetLocalSyncState clearSyncedRows requires an empty local outbox; found {unresolved_outbox} unresolved commits"
4089 )));
4090 }
4091 }
4092 let selected_ids = selected
4093 .iter()
4094 .map(|subscription| subscription.id.as_str())
4095 .collect::<HashSet<_>>();
4096
4097 self.store.transaction(|tx| {
4098 let deleted_subscription_states = tx
4099 .subscription_states(DEFAULT_STATE_ID)?
4100 .iter()
4101 .filter(|state| selected_ids.contains(state.subscription_id.as_str()))
4102 .count();
4103 let deleted_verified_roots = tx
4104 .verified_roots(DEFAULT_STATE_ID)?
4105 .iter()
4106 .filter(|root| selected_ids.contains(root.subscription_id.as_str()))
4107 .count();
4108
4109 let mut cleared_synced_rows = 0i64;
4110 let mut cleared_tables = Vec::new();
4111 if request.clear_synced_rows {
4112 for subscription in &selected {
4113 let deleted =
4114 tx.clear_synced_rows_for_scopes(&subscription.table, &subscription.scopes)?;
4115 if deleted > 0 {
4116 cleared_synced_rows += deleted;
4117 if !cleared_tables
4118 .iter()
4119 .any(|table| table == &subscription.table)
4120 {
4121 cleared_tables.push(subscription.table.clone());
4122 }
4123 }
4124 }
4125 }
4126
4127 for subscription in &selected {
4128 tx.delete_verified_root(DEFAULT_STATE_ID, &subscription.id)?;
4129 tx.delete_subscription_state(DEFAULT_STATE_ID, &subscription.id)?;
4130 }
4131
4132 Ok(crate::health::LocalSyncResetReport {
4133 reset_subscriptions: selected.len(),
4134 deleted_subscription_states,
4135 deleted_verified_roots,
4136 cleared_synced_rows,
4137 cleared_tables,
4138 })
4139 })
4140 }
4141
4142 pub fn reset_local_sync_state_json(&mut self, request_json: &str) -> Result<String> {
4143 let request: crate::health::LocalSyncResetRequest = serde_json::from_str(request_json)?;
4144 Ok(serde_json::to_string(
4145 &self.reset_local_sync_state(request)?,
4146 )?)
4147 }
4148
4149 fn selected_reset_subscriptions(
4150 &self,
4151 subscription_ids: &[String],
4152 ) -> Result<Vec<SubscriptionSpec>> {
4153 if subscription_ids.is_empty() {
4154 return Ok(self.subscriptions.clone());
4155 }
4156 let requested = subscription_ids
4157 .iter()
4158 .map(String::as_str)
4159 .collect::<HashSet<_>>();
4160 let selected = self
4161 .subscriptions
4162 .iter()
4163 .filter(|subscription| requested.contains(subscription.id.as_str()))
4164 .cloned()
4165 .collect::<Vec<_>>();
4166 if selected.len() != requested.len() {
4167 let configured = self
4168 .subscriptions
4169 .iter()
4170 .map(|subscription| subscription.id.as_str())
4171 .collect::<HashSet<_>>();
4172 let missing = subscription_ids
4173 .iter()
4174 .find(|id| !configured.contains(id.as_str()))
4175 .map(String::as_str)
4176 .unwrap_or("unknown");
4177 return Err(SyncularError::config(format!(
4178 "cannot reset unconfigured subscription {missing}"
4179 )));
4180 }
4181 Ok(selected)
4182 }
4183
4184 fn repair_force_rebootstrap(
4185 &mut self,
4186 subscription_ids: &[String],
4187 ) -> Result<crate::health::LocalHealthRepairReport> {
4188 if subscription_ids.is_empty() {
4189 return Err(SyncularError::config(
4190 "forceRebootstrap repair requires explicit subscriptionIds",
4191 ));
4192 }
4193 let configured = self
4194 .subscriptions
4195 .iter()
4196 .map(|subscription| subscription.id.as_str())
4197 .collect::<HashSet<_>>();
4198 for subscription_id in subscription_ids {
4199 if !configured.contains(subscription_id.as_str()) {
4200 return Err(SyncularError::config(format!(
4201 "cannot force rebootstrap for unconfigured subscription {subscription_id}"
4202 )));
4203 }
4204 }
4205
4206 self.store.transaction(|tx| {
4207 let requested = subscription_ids
4208 .iter()
4209 .map(String::as_str)
4210 .collect::<HashSet<_>>();
4211 let deleted_subscription_states = tx
4212 .subscription_states(DEFAULT_STATE_ID)?
4213 .iter()
4214 .filter(|state| requested.contains(state.subscription_id.as_str()))
4215 .count();
4216 let deleted_verified_roots = tx
4217 .verified_roots(DEFAULT_STATE_ID)?
4218 .iter()
4219 .filter(|root| requested.contains(root.subscription_id.as_str()))
4220 .count();
4221 for subscription_id in subscription_ids {
4222 tx.delete_verified_root(DEFAULT_STATE_ID, subscription_id)?;
4223 tx.delete_subscription_state(DEFAULT_STATE_ID, subscription_id)?;
4224 }
4225 Ok(crate::health::LocalHealthRepairReport {
4226 action: crate::health::LocalHealthRepairAction::ForceRebootstrap,
4227 deleted_subscription_states,
4228 deleted_verified_roots,
4229 forced_rebootstrap_subscriptions: subscription_ids.len(),
4230 cleared_orphaned_synced_rows: 0,
4231 cleared_tables: Vec::new(),
4232 })
4233 })
4234 }
4235
4236 fn repair_clear_orphaned_state(
4237 &mut self,
4238 subscription_ids: &[String],
4239 ) -> Result<crate::health::LocalHealthRepairReport> {
4240 let configured = self
4241 .subscriptions
4242 .iter()
4243 .map(|subscription| subscription.id.as_str())
4244 .collect::<HashSet<_>>();
4245 for subscription_id in subscription_ids {
4246 if configured.contains(subscription_id.as_str()) {
4247 return Err(SyncularError::config(format!(
4248 "clearOrphanedState refuses configured subscription {subscription_id}"
4249 )));
4250 }
4251 }
4252 let requested = subscription_ids
4253 .iter()
4254 .map(String::as_str)
4255 .collect::<HashSet<_>>();
4256
4257 self.store.transaction(|tx| {
4258 let states = tx.subscription_states(DEFAULT_STATE_ID)?;
4259 let roots = tx.verified_roots(DEFAULT_STATE_ID)?;
4260 let state_ids = states
4261 .iter()
4262 .map(|state| state.subscription_id.as_str())
4263 .filter(|id| !configured.contains(id))
4264 .filter(|id| requested.is_empty() || requested.contains(id))
4265 .collect::<HashSet<_>>();
4266 let root_ids = roots
4267 .iter()
4268 .map(|root| root.subscription_id.as_str())
4269 .filter(|id| !configured.contains(id))
4270 .filter(|id| requested.is_empty() || requested.contains(id))
4271 .collect::<HashSet<_>>();
4272 let mut all_ids = state_ids.iter().copied().collect::<HashSet<_>>();
4273 all_ids.extend(root_ids.iter().copied());
4274 for subscription_id in all_ids {
4275 tx.delete_subscription_state(DEFAULT_STATE_ID, subscription_id)?;
4276 tx.delete_verified_root(DEFAULT_STATE_ID, subscription_id)?;
4277 }
4278 Ok(crate::health::LocalHealthRepairReport {
4279 action: crate::health::LocalHealthRepairAction::ClearOrphanedState,
4280 deleted_subscription_states: state_ids.len(),
4281 deleted_verified_roots: root_ids.len(),
4282 forced_rebootstrap_subscriptions: 0,
4283 cleared_orphaned_synced_rows: 0,
4284 cleared_tables: Vec::new(),
4285 })
4286 })
4287 }
4288
4289 fn repair_clear_orphaned_synced_rows(
4290 &mut self,
4291 request: &crate::health::LocalHealthRepairRequest,
4292 ) -> Result<crate::health::LocalHealthRepairReport> {
4293 if !request.subscription_ids.is_empty() {
4294 return Err(SyncularError::config(
4295 "clearOrphanedSyncedRows uses tables, not subscriptionIds",
4296 ));
4297 }
4298 let unresolved_outbox = self
4299 .outbox_summaries()?
4300 .iter()
4301 .filter(|commit| commit.status != "acked")
4302 .count();
4303 if unresolved_outbox > 0 {
4304 return Err(SyncularError::config(format!(
4305 "clearOrphanedSyncedRows requires an empty local outbox; found {unresolved_outbox} unresolved commits"
4306 )));
4307 }
4308 let summary = self
4309 .store
4310 .clear_orphaned_synced_rows(&self.subscriptions, &request.tables)?;
4311 let cleared_tables = summary
4312 .tables
4313 .into_iter()
4314 .filter(|table| table.orphaned_synced_rows > 0)
4315 .map(|table| table.table)
4316 .collect::<Vec<_>>();
4317 Ok(crate::health::LocalHealthRepairReport {
4318 action: crate::health::LocalHealthRepairAction::ClearOrphanedSyncedRows,
4319 deleted_subscription_states: 0,
4320 deleted_verified_roots: 0,
4321 forced_rebootstrap_subscriptions: 0,
4322 cleared_orphaned_synced_rows: summary.orphaned_synced_rows,
4323 cleared_tables,
4324 })
4325 }
4326
4327 pub fn conflicts(&mut self) -> SyncularConflicts<'_, S, T> {
4328 SyncularConflicts { client: self }
4329 }
4330
4331 pub fn pending_conflicts(&mut self) -> Result<Vec<crate::store::ConflictSummary>> {
4332 self.conflict_summaries()
4333 }
4334
4335 pub fn has_pending_conflicts(&mut self) -> Result<bool> {
4336 Ok(!self.pending_conflicts()?.is_empty())
4337 }
4338
4339 pub fn resolve_conflict(&mut self, id: &str, resolution: &str) -> Result<()> {
4340 self.store.resolve_conflict(id, resolution)
4341 }
4342
4343 pub fn retry_conflict_keep_local(&mut self, id: &str) -> Result<String> {
4344 self.store.retry_conflict_keep_local(id)
4345 }
4346}
4347
4348impl<'a, S, T> SyncularConflicts<'a, S, T>
4349where
4350 S: SyncStore + SyncStateStore,
4351 T: SyncTransport,
4352{
4353 pub fn pending(&mut self) -> Result<Vec<crate::store::ConflictSummary>> {
4354 self.client.store.conflict_summaries()
4355 }
4356
4357 pub fn is_empty(&mut self) -> Result<bool> {
4358 Ok(self.pending()?.is_empty())
4359 }
4360
4361 pub fn keep_local(&mut self, conflict_id: &str) -> Result<ConflictResolutionReceipt> {
4362 let retry_client_commit_id = self.client.store.retry_conflict_keep_local(conflict_id)?;
4363 Ok(ConflictResolutionReceipt {
4364 conflict_id: conflict_id.to_string(),
4365 resolution: ConflictResolution::KeepLocal,
4366 retry_client_commit_id: Some(retry_client_commit_id),
4367 })
4368 }
4369
4370 pub fn accept_server(&mut self, conflict_id: &str) -> Result<ConflictResolutionReceipt> {
4371 self.resolve(conflict_id, ConflictResolution::AcceptServer)
4372 }
4373
4374 pub fn dismiss(&mut self, conflict_id: &str) -> Result<ConflictResolutionReceipt> {
4375 self.resolve(conflict_id, ConflictResolution::Dismiss)
4376 }
4377
4378 pub fn resolve(
4379 &mut self,
4380 conflict_id: &str,
4381 resolution: ConflictResolution,
4382 ) -> Result<ConflictResolutionReceipt> {
4383 if resolution == ConflictResolution::KeepLocal {
4384 return self.keep_local(conflict_id);
4385 }
4386
4387 self.client
4388 .store
4389 .resolve_conflict(conflict_id, resolution.as_str())?;
4390 Ok(ConflictResolutionReceipt {
4391 conflict_id: conflict_id.to_string(),
4392 resolution,
4393 retry_client_commit_id: None,
4394 })
4395 }
4396}
4397
4398fn validate_outbox_schema_version(commit: &OutboxCommit, current: i32) -> Result<()> {
4399 if commit.schema_version < 1 {
4400 return Err(SyncularError::schema(format!(
4401 "outbox commit {} has invalid schema version {}",
4402 commit.client_commit_id, commit.schema_version
4403 )));
4404 }
4405
4406 if commit.schema_version > current {
4407 return Err(SyncularError::schema(format!(
4408 "outbox commit {} was created with schema version {}, but this client supports {}",
4409 commit.client_commit_id, commit.schema_version, current
4410 )));
4411 }
4412
4413 Ok(())
4414}
4415
4416fn validate_server_schema_version(response: &CombinedResponse, current: i32) -> Result<()> {
4417 if let Some(required) = response.required_schema_version {
4418 if required < 1 {
4419 return Err(SyncularError::schema(format!(
4420 "server reported invalid required schema version {required}"
4421 )));
4422 }
4423
4424 if required > current {
4425 return Err(SyncularError::schema(format!(
4426 "server requires schema version {required}, but this client supports {current}"
4427 )));
4428 }
4429 }
4430
4431 if let Some(latest) = response.latest_schema_version {
4432 if latest < 1 {
4433 return Err(SyncularError::schema(format!(
4434 "server reported invalid latest schema version {latest}"
4435 )));
4436 }
4437 }
4438
4439 Ok(())
4440}
4441
4442fn apply_push_commit_response(
4443 tx: &mut impl SyncStoreTx,
4444 outbox: &OutboxCommit,
4445 response: &PushCommitResponse,
4446) -> Result<bool> {
4447 let mut conflicts_changed = false;
4448 match response.status.as_str() {
4449 "applied" | "cached" => {
4450 tx.mark_pushed_operation_server_versions(outbox, response)?;
4451 tx.mark_outbox_acked(&outbox.id, response)?;
4452 }
4453 _ => {
4454 for result in &response.results {
4455 if result.status == "conflict" || result.status == "error" {
4456 tx.insert_conflict(outbox, result)?;
4457 conflicts_changed = true;
4458 }
4459 }
4460 tx.mark_outbox_failed(&outbox.id, "REJECTED", response)?;
4461 }
4462 }
4463 Ok(conflicts_changed)
4464}
4465
4466fn is_auth_transport_error(error: &SyncularError) -> bool {
4467 if error.kind() != ErrorKind::Transport {
4468 return false;
4469 }
4470 let message = error.message_text();
4471 message.contains("HTTP 401") || message.contains("HTTP 403")
4472}