Skip to main content

syncular_runtime/core/
client.rs

1#[cfg(feature = "native")]
2use crate::app_schema::validate_app_schema_runtime_features;
3use crate::app_schema::{
4    default_app_schema, validate_auth_lease_payload_against_app_schema,
5    validate_blob_encryption_against_app_schema, validate_encrypted_crdt_against_app_schema,
6    validate_field_encryption_rules_against_app_schema, AppSchema, AppTableMetadata,
7};
8#[cfg(feature = "native")]
9use crate::app_schema::{
10    validate_auth_lease_issue_request_against_app_schema,
11    validate_auth_lease_issue_response_against_app_schema,
12    validate_blob_runtime_against_app_schema,
13};
14use crate::binary_snapshot::{
15    BinarySnapshotCell, BinarySnapshotPayload, BorrowedBinarySnapshotCell,
16    DecodedBinarySnapshotRows, SnapshotChunkRows,
17};
18use crate::command_history::{CommandHistoryEntry, CommandHistoryRecord, CommandHistoryState};
19#[cfg(feature = "native")]
20use crate::crdt_field::{
21    validate_crdt_field, CrdtDocumentSnapshot, CrdtField, CrdtFieldCompactionStats, CrdtFieldId,
22    CrdtFieldSyncMode, CrdtUpdateLogEntry,
23};
24#[cfg(feature = "native")]
25use crate::crdt_yjs::{
26    build_yjs_text_update, materialize_yjs_state, validate_crdt_request_json_size,
27    validate_yjs_text_input_size, validate_yjs_update_envelope_size, yjs_state_vector_base64,
28    BuildYjsTextUpdateArgs,
29};
30use crate::crdt_yjs::{YjsUpdateEnvelope, YJS_PAYLOAD_KEY};
31#[cfg(feature = "native")]
32use crate::diesel_sqlite::DieselSqliteStore;
33#[cfg(feature = "native")]
34use crate::encrypted_crdt::{
35    encrypted_crdt_stream_id, encrypted_field_metadata, BuildEncryptedCrdtCheckpointArgs,
36    BuildEncryptedCrdtTextUpdateArgs, BuildEncryptedCrdtYjsUpdateArgs, EncryptedCrdtStreamStats,
37};
38use crate::encrypted_crdt::{is_encrypted_crdt_system_table, EncryptedCrdt};
39use crate::encryption::{BlobEncryption, FieldEncryption, FieldEncryptionContext};
40use crate::error::{ErrorKind, Result, SyncularError};
41#[cfg(feature = "demo-todo-native-fixture")]
42use crate::fixtures::todo::rusqlite_sqlite::RusqliteStore;
43#[cfg(feature = "native")]
44use crate::limits::{DEFAULT_BLOB_UPLOAD_BATCH_LIMIT, DEFAULT_CRDT_UPDATE_QUEUE_CAPACITY};
45use crate::limits::{
46    DEFAULT_CRDT_STATE_VECTOR_HINT_LIMIT, DEFAULT_OUTBOX_PUSH_BATCH_LIMIT,
47    DEFAULT_PULL_LIMIT_COMMITS, DEFAULT_PULL_LIMIT_SNAPSHOT_ROWS, DEFAULT_PULL_MAX_SNAPSHOT_PAGES,
48    MAX_SCOPE_KEYS_PER_SUBSCRIPTION, MAX_SCOPE_VALUES_PER_CLIENT,
49    MAX_SCOPE_VALUES_PER_SUBSCRIPTION, MAX_SUBSCRIPTIONS_PER_CLIENT,
50    MAX_SUBSCRIPTION_PARAMS_PER_SUBSCRIPTION,
51};
52use crate::protocol::*;
53#[cfg(feature = "native")]
54use crate::store::next_blob_upload_retry_at;
55#[cfg(feature = "native")]
56use crate::store::MAX_BLOB_UPLOAD_RETRIES;
57use crate::store::{
58    next_retry_at, now_ms, OutboxCommit, SubscriptionState, SyncStateStore, SyncStore, SyncStoreTx,
59    VerifiedRoot, MAX_SYNC_RETRIES,
60};
61#[cfg(feature = "demo-todo-fixture")]
62use crate::store::{DemoTaskStore, Task};
63#[cfg(feature = "native")]
64use crate::transport::BlobTransport;
65#[cfg(feature = "native")]
66use crate::transport::{HttpSyncTransport, SyncTransportConfig};
67use crate::transport::{
68    RealtimeEvent, RealtimeTransport, SyncAuthHeaderStore, SyncAuthHeaders, SyncTransport,
69};
70#[cfg(feature = "native")]
71use crate::transport::{SyncAuthSigner, SyncAuthSignerStore};
72use serde::{Deserialize, Serialize};
73#[cfg(feature = "native")]
74use serde_json::json;
75use serde_json::{Map, Value};
76#[cfg(feature = "native")]
77use std::collections::BTreeMap;
78use std::collections::{HashMap, HashSet};
79use std::fmt;
80#[cfg(feature = "native")]
81use std::fs;
82#[cfg(feature = "native")]
83use std::fs::File;
84#[cfg(feature = "native")]
85use std::path::Path;
86use std::sync::{Mutex, OnceLock};
87use std::time::{Duration, SystemTime};
88#[cfg(feature = "demo-todo-fixture")]
89use uuid::Uuid;
90
91const DEFAULT_STATE_ID: &str = "default";
92pub const LOCAL_SYNC_DISABLED_BASE_URL: &str = "syncular-local://disabled";
93const MAX_PULL_ROUNDS: usize = 20;
94static ACTIVE_SYNC_KEYS: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
95
96#[derive(Debug, Clone)]
97pub struct SyncularClientConfig {
98    pub db_path: String,
99    pub base_url: String,
100    pub client_id: String,
101    pub actor_id: String,
102    pub project_id: Option<String>,
103}
104
105impl SyncularClientConfig {
106    pub fn local_sync_compatible(
107        db_path: impl Into<String>,
108        client_id: impl Into<String>,
109        actor_id: impl Into<String>,
110        project_id: Option<String>,
111    ) -> Self {
112        Self {
113            db_path: db_path.into(),
114            base_url: LOCAL_SYNC_DISABLED_BASE_URL.to_string(),
115            client_id: client_id.into(),
116            actor_id: actor_id.into(),
117            project_id,
118        }
119    }
120
121    pub fn is_local_sync_compatible(&self) -> bool {
122        self.base_url == LOCAL_SYNC_DISABLED_BASE_URL
123    }
124}
125
126pub trait SyncularMutationExecutor {
127    fn apply_mutation<M>(&mut self, mutation: M) -> Result<MutationReceipt>
128    where
129        M: IntoSyncularMutation;
130
131    fn apply_mutation_batch(&mut self, batch: SyncularMutationBatch) -> Result<MutationReceipt>;
132
133    fn commit_mutations<R>(
134        &mut self,
135        f: impl FnOnce(&mut SyncularMutationBatch) -> Result<R>,
136    ) -> Result<MutationCommit<R>> {
137        let mut batch = SyncularMutationBatch::new();
138        let result = f(&mut batch)?;
139        let commit = self.apply_mutation_batch(batch)?;
140        Ok(MutationCommit { result, commit })
141    }
142}
143
144pub trait SyncularLeasedMutationExecutor: SyncularMutationExecutor {
145    fn apply_leased_mutation<M>(&mut self, mutation: M) -> Result<MutationReceipt>
146    where
147        M: IntoSyncularMutation;
148
149    fn apply_leased_mutation_batch(
150        &mut self,
151        batch: SyncularMutationBatch,
152    ) -> Result<MutationReceipt>;
153
154    fn commit_leased_mutations<R>(
155        &mut self,
156        f: impl FnOnce(&mut SyncularMutationBatch) -> Result<R>,
157    ) -> Result<MutationCommit<R>> {
158        let mut batch = SyncularMutationBatch::new();
159        let result = f(&mut batch)?;
160        let commit = self.apply_leased_mutation_batch(batch)?;
161        Ok(MutationCommit { result, commit })
162    }
163}
164
165pub trait SyncularCommandHistoryExecutor: SyncularMutationExecutor {
166    fn command_history_current_row_json(
167        &mut self,
168        table: &str,
169        row_id: &str,
170    ) -> Result<Option<Value>>;
171
172    fn command_history_record(
173        &mut self,
174        mutation_scope: &str,
175        entries: &[CommandHistoryEntry],
176        receipt: &MutationReceipt,
177    ) -> Result<CommandHistoryRecord>;
178
179    fn command_history_latest(
180        &mut self,
181        state: CommandHistoryState,
182    ) -> Result<Option<CommandHistoryRecord>>;
183
184    fn command_history_mark(
185        &mut self,
186        id: &str,
187        state: CommandHistoryState,
188        receipt: &MutationReceipt,
189    ) -> Result<()>;
190
191    fn apply_command_history_batch(
192        &mut self,
193        mutation_scope: &str,
194        batch: SyncularMutationBatch,
195    ) -> Result<MutationReceipt>;
196
197    fn apply_command_history_tracked_batch(
198        &mut self,
199        mutation_scope: &str,
200        batch: SyncularMutationBatch,
201    ) -> Result<MutationReceipt> {
202        let pending = command_history_pending_entries(self, batch.mutations())?;
203        let receipt = self.apply_command_history_batch(mutation_scope, batch)?;
204        let entries = command_history_committed_entries(self, pending)?;
205        if !entries.is_empty() {
206            self.command_history_record(mutation_scope, &entries, &receipt)?;
207        }
208        Ok(receipt)
209    }
210}
211
212#[derive(Debug, Clone)]
213struct CommandHistoryPendingEntry {
214    table: String,
215    row_id: String,
216    before: Option<Value>,
217}
218
219fn command_history_pending_entries<C>(
220    client: &mut C,
221    mutations: &[PendingSyncularMutation],
222) -> Result<Vec<CommandHistoryPendingEntry>>
223where
224    C: SyncularCommandHistoryExecutor + ?Sized,
225{
226    let mut entries = Vec::new();
227    for mutation in mutations {
228        if entries.iter().any(|entry: &CommandHistoryPendingEntry| {
229            entry.table == mutation.table && entry.row_id == mutation.row_id
230        }) {
231            continue;
232        }
233        let before = client.command_history_current_row_json(&mutation.table, &mutation.row_id)?;
234        entries.push(CommandHistoryPendingEntry {
235            table: mutation.table.clone(),
236            row_id: mutation.row_id.clone(),
237            before,
238        });
239    }
240    Ok(entries)
241}
242
243fn command_history_committed_entries<C>(
244    client: &mut C,
245    pending: Vec<CommandHistoryPendingEntry>,
246) -> Result<Vec<CommandHistoryEntry>>
247where
248    C: SyncularCommandHistoryExecutor + ?Sized,
249{
250    let mut entries = Vec::new();
251    for entry in pending {
252        let after = client.command_history_current_row_json(&entry.table, &entry.row_id)?;
253        if entry.before == after {
254            continue;
255        }
256        entries.push(CommandHistoryEntry {
257            table: entry.table,
258            row_id: entry.row_id,
259            before: entry.before,
260            after,
261        });
262    }
263    Ok(entries)
264}
265
266pub trait SyncularEncryptedCrdtMutationExecutor {
267    fn apply_encrypted_crdt_text_update(
268        &mut self,
269        metadata: &'static AppTableMetadata,
270        field: &'static str,
271        row_id: &str,
272        next_text: &str,
273    ) -> Result<MutationReceipt>;
274
275    fn apply_encrypted_crdt_yjs_update(
276        &mut self,
277        metadata: &'static AppTableMetadata,
278        field: &'static str,
279        row_id: &str,
280        update: YjsUpdateEnvelope,
281    ) -> Result<MutationReceipt>;
282
283    fn apply_encrypted_crdt_checkpoint(
284        &mut self,
285        metadata: &'static AppTableMetadata,
286        field: &'static str,
287        row_id: &str,
288        min_uncheckpointed_updates: i64,
289    ) -> Result<Option<MutationReceipt>>;
290}
291
292#[cfg(feature = "native")]
293#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
294#[serde(rename_all = "camelCase")]
295pub struct CrdtFieldWriteReceipt {
296    pub client_commit_id: String,
297    pub sync_mode: CrdtFieldSyncMode,
298}
299
300#[cfg(feature = "native")]
301#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
302#[serde(rename_all = "camelCase")]
303pub struct CrdtFieldMaterialization {
304    pub value: Value,
305    pub state_base64: Option<String>,
306    pub state_vector_base64: String,
307}
308
309#[cfg(feature = "native")]
310#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
311#[serde(rename_all = "camelCase")]
312pub struct CrdtFieldCompactionReceipt {
313    pub checkpoint_created: bool,
314    pub client_commit_id: Option<String>,
315    pub before: CrdtFieldCompactionStats,
316    pub after: CrdtFieldCompactionStats,
317    pub encrypted_stream_before: Option<EncryptedCrdtStreamStats>,
318    pub encrypted_stream_after: Option<EncryptedCrdtStreamStats>,
319}
320
321#[cfg(feature = "native")]
322#[derive(Debug, Clone, Deserialize)]
323#[serde(rename_all = "camelCase")]
324struct EncryptedCrdtUpdateJsonRequest {
325    table: String,
326    field: String,
327    row_id: String,
328    #[serde(default)]
329    next_text: Option<String>,
330    #[serde(default)]
331    update: Option<YjsUpdateEnvelope>,
332}
333
334#[cfg(feature = "native")]
335#[derive(Debug, Clone, Deserialize)]
336#[serde(rename_all = "camelCase")]
337struct EncryptedCrdtCheckpointJsonRequest {
338    table: String,
339    field: String,
340    row_id: String,
341    #[serde(default)]
342    min_uncheckpointed_updates: Option<i64>,
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize)]
346pub struct SubscriptionSpec {
347    pub id: String,
348    pub table: String,
349    pub scopes: ScopeValues,
350    pub params: Map<String, Value>,
351    #[serde(default, rename = "bootstrapPhase")]
352    pub bootstrap_phase: i64,
353}
354
355pub fn validate_subscription_limits(subscriptions: &[SubscriptionSpec]) -> Result<()> {
356    if subscriptions.len() > MAX_SUBSCRIPTIONS_PER_CLIENT {
357        return Err(subscription_limit_error(
358            "maxSubscriptionsPerClient",
359            subscriptions.len(),
360            MAX_SUBSCRIPTIONS_PER_CLIENT,
361            "too many Syncular subscriptions configured",
362        ));
363    }
364
365    let mut total_scope_values = 0usize;
366    for subscription in subscriptions {
367        if subscription.scopes.len() > MAX_SCOPE_KEYS_PER_SUBSCRIPTION {
368            return Err(subscription_limit_error(
369                "maxScopeKeysPerSubscription",
370                subscription.scopes.len(),
371                MAX_SCOPE_KEYS_PER_SUBSCRIPTION,
372                format!(
373                    "too many scope keys configured for subscription {}",
374                    subscription.id
375                ),
376            ));
377        }
378        if subscription.params.len() > MAX_SUBSCRIPTION_PARAMS_PER_SUBSCRIPTION {
379            return Err(subscription_limit_error(
380                "maxSubscriptionParamsPerSubscription",
381                subscription.params.len(),
382                MAX_SUBSCRIPTION_PARAMS_PER_SUBSCRIPTION,
383                format!(
384                    "too many params configured for subscription {}",
385                    subscription.id
386                ),
387            ));
388        }
389
390        let subscription_scope_values = count_scope_values(&subscription.scopes);
391        if subscription_scope_values > MAX_SCOPE_VALUES_PER_SUBSCRIPTION {
392            return Err(subscription_limit_error(
393                "maxScopeValuesPerSubscription",
394                subscription_scope_values,
395                MAX_SCOPE_VALUES_PER_SUBSCRIPTION,
396                format!(
397                    "too many scope values configured for subscription {}",
398                    subscription.id
399                ),
400            ));
401        }
402        total_scope_values = total_scope_values.saturating_add(subscription_scope_values);
403        if total_scope_values > MAX_SCOPE_VALUES_PER_CLIENT {
404            return Err(subscription_limit_error(
405                "maxScopeValuesPerClient",
406                total_scope_values,
407                MAX_SCOPE_VALUES_PER_CLIENT,
408                "too many total scope values configured for Syncular client",
409            ));
410        }
411    }
412
413    Ok(())
414}
415
416fn count_scope_values(scopes: &ScopeValues) -> usize {
417    scopes
418        .values()
419        .map(|value| value.as_array().map_or(1, Vec::len))
420        .sum()
421}
422
423fn subscription_limit_error(
424    limit: &'static str,
425    observed: usize,
426    max: usize,
427    message: impl fmt::Display,
428) -> SyncularError {
429    SyncularError::limit_exceeded(limit, observed, max, message)
430}
431
432#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
433#[serde(rename_all = "camelCase")]
434pub struct SyncChangedCrdtField {
435    pub field: String,
436    pub state_column: String,
437    pub container_key: String,
438    pub row_id_field: String,
439    pub kind: String,
440    pub sync_mode: String,
441}
442
443#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
444#[serde(rename_all = "camelCase")]
445pub struct SyncChangedRow {
446    pub table: String,
447    #[serde(skip_serializing_if = "Option::is_none")]
448    pub row_id: Option<String>,
449    pub operation: String,
450    #[serde(default)]
451    pub changed_fields: Vec<String>,
452    #[serde(default)]
453    pub crdt_fields: Vec<String>,
454    #[serde(default, skip_serializing_if = "Vec::is_empty")]
455    pub crdt_field_changes: Vec<SyncChangedCrdtField>,
456    #[serde(skip_serializing_if = "Option::is_none")]
457    pub commit_id: Option<String>,
458    #[serde(skip_serializing_if = "Option::is_none")]
459    pub commit_seq: Option<i64>,
460    #[serde(skip_serializing_if = "Option::is_none")]
461    pub subscription_id: Option<String>,
462    #[serde(skip_serializing_if = "Option::is_none")]
463    pub server_version: Option<i64>,
464}
465
466#[derive(Debug, Clone, Default, PartialEq, Eq)]
467pub struct SyncReport {
468    pub changed_tables: Vec<String>,
469    pub changed_rows: Vec<SyncChangedRow>,
470    pub conflicts_changed: bool,
471}
472
473#[cfg(feature = "native")]
474#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
475#[serde(rename_all = "camelCase")]
476pub struct BootstrapStatus {
477    pub channel_phase: String,
478    pub progress_percent: i64,
479    pub is_bootstrapping: bool,
480    pub critical_ready: bool,
481    pub interactive_ready: bool,
482    pub complete: bool,
483    pub active_phase: Option<i64>,
484    pub expected_subscription_ids: Vec<String>,
485    pub ready_subscription_ids: Vec<String>,
486    pub pending_subscription_ids: Vec<String>,
487    pub subscriptions: Vec<BootstrapSubscriptionStatus>,
488    pub phases: Vec<BootstrapPhaseStatus>,
489}
490
491#[cfg(feature = "native")]
492#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
493#[serde(rename_all = "camelCase")]
494pub struct BootstrapSubscriptionStatus {
495    pub id: String,
496    pub table: String,
497    pub expected: bool,
498    pub ready: bool,
499    pub status: Option<String>,
500    pub phase: String,
501    pub progress_percent: i64,
502    pub cursor: Option<i64>,
503    pub bootstrap_state: Option<BootstrapState>,
504    pub bootstrap_phase: i64,
505}
506
507#[cfg(feature = "native")]
508#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
509#[serde(rename_all = "camelCase")]
510pub struct BootstrapPhaseStatus {
511    pub phase: i64,
512    pub expected_subscription_ids: Vec<String>,
513    pub ready_subscription_ids: Vec<String>,
514    pub pending_subscription_ids: Vec<String>,
515    pub is_ready: bool,
516    pub progress_percent: i64,
517}
518
519#[derive(Debug, Clone)]
520struct PreparedSnapshot {
521    snapshot: SyncSnapshot,
522    chunk_batches: Vec<SnapshotChunkRows>,
523    artifact_rows: Vec<Value>,
524}
525
526impl SyncReport {
527    pub fn table_changed(table: impl Into<String>) -> Self {
528        Self {
529            changed_tables: vec![table.into()],
530            changed_rows: Vec::new(),
531            conflicts_changed: false,
532        }
533    }
534
535    pub fn tables_changed<I, Table>(tables: I) -> Self
536    where
537        I: IntoIterator<Item = Table>,
538        Table: Into<String>,
539    {
540        let mut report = Self::default();
541        for table in tables {
542            report.add_changed_table(&table.into());
543        }
544        report
545    }
546
547    pub fn conflicts_changed() -> Self {
548        Self {
549            changed_tables: Vec::new(),
550            changed_rows: Vec::new(),
551            conflicts_changed: true,
552        }
553    }
554
555    fn add_changed_table(&mut self, table: &str) {
556        if !self.changed_tables.iter().any(|existing| existing == table) {
557            self.changed_tables.push(table.to_string());
558        }
559    }
560
561    pub fn changes_table(&self, table: &str) -> bool {
562        self.changed_tables.iter().any(|existing| existing == table)
563    }
564
565    pub fn changes_any_table<'a>(&self, tables: impl IntoIterator<Item = &'a str>) -> bool {
566        tables.into_iter().any(|table| self.changes_table(table))
567    }
568
569    fn add_changed_row(&mut self, row: SyncChangedRow) {
570        self.add_changed_table(&row.table);
571        self.changed_rows.push(row);
572    }
573
574    fn merge(&mut self, other: SyncReport) {
575        self.conflicts_changed |= other.conflicts_changed;
576        for table in other.changed_tables {
577            self.add_changed_table(&table);
578        }
579        self.changed_rows.extend(other.changed_rows);
580    }
581}
582
583pub fn sync_changed_row_for_operation(
584    app_schema: AppSchema,
585    operation: &SyncOperation,
586    commit_id: Option<String>,
587) -> Option<SyncChangedRow> {
588    sync_changed_row_for_local_operation(app_schema, operation, None, None, commit_id)
589}
590
591pub fn sync_changed_row_for_local_operation(
592    app_schema: AppSchema,
593    operation: &SyncOperation,
594    previous_row: Option<&Value>,
595    local_row: Option<&Value>,
596    commit_id: Option<String>,
597) -> Option<SyncChangedRow> {
598    let metadata = app_schema.table_metadata(&operation.table)?;
599    let changed_fields = if operation.op == "delete" {
600        Vec::new()
601    } else if let Some(local_row) = local_row {
602        changed_fields_from_row_diff(metadata, previous_row, Some(local_row))
603    } else {
604        changed_fields_from_payload(metadata, operation.payload.as_ref())
605    };
606    let operation_kind = if operation.op == "upsert" {
607        if previous_row.is_some() {
608            "update"
609        } else {
610            "insert"
611        }
612    } else {
613        operation.op.as_str()
614    };
615    let crdt_field_changes = crdt_field_changes_for_fields(metadata, &changed_fields);
616    let crdt_fields = crdt_state_columns_for_changes(&crdt_field_changes);
617    Some(SyncChangedRow {
618        table: operation.table.clone(),
619        row_id: Some(operation.row_id.clone()),
620        operation: operation_kind.to_string(),
621        crdt_fields,
622        crdt_field_changes,
623        changed_fields,
624        commit_id,
625        commit_seq: None,
626        subscription_id: None,
627        server_version: operation.base_version,
628    })
629}
630
631pub fn sync_changed_row_for_change(
632    app_schema: AppSchema,
633    change: &SyncChange,
634    previous_row: Option<&Value>,
635    commit_seq: i64,
636    subscription_id: &str,
637) -> Option<SyncChangedRow> {
638    let metadata = app_schema.table_metadata(&change.table)?;
639    let changed_fields = changed_fields_from_remote_change(metadata, change, previous_row);
640    let crdt_field_changes = crdt_field_changes_for_fields(metadata, &changed_fields);
641    let crdt_fields = crdt_state_columns_for_changes(&crdt_field_changes);
642    Some(SyncChangedRow {
643        table: change.table.clone(),
644        row_id: Some(change.row_id.clone()),
645        operation: if change.op == "delete" {
646            "delete".to_string()
647        } else if previous_row.is_some() {
648            "update".to_string()
649        } else {
650            "insert".to_string()
651        },
652        crdt_fields,
653        crdt_field_changes,
654        changed_fields,
655        commit_id: Some(commit_seq.to_string()),
656        commit_seq: Some(commit_seq),
657        subscription_id: Some(subscription_id.to_string()),
658        server_version: change.row_version,
659    })
660}
661
662pub fn sync_changed_row_for_snapshot(
663    app_schema: AppSchema,
664    table: &str,
665    row: &Value,
666    previous_row: Option<&Value>,
667    subscription_id: &str,
668) -> Option<SyncChangedRow> {
669    let metadata = app_schema.table_metadata(table)?;
670    let row_id = row
671        .get(metadata.primary_key_column)
672        .and_then(Value::as_str)
673        .map(str::to_string);
674    let changed_fields = changed_fields_from_row_diff(metadata, previous_row, Some(row));
675    let crdt_field_changes = crdt_field_changes_for_fields(metadata, &changed_fields);
676    let crdt_fields = crdt_state_columns_for_changes(&crdt_field_changes);
677    Some(SyncChangedRow {
678        table: table.to_string(),
679        row_id,
680        operation: if previous_row.is_some() {
681            "update".to_string()
682        } else {
683            "insert".to_string()
684        },
685        crdt_fields,
686        crdt_field_changes,
687        changed_fields,
688        commit_id: None,
689        commit_seq: None,
690        subscription_id: Some(subscription_id.to_string()),
691        server_version: row
692            .get(metadata.server_version_column)
693            .and_then(Value::as_i64),
694    })
695}
696
697pub(crate) fn sync_changed_rows_for_cleared_snapshot_chunk(
698    app_schema: AppSchema,
699    table: &str,
700    rows: &SnapshotChunkRows,
701    subscription_id: &str,
702) -> Vec<SyncChangedRow> {
703    sync_changed_rows_for_cleared_snapshot_chunk_limited(
704        app_schema,
705        table,
706        rows,
707        subscription_id,
708        usize::MAX,
709    )
710    .0
711}
712
713pub(crate) fn sync_changed_rows_for_cleared_snapshot_chunk_limited(
714    app_schema: AppSchema,
715    table: &str,
716    rows: &SnapshotChunkRows,
717    subscription_id: &str,
718    limit: usize,
719) -> (Vec<SyncChangedRow>, bool) {
720    match rows {
721        SnapshotChunkRows::Json(rows) => (
722            rows.iter()
723                .take(limit)
724                .filter_map(|row| {
725                    sync_changed_row_for_snapshot(app_schema, table, row, None, subscription_id)
726                })
727                .collect(),
728            rows.len() > limit,
729        ),
730        SnapshotChunkRows::Binary(rows) => sync_changed_rows_for_cleared_binary_snapshot_chunk(
731            app_schema,
732            table,
733            rows,
734            subscription_id,
735            limit,
736        ),
737        SnapshotChunkRows::BinaryPayload(rows) => {
738            sync_changed_rows_for_cleared_binary_snapshot_payload(
739                app_schema,
740                table,
741                rows,
742                subscription_id,
743                limit,
744            )
745        }
746    }
747}
748
749fn sync_changed_rows_for_cleared_binary_snapshot_chunk(
750    app_schema: AppSchema,
751    table: &str,
752    rows: &DecodedBinarySnapshotRows,
753    subscription_id: &str,
754    limit: usize,
755) -> (Vec<SyncChangedRow>, bool) {
756    sync_changed_rows_for_cleared_binary_snapshot_chunk_limited(
757        app_schema,
758        table,
759        rows,
760        subscription_id,
761        limit,
762    )
763    .unwrap_or_default()
764}
765
766fn sync_changed_rows_for_cleared_binary_snapshot_chunk_limited(
767    app_schema: AppSchema,
768    table: &str,
769    rows: &DecodedBinarySnapshotRows,
770    subscription_id: &str,
771    limit: usize,
772) -> Result<(Vec<SyncChangedRow>, bool)> {
773    let Some(metadata) = app_schema.table_metadata(table) else {
774        return Ok((Vec::new(), false));
775    };
776    let Some(primary_key_index) = rows
777        .columns
778        .iter()
779        .position(|column| column.name == metadata.primary_key_column)
780    else {
781        return Ok((Vec::new(), false));
782    };
783    let server_version_index = rows
784        .columns
785        .iter()
786        .position(|column| column.name == metadata.server_version_column);
787    let present_columns = rows
788        .columns
789        .iter()
790        .map(|column| column.name.as_str())
791        .collect::<HashSet<_>>();
792    let changed_fields = metadata
793        .columns
794        .iter()
795        .filter_map(|column| {
796            if column.name == metadata.primary_key_column || !present_columns.contains(column.name)
797            {
798                return None;
799            }
800            Some(column.name.to_string())
801        })
802        .collect::<Vec<_>>();
803    let crdt_field_changes = crdt_field_changes_for_fields(metadata, &changed_fields);
804    let crdt_fields = crdt_state_columns_for_changes(&crdt_field_changes);
805    Ok((
806        rows.rows
807            .iter()
808            .take(limit)
809            .map(|row| SyncChangedRow {
810                table: table.to_string(),
811                row_id: row
812                    .get(primary_key_index)
813                    .and_then(binary_snapshot_cell_row_id),
814                operation: "insert".to_string(),
815                crdt_fields: crdt_fields.clone(),
816                crdt_field_changes: crdt_field_changes.clone(),
817                changed_fields: changed_fields.clone(),
818                commit_id: None,
819                commit_seq: None,
820                subscription_id: Some(subscription_id.to_string()),
821                server_version: server_version_index
822                    .and_then(|index| row.get(index))
823                    .and_then(binary_snapshot_cell_i64),
824            })
825            .collect(),
826        rows.rows.len() > limit,
827    ))
828}
829
830fn binary_snapshot_cell_row_id(cell: &BinarySnapshotCell) -> Option<String> {
831    match cell {
832        BinarySnapshotCell::String(value) => Some(value.clone()),
833        BinarySnapshotCell::Integer(value) => Some(value.to_string()),
834        _ => None,
835    }
836}
837
838fn binary_snapshot_cell_i64(cell: &BinarySnapshotCell) -> Option<i64> {
839    match cell {
840        BinarySnapshotCell::Integer(value) => Some(*value),
841        BinarySnapshotCell::String(value) => value.parse().ok(),
842        _ => None,
843    }
844}
845
846fn sync_changed_rows_for_cleared_binary_snapshot_payload(
847    app_schema: AppSchema,
848    table: &str,
849    payload: &BinarySnapshotPayload,
850    subscription_id: &str,
851    limit: usize,
852) -> (Vec<SyncChangedRow>, bool) {
853    sync_changed_rows_for_cleared_binary_snapshot_payload_limited(
854        app_schema,
855        table,
856        payload,
857        subscription_id,
858        limit,
859    )
860    .unwrap_or_default()
861}
862
863fn sync_changed_rows_for_cleared_binary_snapshot_payload_limited(
864    app_schema: AppSchema,
865    table: &str,
866    payload: &BinarySnapshotPayload,
867    subscription_id: &str,
868    limit: usize,
869) -> Result<(Vec<SyncChangedRow>, bool)> {
870    let truncated = payload.row_count() > limit;
871    if limit == 0 {
872        return Ok((Vec::new(), truncated));
873    }
874    let Some(metadata) = app_schema.table_metadata(table) else {
875        return Ok((Vec::new(), false));
876    };
877    let Some(primary_key_index) = payload
878        .columns
879        .iter()
880        .position(|column| column.name == metadata.primary_key_column)
881    else {
882        return Ok((Vec::new(), false));
883    };
884    let server_version_index = payload
885        .columns
886        .iter()
887        .position(|column| column.name == metadata.server_version_column);
888    let present_columns = payload
889        .columns
890        .iter()
891        .map(|column| column.name.as_str())
892        .collect::<HashSet<_>>();
893    let changed_fields = metadata
894        .columns
895        .iter()
896        .filter_map(|column| {
897            if column.name == metadata.primary_key_column || !present_columns.contains(column.name)
898            {
899                return None;
900            }
901            Some(column.name.to_string())
902        })
903        .collect::<Vec<_>>();
904    let crdt_field_changes = crdt_field_changes_for_fields(metadata, &changed_fields);
905    let crdt_fields = crdt_state_columns_for_changes(&crdt_field_changes);
906    let mut cursor = payload.row_cursor();
907    let row_limit = payload.row_count().min(limit);
908    let mut rows = Vec::with_capacity(row_limit);
909    for _ in 0..row_limit {
910        let mut row_id = None;
911        let mut server_version = None;
912        let read = cursor.read_next_row(|column_index, _column, cell| {
913            if column_index == primary_key_index {
914                row_id = borrowed_binary_snapshot_cell_row_id(cell);
915            }
916            if Some(column_index) == server_version_index {
917                server_version = borrowed_binary_snapshot_cell_i64(cell);
918            }
919            Ok(())
920        })?;
921        if !read {
922            break;
923        }
924        rows.push(SyncChangedRow {
925            table: table.to_string(),
926            row_id,
927            operation: "insert".to_string(),
928            crdt_fields: crdt_fields.clone(),
929            crdt_field_changes: crdt_field_changes.clone(),
930            changed_fields: changed_fields.clone(),
931            commit_id: None,
932            commit_seq: None,
933            subscription_id: Some(subscription_id.to_string()),
934            server_version,
935        });
936    }
937    if !truncated {
938        cursor.assert_done()?;
939    }
940    Ok((rows, truncated))
941}
942
943fn borrowed_binary_snapshot_cell_row_id(cell: BorrowedBinarySnapshotCell<'_>) -> Option<String> {
944    match cell {
945        BorrowedBinarySnapshotCell::String(value) => Some(value.to_string()),
946        BorrowedBinarySnapshotCell::Integer(value) => Some(value.to_string()),
947        _ => None,
948    }
949}
950
951fn borrowed_binary_snapshot_cell_i64(cell: BorrowedBinarySnapshotCell<'_>) -> Option<i64> {
952    match cell {
953        BorrowedBinarySnapshotCell::Integer(value) => Some(value),
954        BorrowedBinarySnapshotCell::String(value) => value.parse().ok(),
955        _ => None,
956    }
957}
958
959#[cfg(test)]
960mod changed_rows_tests {
961    use super::*;
962    use crate::app_schema::{ColumnMetadata, EmbeddedMigration};
963    use crate::binary_snapshot::decode_binary_snapshot_payload;
964
965    static TEST_COLUMNS: [ColumnMetadata; 3] = [
966        ColumnMetadata {
967            name: "id",
968            type_family: "text",
969            notnull_required: true,
970            primary_key: true,
971        },
972        ColumnMetadata {
973            name: "title",
974            type_family: "text",
975            notnull_required: false,
976            primary_key: false,
977        },
978        ColumnMetadata {
979            name: "server_version",
980            type_family: "integer",
981            notnull_required: true,
982            primary_key: false,
983        },
984    ];
985
986    static TEST_TABLES: [&str; 1] = ["tasks"];
987    static TEST_TABLE_METADATA: [AppTableMetadata; 1] = [AppTableMetadata {
988        name: "tasks",
989        primary_key_column: "id",
990        server_version_column: "server_version",
991        soft_delete_column: None,
992        subscription_id: "tasks",
993        columns: &TEST_COLUMNS,
994        blob_columns: &[],
995        crdt_yjs_fields: &[],
996        encrypted_fields: &[],
997        scopes: &[],
998    }];
999    static TEST_MIGRATIONS: [EmbeddedMigration; 0] = [];
1000
1001    fn default_subscriptions(_: &SyncularClientConfig) -> Vec<SubscriptionSpec> {
1002        Vec::new()
1003    }
1004
1005    #[cfg(feature = "native")]
1006    fn adapter_for(_: &str) -> Result<&'static dyn crate::app_schema::DieselTableAdapter> {
1007        Err(SyncularError::config("test schema has no diesel adapter"))
1008    }
1009
1010    fn test_schema() -> AppSchema {
1011        AppSchema {
1012            app_tables: &TEST_TABLES,
1013            app_table_metadata: &TEST_TABLE_METADATA,
1014            migrations: &TEST_MIGRATIONS,
1015            schema_version: Some(1),
1016            default_subscriptions,
1017            #[cfg(feature = "native")]
1018            adapter_for,
1019        }
1020    }
1021
1022    #[test]
1023    fn builds_changed_rows_from_binary_snapshot_payload() {
1024        let payload = decode_binary_snapshot_payload(binary_snapshot_bytes()).unwrap();
1025        let rows = sync_changed_rows_for_cleared_snapshot_chunk(
1026            test_schema(),
1027            "tasks",
1028            &SnapshotChunkRows::BinaryPayload(payload),
1029            "sub-tasks",
1030        );
1031
1032        assert_eq!(rows.len(), 2);
1033        assert_eq!(rows[0].row_id.as_deref(), Some("task-1"));
1034        assert_eq!(rows[0].operation, "insert");
1035        assert_eq!(rows[0].changed_fields, vec!["title", "server_version"]);
1036        assert_eq!(rows[0].server_version, Some(41));
1037        assert_eq!(rows[0].subscription_id.as_deref(), Some("sub-tasks"));
1038        assert_eq!(rows[1].row_id.as_deref(), Some("task-2"));
1039        assert_eq!(rows[1].server_version, Some(42));
1040
1041        let payload = decode_binary_snapshot_payload(binary_snapshot_bytes()).unwrap();
1042        let (limited, truncated) = sync_changed_rows_for_cleared_snapshot_chunk_limited(
1043            test_schema(),
1044            "tasks",
1045            &SnapshotChunkRows::BinaryPayload(payload),
1046            "sub-tasks",
1047            1,
1048        );
1049        assert!(truncated);
1050        assert_eq!(limited.len(), 1);
1051        assert_eq!(limited[0].row_id.as_deref(), Some("task-1"));
1052    }
1053
1054    fn binary_snapshot_bytes() -> Vec<u8> {
1055        let mut bytes = Vec::new();
1056        bytes.extend_from_slice(b"SBT1");
1057        push_u16(&mut bytes, 1);
1058        push_u16(&mut bytes, 0);
1059        push_string16(&mut bytes, "tasks");
1060        push_u16(&mut bytes, 3);
1061        for (name, tag, flags) in [
1062            ("id", 1u8, 0u8),
1063            ("title", 1u8, 0u8),
1064            ("server_version", 2u8, 0u8),
1065        ] {
1066            push_string16(&mut bytes, name);
1067            bytes.push(tag);
1068            bytes.push(flags);
1069        }
1070        push_u32(&mut bytes, 2);
1071
1072        bytes.push(0);
1073        push_string32(&mut bytes, "task-1");
1074        push_string32(&mut bytes, "First");
1075        push_i64(&mut bytes, 41);
1076
1077        bytes.push(0);
1078        push_string32(&mut bytes, "task-2");
1079        push_string32(&mut bytes, "Second");
1080        push_i64(&mut bytes, 42);
1081        bytes
1082    }
1083
1084    fn push_u16(bytes: &mut Vec<u8>, value: u16) {
1085        bytes.extend_from_slice(&value.to_le_bytes());
1086    }
1087
1088    fn push_u32(bytes: &mut Vec<u8>, value: u32) {
1089        bytes.extend_from_slice(&value.to_le_bytes());
1090    }
1091
1092    fn push_i64(bytes: &mut Vec<u8>, value: i64) {
1093        bytes.extend_from_slice(&value.to_le_bytes());
1094    }
1095
1096    fn push_string16(bytes: &mut Vec<u8>, value: &str) {
1097        push_u16(bytes, value.len() as u16);
1098        bytes.extend_from_slice(value.as_bytes());
1099    }
1100
1101    fn push_string32(bytes: &mut Vec<u8>, value: &str) {
1102        push_u32(bytes, value.len() as u32);
1103        bytes.extend_from_slice(value.as_bytes());
1104    }
1105}
1106
1107fn changed_fields_from_remote_change(
1108    metadata: &AppTableMetadata,
1109    change: &SyncChange,
1110    previous_row: Option<&Value>,
1111) -> Vec<String> {
1112    if change.op == "delete" {
1113        return Vec::new();
1114    }
1115    let Some(row) = change.row_json.as_ref() else {
1116        return Vec::new();
1117    };
1118    if row
1119        .as_object()
1120        .is_some_and(|object| object.contains_key(YJS_PAYLOAD_KEY))
1121    {
1122        return changed_fields_from_yjs_envelope(metadata, row);
1123    }
1124    changed_fields_from_row_diff(metadata, previous_row, Some(row))
1125}
1126
1127fn changed_fields_from_row_diff(
1128    metadata: &AppTableMetadata,
1129    previous_row: Option<&Value>,
1130    next_row: Option<&Value>,
1131) -> Vec<String> {
1132    let Some(next_row) = next_row.and_then(Value::as_object) else {
1133        return Vec::new();
1134    };
1135    let previous_row = previous_row.and_then(Value::as_object);
1136    metadata
1137        .columns
1138        .iter()
1139        .filter_map(|column| {
1140            if column.name == metadata.primary_key_column || !next_row.contains_key(column.name) {
1141                return None;
1142            }
1143            match previous_row.and_then(|row| row.get(column.name)) {
1144                Some(previous) if Some(previous) == next_row.get(column.name) => None,
1145                _ => Some(column.name.to_string()),
1146            }
1147        })
1148        .collect()
1149}
1150
1151fn changed_fields_from_payload(
1152    metadata: &AppTableMetadata,
1153    payload: Option<&Value>,
1154) -> Vec<String> {
1155    let Some(payload) = payload else {
1156        return Vec::new();
1157    };
1158    if payload
1159        .as_object()
1160        .is_some_and(|object| object.contains_key(YJS_PAYLOAD_KEY))
1161    {
1162        return changed_fields_from_yjs_envelope(metadata, payload);
1163    }
1164    let Some(payload) = payload.as_object() else {
1165        return Vec::new();
1166    };
1167    metadata
1168        .columns
1169        .iter()
1170        .filter_map(|column| {
1171            if column.name == metadata.primary_key_column || !payload.contains_key(column.name) {
1172                return None;
1173            }
1174            Some(column.name.to_string())
1175        })
1176        .collect()
1177}
1178
1179fn changed_fields_from_yjs_envelope(metadata: &AppTableMetadata, payload: &Value) -> Vec<String> {
1180    let Some(envelope) = payload.get(YJS_PAYLOAD_KEY).and_then(Value::as_object) else {
1181        return Vec::new();
1182    };
1183    let mut fields = Vec::new();
1184    for field_name in envelope.keys() {
1185        if let Some(field) = metadata
1186            .crdt_yjs_fields
1187            .iter()
1188            .find(|candidate| candidate.field == field_name.as_str())
1189        {
1190            push_unique(&mut fields, field.field);
1191            push_unique(&mut fields, field.state_column);
1192        }
1193    }
1194    fields
1195}
1196
1197fn crdt_field_changes_for_fields(
1198    metadata: &AppTableMetadata,
1199    changed_fields: &[String],
1200) -> Vec<SyncChangedCrdtField> {
1201    let changed = changed_fields
1202        .iter()
1203        .map(String::as_str)
1204        .collect::<HashSet<_>>();
1205    metadata
1206        .crdt_yjs_fields
1207        .iter()
1208        .filter_map(|field| {
1209            if changed.contains(field.field) || changed.contains(field.state_column) {
1210                Some(sync_changed_crdt_field_from_metadata(field))
1211            } else {
1212                None
1213            }
1214        })
1215        .collect()
1216}
1217
1218pub fn sync_changed_crdt_field_from_metadata(
1219    field: &crate::app_schema::CrdtYjsFieldMetadata,
1220) -> SyncChangedCrdtField {
1221    SyncChangedCrdtField {
1222        field: field.field.to_string(),
1223        state_column: field.state_column.to_string(),
1224        container_key: field.container_key.to_string(),
1225        row_id_field: field.row_id_field.to_string(),
1226        kind: field.kind.to_string(),
1227        sync_mode: field.sync_mode.to_string(),
1228    }
1229}
1230
1231fn crdt_state_columns_for_changes(changes: &[SyncChangedCrdtField]) -> Vec<String> {
1232    changes
1233        .iter()
1234        .map(|field| field.state_column.clone())
1235        .collect()
1236}
1237
1238fn push_unique(values: &mut Vec<String>, value: &str) {
1239    if !values.iter().any(|existing| existing == value) {
1240        values.push(value.to_string());
1241    }
1242}
1243
1244fn row_id_for_metadata(app_schema: AppSchema, table: &str, row: &Value) -> Option<String> {
1245    let metadata = app_schema.table_metadata(table)?;
1246    row.get(metadata.primary_key_column)
1247        .and_then(Value::as_str)
1248        .map(str::to_string)
1249}
1250
1251fn snapshot_clear_removes_all_rows(app_schema: AppSchema, table: &str) -> bool {
1252    app_schema.table_metadata(table).is_some_and(|metadata| {
1253        !metadata
1254            .crdt_yjs_fields
1255            .iter()
1256            .any(|field| field.sync_mode == "encrypted-update-log")
1257    })
1258}
1259
1260fn normalize_bootstrap_phase(phase: i64) -> i64 {
1261    phase.max(0)
1262}
1263
1264fn native_subscription_ready(state: Option<&SubscriptionState>) -> bool {
1265    state.is_some_and(|state| {
1266        state.status == "active" && state.bootstrap_state_json.is_none() && state.cursor >= 0
1267    })
1268}
1269
1270fn native_subscription_bootstrapping(state: Option<&SubscriptionState>) -> bool {
1271    state.is_some_and(|state| state.status == "active" && state.bootstrap_state_json.is_some())
1272}
1273
1274fn resolve_active_bootstrap_phase_for_native(
1275    entries: &[(SubscriptionSpec, Option<SubscriptionState>)],
1276) -> Option<i64> {
1277    entries
1278        .iter()
1279        .filter(|(_, state)| !native_subscription_ready(state.as_ref()))
1280        .map(|(spec, _)| normalize_bootstrap_phase(spec.bootstrap_phase))
1281        .min()
1282}
1283
1284fn should_include_pull_subscription(
1285    spec: &SubscriptionSpec,
1286    state: Option<&SubscriptionState>,
1287    active_phase: Option<i64>,
1288) -> bool {
1289    let Some(active_phase) = active_phase else {
1290        return true;
1291    };
1292    let phase = normalize_bootstrap_phase(spec.bootstrap_phase);
1293    phase <= active_phase
1294        || native_subscription_ready(state)
1295        || native_subscription_bootstrapping(state)
1296}
1297
1298#[cfg(feature = "native")]
1299fn native_subscription_phase(
1300    status: Option<&str>,
1301    cursor: Option<i64>,
1302    bootstrap_state: Option<&BootstrapState>,
1303) -> String {
1304    if status == Some("revoked") {
1305        "error".to_string()
1306    } else if bootstrap_state.is_some() {
1307        "bootstrapping".to_string()
1308    } else if status == Some("active") && cursor.is_some_and(|cursor| cursor >= 0) {
1309        "live".to_string()
1310    } else {
1311        "pending".to_string()
1312    }
1313}
1314
1315#[cfg(feature = "native")]
1316fn bootstrap_progress_percent(ready: bool, bootstrap_state: Option<&BootstrapState>) -> i64 {
1317    if ready {
1318        return 100;
1319    }
1320    let Some(state) = bootstrap_state else {
1321        return 0;
1322    };
1323    let total = state.tables.len() as i64;
1324    if total <= 0 {
1325        return 0;
1326    }
1327    let processed = state.table_index.clamp(0, total);
1328    ((processed * 100) / total).clamp(0, 100)
1329}
1330
1331#[cfg(feature = "native")]
1332fn parse_bootstrap_state_json(value: Option<&str>) -> Result<Option<BootstrapState>> {
1333    value
1334        .map(serde_json::from_str)
1335        .transpose()
1336        .map_err(Into::into)
1337}
1338
1339#[cfg(feature = "native")]
1340fn build_bootstrap_status(
1341    subscriptions: &[SubscriptionSpec],
1342    states: Vec<Option<SubscriptionState>>,
1343    critical_phase: i64,
1344    interactive_phase: i64,
1345) -> Result<BootstrapStatus> {
1346    let critical_phase = normalize_bootstrap_phase(critical_phase);
1347    let interactive_phase = normalize_bootstrap_phase(interactive_phase).max(critical_phase);
1348    let mut subscription_statuses = Vec::with_capacity(subscriptions.len());
1349
1350    for (spec, state) in subscriptions.iter().zip(states.into_iter()) {
1351        let bootstrap_phase = normalize_bootstrap_phase(spec.bootstrap_phase);
1352        let bootstrap_state = parse_bootstrap_state_json(
1353            state
1354                .as_ref()
1355                .and_then(|state| state.bootstrap_state_json.as_deref()),
1356        )?;
1357        let ready = native_subscription_ready(state.as_ref());
1358        let status = state.as_ref().map(|state| state.status.clone());
1359        let cursor = state.as_ref().map(|state| state.cursor);
1360        let phase = native_subscription_phase(status.as_deref(), cursor, bootstrap_state.as_ref());
1361        let progress_percent = bootstrap_progress_percent(ready, bootstrap_state.as_ref());
1362
1363        subscription_statuses.push(BootstrapSubscriptionStatus {
1364            id: spec.id.clone(),
1365            table: spec.table.clone(),
1366            expected: true,
1367            ready,
1368            status,
1369            phase,
1370            progress_percent,
1371            cursor,
1372            bootstrap_state,
1373            bootstrap_phase,
1374        });
1375    }
1376
1377    let expected_subscription_ids = subscription_statuses
1378        .iter()
1379        .map(|subscription| subscription.id.clone())
1380        .collect::<Vec<_>>();
1381    let ready_subscription_ids = subscription_statuses
1382        .iter()
1383        .filter(|subscription| subscription.ready)
1384        .map(|subscription| subscription.id.clone())
1385        .collect::<Vec<_>>();
1386    let pending_subscription_ids = subscription_statuses
1387        .iter()
1388        .filter(|subscription| !subscription.ready)
1389        .map(|subscription| subscription.id.clone())
1390        .collect::<Vec<_>>();
1391    let complete = pending_subscription_ids.is_empty();
1392    let critical_ready = subscription_statuses
1393        .iter()
1394        .all(|subscription| subscription.bootstrap_phase > critical_phase || subscription.ready);
1395    let interactive_ready = subscription_statuses
1396        .iter()
1397        .all(|subscription| subscription.bootstrap_phase > interactive_phase || subscription.ready);
1398    let active_phase = subscription_statuses
1399        .iter()
1400        .filter(|subscription| !subscription.ready)
1401        .map(|subscription| subscription.bootstrap_phase)
1402        .min();
1403    let has_error = subscription_statuses
1404        .iter()
1405        .any(|subscription| subscription.phase == "error");
1406    let is_bootstrapping = subscription_statuses
1407        .iter()
1408        .any(|subscription| !subscription.ready && subscription.phase != "error");
1409    let channel_phase = if has_error {
1410        "error"
1411    } else if is_bootstrapping {
1412        "bootstrapping"
1413    } else if complete && !expected_subscription_ids.is_empty() {
1414        "live"
1415    } else {
1416        "idle"
1417    }
1418    .to_string();
1419    let progress_percent = if subscription_statuses.is_empty() {
1420        100
1421    } else {
1422        let total = subscription_statuses
1423            .iter()
1424            .map(|subscription| subscription.progress_percent)
1425            .sum::<i64>();
1426        ((total as f64) / (subscription_statuses.len() as f64)).round() as i64
1427    };
1428
1429    let mut phase_map = BTreeMap::<i64, (Vec<String>, Vec<String>, Vec<String>, i64)>::new();
1430    for subscription in &subscription_statuses {
1431        let entry = phase_map
1432            .entry(subscription.bootstrap_phase)
1433            .or_insert_with(|| (Vec::new(), Vec::new(), Vec::new(), 0));
1434        entry.0.push(subscription.id.clone());
1435        if subscription.ready {
1436            entry.1.push(subscription.id.clone());
1437        } else {
1438            entry.2.push(subscription.id.clone());
1439        }
1440        entry.3 += subscription.progress_percent;
1441    }
1442    let phases = phase_map
1443        .into_iter()
1444        .map(
1445            |(phase, (expected_ids, ready_ids, pending_ids, progress_total))| {
1446                let progress_percent = if expected_ids.is_empty() {
1447                    100
1448                } else {
1449                    ((progress_total as f64) / (expected_ids.len() as f64)).round() as i64
1450                };
1451                BootstrapPhaseStatus {
1452                    phase,
1453                    expected_subscription_ids: expected_ids,
1454                    ready_subscription_ids: ready_ids,
1455                    is_ready: pending_ids.is_empty(),
1456                    pending_subscription_ids: pending_ids,
1457                    progress_percent,
1458                }
1459            },
1460        )
1461        .collect();
1462
1463    Ok(BootstrapStatus {
1464        channel_phase,
1465        progress_percent,
1466        is_bootstrapping,
1467        critical_ready,
1468        interactive_ready,
1469        complete,
1470        active_phase,
1471        expected_subscription_ids,
1472        ready_subscription_ids,
1473        pending_subscription_ids,
1474        subscriptions: subscription_statuses,
1475        phases,
1476    })
1477}
1478
1479#[cfg(all(test, feature = "native"))]
1480mod bootstrap_phase_tests {
1481    use super::*;
1482
1483    #[test]
1484    fn staged_pull_selection_matches_subscription_readiness() {
1485        let initial = vec![
1486            (subscription("critical", 0), None),
1487            (subscription("later", 1), None),
1488        ];
1489        let active = resolve_active_bootstrap_phase_for_native(&initial);
1490        assert_eq!(active, Some(0));
1491        assert!(should_include_pull_subscription(
1492            &initial[0].0,
1493            initial[0].1.as_ref(),
1494            active
1495        ));
1496        assert!(!should_include_pull_subscription(
1497            &initial[1].0,
1498            initial[1].1.as_ref(),
1499            active
1500        ));
1501
1502        let critical_ready = vec![
1503            (
1504                subscription("critical", 0),
1505                Some(subscription_state("critical", 1, None)),
1506            ),
1507            (subscription("later", 1), None),
1508        ];
1509        let active = resolve_active_bootstrap_phase_for_native(&critical_ready);
1510        assert_eq!(active, Some(1));
1511        assert!(critical_ready.iter().all(|(spec, state)| {
1512            should_include_pull_subscription(spec, state.as_ref(), active)
1513        }));
1514
1515        let later_bootstrapping = vec![
1516            (
1517                subscription("critical", 0),
1518                Some(subscription_state("critical", 1, None)),
1519            ),
1520            (
1521                subscription("later", 5),
1522                Some(subscription_state("later", -1, Some("{}"))),
1523            ),
1524            (subscription("other", 2), None),
1525        ];
1526        let active = resolve_active_bootstrap_phase_for_native(&later_bootstrapping);
1527        assert_eq!(active, Some(2));
1528        assert!(should_include_pull_subscription(
1529            &later_bootstrapping[1].0,
1530            later_bootstrapping[1].1.as_ref(),
1531            active
1532        ));
1533    }
1534
1535    #[test]
1536    fn bootstrap_status_reports_staged_readiness() {
1537        let subscriptions = vec![subscription("critical", 0), subscription("interactive", 1)];
1538        let states = vec![
1539            Some(subscription_state("critical", 1, None)),
1540            Some(subscription_state(
1541                "interactive",
1542                -1,
1543                Some(
1544                    r#"{"asOfCommitSeq":10,"tables":["tasks","projects"],"tableIndex":1,"rowCursor":"tasks:10"}"#,
1545                ),
1546            )),
1547        ];
1548
1549        let status = build_bootstrap_status(&subscriptions, states, 0, 1)
1550            .expect("bootstrap status should parse");
1551
1552        assert_eq!(status.channel_phase, "bootstrapping");
1553        assert_eq!(status.progress_percent, 75);
1554        assert!(status.critical_ready);
1555        assert!(!status.interactive_ready);
1556        assert!(!status.complete);
1557        assert_eq!(status.active_phase, Some(1));
1558        assert_eq!(status.ready_subscription_ids, vec!["critical".to_string()]);
1559        assert_eq!(
1560            status.pending_subscription_ids,
1561            vec!["interactive".to_string()]
1562        );
1563        assert_eq!(status.subscriptions[1].phase, "bootstrapping");
1564        assert_eq!(status.subscriptions[1].progress_percent, 50);
1565        assert_eq!(
1566            status.subscriptions[1]
1567                .bootstrap_state
1568                .as_ref()
1569                .and_then(|state| state.row_cursor.as_deref()),
1570            Some("tasks:10")
1571        );
1572        assert_eq!(status.phases.len(), 2);
1573        assert!(status.phases[0].is_ready);
1574        assert!(!status.phases[1].is_ready);
1575    }
1576
1577    #[test]
1578    fn bootstrap_status_rejects_invalid_checkpoint_json() {
1579        let subscriptions = vec![subscription("critical", 0)];
1580        let states = vec![Some(subscription_state(
1581            "critical",
1582            -1,
1583            Some("{not valid json"),
1584        ))];
1585
1586        assert!(build_bootstrap_status(&subscriptions, states, 0, 1).is_err());
1587    }
1588
1589    fn subscription(id: &str, bootstrap_phase: i64) -> SubscriptionSpec {
1590        SubscriptionSpec {
1591            id: id.to_string(),
1592            table: "tasks".to_string(),
1593            scopes: Map::new(),
1594            params: Map::new(),
1595            bootstrap_phase,
1596        }
1597    }
1598
1599    fn subscription_state(
1600        subscription_id: &str,
1601        cursor: i64,
1602        bootstrap_state_json: Option<&str>,
1603    ) -> SubscriptionState {
1604        SubscriptionState {
1605            state_id: DEFAULT_STATE_ID.to_string(),
1606            subscription_id: subscription_id.to_string(),
1607            table: "tasks".to_string(),
1608            scopes_json: "{}".to_string(),
1609            params_json: "{}".to_string(),
1610            cursor,
1611            bootstrap_state_json: bootstrap_state_json.map(str::to_string),
1612            status: "active".to_string(),
1613        }
1614    }
1615}
1616
1617#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1618#[serde(rename_all = "kebab-case")]
1619pub enum ConflictResolution {
1620    KeepLocal,
1621    #[serde(rename = "keep-server")]
1622    AcceptServer,
1623    Dismiss,
1624}
1625
1626impl ConflictResolution {
1627    pub fn as_str(self) -> &'static str {
1628        match self {
1629            Self::KeepLocal => "keep-local",
1630            Self::AcceptServer => "keep-server",
1631            Self::Dismiss => "dismiss",
1632        }
1633    }
1634}
1635
1636impl fmt::Display for ConflictResolution {
1637    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1638        f.write_str(self.as_str())
1639    }
1640}
1641
1642#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1643pub struct ConflictResolutionReceipt {
1644    pub conflict_id: String,
1645    pub resolution: ConflictResolution,
1646    pub retry_client_commit_id: Option<String>,
1647}
1648
1649pub struct SyncularConflicts<'a, S, T> {
1650    client: &'a mut SyncularClient<S, T>,
1651}
1652
1653#[cfg(feature = "native")]
1654#[derive(Debug)]
1655pub struct SyncularLiveQuery<QF, Row> {
1656    tables: Vec<String>,
1657    build_query: QF,
1658    rows: Vec<Row>,
1659    revision: u64,
1660}
1661
1662#[cfg(feature = "native")]
1663impl<QF, Row> SyncularLiveQuery<QF, Row> {
1664    pub fn new<I, Table>(tables: I, build_query: QF) -> Self
1665    where
1666        I: IntoIterator<Item = Table>,
1667        Table: Into<String>,
1668    {
1669        Self {
1670            tables: tables.into_iter().map(Into::into).collect(),
1671            build_query,
1672            rows: Vec::new(),
1673            revision: 0,
1674        }
1675    }
1676
1677    pub fn tables(&self) -> &[String] {
1678        &self.tables
1679    }
1680
1681    pub fn rows(&self) -> &[Row] {
1682        &self.rows
1683    }
1684
1685    pub fn revision(&self) -> u64 {
1686        self.revision
1687    }
1688
1689    pub fn into_rows(self) -> Vec<Row> {
1690        self.rows
1691    }
1692
1693    pub fn is_affected_by(&self, report: &SyncReport) -> bool {
1694        report.changes_any_table(self.tables.iter().map(String::as_str))
1695    }
1696
1697    pub fn refresh<T, Q>(
1698        &mut self,
1699        client: &mut SyncularClient<DieselSqliteStore, T>,
1700    ) -> Result<&[Row]>
1701    where
1702        T: SyncTransport,
1703        QF: FnMut() -> Q,
1704        for<'query> Q: diesel::query_dsl::LoadQuery<'query, diesel::sqlite::SqliteConnection, Row>,
1705    {
1706        self.rows = client.read((self.build_query)())?;
1707        self.revision = self.revision.saturating_add(1);
1708        Ok(&self.rows)
1709    }
1710
1711    pub fn refresh_if_changed<T, Q>(
1712        &mut self,
1713        client: &mut SyncularClient<DieselSqliteStore, T>,
1714        report: &SyncReport,
1715    ) -> Result<bool>
1716    where
1717        T: SyncTransport,
1718        QF: FnMut() -> Q,
1719        for<'query> Q: diesel::query_dsl::LoadQuery<'query, diesel::sqlite::SqliteConnection, Row>,
1720    {
1721        if !self.is_affected_by(report) {
1722            return Ok(false);
1723        }
1724        self.refresh(client)?;
1725        Ok(true)
1726    }
1727}
1728
1729pub struct SyncularClient<
1730    #[cfg(feature = "native")] S = DieselSqliteStore,
1731    #[cfg(not(feature = "native"))] S,
1732    #[cfg(feature = "native")] T = HttpSyncTransport,
1733    #[cfg(not(feature = "native"))] T,
1734> {
1735    config: SyncularClientConfig,
1736    store: S,
1737    transport: T,
1738    subscriptions: Vec<SubscriptionSpec>,
1739    app_schema: AppSchema,
1740    schema_version: i32,
1741    sync_lock_key: String,
1742    field_encryption: Option<FieldEncryption>,
1743    encrypted_crdt: Option<EncryptedCrdt>,
1744    blob_encryption: Option<BlobEncryption>,
1745}
1746
1747#[cfg(feature = "native")]
1748impl SyncularClient<DieselSqliteStore, HttpSyncTransport> {
1749    pub fn open(config: SyncularClientConfig) -> Result<Self> {
1750        Self::open_with_schema(config, default_app_schema())
1751    }
1752
1753    pub fn open_with_schema(config: SyncularClientConfig, app_schema: AppSchema) -> Result<Self> {
1754        validate_app_schema_runtime_features(&app_schema)?;
1755        let store = DieselSqliteStore::open_with_schema(&config.db_path, app_schema)?;
1756        let transport = HttpSyncTransport::new(SyncTransportConfig::new(
1757            config.base_url.clone(),
1758            config.client_id.clone(),
1759            config.actor_id.clone(),
1760        ))
1761        .with_schema_version(app_schema.current_schema_version());
1762        Ok(Self::with_app_schema_parts(
1763            config, store, transport, app_schema,
1764        ))
1765    }
1766}
1767
1768#[cfg(feature = "native")]
1769impl<S> SyncularClient<S, HttpSyncTransport>
1770where
1771    S: SyncStore,
1772{
1773    pub fn with_store(config: SyncularClientConfig, store: S) -> Self {
1774        let app_schema = default_app_schema();
1775        let transport = HttpSyncTransport::new(SyncTransportConfig::new(
1776            config.base_url.clone(),
1777            config.client_id.clone(),
1778            config.actor_id.clone(),
1779        ))
1780        .with_schema_version(app_schema.current_schema_version());
1781        Self::with_parts(config, store, transport)
1782    }
1783
1784    pub fn issue_auth_lease(
1785        &mut self,
1786        request: &AuthLeaseIssueRequest,
1787    ) -> Result<crate::store::AuthLeaseRecord> {
1788        validate_auth_lease_issue_request_against_app_schema(self.app_schema, request)?;
1789        let response = self.transport.issue_auth_lease(request)?;
1790        validate_auth_lease_issue_response_against_app_schema(
1791            self.app_schema,
1792            &response,
1793            request.schema_version,
1794        )?;
1795        let record = auth_lease_record_from_issue_response(response, now_ms())?;
1796        self.store.transaction(|tx| tx.upsert_auth_lease(&record))?;
1797        Ok(record)
1798    }
1799
1800    pub fn issue_auth_lease_json(&mut self, request_json: &str) -> Result<String> {
1801        let request: AuthLeaseIssueRequest = serde_json::from_str(request_json)?;
1802        Ok(serde_json::to_string(&self.issue_auth_lease(&request)?)?)
1803    }
1804}
1805
1806#[cfg(feature = "native")]
1807fn auth_lease_record_from_issue_response(
1808    response: AuthLeaseIssueResponse,
1809    updated_at_ms: i64,
1810) -> Result<crate::store::AuthLeaseRecord> {
1811    if !response.ok {
1812        return Err(SyncularError::message(
1813            ErrorKind::Transport,
1814            "auth lease issue returned ok=false",
1815        ));
1816    }
1817    if response.token.is_empty() {
1818        return Err(SyncularError::protocol_message(
1819            "auth lease issue returned an empty token",
1820        ));
1821    }
1822    if response.protected_header.alg != AUTH_LEASE_ALG_ES256
1823        || response.protected_header.typ != AUTH_LEASE_TYP
1824        || response.protected_header.kid.is_empty()
1825    {
1826        return Err(SyncularError::protocol_message(
1827            "auth lease issue returned an invalid protected header",
1828        ));
1829    }
1830    let payload = response.payload;
1831    if payload.version != AUTH_LEASE_VERSION
1832        || payload.protocol_version != AUTH_LEASE_PROTOCOL_VERSION
1833        || payload.lease_id.is_empty()
1834        || payload.actor_id.is_empty()
1835        || payload.schema_version < 1
1836        || payload.scopes.is_empty()
1837    {
1838        return Err(SyncularError::protocol_message(
1839            "auth lease issue returned an invalid payload",
1840        ));
1841    }
1842    let payload_json = serde_json::to_string(&payload)?;
1843    Ok(crate::store::AuthLeaseRecord {
1844        lease_id: payload.lease_id,
1845        kid: response.protected_header.kid,
1846        actor_id: payload.actor_id,
1847        issued_at_ms: payload.issued_at_ms,
1848        not_before_ms: payload.not_before_ms,
1849        expires_at_ms: payload.expires_at_ms,
1850        schema_version: payload.schema_version,
1851        payload_json,
1852        token: response.token,
1853        status: "active".to_string(),
1854        last_validation_error: None,
1855        created_at_ms: payload.issued_at_ms,
1856        updated_at_ms,
1857    })
1858}
1859
1860fn validate_auth_lease_record_against_app_schema(
1861    app_schema: AppSchema,
1862    lease: &crate::store::AuthLeaseRecord,
1863) -> Result<()> {
1864    let payload: AuthLeasePayload = serde_json::from_str(&lease.payload_json)?;
1865    if lease.schema_version != payload.schema_version {
1866        return Err(SyncularError::protocol_message(format!(
1867            "auth lease record schemaVersion {} does not match payload schemaVersion {}",
1868            lease.schema_version, payload.schema_version
1869        )));
1870    }
1871    if lease.lease_id != payload.lease_id {
1872        return Err(SyncularError::protocol_message(
1873            "auth lease record leaseId does not match payload leaseId",
1874        ));
1875    }
1876    if lease.actor_id != payload.actor_id {
1877        return Err(SyncularError::protocol_message(
1878            "auth lease record actorId does not match payload actorId",
1879        ));
1880    }
1881    validate_auth_lease_payload_against_app_schema(app_schema, &payload)
1882}
1883
1884impl<S, T> SyncularClient<S, T>
1885where
1886    S: SyncStateStore,
1887{
1888    pub fn next_outbox_retry_at_ms(&mut self) -> Result<Option<i64>> {
1889        self.store.next_outbox_retry_at()
1890    }
1891
1892    pub fn next_blob_upload_retry_at_ms(&mut self) -> Result<Option<i64>> {
1893        self.store.next_blob_upload_retry_at()
1894    }
1895}
1896
1897impl<S, T> SyncularClient<S, T>
1898where
1899    S: SyncStore,
1900    T: SyncTransport,
1901{
1902    fn ensure_remote_sync_enabled(&self, operation: &str) -> Result<()> {
1903        if !self.config.is_local_sync_compatible() {
1904            return Ok(());
1905        }
1906        Err(SyncularError::config(format!(
1907            "Syncular {operation} requires remote mode; client was opened with local-sync-compatible config"
1908        )))
1909    }
1910
1911    pub fn with_parts(config: SyncularClientConfig, store: S, transport: T) -> Self {
1912        Self::with_app_schema_parts(config, store, transport, default_app_schema())
1913    }
1914
1915    pub fn with_app_schema_parts(
1916        config: SyncularClientConfig,
1917        store: S,
1918        transport: T,
1919        app_schema: AppSchema,
1920    ) -> Self {
1921        let subscriptions = app_schema.default_subscriptions(&config);
1922        Self::with_subscriptions_and_schema(
1923            config,
1924            store,
1925            transport,
1926            subscriptions,
1927            app_schema,
1928            app_schema.current_schema_version(),
1929        )
1930    }
1931
1932    pub fn with_subscriptions(
1933        config: SyncularClientConfig,
1934        store: S,
1935        transport: T,
1936        subscriptions: Vec<SubscriptionSpec>,
1937        schema_version: i32,
1938    ) -> Self {
1939        Self::with_subscriptions_and_schema(
1940            config,
1941            store,
1942            transport,
1943            subscriptions,
1944            default_app_schema(),
1945            schema_version,
1946        )
1947    }
1948
1949    fn with_subscriptions_and_schema(
1950        config: SyncularClientConfig,
1951        store: S,
1952        transport: T,
1953        subscriptions: Vec<SubscriptionSpec>,
1954        app_schema: AppSchema,
1955        schema_version: i32,
1956    ) -> Self {
1957        Self {
1958            sync_lock_key: config.db_path.clone(),
1959            config,
1960            store,
1961            transport,
1962            subscriptions,
1963            app_schema,
1964            schema_version,
1965            field_encryption: None,
1966            encrypted_crdt: None,
1967            blob_encryption: None,
1968        }
1969    }
1970
1971    pub fn set_field_encryption(&mut self, encryption: Option<FieldEncryption>) -> Result<()> {
1972        if let Some(encryption) = &encryption {
1973            validate_field_encryption_rules_against_app_schema(
1974                self.app_schema,
1975                encryption.rules(),
1976            )?;
1977        }
1978        self.field_encryption = encryption;
1979        Ok(())
1980    }
1981
1982    pub fn set_field_encryption_json(&mut self, config_json: &str) -> Result<()> {
1983        self.set_field_encryption(FieldEncryption::from_static_config_json(config_json)?)
1984    }
1985
1986    pub fn set_encrypted_crdt(&mut self, encryption: Option<EncryptedCrdt>) -> Result<()> {
1987        if encryption.is_some() {
1988            validate_encrypted_crdt_against_app_schema(self.app_schema)?;
1989        }
1990        self.encrypted_crdt = encryption;
1991        Ok(())
1992    }
1993
1994    pub fn set_encrypted_crdt_json(&mut self, config_json: &str) -> Result<()> {
1995        self.set_encrypted_crdt(EncryptedCrdt::from_static_config_json(config_json)?)
1996    }
1997
1998    pub fn set_blob_encryption(&mut self, encryption: Option<BlobEncryption>) -> Result<()> {
1999        if encryption.is_some() {
2000            validate_blob_encryption_against_app_schema(self.app_schema)?;
2001        }
2002        self.blob_encryption = encryption;
2003        Ok(())
2004    }
2005
2006    pub fn set_blob_encryption_json(&mut self, config_json: &str) -> Result<()> {
2007        self.set_blob_encryption(BlobEncryption::from_static_config_json(config_json)?)
2008    }
2009
2010    pub fn table_metadata(&self, table: &str) -> Option<&'static AppTableMetadata> {
2011        self.app_schema.table_metadata(table)
2012    }
2013
2014    pub fn app_schema(&self) -> AppSchema {
2015        self.app_schema
2016    }
2017
2018    pub fn current_row_json(&mut self, table: &str, row_id: &str) -> Result<Option<Value>> {
2019        self.store
2020            .transaction(|tx| tx.current_row_json(table, row_id))
2021    }
2022
2023    pub fn subscriptions(&self) -> &[SubscriptionSpec] {
2024        &self.subscriptions
2025    }
2026
2027    pub fn set_subscriptions(&mut self, subscriptions: Vec<SubscriptionSpec>) -> Result<()> {
2028        validate_subscription_limits(&subscriptions)?;
2029        self.subscriptions = subscriptions;
2030        Ok(())
2031    }
2032
2033    pub fn set_subscriptions_json(&mut self, subscriptions_json: &str) -> Result<()> {
2034        let subscriptions: Vec<SubscriptionSpec> = serde_json::from_str(subscriptions_json)?;
2035        self.set_subscriptions(subscriptions)
2036    }
2037
2038    pub fn force_subscriptions_bootstrap(&mut self, subscription_ids: &[String]) -> Result<usize> {
2039        let ids = if subscription_ids.is_empty() {
2040            self.subscriptions
2041                .iter()
2042                .map(|subscription| subscription.id.clone())
2043                .collect::<Vec<_>>()
2044        } else {
2045            subscription_ids.to_vec()
2046        };
2047        self.store.transaction(|tx| {
2048            for subscription_id in &ids {
2049                tx.delete_verified_root(DEFAULT_STATE_ID, subscription_id)?;
2050                tx.delete_subscription_state(DEFAULT_STATE_ID, subscription_id)?;
2051            }
2052            Ok(ids.len())
2053        })
2054    }
2055
2056    pub fn force_subscriptions_bootstrap_json(
2057        &mut self,
2058        subscription_ids_json: &str,
2059    ) -> Result<String> {
2060        let subscription_ids: Vec<String> = serde_json::from_str(subscription_ids_json)?;
2061        Ok(serde_json::to_string(
2062            &self.force_subscriptions_bootstrap(&subscription_ids)?,
2063        )?)
2064    }
2065
2066    #[cfg(feature = "native")]
2067    pub fn bootstrap_status(&mut self) -> Result<BootstrapStatus> {
2068        self.bootstrap_status_for_phases(0, 1)
2069    }
2070
2071    #[cfg(feature = "native")]
2072    pub fn bootstrap_status_for_phases(
2073        &mut self,
2074        critical_phase: i64,
2075        interactive_phase: i64,
2076    ) -> Result<BootstrapStatus> {
2077        let states = self.store.transaction(|tx| {
2078            self.subscriptions
2079                .iter()
2080                .map(|subscription| tx.subscription_state(DEFAULT_STATE_ID, &subscription.id))
2081                .collect::<Result<Vec<_>>>()
2082        })?;
2083        build_bootstrap_status(
2084            &self.subscriptions,
2085            states,
2086            critical_phase,
2087            interactive_phase,
2088        )
2089    }
2090
2091    #[cfg(feature = "native")]
2092    pub fn bootstrap_status_json(&mut self) -> Result<String> {
2093        Ok(serde_json::to_string(&self.bootstrap_status()?)?)
2094    }
2095
2096    pub fn sync_http(&mut self) -> Result<SyncReport> {
2097        let _guard = SyncLockGuard::acquire(&self.sync_lock_key)?;
2098        self.ensure_remote_sync_enabled("sync_http")?;
2099        self.sync_http_unlocked()
2100    }
2101
2102    fn sync_http_unlocked(&mut self) -> Result<SyncReport> {
2103        let pending = self.prepare_sync()?;
2104        let request = CombinedRequest {
2105            client_id: self.config.client_id.clone(),
2106            push: self.build_push_request(&pending)?,
2107            pull: Some(self.build_pull_request()?),
2108        };
2109
2110        let response = match self.transport.post_sync(&request) {
2111            Ok(response) => response,
2112            Err(error) => {
2113                self.schedule_outbox_retry(&pending, &error)?;
2114                return Err(error);
2115            }
2116        };
2117        self.apply_combined_response(&pending, response)
2118    }
2119
2120    pub fn sync_ws(&mut self) -> Result<SyncReport> {
2121        let _guard = SyncLockGuard::acquire(&self.sync_lock_key)?;
2122        self.ensure_remote_sync_enabled("sync_ws")?;
2123        self.sync_ws_unlocked()
2124    }
2125
2126    fn sync_ws_unlocked(&mut self) -> Result<SyncReport> {
2127        let pending = self.prepare_sync()?;
2128        let mut report = SyncReport::default();
2129        if !pending.is_empty() {
2130            let mut socket = match self.transport.connect_realtime() {
2131                Ok(socket) => socket,
2132                Err(error) => {
2133                    self.schedule_outbox_retry(&pending, &error)?;
2134                    return Err(error);
2135                }
2136            };
2137
2138            for commit in &pending {
2139                let operations = self.operations_for_push(commit)?;
2140                let response = match socket.push_commit(PushCommitRequest {
2141                    client_commit_id: commit.client_commit_id.clone(),
2142                    operations,
2143                    schema_version: commit.schema_version,
2144                    auth_lease: commit.auth_lease.clone(),
2145                }) {
2146                    Ok(response) => response,
2147                    Err(error) => {
2148                        self.schedule_outbox_retry(std::slice::from_ref(commit), &error)?;
2149                        return Err(error);
2150                    }
2151                };
2152                report.merge(self.apply_single_push_response(commit, response)?);
2153            }
2154
2155            socket.close();
2156        }
2157
2158        let request = CombinedRequest {
2159            client_id: self.config.client_id.clone(),
2160            push: None,
2161            pull: Some(self.build_pull_request()?),
2162        };
2163        let response = match self.transport.post_sync(&request) {
2164            Ok(response) => response,
2165            Err(error) => {
2166                self.schedule_outbox_retry(&[], &error)?;
2167                return Err(error);
2168            }
2169        };
2170        report.merge(self.apply_combined_response(&[], response)?);
2171        Ok(report)
2172    }
2173
2174    pub fn watch<F>(&mut self, seconds: u64, mut on_event: F) -> Result<()>
2175    where
2176        F: FnMut(&RealtimeEvent),
2177    {
2178        let mut socket = self.transport.connect_realtime()?;
2179        let deadline = SystemTime::now()
2180            .checked_add(Duration::from_secs(seconds))
2181            .unwrap_or_else(SystemTime::now);
2182
2183        while SystemTime::now() < deadline {
2184            let Some(event) = socket.read_event()? else {
2185                continue;
2186            };
2187            on_event(&event);
2188            if matches!(event, RealtimeEvent::Sync) {
2189                let _ = self.sync_http_unlocked()?;
2190            }
2191        }
2192
2193        socket.close();
2194        Ok(())
2195    }
2196
2197    pub fn process_realtime_events<F>(
2198        &mut self,
2199        max_events: usize,
2200        mut on_event: F,
2201    ) -> Result<usize>
2202    where
2203        F: FnMut(&RealtimeEvent),
2204    {
2205        let mut socket = self.transport.connect_realtime()?;
2206        let mut processed = 0usize;
2207
2208        for _ in 0..max_events {
2209            let Some(event) = socket.read_event()? else {
2210                break;
2211            };
2212            on_event(&event);
2213            processed += 1;
2214            if matches!(event, RealtimeEvent::Sync) {
2215                let _ = self.sync_http_unlocked()?;
2216            }
2217        }
2218
2219        socket.close();
2220        Ok(processed)
2221    }
2222}
2223
2224impl<S, T> SyncularClient<S, T>
2225where
2226    S: SyncStore,
2227    T: SyncTransport + SyncAuthHeaderStore,
2228{
2229    pub fn set_auth_headers(&mut self, headers: SyncAuthHeaders) {
2230        self.transport.set_auth_headers(headers);
2231    }
2232}
2233
2234#[cfg(feature = "native")]
2235impl<S, T> SyncularClient<S, T>
2236where
2237    S: SyncStore,
2238    T: SyncTransport + SyncAuthSignerStore,
2239{
2240    pub fn set_auth_signer(&mut self, signer: Option<SyncAuthSigner>) {
2241        self.transport.set_auth_signer(signer);
2242    }
2243}
2244
2245#[cfg(feature = "native")]
2246impl<T> SyncularClient<DieselSqliteStore, T>
2247where
2248    T: SyncTransport + BlobTransport,
2249{
2250    pub fn store_blob_bytes(
2251        &mut self,
2252        data: &[u8],
2253        mime_type: &str,
2254        immediate: bool,
2255    ) -> Result<BlobRef> {
2256        self.ensure_blob_runtime_declared()?;
2257        let (blob, body) = if let Some(encryption) = &self.blob_encryption {
2258            let encrypted = encryption.encrypt_blob(data, mime_type)?;
2259            self.store
2260                .store_blob_body(&encrypted.blob, &encrypted.body, !immediate)?;
2261            (encrypted.blob, encrypted.body)
2262        } else {
2263            let blob = self.store.store_blob_bytes(data, mime_type, !immediate)?;
2264            (blob, data.to_vec())
2265        };
2266        if immediate {
2267            self.transport.upload_blob(&blob, &body)?;
2268        }
2269        Ok(blob)
2270    }
2271
2272    pub fn store_blob_file(
2273        &mut self,
2274        path: &Path,
2275        mime_type: &str,
2276        immediate: bool,
2277        cache_local: bool,
2278    ) -> Result<BlobRef> {
2279        self.ensure_blob_runtime_declared()?;
2280        if cache_local {
2281            let metadata = fs::metadata(path).map_err(|err| {
2282                SyncularError::storage(err).context(format!("stat blob file {path:?}"))
2283            })?;
2284            validate_blob_size_bytes(i64::try_from(metadata.len()).unwrap_or(i64::MAX))?;
2285            let data = fs::read(path).map_err(|err| {
2286                SyncularError::storage(err).context(format!("read blob file {path:?}"))
2287            })?;
2288            let (blob, body) = if let Some(encryption) = &self.blob_encryption {
2289                let encrypted = encryption.encrypt_blob(&data, mime_type)?;
2290                self.store
2291                    .store_blob_body(&encrypted.blob, &encrypted.body, !immediate)?;
2292                (encrypted.blob, encrypted.body)
2293            } else {
2294                let blob = self.store.store_blob_bytes(&data, mime_type, !immediate)?;
2295                (blob, data)
2296            };
2297            if immediate {
2298                self.transport.upload_blob(&blob, &body)?;
2299            }
2300            return Ok(blob);
2301        }
2302
2303        if self.blob_encryption.is_some() {
2304            return Err(SyncularError::config(
2305                "encrypted native blob file storage requires cacheLocal=true until streaming encryption is implemented",
2306            ));
2307        }
2308
2309        if !immediate {
2310            return Err(SyncularError::config(
2311                "native blob file storage with cacheLocal=false requires immediate=true",
2312            ));
2313        }
2314
2315        let file = File::open(path).map_err(|err| {
2316            SyncularError::storage(err).context(format!("open blob file {path:?}"))
2317        })?;
2318        let (hash, size) = blob_hash_reader(file)?;
2319        let blob = BlobRef {
2320            hash,
2321            size,
2322            mime_type: normalize_blob_mime_type(mime_type),
2323            encrypted: false,
2324            key_id: None,
2325        };
2326        validate_blob_ref_size(&blob)?;
2327        self.transport.upload_blob_file(&blob, path)?;
2328        Ok(blob)
2329    }
2330
2331    pub fn retrieve_blob_bytes(&mut self, blob: &BlobRef) -> Result<Vec<u8>> {
2332        self.ensure_blob_runtime_declared()?;
2333        self.ensure_blob_decryption_available(blob)?;
2334        if let Some(bytes) = self.store.read_cached_blob(&blob.hash)? {
2335            return self.decode_blob_body(blob, bytes);
2336        }
2337        let bytes = self.transport.download_blob(blob)?;
2338        let plaintext = self.decode_blob_body(blob, bytes.clone())?;
2339        self.store.cache_blob_bytes(blob, &bytes)?;
2340        Ok(plaintext)
2341    }
2342
2343    pub fn retrieve_blob_file(
2344        &mut self,
2345        blob: &BlobRef,
2346        path: &Path,
2347        cache_local: bool,
2348    ) -> Result<()> {
2349        self.ensure_blob_runtime_declared()?;
2350        self.ensure_blob_decryption_available(blob)?;
2351        if let Some(bytes) = self.store.read_cached_blob(&blob.hash)? {
2352            let plaintext = self.decode_blob_body(blob, bytes)?;
2353            fs::write(path, plaintext).map_err(|err| {
2354                SyncularError::storage(err).context(format!("write blob file {path:?}"))
2355            })?;
2356            return Ok(());
2357        }
2358
2359        if cache_local {
2360            let bytes = self.transport.download_blob(blob)?;
2361            let plaintext = self.decode_blob_body(blob, bytes.clone())?;
2362            self.store.cache_blob_bytes(blob, &bytes)?;
2363            fs::write(path, plaintext).map_err(|err| {
2364                SyncularError::storage(err).context(format!("write blob file {path:?}"))
2365            })?;
2366        } else if blob.encrypted {
2367            let bytes = self.transport.download_blob(blob)?;
2368            let plaintext = self.decode_blob_body(blob, bytes)?;
2369            fs::write(path, plaintext).map_err(|err| {
2370                SyncularError::storage(err).context(format!("write blob file {path:?}"))
2371            })?;
2372        } else {
2373            self.transport.download_blob_to_file(blob, path)?;
2374        }
2375        Ok(())
2376    }
2377
2378    pub fn process_blob_upload_queue(
2379        &mut self,
2380    ) -> Result<crate::diesel_sqlite::BlobUploadQueueResult> {
2381        self.process_blob_upload_queue_with_options(false)
2382    }
2383
2384    pub fn process_blob_upload_queue_with_options(
2385        &mut self,
2386        retry_now: bool,
2387    ) -> Result<crate::diesel_sqlite::BlobUploadQueueResult> {
2388        self.ensure_blob_runtime_declared()?;
2389        self.store.requeue_stale_blob_uploads()?;
2390        let pending = self
2391            .store
2392            .pending_blob_uploads(DEFAULT_BLOB_UPLOAD_BATCH_LIMIT, retry_now)?;
2393        let mut result = crate::diesel_sqlite::BlobUploadQueueResult {
2394            uploaded: 0,
2395            failed: 0,
2396        };
2397        for item in pending {
2398            let next_attempt_count = item.attempt_count + 1;
2399            self.store
2400                .mark_blob_uploading(&item.hash, next_attempt_count)?;
2401            let blob = BlobRef {
2402                hash: item.hash.clone(),
2403                size: item.size,
2404                mime_type: item.mime_type.clone(),
2405                encrypted: item.encrypted != 0,
2406                key_id: item.key_id.clone(),
2407            };
2408            match self.transport.upload_blob(&blob, &item.body) {
2409                Ok(()) => {
2410                    self.store.delete_blob_upload(&item.hash)?;
2411                    result.uploaded += 1;
2412                }
2413                Err(error) => {
2414                    let failed = next_attempt_count >= MAX_BLOB_UPLOAD_RETRIES;
2415                    let now = now_ms();
2416                    self.store.mark_blob_upload_error(
2417                        &item.hash,
2418                        if failed { "failed" } else { "pending" },
2419                        &error.to_string(),
2420                        if failed {
2421                            0
2422                        } else {
2423                            next_blob_upload_retry_at(now, next_attempt_count)
2424                        },
2425                    )?;
2426                    if failed {
2427                        result.failed += 1;
2428                    }
2429                }
2430            }
2431        }
2432        Ok(result)
2433    }
2434}
2435
2436#[cfg(feature = "native")]
2437impl<T> SyncularClient<DieselSqliteStore, T> {
2438    fn ensure_blob_runtime_declared(&self) -> Result<()> {
2439        validate_blob_runtime_against_app_schema(self.app_schema)
2440    }
2441
2442    fn ensure_blob_decryption_available(&self, blob: &BlobRef) -> Result<()> {
2443        if !blob.encrypted {
2444            return Ok(());
2445        }
2446        let Some(encryption) = &self.blob_encryption else {
2447            return Err(SyncularError::config(
2448                "encrypted blob retrieval requires set_blob_encryption(...)",
2449            ));
2450        };
2451        encryption.ensure_can_decrypt(blob)
2452    }
2453
2454    fn decode_blob_body(&self, blob: &BlobRef, body: Vec<u8>) -> Result<Vec<u8>> {
2455        self.ensure_blob_decryption_available(blob)?;
2456        if blob.encrypted {
2457            return self
2458                .blob_encryption
2459                .as_ref()
2460                .expect("checked encrypted blob decryption")
2461                .decrypt_blob(blob, &body);
2462        }
2463        validate_blob_bytes(blob, &body)?;
2464        Ok(body)
2465    }
2466}
2467
2468#[cfg(feature = "native")]
2469impl<T> SyncularClient<DieselSqliteStore, T>
2470where
2471    T: SyncTransport,
2472{
2473    pub fn store_blob_file_local_json(
2474        &mut self,
2475        path: &Path,
2476        mime_type: &str,
2477        enqueue_upload: bool,
2478    ) -> Result<String> {
2479        self.ensure_blob_runtime_declared()?;
2480        let metadata = fs::metadata(path).map_err(|err| {
2481            SyncularError::storage(err).context(format!("stat blob file {path:?}"))
2482        })?;
2483        validate_blob_size_bytes(i64::try_from(metadata.len()).unwrap_or(i64::MAX))?;
2484        let data = fs::read(path).map_err(|err| {
2485            SyncularError::storage(err).context(format!("read blob file {path:?}"))
2486        })?;
2487        let blob = if let Some(encryption) = &self.blob_encryption {
2488            let encrypted = encryption.encrypt_blob(&data, mime_type)?;
2489            self.store
2490                .store_blob_body(&encrypted.blob, &encrypted.body, enqueue_upload)?;
2491            encrypted.blob
2492        } else {
2493            self.store
2494                .store_blob_bytes(&data, mime_type, enqueue_upload)?
2495        };
2496        Ok(serde_json::to_string(&blob)?)
2497    }
2498
2499    pub fn retrieve_cached_blob_file_json(
2500        &mut self,
2501        blob: &BlobRef,
2502        path: &Path,
2503    ) -> Result<String> {
2504        self.ensure_blob_runtime_declared()?;
2505        let Some(bytes) = self.store.read_cached_blob(&blob.hash)? else {
2506            return Err(SyncularError::config(
2507                "blob is not present in the local cache",
2508            ));
2509        };
2510        let plaintext = self.decode_blob_body(blob, bytes)?;
2511        fs::write(path, plaintext).map_err(|err| {
2512            SyncularError::storage(err).context(format!("write blob file {path:?}"))
2513        })?;
2514        Ok(serde_json::to_string(&json!({ "ok": true }))?)
2515    }
2516
2517    pub fn is_blob_local(&mut self, hash: &str) -> Result<bool> {
2518        self.ensure_blob_runtime_declared()?;
2519        self.store.is_blob_local(hash)
2520    }
2521
2522    pub fn blob_upload_queue_stats(
2523        &mut self,
2524    ) -> Result<crate::diesel_sqlite::BlobUploadQueueStats> {
2525        self.ensure_blob_runtime_declared()?;
2526        self.store.blob_upload_queue_stats()
2527    }
2528
2529    pub fn blob_cache_stats(&mut self) -> Result<crate::diesel_sqlite::BlobCacheStats> {
2530        self.ensure_blob_runtime_declared()?;
2531        self.store.blob_cache_stats()
2532    }
2533
2534    pub fn prune_blob_cache(&mut self, max_bytes: i64) -> Result<i64> {
2535        self.ensure_blob_runtime_declared()?;
2536        self.store.prune_blob_cache(max_bytes)
2537    }
2538
2539    pub fn clear_blob_cache(&mut self) -> Result<()> {
2540        self.ensure_blob_runtime_declared()?;
2541        self.store.clear_blob_cache()
2542    }
2543
2544    pub fn compact_storage_json(&mut self, options_json: Option<&str>) -> Result<String> {
2545        self.store.compact_storage_json(options_json)
2546    }
2547
2548    pub fn readonly_query_json(&self, request_json: &str) -> Result<String> {
2549        crate::sqlite_query::execute_readonly_query_json_with_schema(
2550            &self.config.db_path,
2551            request_json,
2552            self.app_schema,
2553        )
2554    }
2555}
2556
2557impl<S, T> SyncularClient<S, T>
2558where
2559    S: SyncStore,
2560    T: SyncTransport,
2561{
2562    fn prepare_sync(&mut self) -> Result<Vec<OutboxCommit>> {
2563        self.store.transaction(|tx| {
2564            tx.requeue_stale_outbox()?;
2565            let pending = tx.pending_outbox(DEFAULT_OUTBOX_PUSH_BATCH_LIMIT)?;
2566            for commit in &pending {
2567                validate_outbox_schema_version(commit, self.schema_version)?;
2568            }
2569            for commit in &pending {
2570                tx.mark_outbox_sending(&commit.id)?;
2571            }
2572            Ok(pending)
2573        })
2574    }
2575
2576    fn build_pull_request(&mut self) -> Result<PullRequest> {
2577        let specs = self.subscriptions.clone();
2578        let snapshot_artifacts =
2579            self.store
2580                .supports_sqlite_snapshot_artifacts()
2581                .then(|| SnapshotArtifactsRequest {
2582                    artifact_kinds: vec![SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1.to_string()],
2583                    compressions: vec![SNAPSHOT_CHUNK_COMPRESSION_GZIP.to_string()],
2584                    feature_set: Vec::new(),
2585                });
2586        self.store.transaction(|tx| {
2587            let mut entries = Vec::new();
2588            for spec in specs {
2589                let state = tx.subscription_state(DEFAULT_STATE_ID, &spec.id)?;
2590                entries.push((spec, state));
2591            }
2592            let active_phase = resolve_active_bootstrap_phase_for_native(&entries);
2593
2594            let mut subscriptions = Vec::new();
2595            for (spec, state) in entries {
2596                if !should_include_pull_subscription(&spec, state.as_ref(), active_phase) {
2597                    continue;
2598                }
2599                let scopes_changed = state
2600                    .as_ref()
2601                    .map(|row| {
2602                        let scopes: ScopeValues = serde_json::from_str(&row.scopes_json)?;
2603                        Ok::<bool, SyncularError>(scopes != spec.scopes)
2604                    })
2605                    .transpose()?
2606                    .unwrap_or(false);
2607                let verified_root = if scopes_changed {
2608                    None
2609                } else {
2610                    tx.verified_root(DEFAULT_STATE_ID, &spec.id)?
2611                        .map(|root| root.root)
2612                };
2613                let crdt_state_vectors = if is_encrypted_crdt_system_table(&spec.table) {
2614                    Vec::new()
2615                } else {
2616                    tx.crdt_state_vector_hints(
2617                        &spec.table,
2618                        &spec.scopes,
2619                        DEFAULT_CRDT_STATE_VECTOR_HINT_LIMIT,
2620                    )?
2621                };
2622                subscriptions.push(SubscriptionRequest {
2623                    id: spec.id,
2624                    table: spec.table,
2625                    scopes: spec.scopes,
2626                    params: spec.params,
2627                    cursor: if scopes_changed {
2628                        -1
2629                    } else {
2630                        state.as_ref().map(|row| row.cursor).unwrap_or(-1)
2631                    },
2632                    bootstrap_state: if scopes_changed {
2633                        None
2634                    } else {
2635                        state
2636                            .and_then(|row| row.bootstrap_state_json)
2637                            .map(|json| serde_json::from_str(&json))
2638                            .transpose()?
2639                    },
2640                    verified_root,
2641                    crdt_state_vectors,
2642                });
2643            }
2644
2645            Ok(PullRequest {
2646                schema_version: self.schema_version,
2647                limit_commits: DEFAULT_PULL_LIMIT_COMMITS,
2648                limit_snapshot_rows: DEFAULT_PULL_LIMIT_SNAPSHOT_ROWS,
2649                max_snapshot_pages: DEFAULT_PULL_MAX_SNAPSHOT_PAGES,
2650                dedupe_rows: None,
2651                snapshot_artifacts,
2652                subscriptions,
2653            })
2654        })
2655    }
2656
2657    fn build_push_request(&self, pending: &[OutboxCommit]) -> Result<Option<PushBatchRequest>> {
2658        if pending.is_empty() {
2659            return Ok(None);
2660        }
2661
2662        let ctx = self.encryption_context();
2663        Ok(Some(PushBatchRequest {
2664            commits: pending
2665                .iter()
2666                .map(|commit| {
2667                    let operations: Vec<SyncOperation> =
2668                        serde_json::from_str(&commit.operations_json)?;
2669                    let operations = if let Some(encryption) = &self.field_encryption {
2670                        encryption.transform_operations_for_push(&ctx, operations)?
2671                    } else {
2672                        operations
2673                    };
2674                    Ok(PushCommitRequest {
2675                        client_commit_id: commit.client_commit_id.clone(),
2676                        operations,
2677                        schema_version: commit.schema_version,
2678                        auth_lease: commit.auth_lease.clone(),
2679                    })
2680                })
2681                .collect::<Result<Vec<_>>>()?,
2682        }))
2683    }
2684
2685    fn operations_for_push(&self, commit: &OutboxCommit) -> Result<Vec<SyncOperation>> {
2686        let operations: Vec<SyncOperation> = serde_json::from_str(&commit.operations_json)?;
2687        if let Some(encryption) = &self.field_encryption {
2688            encryption.transform_operations_for_push(&self.encryption_context(), operations)
2689        } else {
2690            Ok(operations)
2691        }
2692    }
2693
2694    fn transform_push_response(
2695        &self,
2696        outbox: &OutboxCommit,
2697        response: PushCommitResponse,
2698    ) -> Result<PushCommitResponse> {
2699        let Some(encryption) = &self.field_encryption else {
2700            return Ok(response);
2701        };
2702        let operations: Vec<SyncOperation> = serde_json::from_str(&outbox.operations_json)?;
2703        encryption.transform_push_response(&self.encryption_context(), &operations, response)
2704    }
2705
2706    fn transform_pull_response(&self, response: PullResponse) -> Result<PullResponse> {
2707        let response = if let Some(encryption) = &self.field_encryption {
2708            encryption.transform_pull_response(&self.encryption_context(), response)?
2709        } else {
2710            response
2711        };
2712        if let Some(encryption) = &self.encrypted_crdt {
2713            encryption.transform_pull_response(response)
2714        } else {
2715            Ok(response)
2716        }
2717    }
2718
2719    fn transform_snapshot_row(&self, snapshot_table: &str, row: Value) -> Result<Value> {
2720        if let Some(encryption) = &self.field_encryption {
2721            encryption.transform_snapshot_row(&self.encryption_context(), snapshot_table, row)
2722        } else {
2723            Ok(row)
2724        }
2725    }
2726
2727    fn transform_snapshot_chunk_rows(
2728        &self,
2729        snapshot_table: &str,
2730        rows: SnapshotChunkRows,
2731    ) -> Result<SnapshotChunkRows> {
2732        if self.field_encryption.is_none() {
2733            return Ok(rows);
2734        }
2735        rows.try_into_value_rows()?
2736            .into_iter()
2737            .map(|row| self.transform_snapshot_row(snapshot_table, row))
2738            .collect::<Result<Vec<_>>>()
2739            .map(SnapshotChunkRows::Json)
2740    }
2741
2742    fn encryption_context(&self) -> FieldEncryptionContext {
2743        FieldEncryptionContext {
2744            actor_id: self.config.actor_id.clone(),
2745            client_id: self.config.client_id.clone(),
2746        }
2747    }
2748
2749    fn apply_single_push_response(
2750        &mut self,
2751        outbox: &OutboxCommit,
2752        response: PushCommitResponse,
2753    ) -> Result<SyncReport> {
2754        let response = self.transform_push_response(outbox, response)?;
2755        self.store.transaction(|tx| {
2756            let conflicts_changed = apply_push_commit_response(tx, outbox, &response)?;
2757            Ok(SyncReport {
2758                changed_tables: Vec::new(),
2759                changed_rows: Vec::new(),
2760                conflicts_changed,
2761            })
2762        })
2763    }
2764
2765    fn apply_combined_response(
2766        &mut self,
2767        pending: &[OutboxCommit],
2768        response: CombinedResponse,
2769    ) -> Result<SyncReport> {
2770        if let Err(error) = validate_server_schema_version(&response, self.schema_version) {
2771            self.schedule_outbox_retry(pending, &error)?;
2772            return Err(error);
2773        }
2774
2775        if !response.ok {
2776            let error = SyncularError::protocol_message("combined sync response was not ok");
2777            self.schedule_outbox_retry(pending, &error)?;
2778            return Err(error);
2779        }
2780
2781        let mut report = SyncReport::default();
2782        if let Some(push) = response.push {
2783            if !push.ok {
2784                let error = SyncularError::protocol_message("push response was not ok");
2785                self.schedule_outbox_retry(pending, &error)?;
2786                return Err(error);
2787            }
2788
2789            let mut transformed_commits = Vec::new();
2790            for commit_response in push.commits {
2791                let Some(index) = pending
2792                    .iter()
2793                    .position(|row| row.client_commit_id == commit_response.client_commit_id)
2794                else {
2795                    continue;
2796                };
2797                let commit_response =
2798                    self.transform_push_response(&pending[index], commit_response)?;
2799                transformed_commits.push((index, commit_response));
2800            }
2801
2802            report.conflicts_changed |= self.store.transaction(|tx| {
2803                let mut conflicts_changed = false;
2804                for (index, commit_response) in transformed_commits {
2805                    let outbox = &pending[index];
2806                    conflicts_changed |= apply_push_commit_response(tx, outbox, &commit_response)?;
2807                }
2808                Ok(conflicts_changed)
2809            })?;
2810        }
2811
2812        if let Some(pull) = response.pull {
2813            report.merge(self.apply_pull_until_settled(pull)?);
2814        }
2815
2816        Ok(report)
2817    }
2818
2819    fn apply_pull_until_settled(&mut self, mut response: PullResponse) -> Result<SyncReport> {
2820        let mut report = SyncReport::default();
2821        for round in 0..MAX_PULL_ROUNDS {
2822            if !response.ok {
2823                return Err(SyncularError::protocol_message("pull response was not ok"));
2824            }
2825
2826            let verified_roots = self.verify_pull_response_integrity(&response)?;
2827            let transformed_response = self.transform_pull_response(response)?;
2828            let needs_more = pull_response_needs_another_round(&transformed_response, 50);
2829            report.merge(self.apply_pull_response(transformed_response, verified_roots)?);
2830
2831            if !needs_more {
2832                return Ok(report);
2833            }
2834
2835            if round + 1 == MAX_PULL_ROUNDS {
2836                return Ok(report);
2837            }
2838
2839            let request = CombinedRequest {
2840                client_id: self.config.client_id.clone(),
2841                push: None,
2842                pull: Some(self.build_pull_request()?),
2843            };
2844            let combined = self.transport.post_sync(&request)?;
2845            validate_server_schema_version(&combined, self.schema_version)?;
2846            response = combined.pull.unwrap_or(PullResponse {
2847                ok: true,
2848                subscriptions: Vec::new(),
2849            });
2850        }
2851
2852        Ok(report)
2853    }
2854
2855    fn verify_pull_response_integrity(
2856        &mut self,
2857        response: &PullResponse,
2858    ) -> Result<HashMap<String, Option<VerifiedCommitRoot>>> {
2859        validate_pull_commit_integrity_metadata(response)?;
2860        validate_pull_snapshot_manifests(response)?;
2861        self.store.transaction(|tx| {
2862            let mut verified_roots = HashMap::new();
2863            for sub in &response.subscriptions {
2864                if sub.status == "revoked" {
2865                    verified_roots.insert(sub.id.clone(), None);
2866                    continue;
2867                }
2868
2869                let previous_state = tx.subscription_state(DEFAULT_STATE_ID, &sub.id)?;
2870                let stored_root = if previous_state
2871                    .as_ref()
2872                    .map(|prev| {
2873                        let previous_scopes: ScopeValues = serde_json::from_str(&prev.scopes_json)?;
2874                        Ok::<bool, SyncularError>(previous_scopes != sub.scopes)
2875                    })
2876                    .transpose()?
2877                    .unwrap_or(false)
2878                {
2879                    None
2880                } else {
2881                    tx.verified_root(DEFAULT_STATE_ID, &sub.id)?
2882                };
2883                let verified_root = verify_subscription_commit_integrity(
2884                    &sub.id,
2885                    stored_root.as_ref().map(|root| root.root.as_str()),
2886                    sub.integrity.as_ref(),
2887                    &sub.commits,
2888                )?;
2889                verified_roots.insert(sub.id.clone(), verified_root);
2890            }
2891            Ok(verified_roots)
2892        })
2893    }
2894
2895    fn apply_pull_response(
2896        &mut self,
2897        response: PullResponse,
2898        mut verified_roots: HashMap<String, Option<VerifiedCommitRoot>>,
2899    ) -> Result<SyncReport> {
2900        let mut report = SyncReport::default();
2901        for sub in response.subscriptions {
2902            if sub.status == "revoked" {
2903                self.store.transaction(|tx| {
2904                    if let Some(prev) = tx.subscription_state(DEFAULT_STATE_ID, &sub.id)? {
2905                        let scopes: ScopeValues = serde_json::from_str(&prev.scopes_json)?;
2906                        tx.clear_table_for_scopes(&prev.table, &scopes)?;
2907                        report.add_changed_table(&prev.table);
2908                    }
2909                    tx.delete_verified_root(DEFAULT_STATE_ID, &sub.id)?;
2910                    tx.delete_subscription_state(DEFAULT_STATE_ID, &sub.id)
2911                })?;
2912                continue;
2913            }
2914            let spec = self
2915                .subscriptions
2916                .iter()
2917                .find(|candidate| candidate.id == sub.id);
2918            let table = spec
2919                .map(|spec| spec.table.clone())
2920                .unwrap_or_else(|| "tasks".to_string());
2921            let params_json = spec
2922                .map(|spec| serde_json::to_string(&spec.params))
2923                .transpose()?
2924                .unwrap_or_else(|| "{}".to_string());
2925
2926            let mut prepared_snapshots = Vec::new();
2927            if let Some(snapshots) = sub.snapshots.as_ref() {
2928                for snapshot in snapshots {
2929                    let mut artifact_rows = Vec::new();
2930                    if let Some(artifacts) = &snapshot.artifacts {
2931                        if !artifacts.is_empty() && !self.store.supports_sqlite_snapshot_artifacts()
2932                        {
2933                            return Err(SyncularError::protocol_message(
2934                                "snapshot artifacts are not supported by this store",
2935                            ));
2936                        }
2937                        for artifact in artifacts {
2938                            validate_sqlite_snapshot_artifact_for_apply(
2939                                artifact,
2940                                &sub.id,
2941                                &snapshot.table,
2942                            )?;
2943                            let bytes = self
2944                                .transport
2945                                .fetch_snapshot_artifact_bytes(artifact, &sub.scopes)?;
2946                            let rows = self
2947                                .store
2948                                .decode_sqlite_snapshot_artifact_rows(&snapshot.table, &bytes)?;
2949                            for row in rows {
2950                                artifact_rows
2951                                    .push(self.transform_snapshot_row(&snapshot.table, row)?);
2952                            }
2953                        }
2954                    }
2955                    let mut chunk_batches = Vec::new();
2956                    if let Some(chunks) = &snapshot.chunks {
2957                        for chunk in chunks {
2958                            let rows = self
2959                                .transport
2960                                .fetch_snapshot_chunk_rows(chunk, &sub.scopes)?;
2961                            chunk_batches
2962                                .push(self.transform_snapshot_chunk_rows(&snapshot.table, rows)?);
2963                        }
2964                    }
2965                    if snapshot.is_first_page
2966                        || !snapshot.rows.is_empty()
2967                        || !artifact_rows.is_empty()
2968                        || chunk_batches.iter().any(|rows| !rows.is_empty())
2969                    {
2970                        report.add_changed_table(&snapshot.table);
2971                    }
2972                    prepared_snapshots.push(PreparedSnapshot {
2973                        snapshot: snapshot.clone(),
2974                        chunk_batches,
2975                        artifact_rows,
2976                    });
2977                }
2978            }
2979
2980            self.store.transaction(|tx| {
2981                let previous_state = tx.subscription_state(DEFAULT_STATE_ID, &sub.id)?;
2982                let mut previous_scopes_match = false;
2983                if let Some(prev) = &previous_state {
2984                    let previous_scopes: ScopeValues = serde_json::from_str(&prev.scopes_json)?;
2985                    if previous_scopes != sub.scopes {
2986                        tx.clear_table_for_scopes(&prev.table, &previous_scopes)?;
2987                        tx.delete_verified_root(DEFAULT_STATE_ID, &sub.id)?;
2988                        report.add_changed_table(&prev.table);
2989                    } else {
2990                        previous_scopes_match = true;
2991                    }
2992                }
2993
2994                let verified_root = verified_roots.remove(&sub.id).flatten();
2995
2996                let mut snapshot_cleared_tables = HashSet::new();
2997                if let Some(prev) = previous_state.as_ref() {
2998                    if prev.bootstrap_state_json.is_some()
2999                        && previous_scopes_match
3000                        && snapshot_clear_removes_all_rows(self.app_schema, &prev.table)
3001                    {
3002                        snapshot_cleared_tables.insert(prev.table.clone());
3003                    }
3004                }
3005                for prepared in &prepared_snapshots {
3006                    let snapshot = &prepared.snapshot;
3007                    if snapshot.is_first_page {
3008                        tx.clear_table_for_scopes_preserving_local_crdt(
3009                            &snapshot.table,
3010                            &sub.scopes,
3011                        )?;
3012                        if snapshot_clear_removes_all_rows(self.app_schema, &snapshot.table) {
3013                            snapshot_cleared_tables.insert(snapshot.table.clone());
3014                        }
3015                    }
3016                    if snapshot_cleared_tables.contains(&snapshot.table) {
3017                        tx.upsert_rows(&snapshot.table, &snapshot.rows, None)?;
3018                        for row in &snapshot.rows {
3019                            if let Some(changed_row) = sync_changed_row_for_snapshot(
3020                                self.app_schema,
3021                                &snapshot.table,
3022                                row,
3023                                None,
3024                                &sub.id,
3025                            ) {
3026                                report.add_changed_row(changed_row);
3027                            }
3028                        }
3029
3030                        tx.upsert_rows(&snapshot.table, &prepared.artifact_rows, None)?;
3031                        for row in &prepared.artifact_rows {
3032                            if let Some(changed_row) = sync_changed_row_for_snapshot(
3033                                self.app_schema,
3034                                &snapshot.table,
3035                                row,
3036                                None,
3037                                &sub.id,
3038                            ) {
3039                                report.add_changed_row(changed_row);
3040                            }
3041                        }
3042
3043                        for chunk_rows in &prepared.chunk_batches {
3044                            tx.upsert_snapshot_chunk_rows(&snapshot.table, chunk_rows, None)?;
3045                            for changed_row in sync_changed_rows_for_cleared_snapshot_chunk(
3046                                self.app_schema,
3047                                &snapshot.table,
3048                                chunk_rows,
3049                                &sub.id,
3050                            ) {
3051                                report.add_changed_row(changed_row);
3052                            }
3053                        }
3054                    } else {
3055                        for row in snapshot.rows.iter().chain(prepared.artifact_rows.iter()) {
3056                            let previous_row =
3057                                row_id_for_metadata(self.app_schema, &snapshot.table, row)
3058                                    .map(|row_id| tx.current_row_json(&snapshot.table, &row_id))
3059                                    .transpose()?
3060                                    .flatten();
3061                            tx.upsert_row(&snapshot.table, row, None)?;
3062                            if let Some(changed_row) = sync_changed_row_for_snapshot(
3063                                self.app_schema,
3064                                &snapshot.table,
3065                                row,
3066                                previous_row.as_ref(),
3067                                &sub.id,
3068                            ) {
3069                                report.add_changed_row(changed_row);
3070                            }
3071                        }
3072                        for chunk_rows in &prepared.chunk_batches {
3073                            let chunk_rows = chunk_rows.clone().try_into_value_rows()?;
3074                            for row in &chunk_rows {
3075                                let previous_row =
3076                                    row_id_for_metadata(self.app_schema, &snapshot.table, row)
3077                                        .map(|row_id| tx.current_row_json(&snapshot.table, &row_id))
3078                                        .transpose()?
3079                                        .flatten();
3080                                tx.upsert_row(&snapshot.table, row, None)?;
3081                                if let Some(changed_row) = sync_changed_row_for_snapshot(
3082                                    self.app_schema,
3083                                    &snapshot.table,
3084                                    row,
3085                                    previous_row.as_ref(),
3086                                    &sub.id,
3087                                ) {
3088                                    report.add_changed_row(changed_row);
3089                                }
3090                            }
3091                        }
3092                    }
3093                }
3094
3095                for commit in &sub.commits {
3096                    for change in &commit.changes {
3097                        let previous_row = tx.current_row_json(&change.table, &change.row_id)?;
3098                        tx.apply_change(change)?;
3099                        if let Some(changed_row) = sync_changed_row_for_change(
3100                            self.app_schema,
3101                            change,
3102                            previous_row.as_ref(),
3103                            commit.commit_seq,
3104                            &sub.id,
3105                        ) {
3106                            report.add_changed_row(changed_row);
3107                        } else {
3108                            report.add_changed_table(&change.table);
3109                        }
3110                    }
3111                }
3112
3113                tx.upsert_subscription_state(&SubscriptionState {
3114                    state_id: DEFAULT_STATE_ID.to_string(),
3115                    subscription_id: sub.id.clone(),
3116                    table: table.clone(),
3117                    scopes_json: serde_json::to_string(&sub.scopes)?,
3118                    params_json,
3119                    cursor: sub.next_cursor,
3120                    bootstrap_state_json: sub
3121                        .bootstrap_state
3122                        .as_ref()
3123                        .map(serde_json::to_string)
3124                        .transpose()?,
3125                    status: sub.status.clone(),
3126                })?;
3127                if let Some(root) = verified_root {
3128                    tx.upsert_verified_root(&VerifiedRoot {
3129                        state_id: DEFAULT_STATE_ID.to_string(),
3130                        subscription_id: sub.id.clone(),
3131                        partition_id: root.partition_id,
3132                        commit_seq: root.commit_seq,
3133                        root: root.root,
3134                    })?;
3135                }
3136
3137                Ok(())
3138            })?;
3139        }
3140
3141        Ok(report)
3142    }
3143
3144    fn schedule_outbox_retry(
3145        &mut self,
3146        pending: &[OutboxCommit],
3147        error: &SyncularError,
3148    ) -> Result<()> {
3149        if pending.is_empty() {
3150            return Ok(());
3151        }
3152
3153        let now = now_ms();
3154        let message = error.to_string();
3155        let auth_error = is_auth_transport_error(error);
3156        self.store.transaction(|tx| {
3157            for commit in pending {
3158                let attempt_count = commit.attempt_count.saturating_add(1);
3159                let failed = attempt_count >= MAX_SYNC_RETRIES;
3160                let next_attempt_at = if failed || auth_error {
3161                    0
3162                } else {
3163                    next_retry_at(now, attempt_count)
3164                };
3165                tx.mark_outbox_retry(&commit.id, &message, next_attempt_at, failed)?;
3166            }
3167            Ok(())
3168        })
3169    }
3170}
3171
3172#[cfg(feature = "demo-todo-native-fixture")]
3173impl<T> SyncularClient<RusqliteStore, T>
3174where
3175    T: SyncTransport,
3176{
3177    pub fn list_table_json(&mut self, table: &str) -> Result<String> {
3178        Ok(serde_json::to_string(&self.store.list_table_json(table)?)?)
3179    }
3180
3181    pub fn apply_mutation_json(
3182        &mut self,
3183        mutation_json: &str,
3184        local_row_json: Option<&str>,
3185    ) -> Result<String> {
3186        validate_mutation_json_input_size(mutation_json, local_row_json)?;
3187        let operation: SyncOperation = serde_json::from_str(mutation_json)?;
3188        let local_row = local_row_json.map(serde_json::from_str).transpose()?;
3189        self.store.apply_local_operation(operation, local_row)
3190    }
3191
3192    pub fn apply_encrypted_crdt_update_json(
3193        &mut self,
3194        request_json: &str,
3195    ) -> Result<MutationReceipt> {
3196        let _request: EncryptedCrdtUpdateJsonRequest = serde_json::from_str(request_json)?;
3197        Err(SyncularError::config(
3198            "encrypted CRDT update JSON is not supported by RusqliteStore",
3199        ))
3200    }
3201
3202    pub fn apply_encrypted_crdt_checkpoint_json(
3203        &mut self,
3204        request_json: &str,
3205    ) -> Result<Option<MutationReceipt>> {
3206        let _request: EncryptedCrdtCheckpointJsonRequest = serde_json::from_str(request_json)?;
3207        Err(SyncularError::config(
3208            "encrypted CRDT checkpoint JSON is not supported by RusqliteStore",
3209        ))
3210    }
3211}
3212
3213#[cfg(feature = "native")]
3214impl<T> SyncularClient<DieselSqliteStore, T>
3215where
3216    T: SyncTransport,
3217{
3218    pub fn read<'query, Q, Row>(&mut self, query: Q) -> Result<Vec<Row>>
3219    where
3220        Q: diesel::query_dsl::LoadQuery<'query, diesel::sqlite::SqliteConnection, Row>,
3221    {
3222        self.store.read(query)
3223    }
3224
3225    pub fn live_query<QF, Q, Row, I, Table>(
3226        &mut self,
3227        tables: I,
3228        build_query: QF,
3229    ) -> Result<SyncularLiveQuery<QF, Row>>
3230    where
3231        QF: FnMut() -> Q,
3232        for<'query> Q: diesel::query_dsl::LoadQuery<'query, diesel::sqlite::SqliteConnection, Row>,
3233        I: IntoIterator<Item = Table>,
3234        Table: Into<String>,
3235    {
3236        let mut live_query = SyncularLiveQuery::new(tables, build_query);
3237        live_query.refresh(self)?;
3238        Ok(live_query)
3239    }
3240
3241    pub fn apply<M>(&mut self, mutation: M) -> Result<MutationReceipt>
3242    where
3243        M: IntoSyncularMutation,
3244    {
3245        self.apply_mutation(mutation)
3246    }
3247
3248    pub fn apply_mutation_batch(
3249        &mut self,
3250        batch: SyncularMutationBatch,
3251    ) -> Result<MutationReceipt> {
3252        <Self as SyncularMutationExecutor>::apply_mutation_batch(self, batch)
3253    }
3254
3255    pub fn commit_mutations<R>(
3256        &mut self,
3257        f: impl FnOnce(&mut SyncularMutationBatch) -> Result<R>,
3258    ) -> Result<MutationCommit<R>> {
3259        let mut batch = SyncularMutationBatch::new();
3260        let result = f(&mut batch)?;
3261        let commit = self.apply_mutation_batch(batch)?;
3262        Ok(MutationCommit { result, commit })
3263    }
3264
3265    pub fn list_table_json(&mut self, table: &str) -> Result<String> {
3266        Ok(serde_json::to_string(&self.store.list_table_json(table)?)?)
3267    }
3268
3269    pub fn apply_mutation_json(
3270        &mut self,
3271        mutation_json: &str,
3272        local_row_json: Option<&str>,
3273    ) -> Result<String> {
3274        validate_mutation_json_input_size(mutation_json, local_row_json)?;
3275        let operation: SyncOperation = serde_json::from_str(mutation_json)?;
3276        let local_row = local_row_json.map(serde_json::from_str).transpose()?;
3277        self.store.apply_local_operation(operation, local_row)
3278    }
3279
3280    pub fn apply_leased_mutation_json(
3281        &mut self,
3282        mutation_json: &str,
3283        local_row_json: Option<&str>,
3284    ) -> Result<String> {
3285        validate_mutation_json_input_size(mutation_json, local_row_json)?;
3286        let operation: SyncOperation = serde_json::from_str(mutation_json)?;
3287        let local_row = local_row_json.map(serde_json::from_str).transpose()?;
3288        let actor_id = self.config.actor_id.clone();
3289        self.store.apply_local_operation_with_active_auth_lease(
3290            Some(&actor_id),
3291            now_ms(),
3292            operation,
3293            local_row,
3294        )
3295    }
3296
3297    pub fn apply_encrypted_crdt_update_json(
3298        &mut self,
3299        request_json: &str,
3300    ) -> Result<MutationReceipt> {
3301        validate_crdt_request_json_size(request_json)?;
3302        let request: EncryptedCrdtUpdateJsonRequest = serde_json::from_str(request_json)?;
3303        let (metadata, field) =
3304            self.encrypted_crdt_metadata_field(&request.table, &request.field)?;
3305        match (request.next_text, request.update) {
3306            (Some(next_text), None) => {
3307                validate_yjs_text_input_size(&next_text)?;
3308                self.apply_encrypted_crdt_text_update(metadata, field, &request.row_id, &next_text)
3309            }
3310            (None, Some(update)) => {
3311                validate_yjs_update_envelope_size(&update)?;
3312                self.apply_encrypted_crdt_yjs_update(metadata, field, &request.row_id, update)
3313            }
3314            (Some(_), Some(_)) => Err(SyncularError::config(
3315                "encrypted CRDT update JSON must provide either nextText or update, not both",
3316            )),
3317            (None, None) => Err(SyncularError::config(
3318                "encrypted CRDT update JSON requires nextText or update",
3319            )),
3320        }
3321    }
3322
3323    pub fn apply_encrypted_crdt_checkpoint_json(
3324        &mut self,
3325        request_json: &str,
3326    ) -> Result<Option<MutationReceipt>> {
3327        validate_crdt_request_json_size(request_json)?;
3328        let request: EncryptedCrdtCheckpointJsonRequest = serde_json::from_str(request_json)?;
3329        let (metadata, field) =
3330            self.encrypted_crdt_metadata_field(&request.table, &request.field)?;
3331        self.apply_encrypted_crdt_checkpoint(
3332            metadata,
3333            field,
3334            &request.row_id,
3335            request.min_uncheckpointed_updates.unwrap_or(1),
3336        )
3337    }
3338
3339    pub fn open_crdt_field(&self, id: CrdtFieldId) -> Result<CrdtField> {
3340        let field = validate_crdt_field(self.app_schema, &id)?;
3341        if field.sync_mode() == CrdtFieldSyncMode::EncryptedUpdateLog
3342            && self.encrypted_crdt.is_none()
3343        {
3344            return Err(SyncularError::config(
3345                "encrypted CRDT fields require set_encrypted_crdt(...)",
3346            ));
3347        }
3348        Ok(field)
3349    }
3350
3351    pub fn apply_crdt_field_yjs_update(
3352        &mut self,
3353        field: &CrdtField,
3354        update: YjsUpdateEnvelope,
3355    ) -> Result<CrdtFieldWriteReceipt> {
3356        validate_yjs_update_envelope_size(&update)?;
3357        match field.sync_mode() {
3358            CrdtFieldSyncMode::ServerMerge => {
3359                let client_commit_id = self.store.apply_crdt_field_yjs_update(
3360                    field,
3361                    update,
3362                    DEFAULT_CRDT_UPDATE_QUEUE_CAPACITY,
3363                )?;
3364                Ok(CrdtFieldWriteReceipt {
3365                    client_commit_id,
3366                    sync_mode: field.sync_mode(),
3367                })
3368            }
3369            CrdtFieldSyncMode::EncryptedUpdateLog => {
3370                let receipt = self.apply_encrypted_crdt_yjs_update(
3371                    field.metadata(),
3372                    field.field(),
3373                    field.row_id(),
3374                    update,
3375                )?;
3376                Ok(CrdtFieldWriteReceipt {
3377                    client_commit_id: receipt.client_commit_id,
3378                    sync_mode: field.sync_mode(),
3379                })
3380            }
3381        }
3382    }
3383
3384    pub fn apply_crdt_field_yjs_update_with_queue_capacity(
3385        &mut self,
3386        field: &CrdtField,
3387        update: YjsUpdateEnvelope,
3388        max_pending_updates: i64,
3389    ) -> Result<CrdtFieldWriteReceipt> {
3390        validate_yjs_update_envelope_size(&update)?;
3391        match field.sync_mode() {
3392            CrdtFieldSyncMode::ServerMerge => {
3393                let client_commit_id =
3394                    self.store
3395                        .apply_crdt_field_yjs_update(field, update, max_pending_updates)?;
3396                Ok(CrdtFieldWriteReceipt {
3397                    client_commit_id,
3398                    sync_mode: field.sync_mode(),
3399                })
3400            }
3401            CrdtFieldSyncMode::EncryptedUpdateLog => {
3402                self.apply_crdt_field_yjs_update(field, update)
3403            }
3404        }
3405    }
3406
3407    pub fn apply_crdt_field_text(
3408        &mut self,
3409        field: &CrdtField,
3410        next_text: &str,
3411    ) -> Result<CrdtFieldWriteReceipt> {
3412        validate_yjs_text_input_size(next_text)?;
3413        if field.field_metadata().kind != "text" {
3414            return Err(SyncularError::config(format!(
3415                "apply_crdt_field_text requires a text CRDT field, got {}",
3416                field.field_metadata().kind
3417            )));
3418        }
3419        match field.sync_mode() {
3420            CrdtFieldSyncMode::ServerMerge => {
3421                let current_row = self.store.read_row_json(field.table(), field.row_id())?;
3422                let previous_state_base64 = current_row.as_ref().and_then(|row| {
3423                    row.get(field.state_column())
3424                        .and_then(Value::as_str)
3425                        .filter(|value| !value.is_empty())
3426                        .map(str::to_string)
3427                });
3428                if previous_state_base64.is_none() {
3429                    if let Some(existing_text) = current_row
3430                        .as_ref()
3431                        .and_then(|row| row.get(field.field()))
3432                        .and_then(Value::as_str)
3433                        .filter(|value| !value.is_empty() && *value != next_text)
3434                    {
3435                        return Err(SyncularError::config(format!(
3436                            "cannot replace non-empty CRDT text field {}.{} row {} without existing Yjs state; migrate or initialize {} first (current value: {existing_text:?})",
3437                            field.table(),
3438                            field.field(),
3439                            field.row_id(),
3440                            field.state_column()
3441                        )));
3442                    }
3443                }
3444                let update = build_yjs_text_update(BuildYjsTextUpdateArgs {
3445                    previous_state_base64,
3446                    next_text: next_text.to_string(),
3447                    container_key: Some(field.container_key().to_string()),
3448                    update_id: None,
3449                })?;
3450                self.apply_crdt_field_yjs_update(field, update.update)
3451            }
3452            CrdtFieldSyncMode::EncryptedUpdateLog => {
3453                let receipt = self.apply_encrypted_crdt_text_update(
3454                    field.metadata(),
3455                    field.field(),
3456                    field.row_id(),
3457                    next_text,
3458                )?;
3459                Ok(CrdtFieldWriteReceipt {
3460                    client_commit_id: receipt.client_commit_id,
3461                    sync_mode: field.sync_mode(),
3462                })
3463            }
3464        }
3465    }
3466
3467    pub fn materialize_crdt_field(
3468        &mut self,
3469        field: &CrdtField,
3470    ) -> Result<CrdtFieldMaterialization> {
3471        let row = self
3472            .store
3473            .read_row_json(field.table(), field.row_id())?
3474            .ok_or_else(|| {
3475                SyncularError::protocol_message(format!(
3476                    "cannot materialize CRDT field {}.{} for missing row {}",
3477                    field.table(),
3478                    field.field(),
3479                    field.row_id()
3480                ))
3481            })?;
3482        let state_base64 = row
3483            .get(field.state_column())
3484            .and_then(Value::as_str)
3485            .filter(|value| !value.is_empty())
3486            .map(str::to_string);
3487        let value = match state_base64.as_deref() {
3488            Some(state_base64) => materialize_yjs_state(state_base64, &field.yjs_rule()?)?,
3489            None => row.get(field.field()).cloned().unwrap_or(Value::Null),
3490        };
3491        let state_vector_base64 = yjs_state_vector_base64(state_base64.as_deref())?;
3492        Ok(CrdtFieldMaterialization {
3493            value,
3494            state_base64,
3495            state_vector_base64,
3496        })
3497    }
3498
3499    pub fn materialize_crdt_field_json(&mut self, field: &CrdtField) -> Result<String> {
3500        Ok(serde_json::to_string(&self.materialize_crdt_field(field)?)?)
3501    }
3502
3503    pub fn crdt_document_snapshot(&mut self, field: &CrdtField) -> Result<CrdtDocumentSnapshot> {
3504        self.store.crdt_document_snapshot(field)
3505    }
3506
3507    pub fn crdt_document_snapshot_json(&mut self, field: &CrdtField) -> Result<String> {
3508        Ok(serde_json::to_string(&self.crdt_document_snapshot(field)?)?)
3509    }
3510
3511    pub fn crdt_update_log(
3512        &mut self,
3513        field: &CrdtField,
3514        limit: i64,
3515    ) -> Result<Vec<CrdtUpdateLogEntry>> {
3516        self.store.crdt_update_log(field, limit)
3517    }
3518
3519    pub fn crdt_update_log_json(&mut self, field: &CrdtField, limit: i64) -> Result<String> {
3520        Ok(serde_json::to_string(&self.crdt_update_log(field, limit)?)?)
3521    }
3522
3523    pub fn snapshot_crdt_field_state_vector_base64(&mut self, field: &CrdtField) -> Result<String> {
3524        let row = self.store.read_row_json(field.table(), field.row_id())?;
3525        let state_base64 = row.as_ref().and_then(|row| {
3526            row.get(field.state_column())
3527                .and_then(Value::as_str)
3528                .filter(|value| !value.is_empty())
3529        });
3530        yjs_state_vector_base64(state_base64)
3531    }
3532
3533    pub fn compact_crdt_field(
3534        &mut self,
3535        field: &CrdtField,
3536        min_uncheckpointed_updates: i64,
3537    ) -> Result<CrdtFieldCompactionReceipt> {
3538        let before = self.store.crdt_document_snapshot(field)?;
3539        let encrypted_stream_before = self.encrypted_crdt_stream_stats_for_field(field)?;
3540        match field.sync_mode() {
3541            CrdtFieldSyncMode::ServerMerge => {
3542                let after = self.store.compact_crdt_document(field)?;
3543                Ok(CrdtFieldCompactionReceipt {
3544                    checkpoint_created: false,
3545                    client_commit_id: None,
3546                    before: CrdtFieldCompactionStats::from(&before),
3547                    after: CrdtFieldCompactionStats::from(&after),
3548                    encrypted_stream_before,
3549                    encrypted_stream_after: self.encrypted_crdt_stream_stats_for_field(field)?,
3550                })
3551            }
3552            CrdtFieldSyncMode::EncryptedUpdateLog => {
3553                let receipt = self.apply_encrypted_crdt_checkpoint(
3554                    field.metadata(),
3555                    field.field(),
3556                    field.row_id(),
3557                    min_uncheckpointed_updates,
3558                )?;
3559                let after = self.store.crdt_document_snapshot(field)?;
3560                Ok(CrdtFieldCompactionReceipt {
3561                    checkpoint_created: receipt.is_some(),
3562                    client_commit_id: receipt.map(|receipt| receipt.client_commit_id),
3563                    before: CrdtFieldCompactionStats::from(&before),
3564                    after: CrdtFieldCompactionStats::from(&after),
3565                    encrypted_stream_before,
3566                    encrypted_stream_after: self.encrypted_crdt_stream_stats_for_field(field)?,
3567                })
3568            }
3569        }
3570    }
3571
3572    fn encrypted_crdt_stream_stats_for_field(
3573        &mut self,
3574        field: &CrdtField,
3575    ) -> Result<Option<EncryptedCrdtStreamStats>> {
3576        if field.sync_mode() != CrdtFieldSyncMode::EncryptedUpdateLog {
3577            return Ok(None);
3578        }
3579        let Some(encryption) = &self.encrypted_crdt else {
3580            return Ok(None);
3581        };
3582        let stream_id = encrypted_crdt_stream_id(field.table(), field.row_id(), field.field());
3583        self.store
3584            .encrypted_crdt_stream_stats(encryption.partition_id(), &stream_id)
3585            .map(Some)
3586    }
3587
3588    fn encrypted_crdt_metadata_field(
3589        &self,
3590        table: &str,
3591        field: &str,
3592    ) -> Result<(&'static AppTableMetadata, &'static str)> {
3593        let metadata = self.table_metadata(table).ok_or_else(|| {
3594            SyncularError::config(format!("unknown generated app table: {table}"))
3595        })?;
3596        let field = encrypted_field_metadata(metadata, field)?.field;
3597        Ok((metadata, field))
3598    }
3599}
3600
3601#[cfg(feature = "native")]
3602impl<T> SyncularMutationExecutor for SyncularClient<DieselSqliteStore, T>
3603where
3604    T: SyncTransport,
3605{
3606    fn apply_mutation<M>(&mut self, mutation: M) -> Result<MutationReceipt>
3607    where
3608        M: IntoSyncularMutation,
3609    {
3610        self.store
3611            .apply_syncular_mutations(vec![mutation.into_syncular_mutation()])
3612    }
3613
3614    fn apply_mutation_batch(&mut self, batch: SyncularMutationBatch) -> Result<MutationReceipt> {
3615        self.store.apply_syncular_mutations(batch.into_mutations())
3616    }
3617}
3618
3619#[cfg(feature = "native")]
3620impl<T> SyncularLeasedMutationExecutor for SyncularClient<DieselSqliteStore, T>
3621where
3622    T: SyncTransport,
3623{
3624    fn apply_leased_mutation<M>(&mut self, mutation: M) -> Result<MutationReceipt>
3625    where
3626        M: IntoSyncularMutation,
3627    {
3628        let actor_id = self.config.actor_id.clone();
3629        self.store.apply_syncular_mutations_with_active_auth_lease(
3630            Some(&actor_id),
3631            now_ms(),
3632            vec![mutation.into_syncular_mutation()],
3633        )
3634    }
3635
3636    fn apply_leased_mutation_batch(
3637        &mut self,
3638        batch: SyncularMutationBatch,
3639    ) -> Result<MutationReceipt> {
3640        let actor_id = self.config.actor_id.clone();
3641        self.store.apply_syncular_mutations_with_active_auth_lease(
3642            Some(&actor_id),
3643            now_ms(),
3644            batch.into_mutations(),
3645        )
3646    }
3647}
3648
3649#[cfg(feature = "native")]
3650impl<T> SyncularCommandHistoryExecutor for SyncularClient<DieselSqliteStore, T>
3651where
3652    T: SyncTransport,
3653{
3654    fn command_history_current_row_json(
3655        &mut self,
3656        table: &str,
3657        row_id: &str,
3658    ) -> Result<Option<Value>> {
3659        self.current_row_json(table, row_id)
3660    }
3661
3662    fn command_history_record(
3663        &mut self,
3664        mutation_scope: &str,
3665        entries: &[CommandHistoryEntry],
3666        receipt: &MutationReceipt,
3667    ) -> Result<CommandHistoryRecord> {
3668        self.store
3669            .record_command_history(mutation_scope, entries, receipt)
3670    }
3671
3672    fn command_history_latest(
3673        &mut self,
3674        state: CommandHistoryState,
3675    ) -> Result<Option<CommandHistoryRecord>> {
3676        self.store.latest_command_history(state)
3677    }
3678
3679    fn command_history_mark(
3680        &mut self,
3681        id: &str,
3682        state: CommandHistoryState,
3683        receipt: &MutationReceipt,
3684    ) -> Result<()> {
3685        self.store.mark_command_history(id, state, receipt)
3686    }
3687
3688    fn apply_command_history_batch(
3689        &mut self,
3690        mutation_scope: &str,
3691        batch: SyncularMutationBatch,
3692    ) -> Result<MutationReceipt> {
3693        match mutation_scope {
3694            "mutations" => self.apply_mutation_batch(batch),
3695            "leasedMutations" => self.apply_leased_mutation_batch(batch),
3696            other => Err(SyncularError::config(format!(
3697                "sync.command_history_scope_unsupported: {other}"
3698            ))),
3699        }
3700    }
3701
3702    fn apply_command_history_tracked_batch(
3703        &mut self,
3704        mutation_scope: &str,
3705        batch: SyncularMutationBatch,
3706    ) -> Result<MutationReceipt> {
3707        let actor_id = self.config.actor_id.clone();
3708        self.store.apply_syncular_mutations_with_command_history(
3709            mutation_scope,
3710            Some(&actor_id),
3711            now_ms(),
3712            batch.into_mutations(),
3713        )
3714    }
3715}
3716
3717#[cfg(feature = "native")]
3718impl<T> SyncularEncryptedCrdtMutationExecutor for SyncularClient<DieselSqliteStore, T>
3719where
3720    T: SyncTransport,
3721{
3722    fn apply_encrypted_crdt_text_update(
3723        &mut self,
3724        metadata: &'static AppTableMetadata,
3725        field: &'static str,
3726        row_id: &str,
3727        next_text: &str,
3728    ) -> Result<MutationReceipt> {
3729        validate_yjs_text_input_size(next_text)?;
3730        let Some(encryption) = &self.encrypted_crdt else {
3731            return Err(SyncularError::config(
3732                "encrypted CRDT updates require set_encrypted_crdt(...)",
3733            ));
3734        };
3735        let existing_row = self
3736            .store
3737            .read_row_json(metadata.name, row_id)?
3738            .ok_or_else(|| {
3739                SyncularError::protocol_message(format!(
3740                    "cannot update encrypted CRDT field {}.{} before local row {row_id} exists",
3741                    metadata.name, field
3742                ))
3743            })?;
3744        let mutation = encryption.build_text_update_mutation(BuildEncryptedCrdtTextUpdateArgs {
3745            ctx: self.encryption_context(),
3746            metadata,
3747            field,
3748            row_id,
3749            existing_row: &existing_row,
3750            next_text,
3751        })?;
3752        self.store.apply_syncular_mutations(vec![mutation])
3753    }
3754
3755    fn apply_encrypted_crdt_yjs_update(
3756        &mut self,
3757        metadata: &'static AppTableMetadata,
3758        field: &'static str,
3759        row_id: &str,
3760        update: YjsUpdateEnvelope,
3761    ) -> Result<MutationReceipt> {
3762        validate_yjs_update_envelope_size(&update)?;
3763        let Some(encryption) = &self.encrypted_crdt else {
3764            return Err(SyncularError::config(
3765                "encrypted CRDT updates require set_encrypted_crdt(...)",
3766            ));
3767        };
3768        let existing_row = self
3769            .store
3770            .read_row_json(metadata.name, row_id)?
3771            .ok_or_else(|| {
3772                SyncularError::protocol_message(format!(
3773                    "cannot update encrypted CRDT field {}.{} before local row {row_id} exists",
3774                    metadata.name, field
3775                ))
3776            })?;
3777        let mutation = encryption.build_yjs_update_mutation(BuildEncryptedCrdtYjsUpdateArgs {
3778            ctx: self.encryption_context(),
3779            metadata,
3780            field,
3781            row_id,
3782            existing_row: &existing_row,
3783            update,
3784        })?;
3785        self.store.apply_syncular_mutations(vec![mutation])
3786    }
3787
3788    fn apply_encrypted_crdt_checkpoint(
3789        &mut self,
3790        metadata: &'static AppTableMetadata,
3791        field: &'static str,
3792        row_id: &str,
3793        min_uncheckpointed_updates: i64,
3794    ) -> Result<Option<MutationReceipt>> {
3795        if min_uncheckpointed_updates < 1 {
3796            return Err(SyncularError::config(
3797                "encrypted CRDT checkpoint threshold must be at least 1",
3798            ));
3799        }
3800        let Some(encryption) = &self.encrypted_crdt else {
3801            return Err(SyncularError::config(
3802                "encrypted CRDT checkpoints require set_encrypted_crdt(...)",
3803            ));
3804        };
3805        let stream_id = encrypted_crdt_stream_id(metadata.name, row_id, field);
3806        let stats = self
3807            .store
3808            .encrypted_crdt_stream_stats(encryption.partition_id(), &stream_id)?;
3809        if stats.checkpointable_update_count < min_uncheckpointed_updates {
3810            return Ok(None);
3811        }
3812        let Some(covers_seq) = stats.max_server_seq else {
3813            return Ok(None);
3814        };
3815        if stats
3816            .latest_checkpoint_covers_seq
3817            .is_some_and(|latest| latest >= covers_seq)
3818        {
3819            return Ok(None);
3820        }
3821        let existing_row = self
3822            .store
3823            .read_row_json(metadata.name, row_id)?
3824            .ok_or_else(|| {
3825                SyncularError::protocol_message(format!(
3826                    "cannot checkpoint encrypted CRDT field {}.{} before local row {row_id} exists",
3827                    metadata.name, field
3828                ))
3829            })?;
3830        let mutation = encryption.build_checkpoint_mutation(BuildEncryptedCrdtCheckpointArgs {
3831            ctx: self.encryption_context(),
3832            metadata,
3833            field,
3834            row_id,
3835            existing_row: &existing_row,
3836            covers_seq,
3837        })?;
3838        Ok(Some(self.store.apply_syncular_mutations(vec![mutation])?))
3839    }
3840}
3841
3842struct SyncLockGuard {
3843    key: String,
3844}
3845
3846impl SyncLockGuard {
3847    fn acquire(key: &str) -> Result<Self> {
3848        let locks = ACTIVE_SYNC_KEYS.get_or_init(|| Mutex::new(HashSet::new()));
3849        let mut active = locks
3850            .lock()
3851            .map_err(|_| SyncularError::busy("sync lock is poisoned"))?;
3852        if !active.insert(key.to_string()) {
3853            return Err(SyncularError::busy(format!(
3854                "sync already active for local database {key}"
3855            )));
3856        }
3857        Ok(Self {
3858            key: key.to_string(),
3859        })
3860    }
3861}
3862
3863impl Drop for SyncLockGuard {
3864    fn drop(&mut self) {
3865        if let Some(locks) = ACTIVE_SYNC_KEYS.get() {
3866            if let Ok(mut active) = locks.lock() {
3867                active.remove(&self.key);
3868            }
3869        }
3870    }
3871}
3872
3873fn pull_response_needs_another_round(response: &PullResponse, limit_commits: i64) -> bool {
3874    let mut total_commits = 0usize;
3875    for sub in &response.subscriptions {
3876        if sub.status != "active" {
3877            continue;
3878        }
3879        if sub.bootstrap_state.is_some() {
3880            return true;
3881        }
3882        total_commits += sub.commits.len();
3883    }
3884    total_commits >= limit_commits as usize
3885}
3886
3887#[cfg(feature = "demo-todo-fixture")]
3888impl<S, T> SyncularClient<S, T>
3889where
3890    S: SyncStore + DemoTaskStore,
3891    T: SyncTransport,
3892{
3893    pub fn add_task(&mut self, title: String, id: Option<String>) -> Result<String> {
3894        let task_id = id.unwrap_or_else(|| Uuid::new_v4().to_string());
3895        self.store.add_task(
3896            &self.config.actor_id,
3897            self.config.project_id.as_deref(),
3898            task_id.clone(),
3899            title,
3900        )?;
3901        Ok(task_id)
3902    }
3903
3904    pub fn patch_task_title(&mut self, id: String, title: String) -> Result<()> {
3905        self.store
3906            .patch_task_title(self.config.project_id.as_deref(), id, title)
3907    }
3908
3909    pub fn list_tasks(&mut self) -> Result<Vec<Task>> {
3910        self.store.list_tasks()
3911    }
3912}
3913
3914impl<S, T> SyncularClient<S, T>
3915where
3916    S: SyncStore + SyncStateStore,
3917    T: SyncTransport,
3918{
3919    pub fn applied_migrations(&mut self) -> Result<Vec<crate::store::AppliedMigration>> {
3920        self.store.applied_migrations()
3921    }
3922
3923    pub fn app_schema_state(&mut self) -> Result<crate::store::AppSchemaState> {
3924        self.store
3925            .app_schema_state(self.app_schema.current_schema_version())
3926    }
3927
3928    pub fn app_schema_state_json(&mut self) -> Result<String> {
3929        Ok(serde_json::to_string(&self.app_schema_state()?)?)
3930    }
3931
3932    pub fn outbox_summaries(&mut self) -> Result<Vec<crate::store::OutboxSummary>> {
3933        self.store.outbox_summaries()
3934    }
3935
3936    pub fn upsert_auth_lease(&mut self, lease: &crate::store::AuthLeaseRecord) -> Result<()> {
3937        validate_auth_lease_record_against_app_schema(self.app_schema, lease)?;
3938        self.store.transaction(|tx| tx.upsert_auth_lease(lease))
3939    }
3940
3941    pub fn auth_lease(&mut self, lease_id: &str) -> Result<Option<crate::store::AuthLeaseRecord>> {
3942        self.store.transaction(|tx| tx.auth_lease(lease_id))
3943    }
3944
3945    pub fn active_auth_leases(
3946        &mut self,
3947        actor_id: Option<&str>,
3948        now_ms: i64,
3949    ) -> Result<Vec<crate::store::AuthLeaseRecord>> {
3950        self.store
3951            .transaction(|tx| tx.active_auth_leases(actor_id, now_ms))
3952    }
3953
3954    pub fn set_outbox_auth_lease(
3955        &mut self,
3956        client_commit_id: &str,
3957        provenance: Option<&crate::protocol::AuthLeaseProvenance>,
3958    ) -> Result<()> {
3959        self.store
3960            .transaction(|tx| tx.set_outbox_auth_lease(client_commit_id, provenance))
3961    }
3962
3963    pub fn conflict_summaries(&mut self) -> Result<Vec<crate::store::ConflictSummary>> {
3964        self.store.conflict_summaries()
3965    }
3966
3967    pub fn local_health_check(&mut self) -> Result<crate::health::LocalHealthReport> {
3968        let mut report = crate::health::check_local_health(
3969            &mut self.store,
3970            DEFAULT_STATE_ID,
3971            &self.subscriptions,
3972        )?;
3973        let app_schema_state = self.app_schema_state()?;
3974        let outbox = self.outbox_summaries()?;
3975        let conflicts = self.conflict_summaries()?;
3976        let scoped_rows = self.store.scoped_rows_health_summary(&self.subscriptions)?;
3977        let blob_health = self.store.blob_health_summary()?;
3978        let crdt_health = self.store.crdt_health_summary()?;
3979        crate::health::check_local_sync_state_health(
3980            &mut report,
3981            self.app_schema.current_schema_version(),
3982            &app_schema_state,
3983            &outbox,
3984            &conflicts,
3985            scoped_rows.as_ref(),
3986            blob_health.as_ref(),
3987            crdt_health.as_ref(),
3988        );
3989        Ok(report)
3990    }
3991
3992    pub fn local_health_check_json(&mut self) -> Result<String> {
3993        Ok(serde_json::to_string(&self.local_health_check()?)?)
3994    }
3995
3996    pub fn export_local_support_bundle(&mut self) -> Result<crate::health::LocalSupportBundle> {
3997        let mut states = Vec::new();
3998        let mut roots = Vec::new();
3999        self.store.transaction(|tx| {
4000            states = tx.subscription_states(DEFAULT_STATE_ID)?;
4001            roots = tx.verified_roots(DEFAULT_STATE_ID)?;
4002            Ok(())
4003        })?;
4004        let mut health = crate::health::check_local_health_records(
4005            DEFAULT_STATE_ID,
4006            &self.subscriptions,
4007            &states,
4008            &roots,
4009        );
4010        let app_schema_state = self.app_schema_state()?;
4011        let outbox = self.outbox_summaries()?;
4012        let conflicts = self.conflict_summaries()?;
4013        let scoped_rows = self.store.scoped_rows_health_summary(&self.subscriptions)?;
4014        let blob_health = self.store.blob_health_summary()?;
4015        let crdt_health = self.store.crdt_health_summary()?;
4016        crate::health::check_local_sync_state_health(
4017            &mut health,
4018            self.app_schema.current_schema_version(),
4019            &app_schema_state,
4020            &outbox,
4021            &conflicts,
4022            scoped_rows.as_ref(),
4023            blob_health.as_ref(),
4024            crdt_health.as_ref(),
4025        );
4026        Ok(crate::health::local_support_bundle_from_records(
4027            "rust",
4028            health,
4029            &self.subscriptions,
4030            &states,
4031            &roots,
4032            app_schema_state,
4033            &outbox,
4034            &conflicts,
4035            blob_health,
4036            crdt_health,
4037        ))
4038    }
4039
4040    pub fn export_local_support_bundle_json(&mut self) -> Result<String> {
4041        Ok(serde_json::to_string(&self.export_local_support_bundle()?)?)
4042    }
4043
4044    pub fn import_local_support_bundle_json(&mut self, bundle_json: &str) -> Result<String> {
4045        Ok(serde_json::to_string(
4046            &crate::health::import_local_support_bundle_json(bundle_json)?,
4047        )?)
4048    }
4049
4050    pub fn repair_local_health(
4051        &mut self,
4052        request: crate::health::LocalHealthRepairRequest,
4053    ) -> Result<crate::health::LocalHealthRepairReport> {
4054        match request.action {
4055            crate::health::LocalHealthRepairAction::ForceRebootstrap => {
4056                self.repair_force_rebootstrap(&request.subscription_ids)
4057            }
4058            crate::health::LocalHealthRepairAction::ClearOrphanedState => {
4059                self.repair_clear_orphaned_state(&request.subscription_ids)
4060            }
4061            crate::health::LocalHealthRepairAction::ClearOrphanedSyncedRows => {
4062                self.repair_clear_orphaned_synced_rows(&request)
4063            }
4064            crate::health::LocalHealthRepairAction::ManualInspection => Err(SyncularError::config(
4065                "manualInspection health findings cannot be repaired automatically",
4066            )),
4067        }
4068    }
4069
4070    pub fn repair_local_health_json(&mut self, request_json: &str) -> Result<String> {
4071        let request: crate::health::LocalHealthRepairRequest = serde_json::from_str(request_json)?;
4072        Ok(serde_json::to_string(&self.repair_local_health(request)?)?)
4073    }
4074
4075    pub fn reset_local_sync_state(
4076        &mut self,
4077        request: crate::health::LocalSyncResetRequest,
4078    ) -> Result<crate::health::LocalSyncResetReport> {
4079        let selected = self.selected_reset_subscriptions(&request.subscription_ids)?;
4080        if request.clear_synced_rows {
4081            let unresolved_outbox = self
4082                .outbox_summaries()?
4083                .iter()
4084                .filter(|commit| commit.status != "acked")
4085                .count();
4086            if unresolved_outbox > 0 {
4087                return Err(SyncularError::config(format!(
4088                    "resetLocalSyncState clearSyncedRows requires an empty local outbox; found {unresolved_outbox} unresolved commits"
4089                )));
4090            }
4091        }
4092        let selected_ids = selected
4093            .iter()
4094            .map(|subscription| subscription.id.as_str())
4095            .collect::<HashSet<_>>();
4096
4097        self.store.transaction(|tx| {
4098            let deleted_subscription_states = tx
4099                .subscription_states(DEFAULT_STATE_ID)?
4100                .iter()
4101                .filter(|state| selected_ids.contains(state.subscription_id.as_str()))
4102                .count();
4103            let deleted_verified_roots = tx
4104                .verified_roots(DEFAULT_STATE_ID)?
4105                .iter()
4106                .filter(|root| selected_ids.contains(root.subscription_id.as_str()))
4107                .count();
4108
4109            let mut cleared_synced_rows = 0i64;
4110            let mut cleared_tables = Vec::new();
4111            if request.clear_synced_rows {
4112                for subscription in &selected {
4113                    let deleted =
4114                        tx.clear_synced_rows_for_scopes(&subscription.table, &subscription.scopes)?;
4115                    if deleted > 0 {
4116                        cleared_synced_rows += deleted;
4117                        if !cleared_tables
4118                            .iter()
4119                            .any(|table| table == &subscription.table)
4120                        {
4121                            cleared_tables.push(subscription.table.clone());
4122                        }
4123                    }
4124                }
4125            }
4126
4127            for subscription in &selected {
4128                tx.delete_verified_root(DEFAULT_STATE_ID, &subscription.id)?;
4129                tx.delete_subscription_state(DEFAULT_STATE_ID, &subscription.id)?;
4130            }
4131
4132            Ok(crate::health::LocalSyncResetReport {
4133                reset_subscriptions: selected.len(),
4134                deleted_subscription_states,
4135                deleted_verified_roots,
4136                cleared_synced_rows,
4137                cleared_tables,
4138            })
4139        })
4140    }
4141
4142    pub fn reset_local_sync_state_json(&mut self, request_json: &str) -> Result<String> {
4143        let request: crate::health::LocalSyncResetRequest = serde_json::from_str(request_json)?;
4144        Ok(serde_json::to_string(
4145            &self.reset_local_sync_state(request)?,
4146        )?)
4147    }
4148
4149    fn selected_reset_subscriptions(
4150        &self,
4151        subscription_ids: &[String],
4152    ) -> Result<Vec<SubscriptionSpec>> {
4153        if subscription_ids.is_empty() {
4154            return Ok(self.subscriptions.clone());
4155        }
4156        let requested = subscription_ids
4157            .iter()
4158            .map(String::as_str)
4159            .collect::<HashSet<_>>();
4160        let selected = self
4161            .subscriptions
4162            .iter()
4163            .filter(|subscription| requested.contains(subscription.id.as_str()))
4164            .cloned()
4165            .collect::<Vec<_>>();
4166        if selected.len() != requested.len() {
4167            let configured = self
4168                .subscriptions
4169                .iter()
4170                .map(|subscription| subscription.id.as_str())
4171                .collect::<HashSet<_>>();
4172            let missing = subscription_ids
4173                .iter()
4174                .find(|id| !configured.contains(id.as_str()))
4175                .map(String::as_str)
4176                .unwrap_or("unknown");
4177            return Err(SyncularError::config(format!(
4178                "cannot reset unconfigured subscription {missing}"
4179            )));
4180        }
4181        Ok(selected)
4182    }
4183
4184    fn repair_force_rebootstrap(
4185        &mut self,
4186        subscription_ids: &[String],
4187    ) -> Result<crate::health::LocalHealthRepairReport> {
4188        if subscription_ids.is_empty() {
4189            return Err(SyncularError::config(
4190                "forceRebootstrap repair requires explicit subscriptionIds",
4191            ));
4192        }
4193        let configured = self
4194            .subscriptions
4195            .iter()
4196            .map(|subscription| subscription.id.as_str())
4197            .collect::<HashSet<_>>();
4198        for subscription_id in subscription_ids {
4199            if !configured.contains(subscription_id.as_str()) {
4200                return Err(SyncularError::config(format!(
4201                    "cannot force rebootstrap for unconfigured subscription {subscription_id}"
4202                )));
4203            }
4204        }
4205
4206        self.store.transaction(|tx| {
4207            let requested = subscription_ids
4208                .iter()
4209                .map(String::as_str)
4210                .collect::<HashSet<_>>();
4211            let deleted_subscription_states = tx
4212                .subscription_states(DEFAULT_STATE_ID)?
4213                .iter()
4214                .filter(|state| requested.contains(state.subscription_id.as_str()))
4215                .count();
4216            let deleted_verified_roots = tx
4217                .verified_roots(DEFAULT_STATE_ID)?
4218                .iter()
4219                .filter(|root| requested.contains(root.subscription_id.as_str()))
4220                .count();
4221            for subscription_id in subscription_ids {
4222                tx.delete_verified_root(DEFAULT_STATE_ID, subscription_id)?;
4223                tx.delete_subscription_state(DEFAULT_STATE_ID, subscription_id)?;
4224            }
4225            Ok(crate::health::LocalHealthRepairReport {
4226                action: crate::health::LocalHealthRepairAction::ForceRebootstrap,
4227                deleted_subscription_states,
4228                deleted_verified_roots,
4229                forced_rebootstrap_subscriptions: subscription_ids.len(),
4230                cleared_orphaned_synced_rows: 0,
4231                cleared_tables: Vec::new(),
4232            })
4233        })
4234    }
4235
4236    fn repair_clear_orphaned_state(
4237        &mut self,
4238        subscription_ids: &[String],
4239    ) -> Result<crate::health::LocalHealthRepairReport> {
4240        let configured = self
4241            .subscriptions
4242            .iter()
4243            .map(|subscription| subscription.id.as_str())
4244            .collect::<HashSet<_>>();
4245        for subscription_id in subscription_ids {
4246            if configured.contains(subscription_id.as_str()) {
4247                return Err(SyncularError::config(format!(
4248                    "clearOrphanedState refuses configured subscription {subscription_id}"
4249                )));
4250            }
4251        }
4252        let requested = subscription_ids
4253            .iter()
4254            .map(String::as_str)
4255            .collect::<HashSet<_>>();
4256
4257        self.store.transaction(|tx| {
4258            let states = tx.subscription_states(DEFAULT_STATE_ID)?;
4259            let roots = tx.verified_roots(DEFAULT_STATE_ID)?;
4260            let state_ids = states
4261                .iter()
4262                .map(|state| state.subscription_id.as_str())
4263                .filter(|id| !configured.contains(id))
4264                .filter(|id| requested.is_empty() || requested.contains(id))
4265                .collect::<HashSet<_>>();
4266            let root_ids = roots
4267                .iter()
4268                .map(|root| root.subscription_id.as_str())
4269                .filter(|id| !configured.contains(id))
4270                .filter(|id| requested.is_empty() || requested.contains(id))
4271                .collect::<HashSet<_>>();
4272            let mut all_ids = state_ids.iter().copied().collect::<HashSet<_>>();
4273            all_ids.extend(root_ids.iter().copied());
4274            for subscription_id in all_ids {
4275                tx.delete_subscription_state(DEFAULT_STATE_ID, subscription_id)?;
4276                tx.delete_verified_root(DEFAULT_STATE_ID, subscription_id)?;
4277            }
4278            Ok(crate::health::LocalHealthRepairReport {
4279                action: crate::health::LocalHealthRepairAction::ClearOrphanedState,
4280                deleted_subscription_states: state_ids.len(),
4281                deleted_verified_roots: root_ids.len(),
4282                forced_rebootstrap_subscriptions: 0,
4283                cleared_orphaned_synced_rows: 0,
4284                cleared_tables: Vec::new(),
4285            })
4286        })
4287    }
4288
4289    fn repair_clear_orphaned_synced_rows(
4290        &mut self,
4291        request: &crate::health::LocalHealthRepairRequest,
4292    ) -> Result<crate::health::LocalHealthRepairReport> {
4293        if !request.subscription_ids.is_empty() {
4294            return Err(SyncularError::config(
4295                "clearOrphanedSyncedRows uses tables, not subscriptionIds",
4296            ));
4297        }
4298        let unresolved_outbox = self
4299            .outbox_summaries()?
4300            .iter()
4301            .filter(|commit| commit.status != "acked")
4302            .count();
4303        if unresolved_outbox > 0 {
4304            return Err(SyncularError::config(format!(
4305                "clearOrphanedSyncedRows requires an empty local outbox; found {unresolved_outbox} unresolved commits"
4306            )));
4307        }
4308        let summary = self
4309            .store
4310            .clear_orphaned_synced_rows(&self.subscriptions, &request.tables)?;
4311        let cleared_tables = summary
4312            .tables
4313            .into_iter()
4314            .filter(|table| table.orphaned_synced_rows > 0)
4315            .map(|table| table.table)
4316            .collect::<Vec<_>>();
4317        Ok(crate::health::LocalHealthRepairReport {
4318            action: crate::health::LocalHealthRepairAction::ClearOrphanedSyncedRows,
4319            deleted_subscription_states: 0,
4320            deleted_verified_roots: 0,
4321            forced_rebootstrap_subscriptions: 0,
4322            cleared_orphaned_synced_rows: summary.orphaned_synced_rows,
4323            cleared_tables,
4324        })
4325    }
4326
4327    pub fn conflicts(&mut self) -> SyncularConflicts<'_, S, T> {
4328        SyncularConflicts { client: self }
4329    }
4330
4331    pub fn pending_conflicts(&mut self) -> Result<Vec<crate::store::ConflictSummary>> {
4332        self.conflict_summaries()
4333    }
4334
4335    pub fn has_pending_conflicts(&mut self) -> Result<bool> {
4336        Ok(!self.pending_conflicts()?.is_empty())
4337    }
4338
4339    pub fn resolve_conflict(&mut self, id: &str, resolution: &str) -> Result<()> {
4340        self.store.resolve_conflict(id, resolution)
4341    }
4342
4343    pub fn retry_conflict_keep_local(&mut self, id: &str) -> Result<String> {
4344        self.store.retry_conflict_keep_local(id)
4345    }
4346}
4347
4348impl<'a, S, T> SyncularConflicts<'a, S, T>
4349where
4350    S: SyncStore + SyncStateStore,
4351    T: SyncTransport,
4352{
4353    pub fn pending(&mut self) -> Result<Vec<crate::store::ConflictSummary>> {
4354        self.client.store.conflict_summaries()
4355    }
4356
4357    pub fn is_empty(&mut self) -> Result<bool> {
4358        Ok(self.pending()?.is_empty())
4359    }
4360
4361    pub fn keep_local(&mut self, conflict_id: &str) -> Result<ConflictResolutionReceipt> {
4362        let retry_client_commit_id = self.client.store.retry_conflict_keep_local(conflict_id)?;
4363        Ok(ConflictResolutionReceipt {
4364            conflict_id: conflict_id.to_string(),
4365            resolution: ConflictResolution::KeepLocal,
4366            retry_client_commit_id: Some(retry_client_commit_id),
4367        })
4368    }
4369
4370    pub fn accept_server(&mut self, conflict_id: &str) -> Result<ConflictResolutionReceipt> {
4371        self.resolve(conflict_id, ConflictResolution::AcceptServer)
4372    }
4373
4374    pub fn dismiss(&mut self, conflict_id: &str) -> Result<ConflictResolutionReceipt> {
4375        self.resolve(conflict_id, ConflictResolution::Dismiss)
4376    }
4377
4378    pub fn resolve(
4379        &mut self,
4380        conflict_id: &str,
4381        resolution: ConflictResolution,
4382    ) -> Result<ConflictResolutionReceipt> {
4383        if resolution == ConflictResolution::KeepLocal {
4384            return self.keep_local(conflict_id);
4385        }
4386
4387        self.client
4388            .store
4389            .resolve_conflict(conflict_id, resolution.as_str())?;
4390        Ok(ConflictResolutionReceipt {
4391            conflict_id: conflict_id.to_string(),
4392            resolution,
4393            retry_client_commit_id: None,
4394        })
4395    }
4396}
4397
4398fn validate_outbox_schema_version(commit: &OutboxCommit, current: i32) -> Result<()> {
4399    if commit.schema_version < 1 {
4400        return Err(SyncularError::schema(format!(
4401            "outbox commit {} has invalid schema version {}",
4402            commit.client_commit_id, commit.schema_version
4403        )));
4404    }
4405
4406    if commit.schema_version > current {
4407        return Err(SyncularError::schema(format!(
4408            "outbox commit {} was created with schema version {}, but this client supports {}",
4409            commit.client_commit_id, commit.schema_version, current
4410        )));
4411    }
4412
4413    Ok(())
4414}
4415
4416fn validate_server_schema_version(response: &CombinedResponse, current: i32) -> Result<()> {
4417    if let Some(required) = response.required_schema_version {
4418        if required < 1 {
4419            return Err(SyncularError::schema(format!(
4420                "server reported invalid required schema version {required}"
4421            )));
4422        }
4423
4424        if required > current {
4425            return Err(SyncularError::schema(format!(
4426                "server requires schema version {required}, but this client supports {current}"
4427            )));
4428        }
4429    }
4430
4431    if let Some(latest) = response.latest_schema_version {
4432        if latest < 1 {
4433            return Err(SyncularError::schema(format!(
4434                "server reported invalid latest schema version {latest}"
4435            )));
4436        }
4437    }
4438
4439    Ok(())
4440}
4441
4442fn apply_push_commit_response(
4443    tx: &mut impl SyncStoreTx,
4444    outbox: &OutboxCommit,
4445    response: &PushCommitResponse,
4446) -> Result<bool> {
4447    let mut conflicts_changed = false;
4448    match response.status.as_str() {
4449        "applied" | "cached" => {
4450            tx.mark_pushed_operation_server_versions(outbox, response)?;
4451            tx.mark_outbox_acked(&outbox.id, response)?;
4452        }
4453        _ => {
4454            for result in &response.results {
4455                if result.status == "conflict" || result.status == "error" {
4456                    tx.insert_conflict(outbox, result)?;
4457                    conflicts_changed = true;
4458                }
4459            }
4460            tx.mark_outbox_failed(&outbox.id, "REJECTED", response)?;
4461        }
4462    }
4463    Ok(conflicts_changed)
4464}
4465
4466fn is_auth_transport_error(error: &SyncularError) -> bool {
4467    if error.kind() != ErrorKind::Transport {
4468        return false;
4469    }
4470    let message = error.message_text();
4471    message.contains("HTTP 401") || message.contains("HTTP 403")
4472}