Skip to main content

syncular_runtime/storage/
diesel_sqlite.rs

1use crate::app_schema::{
2    checksum, default_app_schema, split_sql_statements, AppSchema, AppTableMetadata,
3};
4use crate::auth_lease_selection::{
5    app_table_operation_scope,
6    select_active_auth_lease_for_operations as select_auth_lease_for_operation_scopes,
7    system_table_operation_scope, ActiveAuthLeasePolicy, MutationOperationScope,
8};
9use crate::binary_snapshot::{BinarySnapshotCell, DecodedBinarySnapshotRows, SnapshotChunkRows};
10use crate::client::SubscriptionSpec;
11use crate::command_history::{CommandHistoryEntry, CommandHistoryRecord, CommandHistoryState};
12use crate::compaction::{
13    required_compaction_cutoff, tombstone_delete_statements, StorageCompactionOptions,
14    StorageCompactionReport,
15};
16use crate::crdt_field::{
17    validate_crdt_field, CrdtDocumentSnapshot, CrdtField, CrdtFieldId, CrdtUpdateLogEntry,
18    CrdtUpdateOrigin, CrdtUpdateStatus,
19};
20use crate::crdt_yjs::{
21    materialize_row_for_metadata, transform_local_row_for_metadata, yjs_state_vector_base64,
22    YjsUpdateEnvelope, YJS_PAYLOAD_KEY,
23};
24use crate::encrypted_crdt::{
25    apply_encrypted_crdt_plaintext_to_row, encrypted_crdt_identity_column,
26    encrypted_crdt_normalize_row, encrypted_crdt_row_matches_scopes, encrypted_crdt_scopes_json,
27    is_encrypted_crdt_system_table, EncryptedCrdtStreamStats, CRDT_CHECKPOINTS_TABLE,
28    CRDT_UPDATES_TABLE,
29};
30use crate::error::{ErrorKind, Result, SyncularError};
31#[cfg(feature = "demo-todo-native-fixture")]
32use crate::fixtures::todo::tasks::{insert_local_task, list_tasks, patch_local_task_title};
33use crate::limits::validate_unresolved_outbox_capacity;
34use crate::protocol::*;
35use crate::protocol::{sync_operations_json_for_outbox, validate_pending_mutation_batch_size};
36use crate::runtime_schema::RUNTIME_SYSTEM_SCHEMA_SQL;
37use crate::schema;
38use crate::store::{
39    now_ms, AppSchemaState, AppliedMigration, AuthLeaseRecord, BlobHealthSummary, ConflictSummary,
40    CrdtHealthSummary, OutboxCommit, OutboxSummary, ScopedRowsHealthSummary, ScopedRowsTableHealth,
41    SubscriptionState, SyncStateStore, SyncStore, SyncStoreTx, VerifiedRoot, APP_SCHEMA_ID,
42    BLOB_UPLOAD_STALE_TIMEOUT_MS, MAX_BLOB_UPLOAD_RETRIES, MAX_SYNC_RETRIES,
43    SQLITE_BUSY_TIMEOUT_MS, SYNC_SENDING_TIMEOUT_MS,
44};
45#[cfg(feature = "demo-todo-fixture")]
46use crate::store::{DemoTaskStore, Task};
47use diesel::connection::SimpleConnection;
48use diesel::prelude::*;
49use diesel::sql_query;
50use diesel::sql_types::{BigInt, Binary, Bool, Integer, Nullable, Text};
51use diesel::sqlite::SqliteConnection;
52use serde::{Deserialize, Serialize};
53use serde_json::{Map, Value};
54use uuid::Uuid;
55
56const SNAPSHOT_UPSERT_BATCH_ROWS: usize = 128;
57
58#[derive(Debug, Clone, Queryable, Selectable)]
59#[diesel(table_name = schema::sync_outbox_commits)]
60#[allow(dead_code)]
61struct OutboxCommitRow {
62    id: String,
63    client_commit_id: String,
64    status: String,
65    operations_json: String,
66    last_response_json: Option<String>,
67    error: Option<String>,
68    created_at: i64,
69    updated_at: i64,
70    attempt_count: i32,
71    acked_commit_seq: Option<i64>,
72    schema_version: i32,
73    next_attempt_at: i64,
74    lease_id: Option<String>,
75    lease_expires_at_ms: Option<i64>,
76    lease_status_at_enqueue: Option<String>,
77    lease_scope_summary_json: Option<String>,
78    lease_token: Option<String>,
79}
80
81#[derive(Debug, Clone, QueryableByName)]
82struct CommandHistoryRow {
83    #[diesel(sql_type = Text)]
84    id: String,
85    #[diesel(sql_type = Text)]
86    mutation_scope: String,
87    #[diesel(sql_type = Text)]
88    state: String,
89    #[diesel(sql_type = Text)]
90    entries_json: String,
91    #[diesel(sql_type = Text)]
92    client_commit_id: String,
93    #[diesel(sql_type = Nullable<Text>)]
94    undo_client_commit_id: Option<String>,
95    #[diesel(sql_type = Nullable<Text>)]
96    redo_client_commit_id: Option<String>,
97    #[diesel(sql_type = BigInt)]
98    created_at: i64,
99    #[diesel(sql_type = BigInt)]
100    updated_at: i64,
101}
102
103#[derive(Debug, Clone)]
104struct PendingCommandHistoryEntry {
105    table: String,
106    row_id: String,
107    before: Option<Value>,
108}
109
110impl TryFrom<CommandHistoryRow> for CommandHistoryRecord {
111    type Error = SyncularError;
112
113    fn try_from(row: CommandHistoryRow) -> Result<Self> {
114        let entries =
115            serde_json::from_str::<Vec<CommandHistoryEntry>>(&row.entries_json).map_err(|err| {
116                SyncularError::storage(err).context("deserialize sync_command_history.entries_json")
117            })?;
118        Ok(Self {
119            id: row.id,
120            mutation_scope: row.mutation_scope,
121            state: CommandHistoryState::try_from(row.state.as_str())?,
122            entries,
123            client_commit_id: row.client_commit_id,
124            undo_client_commit_id: row.undo_client_commit_id,
125            redo_client_commit_id: row.redo_client_commit_id,
126            created_at: row.created_at,
127            updated_at: row.updated_at,
128        })
129    }
130}
131
132fn insert_command_history_record(
133    conn: &mut SqliteConnection,
134    mutation_scope: &str,
135    entries: &[CommandHistoryEntry],
136    receipt: &MutationReceipt,
137) -> Result<CommandHistoryRecord> {
138    let id = Uuid::new_v4().to_string();
139    let entries_json = serde_json::to_string(entries)?;
140    let created_at = now_ms();
141    sql_query("delete from sync_command_history where state = 'undone'").execute(conn)?;
142    sql_query(
143        r#"
144        insert into sync_command_history (
145            id,
146            mutation_scope,
147            state,
148            entries_json,
149            client_commit_id,
150            undo_client_commit_id,
151            redo_client_commit_id,
152            created_at,
153            updated_at
154        )
155        values (?1, ?2, 'done', ?3, ?4, null, null, ?5, ?5)
156        "#,
157    )
158    .bind::<Text, _>(&id)
159    .bind::<Text, _>(mutation_scope)
160    .bind::<Text, _>(&entries_json)
161    .bind::<Text, _>(&receipt.client_commit_id)
162    .bind::<BigInt, _>(created_at)
163    .execute(conn)?;
164    Ok(CommandHistoryRecord {
165        id,
166        mutation_scope: mutation_scope.to_string(),
167        state: CommandHistoryState::Done,
168        entries: entries.to_vec(),
169        client_commit_id: receipt.client_commit_id.clone(),
170        undo_client_commit_id: None,
171        redo_client_commit_id: None,
172        created_at,
173        updated_at: created_at,
174    })
175}
176
177#[derive(Debug, Clone, QueryableByName)]
178struct CrdtDocumentSnapshotRow {
179    #[diesel(sql_type = Text)]
180    document_key: String,
181    #[diesel(sql_type = Text)]
182    app_table: String,
183    #[diesel(sql_type = Text)]
184    row_id: String,
185    #[diesel(sql_type = Text)]
186    field_name: String,
187    #[diesel(sql_type = Text)]
188    state_column: String,
189    #[diesel(sql_type = Text)]
190    sync_mode: String,
191    #[diesel(sql_type = Nullable<Text>)]
192    state_base64: Option<String>,
193    #[diesel(sql_type = Text)]
194    state_vector_base64: String,
195    #[diesel(sql_type = BigInt)]
196    pending_updates: i64,
197    #[diesel(sql_type = BigInt)]
198    flushed_updates: i64,
199    #[diesel(sql_type = BigInt)]
200    acked_updates: i64,
201    #[diesel(sql_type = BigInt)]
202    log_updates: i64,
203    #[diesel(sql_type = BigInt)]
204    updated_at: i64,
205    #[diesel(sql_type = Nullable<BigInt>)]
206    compacted_at: Option<i64>,
207}
208
209#[derive(Debug, Clone, QueryableByName)]
210struct CrdtUpdateLogRow {
211    #[diesel(sql_type = BigInt)]
212    id: i64,
213    #[diesel(sql_type = Text)]
214    document_key: String,
215    #[diesel(sql_type = Text)]
216    update_id: String,
217    #[diesel(sql_type = Nullable<Text>)]
218    client_commit_id: Option<String>,
219    #[diesel(sql_type = Text)]
220    origin: String,
221    #[diesel(sql_type = Text)]
222    status: String,
223    #[diesel(sql_type = Text)]
224    update_base64: String,
225    #[diesel(sql_type = Text)]
226    state_vector_base64: String,
227    #[diesel(sql_type = BigInt)]
228    created_at: i64,
229    #[diesel(sql_type = Nullable<BigInt>)]
230    flushed_at: Option<i64>,
231    #[diesel(sql_type = Nullable<BigInt>)]
232    acked_at: Option<i64>,
233}
234
235impl From<OutboxCommitRow> for OutboxCommit {
236    fn from(row: OutboxCommitRow) -> Self {
237        Self {
238            id: row.id,
239            client_commit_id: row.client_commit_id,
240            status: row.status,
241            operations_json: row.operations_json,
242            last_response_json: row.last_response_json,
243            error: row.error,
244            created_at: row.created_at,
245            updated_at: row.updated_at,
246            attempt_count: row.attempt_count,
247            acked_commit_seq: row.acked_commit_seq,
248            schema_version: row.schema_version,
249            next_attempt_at: row.next_attempt_at,
250            auth_lease: auth_lease_provenance_from_columns(
251                row.lease_id,
252                row.lease_expires_at_ms,
253                row.lease_status_at_enqueue,
254                row.lease_scope_summary_json,
255                row.lease_token,
256            ),
257        }
258    }
259}
260
261fn auth_lease_provenance_from_columns(
262    lease_id: Option<String>,
263    lease_expires_at_ms: Option<i64>,
264    lease_status_at_enqueue: Option<String>,
265    lease_scope_summary_json: Option<String>,
266    lease_token: Option<String>,
267) -> Option<AuthLeaseProvenance> {
268    Some(AuthLeaseProvenance {
269        lease_id: lease_id?,
270        lease_expires_at_ms: lease_expires_at_ms?,
271        lease_status_at_enqueue: lease_status_at_enqueue?,
272        lease_scope_summary_json,
273        lease_token,
274    })
275}
276
277impl TryFrom<CrdtDocumentSnapshotRow> for CrdtDocumentSnapshot {
278    type Error = SyncularError;
279
280    fn try_from(row: CrdtDocumentSnapshotRow) -> Result<Self> {
281        Ok(Self {
282            document_key: row.document_key,
283            table: row.app_table,
284            row_id: row.row_id,
285            field: row.field_name,
286            state_column: row.state_column,
287            sync_mode: crdt_sync_mode_from_str(&row.sync_mode)?,
288            state_base64: row.state_base64,
289            state_vector_base64: row.state_vector_base64,
290            pending_updates: row.pending_updates,
291            flushed_updates: row.flushed_updates,
292            acked_updates: row.acked_updates,
293            log_updates: row.log_updates,
294            updated_at: row.updated_at,
295            compacted_at: row.compacted_at,
296        })
297    }
298}
299
300impl TryFrom<CrdtUpdateLogRow> for CrdtUpdateLogEntry {
301    type Error = SyncularError;
302
303    fn try_from(row: CrdtUpdateLogRow) -> Result<Self> {
304        Ok(Self {
305            id: row.id,
306            document_key: row.document_key,
307            update_id: row.update_id,
308            client_commit_id: row.client_commit_id,
309            origin: crdt_update_origin_from_str(&row.origin)?,
310            status: crdt_update_status_from_str(&row.status)?,
311            update_base64: row.update_base64,
312            state_vector_base64: row.state_vector_base64,
313            created_at: row.created_at,
314            flushed_at: row.flushed_at,
315            acked_at: row.acked_at,
316        })
317    }
318}
319
320#[derive(Insertable)]
321#[diesel(table_name = schema::sync_outbox_commits)]
322struct NewOutboxCommit {
323    id: String,
324    client_commit_id: String,
325    status: String,
326    operations_json: String,
327    last_response_json: Option<String>,
328    error: Option<String>,
329    created_at: i64,
330    updated_at: i64,
331    attempt_count: i32,
332    acked_commit_seq: Option<i64>,
333    schema_version: i32,
334    next_attempt_at: i64,
335    lease_id: Option<String>,
336    lease_expires_at_ms: Option<i64>,
337    lease_status_at_enqueue: Option<String>,
338    lease_scope_summary_json: Option<String>,
339    lease_token: Option<String>,
340}
341
342#[derive(Debug, Clone, Queryable, Selectable, Insertable, AsChangeset)]
343#[diesel(table_name = schema::sync_auth_leases)]
344struct AuthLeaseRecordRow {
345    lease_id: String,
346    kid: String,
347    actor_id: String,
348    issued_at_ms: i64,
349    not_before_ms: i64,
350    expires_at_ms: i64,
351    schema_version: i32,
352    payload_json: String,
353    token: String,
354    status: String,
355    last_validation_error: Option<String>,
356    created_at_ms: i64,
357    updated_at_ms: i64,
358}
359
360impl From<AuthLeaseRecordRow> for AuthLeaseRecord {
361    fn from(row: AuthLeaseRecordRow) -> Self {
362        Self {
363            lease_id: row.lease_id,
364            kid: row.kid,
365            actor_id: row.actor_id,
366            issued_at_ms: row.issued_at_ms,
367            not_before_ms: row.not_before_ms,
368            expires_at_ms: row.expires_at_ms,
369            schema_version: row.schema_version,
370            payload_json: row.payload_json,
371            token: row.token,
372            status: row.status,
373            last_validation_error: row.last_validation_error,
374            created_at_ms: row.created_at_ms,
375            updated_at_ms: row.updated_at_ms,
376        }
377    }
378}
379
380impl From<&AuthLeaseRecord> for AuthLeaseRecordRow {
381    fn from(record: &AuthLeaseRecord) -> Self {
382        Self {
383            lease_id: record.lease_id.clone(),
384            kid: record.kid.clone(),
385            actor_id: record.actor_id.clone(),
386            issued_at_ms: record.issued_at_ms,
387            not_before_ms: record.not_before_ms,
388            expires_at_ms: record.expires_at_ms,
389            schema_version: record.schema_version,
390            payload_json: record.payload_json.clone(),
391            token: record.token.clone(),
392            status: record.status.clone(),
393            last_validation_error: record.last_validation_error.clone(),
394            created_at_ms: record.created_at_ms,
395            updated_at_ms: record.updated_at_ms,
396        }
397    }
398}
399
400#[derive(Debug, Clone, Queryable, Selectable)]
401#[diesel(table_name = schema::sync_subscription_state)]
402#[allow(dead_code)]
403struct SubscriptionStateRow {
404    state_id: String,
405    subscription_id: String,
406    table_name: String,
407    scopes_json: String,
408    params_json: String,
409    cursor: i64,
410    bootstrap_state_json: Option<String>,
411    status: String,
412    created_at: i64,
413    updated_at: i64,
414}
415
416impl From<SubscriptionStateRow> for SubscriptionState {
417    fn from(row: SubscriptionStateRow) -> Self {
418        Self {
419            state_id: row.state_id,
420            subscription_id: row.subscription_id,
421            table: row.table_name,
422            scopes_json: row.scopes_json,
423            params_json: row.params_json,
424            cursor: row.cursor,
425            bootstrap_state_json: row.bootstrap_state_json,
426            status: row.status,
427        }
428    }
429}
430
431#[derive(Debug, Clone, QueryableByName)]
432struct VerifiedRootRow {
433    #[diesel(sql_type = Text)]
434    state_id: String,
435    #[diesel(sql_type = Text)]
436    subscription_id: String,
437    #[diesel(sql_type = Text)]
438    partition_id: String,
439    #[diesel(sql_type = BigInt)]
440    commit_seq: i64,
441    #[diesel(sql_type = Text)]
442    root: String,
443}
444
445impl From<VerifiedRootRow> for VerifiedRoot {
446    fn from(row: VerifiedRootRow) -> Self {
447        Self {
448            state_id: row.state_id,
449            subscription_id: row.subscription_id,
450            partition_id: row.partition_id,
451            commit_seq: row.commit_seq,
452            root: row.root,
453        }
454    }
455}
456
457#[derive(QueryableByName)]
458struct MigrationVersionRow {
459    #[diesel(sql_type = Text)]
460    version: String,
461    #[diesel(sql_type = Text)]
462    checksum: String,
463}
464
465#[derive(QueryableByName)]
466struct AppliedMigrationRow {
467    #[diesel(sql_type = Text)]
468    version: String,
469    #[diesel(sql_type = Text)]
470    name: String,
471    #[diesel(sql_type = Text)]
472    checksum: String,
473    #[diesel(sql_type = BigInt)]
474    applied_at: i64,
475}
476
477impl From<AppliedMigrationRow> for AppliedMigration {
478    fn from(row: AppliedMigrationRow) -> Self {
479        Self {
480            version: row.version,
481            name: row.name,
482            checksum: row.checksum,
483            applied_at: row.applied_at,
484        }
485    }
486}
487
488#[derive(QueryableByName)]
489struct AppSchemaStateRow {
490    #[diesel(sql_type = Integer)]
491    schema_version: i32,
492    #[diesel(sql_type = BigInt)]
493    updated_at: i64,
494}
495
496#[derive(QueryableByName)]
497struct OutboxSummaryRow {
498    #[diesel(sql_type = Text)]
499    client_commit_id: String,
500    #[diesel(sql_type = Text)]
501    status: String,
502    #[diesel(sql_type = Integer)]
503    schema_version: i32,
504    #[diesel(sql_type = Nullable<Text>)]
505    lease_id: Option<String>,
506    #[diesel(sql_type = Nullable<BigInt>)]
507    lease_expires_at_ms: Option<i64>,
508    #[diesel(sql_type = Nullable<Text>)]
509    lease_status_at_enqueue: Option<String>,
510    #[diesel(sql_type = Nullable<Text>)]
511    lease_scope_summary_json: Option<String>,
512    #[diesel(sql_type = Nullable<Text>)]
513    lease_token: Option<String>,
514}
515
516impl From<OutboxSummaryRow> for OutboxSummary {
517    fn from(row: OutboxSummaryRow) -> Self {
518        Self {
519            client_commit_id: row.client_commit_id,
520            status: row.status,
521            schema_version: row.schema_version,
522            auth_lease: auth_lease_provenance_from_columns(
523                row.lease_id,
524                row.lease_expires_at_ms,
525                row.lease_status_at_enqueue,
526                row.lease_scope_summary_json,
527                row.lease_token,
528            ),
529        }
530    }
531}
532
533#[derive(QueryableByName)]
534struct NextRetryAtRow {
535    #[diesel(sql_type = diesel::sql_types::Nullable<BigInt>)]
536    next_attempt_at: Option<i64>,
537}
538
539#[derive(QueryableByName)]
540struct ConflictSummaryRow {
541    #[diesel(sql_type = Text)]
542    id: String,
543    #[diesel(sql_type = Text)]
544    client_commit_id: String,
545    #[diesel(sql_type = Integer)]
546    op_index: i32,
547    #[diesel(sql_type = Text)]
548    result_status: String,
549    #[diesel(sql_type = Text)]
550    message: String,
551    #[diesel(sql_type = diesel::sql_types::Nullable<Text>)]
552    code: Option<String>,
553    #[diesel(sql_type = diesel::sql_types::Nullable<BigInt>)]
554    server_version: Option<i64>,
555    #[diesel(sql_type = diesel::sql_types::Nullable<BigInt>)]
556    resolved_at: Option<i64>,
557    #[diesel(sql_type = diesel::sql_types::Nullable<Text>)]
558    resolution: Option<String>,
559}
560
561#[derive(QueryableByName)]
562struct ConflictRetryRow {
563    #[diesel(sql_type = BigInt)]
564    server_version: i64,
565    #[diesel(sql_type = Integer)]
566    op_index: i32,
567    #[diesel(sql_type = Text)]
568    operations_json: String,
569}
570
571#[derive(QueryableByName)]
572struct BlobBodyRow {
573    #[diesel(sql_type = Binary)]
574    body: Vec<u8>,
575}
576
577#[derive(QueryableByName)]
578struct BlobFoundRow {
579    #[diesel(sql_type = Integer)]
580    found: i32,
581}
582
583#[derive(QueryableByName)]
584struct JsonObjectRow {
585    #[diesel(sql_type = Text)]
586    row_json: String,
587}
588
589#[derive(QueryableByName)]
590struct EncryptedCrdtScopeRow {
591    #[diesel(sql_type = Text)]
592    identity: String,
593    #[diesel(sql_type = Text)]
594    scopes: String,
595}
596
597#[derive(QueryableByName)]
598struct EncryptedCrdtStreamStatsRow {
599    #[diesel(sql_type = BigInt)]
600    update_count: i64,
601    #[diesel(sql_type = BigInt)]
602    checkpoint_count: i64,
603    #[diesel(sql_type = BigInt)]
604    checkpointable_update_count: i64,
605    #[diesel(sql_type = diesel::sql_types::Nullable<BigInt>)]
606    max_server_seq: Option<i64>,
607    #[diesel(sql_type = diesel::sql_types::Nullable<BigInt>)]
608    latest_checkpoint_covers_seq: Option<i64>,
609}
610
611impl From<EncryptedCrdtStreamStatsRow> for EncryptedCrdtStreamStats {
612    fn from(row: EncryptedCrdtStreamStatsRow) -> Self {
613        Self {
614            update_count: row.update_count,
615            checkpoint_count: row.checkpoint_count,
616            checkpointable_update_count: row.checkpointable_update_count,
617            max_server_seq: row.max_server_seq,
618            latest_checkpoint_covers_seq: row.latest_checkpoint_covers_seq,
619        }
620    }
621}
622
623#[derive(QueryableByName)]
624pub struct PendingBlobUploadRow {
625    #[diesel(sql_type = Text)]
626    pub hash: String,
627    #[diesel(sql_type = BigInt)]
628    pub size: i64,
629    #[diesel(sql_type = Text)]
630    pub mime_type: String,
631    #[diesel(sql_type = Binary)]
632    pub body: Vec<u8>,
633    #[diesel(sql_type = Integer)]
634    pub encrypted: i32,
635    #[diesel(sql_type = Nullable<Text>)]
636    pub key_id: Option<String>,
637    #[diesel(sql_type = Integer)]
638    pub attempt_count: i32,
639}
640
641#[derive(QueryableByName)]
642struct BlobQueueStatsRow {
643    #[diesel(sql_type = Text)]
644    status: String,
645    #[diesel(sql_type = BigInt)]
646    count: i64,
647}
648
649#[derive(QueryableByName)]
650struct BlobCacheStatsRow {
651    #[diesel(sql_type = BigInt)]
652    count: i64,
653    #[diesel(sql_type = BigInt)]
654    total_bytes: i64,
655}
656
657#[derive(QueryableByName)]
658struct BlobCacheEntryRow {
659    #[diesel(sql_type = Text)]
660    hash: String,
661    #[diesel(sql_type = BigInt)]
662    size: i64,
663}
664
665#[derive(QueryableByName)]
666struct CountRow {
667    #[diesel(sql_type = BigInt)]
668    count: i64,
669}
670
671#[derive(QueryableByName)]
672struct StringValueRow {
673    #[diesel(sql_type = Text)]
674    value: String,
675}
676
677#[derive(QueryableByName)]
678struct ColumnNameRow {
679    #[diesel(sql_type = Text)]
680    name: String,
681}
682
683#[derive(QueryableByName)]
684struct NullableStringValueRow {
685    #[diesel(sql_type = Nullable<Text>)]
686    value: Option<String>,
687}
688
689#[derive(QueryableByName)]
690struct CrdtHealthStatsRow {
691    #[diesel(sql_type = BigInt)]
692    document_count: i64,
693    #[diesel(sql_type = BigInt)]
694    pending_updates: i64,
695    #[diesel(sql_type = BigInt)]
696    flushed_updates: i64,
697    #[diesel(sql_type = BigInt)]
698    acked_updates: i64,
699    #[diesel(sql_type = BigInt)]
700    log_updates: i64,
701}
702
703#[derive(QueryableByName)]
704struct CrdtDocumentIdentityRow {
705    #[diesel(sql_type = Text)]
706    app_table: String,
707    #[diesel(sql_type = Text)]
708    row_id: String,
709}
710
711#[derive(Debug, Clone, Serialize, Deserialize)]
712#[serde(rename_all = "camelCase")]
713pub struct BlobUploadQueueStats {
714    pub pending: i64,
715    pub uploading: i64,
716    pub failed: i64,
717}
718
719#[derive(Debug, Clone, Serialize, Deserialize)]
720#[serde(rename_all = "camelCase")]
721pub struct BlobCacheStats {
722    pub count: i64,
723    pub total_bytes: i64,
724}
725
726#[derive(Debug, Clone, Serialize, Deserialize)]
727#[serde(rename_all = "camelCase")]
728pub struct BlobUploadQueueResult {
729    pub uploaded: i32,
730    pub failed: i32,
731}
732
733impl From<ConflictSummaryRow> for ConflictSummary {
734    fn from(row: ConflictSummaryRow) -> Self {
735        Self {
736            id: row.id,
737            client_commit_id: row.client_commit_id,
738            op_index: row.op_index,
739            result_status: row.result_status,
740            message: row.message,
741            code: row.code,
742            server_version: row.server_version,
743            resolved_at: row.resolved_at,
744            resolution: row.resolution,
745        }
746    }
747}
748
749pub struct DieselSqliteStore {
750    conn: SqliteConnection,
751    app_schema: AppSchema,
752}
753
754pub struct DieselSqliteTx<'a> {
755    conn: &'a mut SqliteConnection,
756    app_schema: AppSchema,
757}
758
759#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
760pub struct SqliteRuntimePragmaReport {
761    pub journal_mode: String,
762    pub foreign_keys: i32,
763    pub busy_timeout: i32,
764    pub synchronous: i32,
765}
766
767#[derive(QueryableByName)]
768struct JournalModePragmaRow {
769    #[diesel(sql_type = Text)]
770    journal_mode: String,
771}
772
773#[derive(QueryableByName)]
774struct ForeignKeysPragmaRow {
775    #[diesel(sql_type = Integer)]
776    foreign_keys: i32,
777}
778
779#[derive(QueryableByName)]
780struct BusyTimeoutPragmaRow {
781    #[diesel(sql_type = Integer)]
782    timeout: i32,
783}
784
785#[derive(QueryableByName)]
786struct SynchronousPragmaRow {
787    #[diesel(sql_type = Integer)]
788    synchronous: i32,
789}
790
791impl DieselSqliteStore {
792    pub fn open(path: &str) -> Result<Self> {
793        Self::open_with_schema(path, default_app_schema())
794    }
795
796    pub fn open_with_schema(path: &str, app_schema: AppSchema) -> Result<Self> {
797        let mut conn = SqliteConnection::establish(path).map_err(|err| {
798            SyncularError::storage(err).context(format!("open sqlite database at {path}"))
799        })?;
800        apply_sqlite_runtime_pragmas(&mut conn)?;
801        let mut store = Self { conn, app_schema };
802        store.ensure_schema()?;
803        Ok(store)
804    }
805
806    pub fn ensure_schema(&mut self) -> Result<()> {
807        sql_query(
808            r#"
809            create table if not exists sync_migrations (
810                version text primary key,
811                name text not null,
812                checksum text not null,
813                applied_at bigint not null
814            )
815            "#,
816        )
817        .execute(&mut self.conn)?;
818        self.ensure_app_schema_state_table()?;
819        self.reject_future_app_schema_state()?;
820        self.ensure_runtime_system_schema()?;
821
822        for migration in self.app_schema.migrations {
823            let applied = sql_query(
824                r#"
825                select version, checksum
826                from sync_migrations
827                where version = ?1
828                limit 1
829                "#,
830            )
831            .bind::<Text, _>(migration.version)
832            .load::<MigrationVersionRow>(&mut self.conn)?
833            .into_iter()
834            .next();
835            let expected_checksum = checksum(migration.up_sql);
836
837            if let Some(applied) = applied {
838                if applied.checksum != expected_checksum {
839                    return Err(SyncularError::schema(format!(
840                        "migration {} checksum mismatch",
841                        applied.version
842                    )));
843                }
844                continue;
845            }
846
847            self.conn.transaction::<(), SyncularError, _>(|conn| {
848                for statement in split_sql_statements(migration.up_sql) {
849                    sql_query(statement).execute(conn)?;
850                }
851                sql_query(
852                    r#"
853                    insert into sync_migrations (version, name, checksum, applied_at)
854                    values (?1, ?2, ?3, ?4)
855                    "#,
856                )
857                .bind::<Text, _>(migration.version)
858                .bind::<Text, _>(migration.name)
859                .bind::<Text, _>(&expected_checksum)
860                .bind::<BigInt, _>(now_ms())
861                .execute(conn)?;
862                Ok(())
863            })?;
864        }
865
866        self.record_app_schema_state()?;
867        Ok(())
868    }
869
870    fn ensure_app_schema_state_table(&mut self) -> Result<()> {
871        sql_query(
872            r#"
873            create table if not exists syncular_app_schema (
874                schema_id text primary key,
875                schema_version integer not null,
876                updated_at bigint not null
877            )
878            "#,
879        )
880        .execute(&mut self.conn)?;
881        Ok(())
882    }
883
884    fn app_schema_state_row(&mut self) -> Result<Option<AppSchemaStateRow>> {
885        Ok(sql_query(
886            r#"
887            select schema_version, updated_at
888            from syncular_app_schema
889            where schema_id = ?1
890            limit 1
891            "#,
892        )
893        .bind::<Text, _>(APP_SCHEMA_ID)
894        .load::<AppSchemaStateRow>(&mut self.conn)?
895        .into_iter()
896        .next())
897    }
898
899    fn reject_future_app_schema_state(&mut self) -> Result<()> {
900        let current = self.app_schema.current_schema_version();
901        if let Some(row) = self.app_schema_state_row()? {
902            if row.schema_version > current {
903                return Err(SyncularError::schema(format!(
904                    "Syncular app schema version mismatch: local {}, generated {}",
905                    row.schema_version, current
906                )));
907            }
908        }
909        Ok(())
910    }
911
912    fn record_app_schema_state(&mut self) -> Result<()> {
913        let current = self.app_schema.current_schema_version();
914        sql_query(
915            r#"
916            insert into syncular_app_schema (schema_id, schema_version, updated_at)
917            values (?1, ?2, ?3)
918            on conflict (schema_id) do update set
919                schema_version = excluded.schema_version,
920                updated_at = excluded.updated_at
921            "#,
922        )
923        .bind::<Text, _>(APP_SCHEMA_ID)
924        .bind::<Integer, _>(current)
925        .bind::<BigInt, _>(now_ms())
926        .execute(&mut self.conn)?;
927        Ok(())
928    }
929
930    pub fn app_schema_state(&mut self) -> Result<AppSchemaState> {
931        self.ensure_app_schema_state_table()?;
932        let row = self.app_schema_state_row()?;
933        Ok(AppSchemaState {
934            schema_id: APP_SCHEMA_ID.to_string(),
935            schema_version: row.as_ref().map(|row| row.schema_version),
936            current_schema_version: self.app_schema.current_schema_version(),
937            updated_at: row.as_ref().map(|row| row.updated_at),
938        })
939    }
940
941    pub fn runtime_pragma_report(&mut self) -> Result<SqliteRuntimePragmaReport> {
942        sqlite_runtime_pragma_report(&mut self.conn)
943    }
944
945    fn ensure_runtime_system_schema(&mut self) -> Result<()> {
946        for statement in split_sql_statements(RUNTIME_SYSTEM_SCHEMA_SQL) {
947            sql_query(statement).execute(&mut self.conn)?;
948        }
949        self.ensure_runtime_system_schema_upgrades()?;
950        Ok(())
951    }
952
953    fn ensure_runtime_system_schema_upgrades(&mut self) -> Result<()> {
954        add_column_if_missing(
955            &mut self.conn,
956            "sync_outbox_commits",
957            "lease_id",
958            "alter table sync_outbox_commits add column lease_id text null",
959        )?;
960        add_column_if_missing(
961            &mut self.conn,
962            "sync_outbox_commits",
963            "lease_expires_at_ms",
964            "alter table sync_outbox_commits add column lease_expires_at_ms bigint null",
965        )?;
966        add_column_if_missing(
967            &mut self.conn,
968            "sync_outbox_commits",
969            "lease_status_at_enqueue",
970            "alter table sync_outbox_commits add column lease_status_at_enqueue text null",
971        )?;
972        add_column_if_missing(
973            &mut self.conn,
974            "sync_outbox_commits",
975            "lease_scope_summary_json",
976            "alter table sync_outbox_commits add column lease_scope_summary_json text null",
977        )?;
978        add_column_if_missing(
979            &mut self.conn,
980            "sync_outbox_commits",
981            "lease_token",
982            "alter table sync_outbox_commits add column lease_token text null",
983        )
984    }
985
986    pub fn list_table_json(&mut self, table: &str) -> Result<Vec<Value>> {
987        if self.app_schema.table_metadata(table).is_none() {
988            return Err(SyncularError::config(format!(
989                "unknown generated app table: {table}"
990            )));
991        }
992
993        list_app_rows_json(&mut self.conn, self.app_schema, table)
994    }
995
996    pub fn read_row_json(&mut self, table: &str, row_id: &str) -> Result<Option<Value>> {
997        current_app_row_json(&mut self.conn, self.app_schema, table, row_id)
998    }
999
1000    pub fn record_command_history(
1001        &mut self,
1002        mutation_scope: &str,
1003        entries: &[CommandHistoryEntry],
1004        receipt: &MutationReceipt,
1005    ) -> Result<CommandHistoryRecord> {
1006        self.conn
1007            .transaction::<CommandHistoryRecord, SyncularError, _>(|conn| {
1008                insert_command_history_record(conn, mutation_scope, entries, receipt)
1009            })
1010    }
1011
1012    pub fn apply_syncular_mutations_with_command_history(
1013        &mut self,
1014        mutation_scope: &str,
1015        actor_id: Option<&str>,
1016        now_ms_value: i64,
1017        mutations: Vec<PendingSyncularMutation>,
1018    ) -> Result<MutationReceipt> {
1019        self.transaction(|tx| {
1020            tx.apply_syncular_mutations_with_command_history(
1021                mutation_scope,
1022                actor_id,
1023                now_ms_value,
1024                mutations,
1025            )
1026        })
1027    }
1028
1029    pub fn latest_command_history(
1030        &mut self,
1031        state: CommandHistoryState,
1032    ) -> Result<Option<CommandHistoryRecord>> {
1033        sql_query(
1034            r#"
1035            select id, mutation_scope, state, entries_json, client_commit_id,
1036                   undo_client_commit_id, redo_client_commit_id, created_at, updated_at
1037            from sync_command_history
1038            where state = ?1
1039            order by updated_at desc, created_at desc, id desc
1040            limit 1
1041            "#,
1042        )
1043        .bind::<Text, _>(state.as_str())
1044        .load::<CommandHistoryRow>(&mut self.conn)?
1045        .into_iter()
1046        .next()
1047        .map(CommandHistoryRecord::try_from)
1048        .transpose()
1049    }
1050
1051    pub fn mark_command_history(
1052        &mut self,
1053        id: &str,
1054        state: CommandHistoryState,
1055        receipt: &MutationReceipt,
1056    ) -> Result<()> {
1057        let replay_column = match state {
1058            CommandHistoryState::Done => "redo_client_commit_id",
1059            CommandHistoryState::Undone => "undo_client_commit_id",
1060        };
1061        let statement = format!(
1062            "update sync_command_history set state = ?1, updated_at = ?2, {replay_column} = ?3 where id = ?4"
1063        );
1064        sql_query(statement)
1065            .bind::<Text, _>(state.as_str())
1066            .bind::<BigInt, _>(now_ms())
1067            .bind::<Text, _>(&receipt.client_commit_id)
1068            .bind::<Text, _>(id)
1069            .execute(&mut self.conn)?;
1070        Ok(())
1071    }
1072
1073    pub fn read<'query, Q, Row>(&mut self, query: Q) -> Result<Vec<Row>>
1074    where
1075        Q: diesel::query_dsl::LoadQuery<'query, SqliteConnection, Row>,
1076    {
1077        query.load(&mut self.conn).map_err(Into::into)
1078    }
1079
1080    pub fn apply_local_operation(
1081        &mut self,
1082        operation: SyncOperation,
1083        local_row: Option<Value>,
1084    ) -> Result<String> {
1085        if self.app_schema.table_metadata(&operation.table).is_none()
1086            && !is_encrypted_crdt_system_table(&operation.table)
1087        {
1088            return Err(SyncularError::config(format!(
1089                "unknown generated app table: {}",
1090                operation.table
1091            )));
1092        }
1093
1094        self.transaction(|tx| tx.apply_local_operation(operation, local_row))
1095    }
1096
1097    pub fn apply_local_operation_with_active_auth_lease(
1098        &mut self,
1099        actor_id: Option<&str>,
1100        now_ms_value: i64,
1101        operation: SyncOperation,
1102        local_row: Option<Value>,
1103    ) -> Result<String> {
1104        self.transaction(|tx| {
1105            tx.apply_local_operation_with_active_auth_lease(
1106                actor_id,
1107                now_ms_value,
1108                operation,
1109                local_row,
1110            )
1111        })
1112    }
1113
1114    pub fn apply_syncular_mutations(
1115        &mut self,
1116        mutations: Vec<PendingSyncularMutation>,
1117    ) -> Result<MutationReceipt> {
1118        self.transaction(|tx| tx.apply_syncular_mutations(mutations))
1119    }
1120
1121    pub fn apply_syncular_mutations_with_active_auth_lease(
1122        &mut self,
1123        actor_id: Option<&str>,
1124        now_ms_value: i64,
1125        mutations: Vec<PendingSyncularMutation>,
1126    ) -> Result<MutationReceipt> {
1127        self.transaction(|tx| {
1128            tx.apply_syncular_mutations_with_active_auth_lease(actor_id, now_ms_value, mutations)
1129        })
1130    }
1131
1132    pub fn upsert_auth_lease(&mut self, lease: &AuthLeaseRecord) -> Result<()> {
1133        self.transaction(|tx| tx.upsert_auth_lease(lease))
1134    }
1135
1136    pub fn auth_lease(&mut self, lease_id: &str) -> Result<Option<AuthLeaseRecord>> {
1137        self.transaction(|tx| tx.auth_lease(lease_id))
1138    }
1139
1140    pub fn active_auth_leases(
1141        &mut self,
1142        actor_id: Option<&str>,
1143        now_ms_value: i64,
1144    ) -> Result<Vec<AuthLeaseRecord>> {
1145        self.transaction(|tx| tx.active_auth_leases(actor_id, now_ms_value))
1146    }
1147
1148    pub fn set_outbox_auth_lease(
1149        &mut self,
1150        client_commit_id: &str,
1151        provenance: Option<&AuthLeaseProvenance>,
1152    ) -> Result<()> {
1153        self.transaction(|tx| tx.set_outbox_auth_lease(client_commit_id, provenance))
1154    }
1155
1156    pub fn store_blob_bytes(
1157        &mut self,
1158        data: &[u8],
1159        mime_type: &str,
1160        enqueue_upload: bool,
1161    ) -> Result<BlobRef> {
1162        let size = i64::try_from(data.len()).map_err(|_| {
1163            SyncularError::protocol_message("blob is too large for SQLite size metadata")
1164        })?;
1165        validate_blob_size_bytes(size)?;
1166        let blob = BlobRef {
1167            hash: blob_hash(data),
1168            size,
1169            mime_type: normalize_blob_mime_type(mime_type),
1170            encrypted: false,
1171            key_id: None,
1172        };
1173
1174        self.store_blob_body(&blob, data, enqueue_upload)?;
1175        Ok(blob)
1176    }
1177
1178    pub fn store_blob_body(
1179        &mut self,
1180        blob: &BlobRef,
1181        data: &[u8],
1182        enqueue_upload: bool,
1183    ) -> Result<()> {
1184        self.conn.transaction::<(), SyncularError, _>(|conn| {
1185            cache_blob(conn, blob, data)?;
1186            if enqueue_upload {
1187                enqueue_blob_upload(conn, blob, data)?;
1188            }
1189            Ok(())
1190        })
1191    }
1192
1193    pub fn cache_blob_bytes(&mut self, blob: &BlobRef, data: &[u8]) -> Result<()> {
1194        cache_blob(&mut self.conn, blob, data)
1195    }
1196
1197    pub fn read_cached_blob(&mut self, hash: &str) -> Result<Option<Vec<u8>>> {
1198        validate_blob_hash(hash)?;
1199        let row = sql_query("select body from sync_blob_cache where hash = ?1 limit 1")
1200            .bind::<Text, _>(hash)
1201            .load::<BlobBodyRow>(&mut self.conn)?
1202            .into_iter()
1203            .next();
1204        let Some(row) = row else {
1205            return Ok(None);
1206        };
1207        sql_query("update sync_blob_cache set last_accessed_at = ?1 where hash = ?2")
1208            .bind::<BigInt, _>(now_ms())
1209            .bind::<Text, _>(hash)
1210            .execute(&mut self.conn)?;
1211        Ok(Some(row.body))
1212    }
1213
1214    pub fn is_blob_local(&mut self, hash: &str) -> Result<bool> {
1215        validate_blob_hash(hash)?;
1216        let row = sql_query("select 1 as found from sync_blob_cache where hash = ?1 limit 1")
1217            .bind::<Text, _>(hash)
1218            .load::<BlobFoundRow>(&mut self.conn)?
1219            .into_iter()
1220            .next();
1221        Ok(row.is_some_and(|row| row.found == 1))
1222    }
1223
1224    pub fn requeue_stale_blob_uploads(&mut self) -> Result<()> {
1225        let now = now_ms();
1226        let stale_before = now - BLOB_UPLOAD_STALE_TIMEOUT_MS;
1227        sql_query(
1228            r#"
1229            update sync_blob_outbox
1230            set status = case
1231                    when attempt_count >= ?1 then 'failed'
1232                    else 'pending'
1233                end,
1234                error = case
1235                    when attempt_count >= ?1 then 'Upload timed out while in uploading state'
1236                    else 'Upload timed out while in uploading state; retrying'
1237                end,
1238                next_attempt_at = case
1239                    when attempt_count >= ?1 then 0
1240                    else ?2
1241                end,
1242                updated_at = ?2
1243            where status = 'uploading' and updated_at < ?3
1244            "#,
1245        )
1246        .bind::<Integer, _>(MAX_BLOB_UPLOAD_RETRIES)
1247        .bind::<BigInt, _>(now)
1248        .bind::<BigInt, _>(stale_before)
1249        .execute(&mut self.conn)?;
1250        Ok(())
1251    }
1252
1253    pub fn pending_blob_uploads(
1254        &mut self,
1255        limit: i64,
1256        retry_now: bool,
1257    ) -> Result<Vec<PendingBlobUploadRow>> {
1258        let now = now_ms();
1259        Ok(sql_query(
1260            r#"
1261            select hash, size, mime_type, body, encrypted, key_id, attempt_count
1262            from sync_blob_outbox
1263            where status = 'pending' and attempt_count < ?1 and (?2 or next_attempt_at <= ?3)
1264            order by created_at asc
1265            limit ?4
1266            "#,
1267        )
1268        .bind::<Integer, _>(MAX_BLOB_UPLOAD_RETRIES)
1269        .bind::<Bool, _>(retry_now)
1270        .bind::<BigInt, _>(now)
1271        .bind::<BigInt, _>(limit)
1272        .load::<PendingBlobUploadRow>(&mut self.conn)?)
1273    }
1274
1275    pub fn mark_blob_uploading(&mut self, hash: &str, attempt_count: i32) -> Result<()> {
1276        sql_query(
1277            r#"
1278            update sync_blob_outbox
1279            set status = 'uploading',
1280                attempt_count = ?1,
1281                error = null,
1282                next_attempt_at = 0,
1283                updated_at = ?2
1284            where hash = ?3 and status = 'pending'
1285            "#,
1286        )
1287        .bind::<Integer, _>(attempt_count)
1288        .bind::<BigInt, _>(now_ms())
1289        .bind::<Text, _>(hash)
1290        .execute(&mut self.conn)?;
1291        Ok(())
1292    }
1293
1294    pub fn mark_blob_upload_error(
1295        &mut self,
1296        hash: &str,
1297        status: &str,
1298        error: &str,
1299        next_attempt_at: i64,
1300    ) -> Result<()> {
1301        sql_query(
1302            r#"
1303            update sync_blob_outbox
1304            set status = ?1, error = ?2, next_attempt_at = ?3, updated_at = ?4
1305            where hash = ?5
1306            "#,
1307        )
1308        .bind::<Text, _>(status)
1309        .bind::<Text, _>(error)
1310        .bind::<BigInt, _>(next_attempt_at)
1311        .bind::<BigInt, _>(now_ms())
1312        .bind::<Text, _>(hash)
1313        .execute(&mut self.conn)?;
1314        Ok(())
1315    }
1316
1317    pub fn delete_blob_upload(&mut self, hash: &str) -> Result<()> {
1318        sql_query("delete from sync_blob_outbox where hash = ?1")
1319            .bind::<Text, _>(hash)
1320            .execute(&mut self.conn)?;
1321        Ok(())
1322    }
1323
1324    pub fn blob_upload_queue_stats(&mut self) -> Result<BlobUploadQueueStats> {
1325        let rows = sql_query(
1326            r#"
1327            select status, count(hash) as count
1328            from sync_blob_outbox
1329            group by status
1330            "#,
1331        )
1332        .load::<BlobQueueStatsRow>(&mut self.conn)?;
1333        let mut stats = BlobUploadQueueStats {
1334            pending: 0,
1335            uploading: 0,
1336            failed: 0,
1337        };
1338        for row in rows {
1339            match row.status.as_str() {
1340                "pending" => stats.pending = row.count,
1341                "uploading" => stats.uploading = row.count,
1342                "failed" => stats.failed = row.count,
1343                _ => {}
1344            }
1345        }
1346        Ok(stats)
1347    }
1348
1349    pub fn blob_cache_stats(&mut self) -> Result<BlobCacheStats> {
1350        let row = sql_query(
1351            r#"
1352            select count(hash) as count, coalesce(sum(size), 0) as total_bytes
1353            from sync_blob_cache
1354            "#,
1355        )
1356        .load::<BlobCacheStatsRow>(&mut self.conn)?
1357        .into_iter()
1358        .next()
1359        .unwrap_or(BlobCacheStatsRow {
1360            count: 0,
1361            total_bytes: 0,
1362        });
1363        Ok(BlobCacheStats {
1364            count: row.count,
1365            total_bytes: row.total_bytes,
1366        })
1367    }
1368
1369    fn blob_reference_health_counts(&mut self) -> Result<(i64, i64)> {
1370        let mut checked = 0i64;
1371        let mut invalid = 0i64;
1372        for metadata in self.app_schema.app_table_metadata {
1373            validate_app_table_metadata(metadata)?;
1374            for column in metadata.blob_columns {
1375                validate_identifier(column)?;
1376                let sql = format!(
1377                    "select {column} as value from {table} where {column} is not null and {column} <> ''",
1378                    table = metadata.name
1379                );
1380                let rows = sql_query(sql).load::<NullableStringValueRow>(&mut self.conn)?;
1381                for row in rows {
1382                    let Some(value) = row.value else {
1383                        continue;
1384                    };
1385                    checked += 1;
1386                    let parsed = serde_json::from_str::<BlobRef>(&value);
1387                    match parsed {
1388                        Ok(blob) if validate_blob_ref_size(&blob).is_ok() => {}
1389                        _ => invalid += 1,
1390                    }
1391                }
1392            }
1393        }
1394        Ok((checked, invalid))
1395    }
1396
1397    fn scoped_rows_health_summary(
1398        &mut self,
1399        subscriptions: &[SubscriptionSpec],
1400    ) -> Result<ScopedRowsHealthSummary> {
1401        scoped_rows_health_summary_for_schema(&mut self.conn, self.app_schema, subscriptions)
1402    }
1403
1404    fn clear_orphaned_synced_rows(
1405        &mut self,
1406        subscriptions: &[SubscriptionSpec],
1407        tables: &[String],
1408    ) -> Result<ScopedRowsHealthSummary> {
1409        self.conn.transaction(|conn| {
1410            clear_orphaned_synced_rows_for_schema(conn, self.app_schema, subscriptions, tables)
1411        })
1412    }
1413
1414    pub fn crdt_health_summary(&mut self) -> Result<CrdtHealthSummary> {
1415        let stats = sql_query(
1416            r#"
1417            select
1418              count(*) as document_count,
1419              coalesce(sum(pending_updates), 0) as pending_updates,
1420              coalesce(sum(flushed_updates), 0) as flushed_updates,
1421              coalesce(sum(acked_updates), 0) as acked_updates,
1422              coalesce(sum(log_updates), 0) as log_updates
1423            from sync_crdt_documents
1424            "#,
1425        )
1426        .load::<CrdtHealthStatsRow>(&mut self.conn)?
1427        .into_iter()
1428        .next()
1429        .unwrap_or(CrdtHealthStatsRow {
1430            document_count: 0,
1431            pending_updates: 0,
1432            flushed_updates: 0,
1433            acked_updates: 0,
1434            log_updates: 0,
1435        });
1436        let orphaned_log_entries = sql_query(
1437            r#"
1438            select count(*) as count
1439            from sync_crdt_update_log log
1440            left join sync_crdt_documents documents
1441              on documents.document_key = log.document_key
1442            where documents.document_key is null
1443            "#,
1444        )
1445        .load::<CountRow>(&mut self.conn)?
1446        .into_iter()
1447        .next()
1448        .map(|row| row.count)
1449        .unwrap_or(0);
1450
1451        Ok(CrdtHealthSummary {
1452            document_count: stats.document_count,
1453            pending_updates: stats.pending_updates,
1454            flushed_updates: stats.flushed_updates,
1455            acked_updates: stats.acked_updates,
1456            log_updates: stats.log_updates,
1457            orphaned_documents: self.orphaned_crdt_document_count()?,
1458            orphaned_log_entries,
1459        })
1460    }
1461
1462    fn orphaned_crdt_document_count(&mut self) -> Result<i64> {
1463        let documents = sql_query(
1464            r#"
1465            select app_table, row_id
1466            from sync_crdt_documents
1467            order by app_table asc, row_id asc
1468            "#,
1469        )
1470        .load::<CrdtDocumentIdentityRow>(&mut self.conn)?;
1471        let mut orphaned = 0i64;
1472        for document in documents {
1473            let Some(metadata) = self.app_schema.table_metadata(&document.app_table) else {
1474                orphaned += 1;
1475                continue;
1476            };
1477            if get_app_row_json_generic(&mut self.conn, metadata, &document.row_id)?.is_none() {
1478                orphaned += 1;
1479            }
1480        }
1481        Ok(orphaned)
1482    }
1483
1484    pub fn apply_crdt_field_yjs_update(
1485        &mut self,
1486        field: &CrdtField,
1487        update: YjsUpdateEnvelope,
1488        max_pending_updates: i64,
1489    ) -> Result<String> {
1490        self.transaction(|tx| tx.apply_crdt_field_yjs_update(field, update, max_pending_updates))
1491    }
1492
1493    pub fn crdt_document_snapshot(&mut self, field: &CrdtField) -> Result<CrdtDocumentSnapshot> {
1494        let row = current_app_row_json(
1495            &mut self.conn,
1496            self.app_schema,
1497            field.table(),
1498            field.row_id(),
1499        )?;
1500        let state_base64 = crdt_field_state_base64(field, row.as_ref());
1501        let state_vector_base64 = yjs_state_vector_base64(state_base64.as_deref())?;
1502        upsert_crdt_document_snapshot(
1503            &mut self.conn,
1504            field,
1505            state_base64.as_deref(),
1506            &state_vector_base64,
1507            None,
1508        )?;
1509        select_crdt_document_snapshot(&mut self.conn, &field.document_key())
1510    }
1511
1512    pub fn crdt_update_log(
1513        &mut self,
1514        field: &CrdtField,
1515        limit: i64,
1516    ) -> Result<Vec<CrdtUpdateLogEntry>> {
1517        sql_query(
1518            r#"
1519            select id, document_key, update_id, client_commit_id, origin, status, update_base64,
1520                   state_vector_base64, created_at, flushed_at, acked_at
1521            from sync_crdt_update_log
1522            where document_key = ?1
1523            order by id asc
1524            limit ?2
1525            "#,
1526        )
1527        .bind::<Text, _>(field.document_key())
1528        .bind::<BigInt, _>(limit.max(0))
1529        .load::<CrdtUpdateLogRow>(&mut self.conn)?
1530        .into_iter()
1531        .map(CrdtUpdateLogEntry::try_from)
1532        .collect()
1533    }
1534
1535    pub fn compact_crdt_document(&mut self, field: &CrdtField) -> Result<CrdtDocumentSnapshot> {
1536        let row = current_app_row_json(
1537            &mut self.conn,
1538            self.app_schema,
1539            field.table(),
1540            field.row_id(),
1541        )?;
1542        let state_base64 = crdt_field_state_base64(field, row.as_ref());
1543        let state_vector_base64 = yjs_state_vector_base64(state_base64.as_deref())?;
1544        upsert_crdt_document_snapshot(
1545            &mut self.conn,
1546            field,
1547            state_base64.as_deref(),
1548            &state_vector_base64,
1549            Some(now_ms()),
1550        )?;
1551        select_crdt_document_snapshot(&mut self.conn, &field.document_key())
1552    }
1553
1554    pub fn prune_blob_cache(&mut self, max_bytes: i64) -> Result<i64> {
1555        if max_bytes <= 0 {
1556            return Ok(0);
1557        }
1558        let stats = self.blob_cache_stats()?;
1559        if stats.total_bytes <= max_bytes {
1560            return Ok(0);
1561        }
1562        let target = stats.total_bytes - max_bytes;
1563        let entries = sql_query(
1564            r#"
1565            select hash, size
1566            from sync_blob_cache
1567            order by last_accessed_at asc
1568            "#,
1569        )
1570        .load::<BlobCacheEntryRow>(&mut self.conn)?;
1571        let mut freed = 0i64;
1572        for entry in entries {
1573            if freed >= target {
1574                break;
1575            }
1576            sql_query("delete from sync_blob_cache where hash = ?1")
1577                .bind::<Text, _>(&entry.hash)
1578                .execute(&mut self.conn)?;
1579            freed += entry.size;
1580        }
1581        Ok(freed)
1582    }
1583
1584    pub fn prune_crdt_update_log(&mut self, cutoff: i64) -> Result<i64> {
1585        let deleted = sql_query(
1586            r#"
1587            delete from sync_crdt_update_log
1588            where status in ('acked', 'pruned')
1589              and coalesce(acked_at, flushed_at, created_at) <= ?1
1590            "#,
1591        )
1592        .bind::<BigInt, _>(cutoff)
1593        .execute(&mut self.conn)? as i64;
1594        refresh_all_crdt_document_counts(&mut self.conn)?;
1595        Ok(deleted)
1596    }
1597
1598    pub fn clear_blob_cache(&mut self) -> Result<()> {
1599        sql_query("delete from sync_blob_cache").execute(&mut self.conn)?;
1600        Ok(())
1601    }
1602
1603    pub fn encrypted_crdt_stream_stats(
1604        &mut self,
1605        partition_id: &str,
1606        stream_id: &str,
1607    ) -> Result<EncryptedCrdtStreamStats> {
1608        Ok(sql_query(
1609            r#"
1610            select
1611                (select count(*) from sync_crdt_updates
1612                 where partition_id = ?1 and stream_id = ?2) as update_count,
1613                (select count(*) from sync_crdt_checkpoints
1614                 where partition_id = ?1 and stream_id = ?2) as checkpoint_count,
1615                (select count(*) from sync_crdt_updates
1616                 where partition_id = ?1 and stream_id = ?2
1617                   and server_seq is not null
1618                   and server_seq > coalesce((
1619                       select max(covers_seq) from sync_crdt_checkpoints
1620                       where partition_id = ?1 and stream_id = ?2
1621                   ), 0)) as checkpointable_update_count,
1622                (select max(server_seq) from sync_crdt_updates
1623                 where partition_id = ?1 and stream_id = ?2) as max_server_seq,
1624                (select max(covers_seq) from sync_crdt_checkpoints
1625                 where partition_id = ?1 and stream_id = ?2) as latest_checkpoint_covers_seq
1626            "#,
1627        )
1628        .bind::<Text, _>(partition_id)
1629        .bind::<Text, _>(stream_id)
1630        .load::<EncryptedCrdtStreamStatsRow>(&mut self.conn)?
1631        .into_iter()
1632        .next()
1633        .map(Into::into)
1634        .unwrap_or_default())
1635    }
1636
1637    pub fn prune_encrypted_crdt_updates(&mut self) -> Result<i64> {
1638        Ok(sql_query(
1639            r#"
1640            delete from sync_crdt_updates
1641            where server_seq is not null
1642              and exists (
1643                select 1
1644                from sync_crdt_checkpoints
1645                where sync_crdt_checkpoints.partition_id = sync_crdt_updates.partition_id
1646                  and sync_crdt_checkpoints.stream_id = sync_crdt_updates.stream_id
1647                  and sync_crdt_checkpoints.key_id = sync_crdt_updates.key_id
1648                  and sync_crdt_checkpoints.server_seq is not null
1649                  and sync_crdt_checkpoints.covers_seq >= sync_crdt_updates.server_seq
1650              )
1651            "#,
1652        )
1653        .execute(&mut self.conn)? as i64)
1654    }
1655
1656    pub fn prune_encrypted_crdt_checkpoints(&mut self, keep_per_stream: i64) -> Result<i64> {
1657        Ok(sql_query(
1658            r#"
1659            delete from sync_crdt_checkpoints
1660            where checkpoint_id in (
1661                select checkpoint_id
1662                from (
1663                    select
1664                        checkpoint_id,
1665                        row_number() over (
1666                            partition by partition_id, stream_id, key_id
1667                            order by covers_seq desc, coalesce(server_seq, 0) desc, seq desc
1668                        ) as checkpoint_rank
1669                    from sync_crdt_checkpoints
1670                ) ranked
1671                where checkpoint_rank > ?1
1672            )
1673            "#,
1674        )
1675        .bind::<BigInt, _>(keep_per_stream)
1676        .execute(&mut self.conn)? as i64)
1677    }
1678
1679    pub fn compact_storage(
1680        &mut self,
1681        options: &StorageCompactionOptions,
1682    ) -> Result<StorageCompactionReport> {
1683        let cutoff = options.cutoff_ms_now()?;
1684        let mut report = StorageCompactionReport::default();
1685
1686        if options.should_prune_acked_outbox() {
1687            let cutoff = required_compaction_cutoff(cutoff, "acked outbox")?;
1688            report.acked_outbox_commits_deleted = sql_query(
1689                "delete from sync_outbox_commits where status = 'acked' and updated_at <= ?1",
1690            )
1691            .bind::<BigInt, _>(cutoff)
1692            .execute(&mut self.conn)? as i64;
1693        }
1694
1695        if options.should_prune_resolved_conflicts() {
1696            let cutoff = required_compaction_cutoff(cutoff, "resolved conflicts")?;
1697            report.resolved_conflicts_deleted = sql_query(
1698                "delete from sync_conflicts where resolved_at is not null and resolved_at <= ?1",
1699            )
1700            .bind::<BigInt, _>(cutoff)
1701            .execute(&mut self.conn)? as i64;
1702        }
1703
1704        if options.should_prune_failed_blob_uploads() {
1705            let cutoff = required_compaction_cutoff(cutoff, "failed blob uploads")?;
1706            report.failed_blob_uploads_deleted = sql_query(
1707                "delete from sync_blob_outbox where status = 'failed' and updated_at <= ?1",
1708            )
1709            .bind::<BigInt, _>(cutoff)
1710            .execute(&mut self.conn)? as i64;
1711        }
1712
1713        if options.should_prune_inactive_subscription_states() {
1714            let cutoff = required_compaction_cutoff(cutoff, "inactive subscription states")?;
1715            report.inactive_subscription_states_deleted = sql_query(
1716                "delete from sync_subscription_state where status != 'active' and updated_at <= ?1",
1717            )
1718            .bind::<BigInt, _>(cutoff)
1719            .execute(&mut self.conn)?
1720                as i64;
1721        }
1722
1723        if options.should_prune_tombstones() {
1724            let max_server_version = options.max_tombstone_server_version.ok_or_else(|| {
1725                SyncularError::config(
1726                    "storage compaction tombstone cleanup requires maxTombstoneServerVersion",
1727                )
1728            })?;
1729            for statement in
1730                tombstone_delete_statements(self.app_schema.app_table_metadata, max_server_version)?
1731            {
1732                report.tombstone_rows_deleted +=
1733                    sql_query(statement).execute(&mut self.conn)? as i64;
1734            }
1735        }
1736
1737        if let Some(max_bytes) = options.max_blob_cache_bytes {
1738            report.blob_cache_bytes_pruned = self.prune_blob_cache(max_bytes)?;
1739        }
1740
1741        if options.should_prune_encrypted_crdt_updates() {
1742            report.encrypted_crdt_updates_deleted = self.prune_encrypted_crdt_updates()?;
1743        }
1744
1745        if let Some(keep) = options.encrypted_crdt_checkpoint_keep_count()? {
1746            report.encrypted_crdt_checkpoints_deleted =
1747                self.prune_encrypted_crdt_checkpoints(keep)?;
1748        }
1749
1750        if options.should_prune_crdt_update_log() {
1751            let cutoff = required_compaction_cutoff(cutoff, "CRDT update log")?;
1752            report.crdt_update_log_deleted = self.prune_crdt_update_log(cutoff)?;
1753        }
1754
1755        Ok(report)
1756    }
1757
1758    pub fn compact_storage_json(&mut self, options_json: Option<&str>) -> Result<String> {
1759        let options = StorageCompactionOptions::from_json(options_json)?;
1760        Ok(serde_json::to_string(&self.compact_storage(&options)?)?)
1761    }
1762}
1763
1764pub fn apply_sqlite_runtime_pragmas(conn: &mut SqliteConnection) -> Result<()> {
1765    conn.batch_execute(&format!(
1766        r#"
1767        pragma busy_timeout = {SQLITE_BUSY_TIMEOUT_MS};
1768        pragma foreign_keys = on;
1769        pragma journal_mode = wal;
1770        pragma synchronous = normal;
1771        "#
1772    ))?;
1773    Ok(())
1774}
1775
1776fn sqlite_runtime_pragma_report(conn: &mut SqliteConnection) -> Result<SqliteRuntimePragmaReport> {
1777    Ok(SqliteRuntimePragmaReport {
1778        journal_mode: sql_query("pragma journal_mode")
1779            .load::<JournalModePragmaRow>(conn)?
1780            .into_iter()
1781            .next()
1782            .map(|row| row.journal_mode)
1783            .unwrap_or_default(),
1784        foreign_keys: sql_query("pragma foreign_keys")
1785            .load::<ForeignKeysPragmaRow>(conn)?
1786            .into_iter()
1787            .next()
1788            .map(|row| row.foreign_keys)
1789            .unwrap_or_default(),
1790        busy_timeout: sql_query("pragma busy_timeout")
1791            .load::<BusyTimeoutPragmaRow>(conn)?
1792            .into_iter()
1793            .next()
1794            .map(|row| row.timeout)
1795            .unwrap_or_default(),
1796        synchronous: sql_query("pragma synchronous")
1797            .load::<SynchronousPragmaRow>(conn)?
1798            .into_iter()
1799            .next()
1800            .map(|row| row.synchronous)
1801            .unwrap_or_default(),
1802    })
1803}
1804
1805fn cache_blob(conn: &mut SqliteConnection, blob: &BlobRef, data: &[u8]) -> Result<()> {
1806    validate_blob_bytes(blob, data)?;
1807    let now = now_ms();
1808    sql_query(
1809        r#"
1810        insert into sync_blob_cache
1811            (hash, size, mime_type, body, encrypted, key_id, cached_at, last_accessed_at)
1812        values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1813        on conflict(hash) do update set
1814            size = excluded.size,
1815            mime_type = excluded.mime_type,
1816            body = excluded.body,
1817            encrypted = excluded.encrypted,
1818            key_id = excluded.key_id,
1819            last_accessed_at = excluded.last_accessed_at
1820        "#,
1821    )
1822    .bind::<Text, _>(&blob.hash)
1823    .bind::<BigInt, _>(blob.size)
1824    .bind::<Text, _>(&blob.mime_type)
1825    .bind::<Binary, _>(data)
1826    .bind::<Integer, _>(if blob.encrypted { 1 } else { 0 })
1827    .bind::<diesel::sql_types::Nullable<Text>, _>(blob.key_id.as_deref())
1828    .bind::<BigInt, _>(now)
1829    .bind::<BigInt, _>(now)
1830    .execute(conn)?;
1831    Ok(())
1832}
1833
1834fn enqueue_blob_upload(conn: &mut SqliteConnection, blob: &BlobRef, data: &[u8]) -> Result<()> {
1835    let now = now_ms();
1836    sql_query(
1837        r#"
1838        insert into sync_blob_outbox
1839            (hash, size, mime_type, body, encrypted, key_id, status, attempt_count, error, created_at, updated_at, next_attempt_at)
1840        values (?1, ?2, ?3, ?4, ?5, ?6, 'pending', 0, null, ?7, ?8, 0)
1841        on conflict(hash) do nothing
1842        "#,
1843    )
1844    .bind::<Text, _>(&blob.hash)
1845    .bind::<BigInt, _>(blob.size)
1846    .bind::<Text, _>(&blob.mime_type)
1847    .bind::<Binary, _>(data)
1848    .bind::<Integer, _>(if blob.encrypted { 1 } else { 0 })
1849    .bind::<diesel::sql_types::Nullable<Text>, _>(blob.key_id.as_deref())
1850    .bind::<BigInt, _>(now)
1851    .bind::<BigInt, _>(now)
1852    .execute(conn)?;
1853    Ok(())
1854}
1855
1856fn mutation_has_server_merge_yjs_payload(
1857    mutation: &PendingSyncularMutation,
1858    metadata: &AppTableMetadata,
1859) -> bool {
1860    operation_payload_has_server_merge_yjs_payload(mutation.payload.as_ref(), metadata)
1861}
1862
1863fn operation_payload_has_server_merge_yjs_payload(
1864    payload: Option<&Value>,
1865    metadata: &AppTableMetadata,
1866) -> bool {
1867    let Some(Value::Object(payload)) = payload else {
1868        return false;
1869    };
1870    let Some(Value::Object(envelope)) = payload.get(YJS_PAYLOAD_KEY) else {
1871        return false;
1872    };
1873
1874    metadata.crdt_yjs_fields.iter().any(|field| {
1875        (field.sync_mode == "server-merge" || field.sync_mode.is_empty())
1876            && envelope.contains_key(field.field)
1877    })
1878}
1879
1880fn collect_server_merge_yjs_updates(
1881    app_schema: AppSchema,
1882    metadata: &'static AppTableMetadata,
1883    operation: &SyncOperation,
1884) -> Result<Vec<(CrdtField, YjsUpdateEnvelope)>> {
1885    let Some(Value::Object(payload)) = operation.payload.as_ref() else {
1886        return Ok(Vec::new());
1887    };
1888    let Some(Value::Object(envelope)) = payload.get(YJS_PAYLOAD_KEY) else {
1889        return Ok(Vec::new());
1890    };
1891
1892    let mut updates = Vec::new();
1893    for field_metadata in metadata.crdt_yjs_fields.iter().filter(|field| {
1894        (field.sync_mode == "server-merge" || field.sync_mode.is_empty())
1895            && envelope.contains_key(field.field)
1896    }) {
1897        let field = validate_crdt_field(
1898            app_schema,
1899            &CrdtFieldId::new(&operation.table, &operation.row_id, field_metadata.field),
1900        )?;
1901        let Some(value) = envelope.get(field_metadata.field) else {
1902            continue;
1903        };
1904        match value {
1905            Value::Array(items) => {
1906                for item in items {
1907                    updates.push((field.clone(), serde_json::from_value(item.clone())?));
1908                }
1909            }
1910            Value::Null => {}
1911            item => updates.push((field, serde_json::from_value(item.clone())?)),
1912        }
1913    }
1914
1915    Ok(updates)
1916}
1917
1918impl SyncStore for DieselSqliteStore {
1919    type Tx<'a> = DieselSqliteTx<'a>;
1920
1921    fn transaction<T>(&mut self, f: impl FnOnce(&mut Self::Tx<'_>) -> Result<T>) -> Result<T> {
1922        let app_schema = self.app_schema;
1923        self.conn.transaction::<T, SyncularError, _>(|conn| {
1924            let mut tx = DieselSqliteTx { conn, app_schema };
1925            f(&mut tx)
1926        })
1927    }
1928
1929    fn supports_sqlite_snapshot_artifacts(&self) -> bool {
1930        true
1931    }
1932
1933    fn decode_sqlite_snapshot_artifact_rows(
1934        &self,
1935        table: &str,
1936        artifact_bytes: &[u8],
1937    ) -> Result<Vec<Value>> {
1938        let mut artifact_conn = SqliteConnection::establish(":memory:")
1939            .map_err(|err| SyncularError::storage(err).context("open sqlite snapshot artifact"))?;
1940        artifact_conn
1941            .deserialize_readonly_database_from_buffer(artifact_bytes)
1942            .map_err(|err| {
1943                SyncularError::storage(err).context("deserialize sqlite snapshot artifact")
1944            })?;
1945        list_app_rows_json(&mut artifact_conn, self.app_schema, table)
1946    }
1947}
1948
1949#[cfg(feature = "demo-todo-native-fixture")]
1950impl DemoTaskStore for DieselSqliteStore {
1951    fn add_task(
1952        &mut self,
1953        actor_id: &str,
1954        project_id: Option<&str>,
1955        task_id: String,
1956        title_value: String,
1957    ) -> Result<()> {
1958        self.transaction(|tx| tx.add_task(actor_id, project_id, task_id, title_value))
1959    }
1960
1961    fn list_tasks(&mut self) -> Result<Vec<Task>> {
1962        list_tasks(&mut self.conn)
1963    }
1964
1965    fn patch_task_title(
1966        &mut self,
1967        project_id: Option<&str>,
1968        task_id: String,
1969        title_value: String,
1970    ) -> Result<()> {
1971        self.transaction(|tx| tx.patch_task_title(project_id, task_id, title_value))
1972    }
1973}
1974
1975impl SyncStateStore for DieselSqliteStore {
1976    fn applied_migrations(&mut self) -> Result<Vec<AppliedMigration>> {
1977        let rows = sql_query(
1978            r#"
1979            select version, name, checksum, applied_at
1980            from sync_migrations
1981            order by version asc
1982            "#,
1983        )
1984        .load::<AppliedMigrationRow>(&mut self.conn)?;
1985
1986        Ok(rows.into_iter().map(AppliedMigration::from).collect())
1987    }
1988
1989    fn app_schema_state(&mut self, _current_schema_version: i32) -> Result<AppSchemaState> {
1990        DieselSqliteStore::app_schema_state(self)
1991    }
1992
1993    fn outbox_summaries(&mut self) -> Result<Vec<OutboxSummary>> {
1994        let rows = sql_query(
1995            r#"
1996            select client_commit_id, status, schema_version,
1997                   lease_id, lease_expires_at_ms, lease_status_at_enqueue,
1998                   lease_scope_summary_json, lease_token
1999            from sync_outbox_commits
2000            order by created_at asc
2001            "#,
2002        )
2003        .load::<OutboxSummaryRow>(&mut self.conn)?;
2004
2005        Ok(rows.into_iter().map(OutboxSummary::from).collect())
2006    }
2007
2008    fn next_outbox_retry_at(&mut self) -> Result<Option<i64>> {
2009        Ok(sql_query(
2010            r#"
2011            select min(next_attempt_at) as next_attempt_at
2012            from sync_outbox_commits
2013            where status = 'pending' and attempt_count > 0 and attempt_count < ?1
2014            "#,
2015        )
2016        .bind::<Integer, _>(MAX_SYNC_RETRIES)
2017        .load::<NextRetryAtRow>(&mut self.conn)?
2018        .into_iter()
2019        .next()
2020        .and_then(|row| row.next_attempt_at))
2021    }
2022
2023    fn next_blob_upload_retry_at(&mut self) -> Result<Option<i64>> {
2024        Ok(sql_query(
2025            r#"
2026            select min(next_attempt_at) as next_attempt_at
2027            from sync_blob_outbox
2028            where status = 'pending' and attempt_count > 0 and attempt_count < ?1
2029            "#,
2030        )
2031        .bind::<Integer, _>(MAX_BLOB_UPLOAD_RETRIES)
2032        .load::<NextRetryAtRow>(&mut self.conn)?
2033        .into_iter()
2034        .next()
2035        .and_then(|row| row.next_attempt_at))
2036    }
2037
2038    fn conflict_summaries(&mut self) -> Result<Vec<ConflictSummary>> {
2039        let rows = sql_query(
2040            r#"
2041            select id, client_commit_id, op_index, result_status, message, code, server_version,
2042                   resolved_at, resolution
2043            from sync_conflicts
2044            where resolved_at is null
2045            order by created_at desc
2046            "#,
2047        )
2048        .load::<ConflictSummaryRow>(&mut self.conn)?;
2049
2050        Ok(rows.into_iter().map(ConflictSummary::from).collect())
2051    }
2052
2053    fn blob_health_summary(&mut self) -> Result<Option<BlobHealthSummary>> {
2054        let upload = self.blob_upload_queue_stats()?;
2055        let cache = self.blob_cache_stats()?;
2056        let (checked_references, invalid_references) = self.blob_reference_health_counts()?;
2057        Ok(Some(BlobHealthSummary {
2058            cache_count: cache.count,
2059            cache_bytes: cache.total_bytes,
2060            upload_pending: upload.pending,
2061            upload_uploading: upload.uploading,
2062            upload_failed: upload.failed,
2063            checked_references,
2064            invalid_references,
2065        }))
2066    }
2067
2068    fn crdt_health_summary(&mut self) -> Result<Option<CrdtHealthSummary>> {
2069        Ok(Some(DieselSqliteStore::crdt_health_summary(self)?))
2070    }
2071
2072    fn scoped_rows_health_summary(
2073        &mut self,
2074        subscriptions: &[SubscriptionSpec],
2075    ) -> Result<Option<ScopedRowsHealthSummary>> {
2076        Ok(Some(DieselSqliteStore::scoped_rows_health_summary(
2077            self,
2078            subscriptions,
2079        )?))
2080    }
2081
2082    fn clear_orphaned_synced_rows(
2083        &mut self,
2084        subscriptions: &[SubscriptionSpec],
2085        tables: &[String],
2086    ) -> Result<ScopedRowsHealthSummary> {
2087        DieselSqliteStore::clear_orphaned_synced_rows(self, subscriptions, tables)
2088    }
2089
2090    fn resolve_conflict(&mut self, id_value: &str, resolution_value: &str) -> Result<()> {
2091        sql_query(
2092            r#"
2093            update sync_conflicts
2094            set resolved_at = ?1, resolution = ?2
2095            where id = ?3 and resolved_at is null
2096            "#,
2097        )
2098        .bind::<BigInt, _>(now_ms())
2099        .bind::<Text, _>(resolution_value)
2100        .bind::<Text, _>(id_value)
2101        .execute(&mut self.conn)?;
2102        Ok(())
2103    }
2104
2105    fn retry_conflict_keep_local(&mut self, id_value: &str) -> Result<String> {
2106        self.transaction(|tx| tx.retry_conflict_keep_local(id_value))
2107    }
2108}
2109
2110impl<'a> DieselSqliteTx<'a> {
2111    #[cfg(feature = "demo-todo-native-fixture")]
2112    fn add_task(
2113        &mut self,
2114        actor_id: &str,
2115        project_id: Option<&str>,
2116        task_id: String,
2117        title_value: String,
2118    ) -> Result<()> {
2119        let operation = insert_local_task(self.conn, actor_id, project_id, &task_id, &title_value)?;
2120        self.enqueue_outbox(vec![operation])?;
2121
2122        Ok(())
2123    }
2124
2125    #[cfg(feature = "demo-todo-native-fixture")]
2126    fn patch_task_title(
2127        &mut self,
2128        project_id: Option<&str>,
2129        task_id: String,
2130        title_value: String,
2131    ) -> Result<()> {
2132        let operation = patch_local_task_title(self.conn, project_id, &task_id, &title_value)?;
2133        self.enqueue_outbox(vec![operation])?;
2134
2135        Ok(())
2136    }
2137
2138    fn enqueue_outbox_receipt(
2139        &mut self,
2140        operations: Vec<SyncOperation>,
2141    ) -> Result<MutationReceipt> {
2142        use schema::sync_outbox_commits::dsl as o;
2143
2144        self.assert_outbox_capacity()?;
2145        let id = Uuid::new_v4().to_string();
2146        let client_commit_id = Uuid::new_v4().to_string();
2147        let now = now_ms();
2148        let row = NewOutboxCommit {
2149            id: id.clone(),
2150            client_commit_id: client_commit_id.clone(),
2151            status: "pending".to_string(),
2152            operations_json: sync_operations_json_for_outbox(&operations)?,
2153            last_response_json: None,
2154            error: None,
2155            created_at: now,
2156            updated_at: now,
2157            attempt_count: 0,
2158            acked_commit_seq: None,
2159            schema_version: self.app_schema.current_schema_version(),
2160            next_attempt_at: 0,
2161            lease_id: None,
2162            lease_expires_at_ms: None,
2163            lease_status_at_enqueue: None,
2164            lease_scope_summary_json: None,
2165            lease_token: None,
2166        };
2167
2168        diesel::insert_into(o::sync_outbox_commits)
2169            .values(row)
2170            .execute(self.conn)?;
2171
2172        Ok(MutationReceipt {
2173            commit_id: id,
2174            client_commit_id,
2175        })
2176    }
2177
2178    fn enqueue_outbox(&mut self, operations: Vec<SyncOperation>) -> Result<String> {
2179        Ok(self.enqueue_outbox_receipt(operations)?.client_commit_id)
2180    }
2181
2182    fn assert_outbox_capacity(&mut self) -> Result<()> {
2183        let unresolved = sql_query(
2184            r#"
2185            select count(*) as count
2186            from sync_outbox_commits
2187            where status <> 'acked'
2188            "#,
2189        )
2190        .load::<CountRow>(self.conn)?
2191        .into_iter()
2192        .next()
2193        .map(|row| row.count)
2194        .unwrap_or(0);
2195        validate_unresolved_outbox_capacity(usize::try_from(unresolved).unwrap_or(usize::MAX))
2196    }
2197
2198    fn retry_conflict_keep_local(&mut self, conflict_id: &str) -> Result<String> {
2199        let row = sql_query(
2200            r#"
2201            select c.server_version as server_version,
2202                   c.op_index as op_index,
2203                   o.operations_json as operations_json
2204            from sync_conflicts c
2205            join sync_outbox_commits o on o.id = c.outbox_commit_id
2206            where c.id = ?1 and c.resolved_at is null
2207            limit 1
2208            "#,
2209        )
2210        .bind::<Text, _>(conflict_id)
2211        .load::<ConflictRetryRow>(self.conn)?
2212        .into_iter()
2213        .next()
2214        .ok_or_else(|| {
2215            SyncularError::config(format!("pending conflict not found: {conflict_id}"))
2216        })?;
2217
2218        let mut operations: Vec<SyncOperation> = serde_json::from_str(&row.operations_json)?;
2219        let operation = operations.get_mut(row.op_index as usize).ok_or_else(|| {
2220            SyncularError::protocol_message(format!(
2221                "conflict op index {} out of bounds",
2222                row.op_index
2223            ))
2224        })?;
2225        operation.base_version = Some(row.server_version);
2226        let retry_client_commit_id = self.enqueue_outbox(vec![operation.clone()])?;
2227
2228        sql_query(
2229            r#"
2230            update sync_conflicts
2231            set resolved_at = ?1, resolution = 'keep-local'
2232            where id = ?2 and resolved_at is null
2233            "#,
2234        )
2235        .bind::<BigInt, _>(now_ms())
2236        .bind::<Text, _>(conflict_id)
2237        .execute(self.conn)?;
2238
2239        Ok(retry_client_commit_id)
2240    }
2241
2242    fn apply_local_operation(
2243        &mut self,
2244        operation: SyncOperation,
2245        local_row: Option<Value>,
2246    ) -> Result<String> {
2247        if is_encrypted_crdt_system_table(&operation.table) {
2248            match operation.op.as_str() {
2249                "upsert" => {
2250                    let row = apply_encrypted_crdt_system_row(
2251                        self.conn,
2252                        &operation.table,
2253                        &operation.row_id,
2254                        local_row.as_ref().or(operation.payload.as_ref()),
2255                        None,
2256                    )?;
2257                    materialize_encrypted_crdt_system_row(
2258                        self.conn,
2259                        self.app_schema,
2260                        &operation.table,
2261                        &row,
2262                    )?;
2263                }
2264                "delete" => delete_encrypted_crdt_system_row(
2265                    self.conn,
2266                    &operation.table,
2267                    &operation.row_id,
2268                )?,
2269                op => {
2270                    return Err(SyncularError::protocol_message(format!(
2271                        "unsupported local operation: {op}"
2272                    )));
2273                }
2274            }
2275            return self.enqueue_outbox(vec![operation]);
2276        }
2277
2278        let metadata = self
2279            .app_schema
2280            .table_metadata(&operation.table)
2281            .ok_or_else(|| {
2282                SyncularError::config(format!("unknown generated app table: {}", operation.table))
2283            })?;
2284        let current_row = self.current_row_json(&operation.table, &operation.row_id)?;
2285        let current_server_version =
2286            self.row_server_version(&operation.table, current_row.as_ref());
2287        let yjs_updates = collect_server_merge_yjs_updates(self.app_schema, metadata, &operation)?;
2288        let local_row = transform_local_row_for_metadata(
2289            &operation.table,
2290            &operation.row_id,
2291            local_row,
2292            operation.payload.as_ref(),
2293            current_row.as_ref(),
2294            metadata,
2295        )?;
2296
2297        match operation.op.as_str() {
2298            "upsert" => {
2299                let row = local_row.unwrap_or_else(|| {
2300                    merged_local_row(
2301                        metadata,
2302                        current_row,
2303                        &operation.row_id,
2304                        operation.payload.as_ref(),
2305                    )
2306                });
2307                let local_server_version = if operation_payload_has_server_merge_yjs_payload(
2308                    operation.payload.as_ref(),
2309                    metadata,
2310                ) {
2311                    current_server_version.or(operation.base_version)
2312                } else {
2313                    Some(operation.base_version.unwrap_or(0))
2314                };
2315                self.upsert_row(&operation.table, &row, local_server_version)?;
2316            }
2317            "delete" => {
2318                self.apply_change(&SyncChange {
2319                    table: operation.table.clone(),
2320                    row_id: operation.row_id.clone(),
2321                    op: "delete".to_string(),
2322                    row_json: None,
2323                    row_version: operation.base_version,
2324                    scopes: Map::new(),
2325                })?;
2326            }
2327            op => {
2328                return Err(SyncularError::protocol_message(format!(
2329                    "unsupported local operation: {op}"
2330                )));
2331            }
2332        }
2333
2334        let client_commit_id = self.enqueue_outbox(vec![operation])?;
2335        for (field, update) in yjs_updates {
2336            let row = self.current_row_json(field.table(), field.row_id())?;
2337            let state_base64 = crdt_field_state_base64(&field, row.as_ref());
2338            let state_vector_base64 = yjs_state_vector_base64(state_base64.as_deref())?;
2339            record_crdt_update_log(
2340                self.conn,
2341                &field,
2342                &update,
2343                Some(&client_commit_id),
2344                CrdtUpdateOrigin::Local,
2345                CrdtUpdateStatus::Pending,
2346                state_base64.as_deref(),
2347                &state_vector_base64,
2348            )?;
2349        }
2350        Ok(client_commit_id)
2351    }
2352
2353    fn apply_local_operation_with_active_auth_lease(
2354        &mut self,
2355        actor_id: Option<&str>,
2356        now_ms_value: i64,
2357        operation: SyncOperation,
2358        local_row: Option<Value>,
2359    ) -> Result<String> {
2360        let pre_delete_scope = if operation.op == "delete" {
2361            Some(self.operation_scope_for_current_row(&operation)?)
2362        } else {
2363            None
2364        };
2365        let client_commit_id = self.apply_local_operation(operation.clone(), local_row)?;
2366        let operation_scope = pre_delete_scope
2367            .map(Ok)
2368            .unwrap_or_else(|| self.operation_scope_for_current_row(&operation))?;
2369        let provenance = self.select_active_auth_lease_for_operations(
2370            ActiveAuthLeasePolicy {
2371                actor_id,
2372                now_ms: now_ms_value,
2373            },
2374            &[operation_scope],
2375        )?;
2376        self.set_outbox_auth_lease(&client_commit_id, Some(&provenance))?;
2377        Ok(client_commit_id)
2378    }
2379
2380    fn operation_scope_for_current_row(
2381        &mut self,
2382        operation: &SyncOperation,
2383    ) -> Result<MutationOperationScope> {
2384        if is_encrypted_crdt_system_table(&operation.table) {
2385            return Ok(system_table_operation_scope(operation));
2386        }
2387        let metadata = self
2388            .app_schema
2389            .table_metadata(&operation.table)
2390            .ok_or_else(|| {
2391                SyncularError::config(format!("unknown generated app table: {}", operation.table))
2392            })?;
2393        let row = self.current_row_json(&operation.table, &operation.row_id)?;
2394        Ok(app_table_operation_scope(
2395            metadata,
2396            operation,
2397            row.as_ref(),
2398            row.is_some() || operation.op == "upsert",
2399        ))
2400    }
2401
2402    fn apply_crdt_field_yjs_update(
2403        &mut self,
2404        field: &CrdtField,
2405        update: YjsUpdateEnvelope,
2406        max_pending_updates: i64,
2407    ) -> Result<String> {
2408        assert_crdt_document_capacity(self.conn, &field.document_key(), max_pending_updates)?;
2409        let mut envelope = Map::new();
2410        envelope.insert(field.field().to_string(), serde_json::to_value(&update)?);
2411        let mut payload = Map::new();
2412        payload.insert(YJS_PAYLOAD_KEY.to_string(), Value::Object(envelope));
2413        let operation = SyncOperation {
2414            table: field.table().to_string(),
2415            row_id: field.row_id().to_string(),
2416            op: "upsert".to_string(),
2417            payload: Some(Value::Object(payload)),
2418            base_version: None,
2419        };
2420        let client_commit_id = self.apply_local_operation(operation, None)?;
2421        let row = self.current_row_json(field.table(), field.row_id())?;
2422        let state_base64 = crdt_field_state_base64(field, row.as_ref());
2423        let state_vector_base64 = yjs_state_vector_base64(state_base64.as_deref())?;
2424        record_crdt_update_log(
2425            self.conn,
2426            field,
2427            &update,
2428            Some(&client_commit_id),
2429            CrdtUpdateOrigin::Local,
2430            CrdtUpdateStatus::Pending,
2431            state_base64.as_deref(),
2432            &state_vector_base64,
2433        )?;
2434        Ok(client_commit_id)
2435    }
2436
2437    fn apply_syncular_mutations(
2438        &mut self,
2439        mutations: Vec<PendingSyncularMutation>,
2440    ) -> Result<MutationReceipt> {
2441        self.apply_syncular_mutations_inner(mutations, None)
2442    }
2443
2444    fn apply_syncular_mutations_with_active_auth_lease(
2445        &mut self,
2446        actor_id: Option<&str>,
2447        now_ms_value: i64,
2448        mutations: Vec<PendingSyncularMutation>,
2449    ) -> Result<MutationReceipt> {
2450        self.apply_syncular_mutations_inner(
2451            mutations,
2452            Some(ActiveAuthLeasePolicy {
2453                actor_id,
2454                now_ms: now_ms_value,
2455            }),
2456        )
2457    }
2458
2459    fn apply_syncular_mutations_with_command_history(
2460        &mut self,
2461        mutation_scope: &str,
2462        actor_id: Option<&str>,
2463        now_ms_value: i64,
2464        mutations: Vec<PendingSyncularMutation>,
2465    ) -> Result<MutationReceipt> {
2466        let pending = self.command_history_pending_entries(&mutations)?;
2467        let receipt = match mutation_scope {
2468            "mutations" => self.apply_syncular_mutations_inner(mutations, None)?,
2469            "leasedMutations" => self.apply_syncular_mutations_inner(
2470                mutations,
2471                Some(ActiveAuthLeasePolicy {
2472                    actor_id,
2473                    now_ms: now_ms_value,
2474                }),
2475            )?,
2476            other => {
2477                return Err(SyncularError::config(format!(
2478                    "sync.command_history_scope_unsupported: {other}"
2479                )));
2480            }
2481        };
2482        let entries = self.command_history_committed_entries(pending)?;
2483        if !entries.is_empty() {
2484            insert_command_history_record(self.conn, mutation_scope, &entries, &receipt)?;
2485        }
2486        Ok(receipt)
2487    }
2488
2489    fn command_history_pending_entries(
2490        &mut self,
2491        mutations: &[PendingSyncularMutation],
2492    ) -> Result<Vec<PendingCommandHistoryEntry>> {
2493        let mut entries = Vec::new();
2494        for mutation in mutations {
2495            if entries.iter().any(|entry: &PendingCommandHistoryEntry| {
2496                entry.table == mutation.table && entry.row_id == mutation.row_id
2497            }) {
2498                continue;
2499            }
2500            let before = self.current_row_json(&mutation.table, &mutation.row_id)?;
2501            entries.push(PendingCommandHistoryEntry {
2502                table: mutation.table.clone(),
2503                row_id: mutation.row_id.clone(),
2504                before,
2505            });
2506        }
2507        Ok(entries)
2508    }
2509
2510    fn command_history_committed_entries(
2511        &mut self,
2512        pending: Vec<PendingCommandHistoryEntry>,
2513    ) -> Result<Vec<CommandHistoryEntry>> {
2514        let mut entries = Vec::new();
2515        for entry in pending {
2516            let after = self.current_row_json(&entry.table, &entry.row_id)?;
2517            if entry.before == after {
2518                continue;
2519            }
2520            entries.push(CommandHistoryEntry {
2521                table: entry.table,
2522                row_id: entry.row_id,
2523                before: entry.before,
2524                after,
2525            });
2526        }
2527        Ok(entries)
2528    }
2529
2530    fn apply_syncular_mutations_inner(
2531        &mut self,
2532        mutations: Vec<PendingSyncularMutation>,
2533        auth_lease_policy: Option<ActiveAuthLeasePolicy<'_>>,
2534    ) -> Result<MutationReceipt> {
2535        if mutations.is_empty() {
2536            return Err(SyncularError::config(
2537                "cannot commit an empty Syncular mutation batch",
2538            ));
2539        }
2540        validate_pending_mutation_batch_size(&mutations)?;
2541
2542        let mut operations = Vec::with_capacity(mutations.len());
2543        let mut operation_scopes = Vec::with_capacity(mutations.len());
2544        for mutation in mutations {
2545            if is_encrypted_crdt_system_table(&mutation.table) {
2546                let operation = mutation.operation(None);
2547                operation_scopes.push(system_table_operation_scope(&operation));
2548                match mutation.kind {
2549                    SyncularMutationKind::Delete => {
2550                        delete_encrypted_crdt_system_row(
2551                            self.conn,
2552                            &mutation.table,
2553                            &mutation.row_id,
2554                        )?;
2555                    }
2556                    SyncularMutationKind::Insert
2557                    | SyncularMutationKind::Update
2558                    | SyncularMutationKind::Upsert => {
2559                        let row = apply_encrypted_crdt_system_row(
2560                            self.conn,
2561                            &mutation.table,
2562                            &mutation.row_id,
2563                            mutation.local_row.as_ref().or(operation.payload.as_ref()),
2564                            None,
2565                        )?;
2566                        materialize_encrypted_crdt_system_row(
2567                            self.conn,
2568                            self.app_schema,
2569                            &mutation.table,
2570                            &row,
2571                        )?;
2572                    }
2573                }
2574                operations.push(operation);
2575                continue;
2576            }
2577
2578            if self.app_schema.table_metadata(&mutation.table).is_none() {
2579                return Err(SyncularError::config(format!(
2580                    "unknown generated app table: {}",
2581                    mutation.table
2582                )));
2583            }
2584
2585            let metadata = self
2586                .app_schema
2587                .table_metadata(&mutation.table)
2588                .expect("validated mutation table has metadata");
2589            let current_row = self.current_row_json(&mutation.table, &mutation.row_id)?;
2590            let current_server_version =
2591                self.row_server_version(&mutation.table, current_row.as_ref());
2592            let base_version = mutation.base_version.or_else(|| {
2593                if mutation_has_server_merge_yjs_payload(&mutation, metadata) {
2594                    return None;
2595                }
2596
2597                match mutation.kind {
2598                    SyncularMutationKind::Insert => None,
2599                    SyncularMutationKind::Update
2600                    | SyncularMutationKind::Upsert
2601                    | SyncularMutationKind::Delete => current_server_version,
2602                }
2603            });
2604            let operation = mutation.operation(base_version);
2605
2606            match mutation.kind {
2607                SyncularMutationKind::Delete => {
2608                    operation_scopes.push(app_table_operation_scope(
2609                        metadata,
2610                        &operation,
2611                        current_row.as_ref(),
2612                        current_row.is_some(),
2613                    ));
2614                    self.apply_change(&SyncChange {
2615                        table: mutation.table.clone(),
2616                        row_id: mutation.row_id.clone(),
2617                        op: "delete".to_string(),
2618                        row_json: None,
2619                        row_version: base_version,
2620                        scopes: Map::new(),
2621                    })?;
2622                }
2623                SyncularMutationKind::Insert
2624                | SyncularMutationKind::Update
2625                | SyncularMutationKind::Upsert => {
2626                    let local_row = transform_local_row_for_metadata(
2627                        &mutation.table,
2628                        &mutation.row_id,
2629                        mutation.local_row,
2630                        operation.payload.as_ref(),
2631                        current_row.as_ref(),
2632                        metadata,
2633                    )?;
2634                    let local_row = local_row.unwrap_or_else(|| {
2635                        merged_local_row(
2636                            metadata,
2637                            current_row,
2638                            &mutation.row_id,
2639                            operation.payload.as_ref(),
2640                        )
2641                    });
2642                    operation_scopes.push(app_table_operation_scope(
2643                        metadata,
2644                        &operation,
2645                        Some(&local_row),
2646                        true,
2647                    ));
2648                    self.upsert_row(
2649                        &mutation.table,
2650                        &local_row,
2651                        current_server_version.or(base_version),
2652                    )?;
2653                }
2654            }
2655
2656            operations.push(operation);
2657        }
2658
2659        let receipt = self.enqueue_outbox_receipt(operations)?;
2660        if let Some(policy) = auth_lease_policy {
2661            let provenance =
2662                self.select_active_auth_lease_for_operations(policy, &operation_scopes)?;
2663            self.set_outbox_auth_lease(&receipt.client_commit_id, Some(&provenance))?;
2664        }
2665        Ok(receipt)
2666    }
2667
2668    fn select_active_auth_lease_for_operations(
2669        &mut self,
2670        policy: ActiveAuthLeasePolicy<'_>,
2671        operations: &[MutationOperationScope],
2672    ) -> Result<AuthLeaseProvenance> {
2673        let candidate_leases = self.auth_lease_candidates_for_selection(policy.actor_id)?;
2674        select_auth_lease_for_operation_scopes(
2675            policy,
2676            candidate_leases,
2677            self.app_schema.current_schema_version(),
2678            operations,
2679        )
2680    }
2681
2682    fn auth_lease_candidates_for_selection(
2683        &mut self,
2684        actor_id_value: Option<&str>,
2685    ) -> Result<Vec<AuthLeaseRecord>> {
2686        use schema::sync_auth_leases::dsl as l;
2687
2688        let mut query = l::sync_auth_leases
2689            .select(AuthLeaseRecordRow::as_select())
2690            .filter(l::status.eq("active"))
2691            .into_boxed();
2692        if let Some(actor_id_value) = actor_id_value {
2693            query = query.filter(l::actor_id.eq(actor_id_value));
2694        }
2695        let rows = query.order(l::expires_at_ms.asc()).load(self.conn)?;
2696        Ok(rows.into_iter().map(AuthLeaseRecord::from).collect())
2697    }
2698
2699    fn current_row_json(&mut self, table: &str, row_id: &str) -> Result<Option<Value>> {
2700        current_app_row_json(self.conn, self.app_schema, table, row_id)
2701    }
2702
2703    fn row_server_version(&self, table: &str, row: Option<&Value>) -> Option<i64> {
2704        let metadata = self.app_schema.table_metadata(table)?;
2705        row.and_then(|row| {
2706            row.get(metadata.server_version_column)
2707                .and_then(Value::as_i64)
2708        })
2709    }
2710}
2711
2712fn merged_local_row(
2713    metadata: &AppTableMetadata,
2714    current_row: Option<Value>,
2715    row_id: &str,
2716    payload: Option<&Value>,
2717) -> Value {
2718    let mut row = current_row
2719        .and_then(|row| row.as_object().cloned())
2720        .unwrap_or_default();
2721    if let Some(payload) = payload.and_then(Value::as_object) {
2722        for (key, value) in payload {
2723            row.insert(key.clone(), value.clone());
2724        }
2725    }
2726    row.insert(
2727        metadata.primary_key_column.to_string(),
2728        Value::String(row_id.to_string()),
2729    );
2730    Value::Object(row)
2731}
2732
2733fn apply_encrypted_crdt_system_row(
2734    conn: &mut SqliteConnection,
2735    table: &str,
2736    row_id: &str,
2737    row: Option<&Value>,
2738    server_seq: Option<i64>,
2739) -> Result<Map<String, Value>> {
2740    let row = encrypted_crdt_normalize_row(table, row_id, row)?;
2741    let server_seq = server_seq
2742        .or_else(|| row.get("server_seq").and_then(Value::as_i64))
2743        .or_else(|| row.get("seq").and_then(Value::as_i64));
2744    let scopes_json = encrypted_crdt_scopes_json(&row)?;
2745    let partition_id = row
2746        .get("partition_id")
2747        .and_then(Value::as_str)
2748        .unwrap_or("default");
2749    let stream_id = row.get("stream_id").and_then(Value::as_str).unwrap();
2750    let app_table = row.get("app_table").and_then(Value::as_str).unwrap();
2751    let app_row_id = row.get("row_id").and_then(Value::as_str).unwrap();
2752    let field_name = row.get("field_name").and_then(Value::as_str).unwrap();
2753    let key_id = row.get("key_id").and_then(Value::as_str).unwrap();
2754    let ciphertext = row.get("ciphertext").and_then(Value::as_str).unwrap();
2755    let actor_id = row.get("actor_id").and_then(Value::as_str);
2756    let client_id = row.get("client_id").and_then(Value::as_str);
2757    let created_at = row
2758        .get("created_at")
2759        .and_then(Value::as_i64)
2760        .unwrap_or_else(now_ms);
2761
2762    match table {
2763        CRDT_UPDATES_TABLE => {
2764            let update_id = row.get("update_id").and_then(Value::as_str).unwrap();
2765            sql_query(
2766                r#"
2767                insert into sync_crdt_updates (
2768                    partition_id, stream_id, app_table, row_id, field_name,
2769                    update_id, actor_id, client_id, key_id, ciphertext, scopes, created_at,
2770                    server_seq
2771                ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
2772                on conflict (update_id) do update set
2773                    server_seq = coalesce(excluded.server_seq, sync_crdt_updates.server_seq)
2774                "#,
2775            )
2776            .bind::<Text, _>(partition_id)
2777            .bind::<Text, _>(stream_id)
2778            .bind::<Text, _>(app_table)
2779            .bind::<Text, _>(app_row_id)
2780            .bind::<Text, _>(field_name)
2781            .bind::<Text, _>(update_id)
2782            .bind::<diesel::sql_types::Nullable<Text>, _>(actor_id)
2783            .bind::<diesel::sql_types::Nullable<Text>, _>(client_id)
2784            .bind::<Text, _>(key_id)
2785            .bind::<Text, _>(ciphertext)
2786            .bind::<Text, _>(&scopes_json)
2787            .bind::<BigInt, _>(created_at)
2788            .bind::<diesel::sql_types::Nullable<BigInt>, _>(server_seq)
2789            .execute(conn)?;
2790        }
2791        CRDT_CHECKPOINTS_TABLE => {
2792            let checkpoint_id = row.get("checkpoint_id").and_then(Value::as_str).unwrap();
2793            let covers_seq = row.get("covers_seq").and_then(Value::as_i64).unwrap();
2794            sql_query(
2795                r#"
2796                insert into sync_crdt_checkpoints (
2797                    partition_id, stream_id, app_table, row_id, field_name,
2798                    checkpoint_id, covers_seq, actor_id, client_id, key_id,
2799                    ciphertext, scopes, created_at, server_seq
2800                ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)
2801                on conflict (checkpoint_id) do update set
2802                    server_seq = coalesce(excluded.server_seq, sync_crdt_checkpoints.server_seq)
2803                "#,
2804            )
2805            .bind::<Text, _>(partition_id)
2806            .bind::<Text, _>(stream_id)
2807            .bind::<Text, _>(app_table)
2808            .bind::<Text, _>(app_row_id)
2809            .bind::<Text, _>(field_name)
2810            .bind::<Text, _>(checkpoint_id)
2811            .bind::<BigInt, _>(covers_seq)
2812            .bind::<diesel::sql_types::Nullable<Text>, _>(actor_id)
2813            .bind::<diesel::sql_types::Nullable<Text>, _>(client_id)
2814            .bind::<Text, _>(key_id)
2815            .bind::<Text, _>(ciphertext)
2816            .bind::<Text, _>(&scopes_json)
2817            .bind::<BigInt, _>(created_at)
2818            .bind::<diesel::sql_types::Nullable<BigInt>, _>(server_seq)
2819            .execute(conn)?;
2820        }
2821        _ => unreachable!("validated encrypted CRDT table"),
2822    }
2823    Ok(row)
2824}
2825
2826fn delete_encrypted_crdt_system_row(
2827    conn: &mut SqliteConnection,
2828    table: &str,
2829    row_id: &str,
2830) -> Result<()> {
2831    let identity = encrypted_crdt_identity_column(table)?;
2832    let sql = format!("delete from {table} where {identity} = ?1");
2833    sql_query(sql).bind::<Text, _>(row_id).execute(conn)?;
2834    Ok(())
2835}
2836
2837fn update_encrypted_crdt_system_server_seq(
2838    conn: &mut SqliteConnection,
2839    table: &str,
2840    row_id: &str,
2841    server_seq: i64,
2842) -> Result<()> {
2843    let identity = encrypted_crdt_identity_column(table)?;
2844    let sql = format!("update {table} set server_seq = ?1 where {identity} = ?2");
2845    sql_query(sql)
2846        .bind::<BigInt, _>(server_seq)
2847        .bind::<Text, _>(row_id)
2848        .execute(conn)?;
2849    Ok(())
2850}
2851
2852fn materialize_encrypted_crdt_system_row(
2853    conn: &mut SqliteConnection,
2854    app_schema: AppSchema,
2855    system_table: &str,
2856    system_row: &Map<String, Value>,
2857) -> Result<()> {
2858    let app_table = system_row
2859        .get("app_table")
2860        .and_then(Value::as_str)
2861        .ok_or_else(|| SyncularError::protocol_message("encrypted CRDT row missing app_table"))?;
2862    let app_row_id = system_row
2863        .get("row_id")
2864        .and_then(Value::as_str)
2865        .ok_or_else(|| SyncularError::protocol_message("encrypted CRDT row missing row_id"))?;
2866    let field_name = system_row
2867        .get("field_name")
2868        .and_then(Value::as_str)
2869        .ok_or_else(|| SyncularError::protocol_message("encrypted CRDT row missing field_name"))?;
2870    let Some(metadata) = app_schema.table_metadata(app_table) else {
2871        return Ok(());
2872    };
2873    if !metadata
2874        .crdt_yjs_fields
2875        .iter()
2876        .any(|field| field.field == field_name && field.sync_mode == "encrypted-update-log")
2877    {
2878        return Ok(());
2879    }
2880    let current_row = current_app_row_json(conn, app_schema, app_table, app_row_id)?;
2881    let Some(row) = apply_encrypted_crdt_plaintext_to_row(
2882        metadata,
2883        field_name,
2884        app_row_id,
2885        system_table,
2886        system_row,
2887        current_row,
2888    )?
2889    else {
2890        return Ok(());
2891    };
2892    let fallback_version = row
2893        .get(metadata.server_version_column)
2894        .and_then(Value::as_i64)
2895        .or(Some(0));
2896    upsert_app_row(conn, app_schema, app_table, &row, fallback_version)?;
2897    Ok(())
2898}
2899
2900fn current_app_row_json(
2901    conn: &mut SqliteConnection,
2902    app_schema: AppSchema,
2903    table: &str,
2904    row_id: &str,
2905) -> Result<Option<Value>> {
2906    let metadata = app_schema
2907        .table_metadata(table)
2908        .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
2909    get_app_row_json_generic(conn, metadata, row_id)
2910}
2911
2912fn get_app_row_json_generic(
2913    conn: &mut SqliteConnection,
2914    metadata: &AppTableMetadata,
2915    row_id: &str,
2916) -> Result<Option<Value>> {
2917    validate_app_table_metadata(metadata)?;
2918    let projection = json_object_projection(metadata)?;
2919    let sql = format!(
2920        "select {projection} as row_json from {table} where {pk} = {row_id} limit 1",
2921        table = metadata.name,
2922        pk = metadata.primary_key_column,
2923        row_id = sql_string(row_id)
2924    );
2925    Ok(rows_from_json_query(conn, sql)?.into_iter().next())
2926}
2927
2928fn crdt_state_vector_hints_for_subscription(
2929    conn: &mut SqliteConnection,
2930    app_schema: AppSchema,
2931    table: &str,
2932    scopes: &ScopeValues,
2933    limit: i64,
2934) -> Result<Vec<CrdtStateVectorHint>> {
2935    let metadata = app_schema
2936        .table_metadata(table)
2937        .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
2938    validate_app_table_metadata(metadata)?;
2939    let rows = sql_query(
2940        r#"
2941        select document_key, app_table, row_id, field_name, state_column, sync_mode,
2942               state_base64, state_vector_base64, pending_updates, flushed_updates,
2943               acked_updates, log_updates, updated_at, compacted_at
2944        from sync_crdt_documents
2945        where app_table = ?1 and state_vector_base64 != ''
2946        order by updated_at desc
2947        limit ?2
2948        "#,
2949    )
2950    .bind::<Text, _>(table)
2951    .bind::<BigInt, _>(limit.max(0))
2952    .load::<CrdtDocumentSnapshotRow>(conn)?;
2953
2954    let mut hints = Vec::new();
2955    for row in rows {
2956        let Some(app_row) = get_app_row_json_generic(conn, metadata, &row.row_id)? else {
2957            continue;
2958        };
2959        if !row_matches_scope_values(metadata, &app_row, scopes) {
2960            continue;
2961        }
2962        hints.push(CrdtStateVectorHint {
2963            row_id: row.row_id,
2964            field: row.field_name,
2965            state_column: row.state_column,
2966            state_vector_base64: row.state_vector_base64,
2967            sync_mode: row.sync_mode,
2968            updated_at: row.updated_at,
2969        });
2970    }
2971    Ok(hints)
2972}
2973
2974fn clear_encrypted_crdt_system_table_for_scopes(
2975    conn: &mut SqliteConnection,
2976    table: &str,
2977    scopes: &ScopeValues,
2978) -> Result<()> {
2979    let identity = encrypted_crdt_identity_column(table)?;
2980    if scopes.is_empty() {
2981        let sql = format!("delete from {table}");
2982        sql_query(sql).execute(conn)?;
2983        return Ok(());
2984    }
2985
2986    let sql = format!("select {identity} as identity, scopes from {table}");
2987    let rows = sql_query(sql).load::<EncryptedCrdtScopeRow>(conn)?;
2988    for row in rows {
2989        let stored_scopes: Value = serde_json::from_str(&row.scopes)?;
2990        let mut object = Map::new();
2991        object.insert("scopes".to_string(), stored_scopes);
2992        if encrypted_crdt_row_matches_scopes(&object, scopes) {
2993            delete_encrypted_crdt_system_row(conn, table, &row.identity)?;
2994        }
2995    }
2996    Ok(())
2997}
2998
2999fn list_app_rows_json(
3000    conn: &mut SqliteConnection,
3001    app_schema: AppSchema,
3002    table: &str,
3003) -> Result<Vec<Value>> {
3004    let metadata = app_schema
3005        .table_metadata(table)
3006        .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
3007    match app_schema.adapter_for(table) {
3008        Ok(adapter) => adapter.list_rows_json(conn),
3009        Err(_) => list_rows_json_generic(conn, metadata),
3010    }
3011}
3012
3013fn clear_app_table_for_scopes(
3014    conn: &mut SqliteConnection,
3015    app_schema: AppSchema,
3016    table: &str,
3017    scopes: &ScopeValues,
3018) -> Result<()> {
3019    let metadata = app_schema
3020        .table_metadata(table)
3021        .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
3022    match app_schema.adapter_for(table) {
3023        Ok(adapter) => adapter.clear_for_scopes(conn, scopes),
3024        Err(_) => clear_table_for_scopes_generic(conn, metadata, scopes),
3025    }
3026}
3027
3028fn clear_app_table_for_scopes_preserving_local_crdt(
3029    conn: &mut SqliteConnection,
3030    app_schema: AppSchema,
3031    table: &str,
3032    scopes: &ScopeValues,
3033) -> Result<()> {
3034    let metadata = app_schema
3035        .table_metadata(table)
3036        .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
3037    let encrypted_fields = metadata
3038        .crdt_yjs_fields
3039        .iter()
3040        .filter(|field| field.sync_mode == "encrypted-update-log")
3041        .collect::<Vec<_>>();
3042    if encrypted_fields.is_empty() {
3043        return clear_app_table_for_scopes(conn, app_schema, table, scopes);
3044    }
3045
3046    validate_app_table_metadata(metadata)?;
3047    for field in &encrypted_fields {
3048        validate_identifier(field.state_column)?;
3049    }
3050    let mut filters = scope_filters(metadata, scopes)?;
3051    filters.extend(encrypted_fields.iter().map(|field| {
3052        format!(
3053            "({} is null or {} = '')",
3054            field.state_column, field.state_column
3055        )
3056    }));
3057    let where_clause = if filters.is_empty() {
3058        String::new()
3059    } else {
3060        format!(" where {}", filters.join(" and "))
3061    };
3062    let sql = format!("delete from {table}{where_clause}", table = metadata.name);
3063    sql_query(sql).execute(conn)?;
3064    Ok(())
3065}
3066
3067fn clear_synced_app_rows_for_scopes(
3068    conn: &mut SqliteConnection,
3069    app_schema: AppSchema,
3070    table: &str,
3071    scopes: &ScopeValues,
3072) -> Result<i64> {
3073    let metadata = app_schema
3074        .table_metadata(table)
3075        .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
3076    clear_synced_rows_for_scopes_generic(conn, metadata, scopes)
3077}
3078
3079fn scoped_rows_health_summary_for_schema(
3080    conn: &mut SqliteConnection,
3081    app_schema: AppSchema,
3082    subscriptions: &[SubscriptionSpec],
3083) -> Result<ScopedRowsHealthSummary> {
3084    let mut summary = ScopedRowsHealthSummary::default();
3085    for metadata in app_schema.app_table_metadata {
3086        validate_app_table_metadata(metadata)?;
3087        validate_identifier(metadata.server_version_column)?;
3088        let checked_synced_rows = count_rows(
3089            conn,
3090            &format!(
3091                "select count(*) as count from {table} where {server_version} > 0",
3092                table = metadata.name,
3093                server_version = metadata.server_version_column
3094            ),
3095        )?;
3096        let table_subscriptions = subscriptions
3097            .iter()
3098            .filter(|subscription| subscription.table == metadata.name)
3099            .collect::<Vec<_>>();
3100        let orphaned_synced_rows = if checked_synced_rows == 0 {
3101            0
3102        } else if table_subscriptions.is_empty() {
3103            checked_synced_rows
3104        } else {
3105            let scope_clauses = table_subscriptions
3106                .iter()
3107                .map(|subscription| scope_clause(metadata, &subscription.scopes))
3108                .collect::<Result<Vec<_>>>()?;
3109            count_rows(
3110                conn,
3111                &format!(
3112                    "select count(*) as count from {table} where {server_version} > 0 and not ({scope_clause})",
3113                    table = metadata.name,
3114                    server_version = metadata.server_version_column,
3115                    scope_clause = scope_clauses.join(" or ")
3116                ),
3117            )?
3118        };
3119        summary.checked_synced_rows += checked_synced_rows;
3120        summary.orphaned_synced_rows += orphaned_synced_rows;
3121        summary.tables.push(ScopedRowsTableHealth {
3122            table: metadata.name.to_string(),
3123            checked_synced_rows,
3124            orphaned_synced_rows,
3125        });
3126    }
3127    Ok(summary)
3128}
3129
3130fn clear_orphaned_synced_rows_for_schema(
3131    conn: &mut SqliteConnection,
3132    app_schema: AppSchema,
3133    subscriptions: &[SubscriptionSpec],
3134    tables: &[String],
3135) -> Result<ScopedRowsHealthSummary> {
3136    validate_requested_app_tables(app_schema, tables)?;
3137    let mut summary = ScopedRowsHealthSummary::default();
3138    for metadata in app_schema
3139        .app_table_metadata
3140        .iter()
3141        .filter(|metadata| tables.is_empty() || tables.iter().any(|table| table == metadata.name))
3142    {
3143        validate_app_table_metadata(metadata)?;
3144        validate_identifier(metadata.server_version_column)?;
3145        let checked_synced_rows = count_rows(
3146            conn,
3147            &format!(
3148                "select count(*) as count from {table} where {server_version} > 0",
3149                table = metadata.name,
3150                server_version = metadata.server_version_column
3151            ),
3152        )?;
3153        let table_subscriptions = subscriptions
3154            .iter()
3155            .filter(|subscription| subscription.table == metadata.name)
3156            .collect::<Vec<_>>();
3157        let orphaned_synced_rows = if checked_synced_rows == 0 {
3158            0
3159        } else if table_subscriptions.is_empty() {
3160            delete_rows_with_count(
3161                conn,
3162                &format!(
3163                    "delete from {table} where {server_version} > 0",
3164                    table = metadata.name,
3165                    server_version = metadata.server_version_column
3166                ),
3167            )?
3168        } else {
3169            let scope_clauses = table_subscriptions
3170                .iter()
3171                .map(|subscription| scope_clause(metadata, &subscription.scopes))
3172                .collect::<Result<Vec<_>>>()?;
3173            delete_rows_with_count(
3174                conn,
3175                &format!(
3176                    "delete from {table} where {server_version} > 0 and not ({scope_clause})",
3177                    table = metadata.name,
3178                    server_version = metadata.server_version_column,
3179                    scope_clause = scope_clauses.join(" or ")
3180                ),
3181            )?
3182        };
3183        summary.checked_synced_rows += checked_synced_rows;
3184        summary.orphaned_synced_rows += orphaned_synced_rows;
3185        summary.tables.push(ScopedRowsTableHealth {
3186            table: metadata.name.to_string(),
3187            checked_synced_rows,
3188            orphaned_synced_rows,
3189        });
3190    }
3191    Ok(summary)
3192}
3193
3194fn validate_requested_app_tables(app_schema: AppSchema, tables: &[String]) -> Result<()> {
3195    for table in tables {
3196        if app_schema.table_metadata(table).is_none() {
3197            return Err(SyncularError::config(format!(
3198                "unknown generated app table: {table}"
3199            )));
3200        }
3201    }
3202    Ok(())
3203}
3204
3205fn count_rows(conn: &mut SqliteConnection, sql: &str) -> Result<i64> {
3206    Ok(sql_query(sql)
3207        .load::<CountRow>(conn)?
3208        .into_iter()
3209        .next()
3210        .map(|row| row.count)
3211        .unwrap_or_default())
3212}
3213
3214fn delete_rows_with_count(conn: &mut SqliteConnection, sql: &str) -> Result<i64> {
3215    Ok(sql_query(sql).execute(conn)? as i64)
3216}
3217
3218fn preserve_encrypted_crdt_materialized_columns(
3219    conn: &mut SqliteConnection,
3220    app_schema: AppSchema,
3221    metadata: &'static AppTableMetadata,
3222    row: Value,
3223) -> Result<Value> {
3224    if !metadata
3225        .crdt_yjs_fields
3226        .iter()
3227        .any(|field| field.sync_mode == "encrypted-update-log")
3228    {
3229        return Ok(row);
3230    }
3231    let Some(mut row_object) = row.as_object().cloned() else {
3232        return Ok(row);
3233    };
3234    let Some(row_id) = row_object
3235        .get(metadata.primary_key_column)
3236        .and_then(Value::as_str)
3237        .map(str::to_string)
3238    else {
3239        return Ok(Value::Object(row_object));
3240    };
3241    let Some(existing_row) = current_app_row_json(conn, app_schema, metadata.name, &row_id)? else {
3242        return Ok(Value::Object(row_object));
3243    };
3244    let Some(existing_object) = existing_row.as_object() else {
3245        return Ok(Value::Object(row_object));
3246    };
3247
3248    for field in metadata
3249        .crdt_yjs_fields
3250        .iter()
3251        .filter(|field| field.sync_mode == "encrypted-update-log")
3252    {
3253        let Some(state) = existing_object
3254            .get(field.state_column)
3255            .and_then(Value::as_str)
3256            .filter(|state| !state.is_empty())
3257        else {
3258            continue;
3259        };
3260        row_object.insert(
3261            field.state_column.to_string(),
3262            Value::String(state.to_string()),
3263        );
3264        if let Some(value) = existing_object.get(field.field) {
3265            row_object.insert(field.field.to_string(), value.clone());
3266        }
3267    }
3268    Ok(Value::Object(row_object))
3269}
3270
3271fn upsert_app_row(
3272    conn: &mut SqliteConnection,
3273    app_schema: AppSchema,
3274    table: &str,
3275    row: &Value,
3276    fallback_version: Option<i64>,
3277) -> Result<()> {
3278    let metadata = app_schema
3279        .table_metadata(table)
3280        .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
3281    match app_schema.adapter_for(table) {
3282        Ok(adapter) => adapter.upsert_row(conn, row, fallback_version),
3283        Err(_) => upsert_row_generic(conn, metadata, row, fallback_version),
3284    }
3285}
3286
3287fn apply_app_change(
3288    conn: &mut SqliteConnection,
3289    app_schema: AppSchema,
3290    change: &SyncChange,
3291) -> Result<()> {
3292    let metadata = app_schema.table_metadata(&change.table).ok_or_else(|| {
3293        SyncularError::config(format!("unknown generated app table: {}", change.table))
3294    })?;
3295    if change.op == "upsert" && change.row_json.is_some() {
3296        let mut change = change.clone();
3297        let row = change.row_json.take().expect("checked row_json presence");
3298        change.row_json = Some(preserve_encrypted_crdt_materialized_columns(
3299            conn, app_schema, metadata, row,
3300        )?);
3301        return match app_schema.adapter_for(&change.table) {
3302            Ok(adapter) => adapter.apply_change(conn, &change),
3303            Err(_) => apply_change_generic(conn, metadata, &change),
3304        };
3305    }
3306    match app_schema.adapter_for(&change.table) {
3307        Ok(adapter) => adapter.apply_change(conn, change),
3308        Err(_) => apply_change_generic(conn, metadata, change),
3309    }
3310}
3311
3312fn list_rows_json_generic(
3313    conn: &mut SqliteConnection,
3314    metadata: &AppTableMetadata,
3315) -> Result<Vec<Value>> {
3316    validate_app_table_metadata(metadata)?;
3317    let projection = json_object_projection(metadata)?;
3318    let sql = format!(
3319        "select {projection} as row_json from {table} order by {pk} asc",
3320        table = metadata.name,
3321        pk = metadata.primary_key_column
3322    );
3323    rows_from_json_query(conn, sql)
3324}
3325
3326fn clear_table_for_scopes_generic(
3327    conn: &mut SqliteConnection,
3328    metadata: &AppTableMetadata,
3329    scopes: &ScopeValues,
3330) -> Result<()> {
3331    validate_app_table_metadata(metadata)?;
3332    let filters = scope_filters(metadata, scopes)?;
3333    let where_clause = if filters.is_empty() {
3334        String::new()
3335    } else {
3336        format!(" where {}", filters.join(" and "))
3337    };
3338    let sql = format!("delete from {table}{where_clause}", table = metadata.name);
3339    sql_query(sql).execute(conn)?;
3340    Ok(())
3341}
3342
3343fn clear_synced_rows_for_scopes_generic(
3344    conn: &mut SqliteConnection,
3345    metadata: &AppTableMetadata,
3346    scopes: &ScopeValues,
3347) -> Result<i64> {
3348    validate_app_table_metadata(metadata)?;
3349    validate_identifier(metadata.server_version_column)?;
3350    let mut filters = scope_filters(metadata, scopes)?;
3351    filters.push(format!("{} > 0", metadata.server_version_column));
3352    let sql = format!(
3353        "delete from {table} where {where_clause}",
3354        table = metadata.name,
3355        where_clause = filters.join(" and ")
3356    );
3357    Ok(sql_query(sql).execute(conn)? as i64)
3358}
3359
3360fn upsert_row_generic(
3361    conn: &mut SqliteConnection,
3362    metadata: &AppTableMetadata,
3363    row: &Value,
3364    fallback_version: Option<i64>,
3365) -> Result<()> {
3366    validate_app_table_metadata(metadata)?;
3367    let row = row.as_object().ok_or_else(|| {
3368        SyncularError::protocol_message(format!("row is not a JSON object: {row}"))
3369    })?;
3370    row.get(metadata.primary_key_column)
3371        .and_then(Value::as_str)
3372        .ok_or_else(|| {
3373            SyncularError::protocol_message(format!(
3374                "row for table {} is missing string primary key {}",
3375                metadata.name, metadata.primary_key_column
3376            ))
3377        })?;
3378
3379    let columns = syncable_columns(metadata);
3380    if columns.is_empty() {
3381        return Ok(());
3382    }
3383    let values = columns
3384        .iter()
3385        .map(|column| generic_column_sql_value(metadata, row, column, fallback_version))
3386        .collect::<Result<Vec<_>>>()?;
3387    let update_columns = columns
3388        .iter()
3389        .copied()
3390        .filter(|column| *column != metadata.primary_key_column)
3391        .collect::<Vec<_>>();
3392    let on_conflict = if update_columns.is_empty() {
3393        "do nothing".to_string()
3394    } else {
3395        format!(
3396            "do update set {}",
3397            update_columns
3398                .iter()
3399                .map(|column| format!("{column} = excluded.{column}"))
3400                .collect::<Vec<_>>()
3401                .join(", ")
3402        )
3403    };
3404
3405    let sql = format!(
3406        "insert into {table} ({columns}) values ({values}) on conflict({pk}) {on_conflict}",
3407        table = metadata.name,
3408        columns = columns.join(", "),
3409        values = values.join(", "),
3410        pk = metadata.primary_key_column,
3411    );
3412    sql_query(sql).execute(conn)?;
3413    Ok(())
3414}
3415
3416fn upsert_rows_generic_batch(
3417    conn: &mut SqliteConnection,
3418    metadata: &AppTableMetadata,
3419    rows: &[Map<String, Value>],
3420    fallback_version: Option<i64>,
3421) -> Result<()> {
3422    if rows.is_empty() {
3423        return Ok(());
3424    }
3425    validate_app_table_metadata(metadata)?;
3426
3427    let columns = syncable_columns(metadata);
3428    if columns.is_empty() {
3429        return Ok(());
3430    }
3431    let update_columns = columns
3432        .iter()
3433        .copied()
3434        .filter(|column| *column != metadata.primary_key_column)
3435        .collect::<Vec<_>>();
3436    let on_conflict = if update_columns.is_empty() {
3437        "do nothing".to_string()
3438    } else {
3439        format!(
3440            "do update set {}",
3441            update_columns
3442                .iter()
3443                .map(|column| format!("{column} = excluded.{column}"))
3444                .collect::<Vec<_>>()
3445                .join(", ")
3446        )
3447    };
3448
3449    for batch in rows.chunks(SNAPSHOT_UPSERT_BATCH_ROWS) {
3450        let value_groups = batch
3451            .iter()
3452            .map(|row| {
3453                row.get(metadata.primary_key_column)
3454                    .and_then(Value::as_str)
3455                    .ok_or_else(|| {
3456                        SyncularError::protocol_message(format!(
3457                            "row for table {} is missing string primary key {}",
3458                            metadata.name, metadata.primary_key_column
3459                        ))
3460                    })?;
3461                let values = columns
3462                    .iter()
3463                    .map(|column| generic_column_sql_value(metadata, row, column, fallback_version))
3464                    .collect::<Result<Vec<_>>>()?;
3465                Ok(format!("({})", values.join(", ")))
3466            })
3467            .collect::<Result<Vec<_>>>()?;
3468        let sql = format!(
3469            "insert into {table} ({columns}) values {values} on conflict({pk}) {on_conflict}",
3470            table = metadata.name,
3471            columns = columns.join(", "),
3472            values = value_groups.join(", "),
3473            pk = metadata.primary_key_column,
3474        );
3475        sql_query(sql).execute(conn)?;
3476    }
3477
3478    Ok(())
3479}
3480
3481fn upsert_binary_snapshot_rows_batch(
3482    conn: &mut SqliteConnection,
3483    metadata: &AppTableMetadata,
3484    rows: &DecodedBinarySnapshotRows,
3485) -> Result<()> {
3486    if rows.rows.is_empty() {
3487        return Ok(());
3488    }
3489    if rows.table != metadata.name {
3490        return Err(SyncularError::protocol_message(format!(
3491            "binary snapshot table mismatch: expected {}, got {}",
3492            metadata.name, rows.table
3493        )));
3494    }
3495    validate_app_table_metadata(metadata)?;
3496
3497    let columns = rows
3498        .columns
3499        .iter()
3500        .map(|column| {
3501            validate_identifier(&column.name)?;
3502            Ok(column.name.as_str())
3503        })
3504        .collect::<Result<Vec<_>>>()?;
3505    if !columns.contains(&metadata.primary_key_column) {
3506        return Err(SyncularError::protocol_message(format!(
3507            "binary snapshot for table {} is missing primary key {}",
3508            metadata.name, metadata.primary_key_column
3509        )));
3510    }
3511    if rows.rows.iter().any(|row| row.len() != columns.len()) {
3512        return Err(SyncularError::protocol_message(format!(
3513            "binary snapshot for table {} has a row with the wrong column count",
3514            metadata.name
3515        )));
3516    }
3517
3518    let update_columns = columns
3519        .iter()
3520        .copied()
3521        .filter(|column| *column != metadata.primary_key_column)
3522        .collect::<Vec<_>>();
3523    let on_conflict = if update_columns.is_empty() {
3524        "do nothing".to_string()
3525    } else {
3526        format!(
3527            "do update set {}",
3528            update_columns
3529                .iter()
3530                .map(|column| format!("{column} = excluded.{column}"))
3531                .collect::<Vec<_>>()
3532                .join(", ")
3533        )
3534    };
3535
3536    for batch in rows.rows.chunks(SNAPSHOT_UPSERT_BATCH_ROWS) {
3537        let value_groups = batch
3538            .iter()
3539            .map(|row| {
3540                let values = row
3541                    .iter()
3542                    .map(binary_snapshot_cell_sql_value)
3543                    .collect::<Result<Vec<_>>>()?;
3544                Ok(format!("({})", values.join(", ")))
3545            })
3546            .collect::<Result<Vec<_>>>()?;
3547        let sql = format!(
3548            "insert into {table} ({columns}) values {values} on conflict({pk}) {on_conflict}",
3549            table = metadata.name,
3550            columns = columns.join(", "),
3551            values = value_groups.join(", "),
3552            pk = metadata.primary_key_column,
3553        );
3554        sql_query(sql).execute(conn)?;
3555    }
3556
3557    Ok(())
3558}
3559
3560fn binary_snapshot_cell_sql_value(value: &BinarySnapshotCell) -> Result<String> {
3561    Ok(match value {
3562        BinarySnapshotCell::Null => "NULL".to_string(),
3563        BinarySnapshotCell::String(value) => sql_string(value),
3564        BinarySnapshotCell::Integer(value) => value.to_string(),
3565        BinarySnapshotCell::Float(value) => {
3566            if value.is_finite() {
3567                value.to_string()
3568            } else {
3569                return Err(SyncularError::protocol_message(
3570                    "binary snapshot float value must be finite",
3571                ));
3572            }
3573        }
3574        BinarySnapshotCell::Boolean(value) => i32::from(*value).to_string(),
3575        BinarySnapshotCell::Json(value) => sql_string(&value.to_string()),
3576        BinarySnapshotCell::Bytes(value) => {
3577            format!("X'{}'", hex::encode(value))
3578        }
3579    })
3580}
3581
3582fn apply_change_generic(
3583    conn: &mut SqliteConnection,
3584    metadata: &AppTableMetadata,
3585    change: &SyncChange,
3586) -> Result<()> {
3587    if change.table != metadata.name {
3588        return Err(SyncularError::schema(format!(
3589            "metadata for {} cannot apply change for {}",
3590            metadata.name, change.table
3591        )));
3592    }
3593    match change.op.as_str() {
3594        "delete" => delete_row_generic(conn, metadata, &change.row_id),
3595        "upsert" => {
3596            let row = change.row_json.as_ref().ok_or_else(|| {
3597                SyncularError::protocol_message(format!(
3598                    "upsert change missing row_json for {}",
3599                    change.row_id
3600                ))
3601            })?;
3602            upsert_row_generic(conn, metadata, row, change.row_version)
3603        }
3604        op => Err(SyncularError::protocol_message(format!(
3605            "unsupported sync change operation: {op}"
3606        ))),
3607    }
3608}
3609
3610fn delete_row_generic(
3611    conn: &mut SqliteConnection,
3612    metadata: &AppTableMetadata,
3613    row_id: &str,
3614) -> Result<()> {
3615    validate_app_table_metadata(metadata)?;
3616    let sql = format!(
3617        "delete from {table} where {pk} = {row_id}",
3618        table = metadata.name,
3619        pk = metadata.primary_key_column,
3620        row_id = sql_string(row_id),
3621    );
3622    sql_query(sql).execute(conn)?;
3623    Ok(())
3624}
3625
3626fn rows_from_json_query(conn: &mut SqliteConnection, sql: String) -> Result<Vec<Value>> {
3627    sql_query(sql)
3628        .load::<JsonObjectRow>(conn)?
3629        .into_iter()
3630        .map(|row| Ok(serde_json::from_str::<Value>(&row.row_json)?))
3631        .collect()
3632}
3633
3634fn json_object_projection(metadata: &AppTableMetadata) -> Result<String> {
3635    let columns = syncable_columns(metadata);
3636    if columns.is_empty() {
3637        return Err(SyncularError::schema(format!(
3638            "app table {} has no declared columns",
3639            metadata.name
3640        )));
3641    }
3642    let pairs = columns
3643        .iter()
3644        .map(|column| {
3645            validate_identifier(column)?;
3646            Ok(format!("{label}, {column}", label = sql_string(column)))
3647        })
3648        .collect::<Result<Vec<_>>>()?;
3649    Ok(format!("json_object({})", pairs.join(", ")))
3650}
3651
3652fn generic_column_sql_value(
3653    metadata: &AppTableMetadata,
3654    row: &Map<String, Value>,
3655    column: &str,
3656    fallback_version: Option<i64>,
3657) -> Result<String> {
3658    if column == metadata.server_version_column {
3659        if let Some(version) = fallback_version {
3660            return Ok(version.to_string());
3661        }
3662        return Ok(row
3663            .get(column)
3664            .map(sql_value)
3665            .unwrap_or_else(|| "0".to_string()));
3666    }
3667
3668    if let Some(value) = row.get(column) {
3669        return Ok(sql_value(value));
3670    }
3671
3672    if column == metadata.primary_key_column {
3673        return Err(SyncularError::protocol_message(format!(
3674            "row for table {} is missing primary key {}",
3675            metadata.name, metadata.primary_key_column
3676        )));
3677    }
3678
3679    let column_metadata = metadata
3680        .columns
3681        .iter()
3682        .find(|candidate| candidate.name == column);
3683    if column_metadata.is_some_and(|column| column.notnull_required) {
3684        return Err(SyncularError::protocol_message(format!(
3685            "row for table {} is missing required column {}",
3686            metadata.name, column
3687        )));
3688    }
3689
3690    Ok("NULL".to_string())
3691}
3692
3693fn scope_filters(metadata: &AppTableMetadata, scopes: &ScopeValues) -> Result<Vec<String>> {
3694    for scope_name in scopes.keys() {
3695        if !metadata.scopes.iter().any(|scope| scope.name == scope_name) {
3696            return Err(SyncularError::config(format!(
3697                "unknown scope {scope_name} for table {}",
3698                metadata.name
3699            )));
3700        }
3701    }
3702
3703    let mut filters = Vec::new();
3704    for scope in metadata.scopes {
3705        match scopes.get(scope.name) {
3706            Some(value) => filters.push(scope_filter(scope.column, value)?),
3707            None if scope.required => filters.push("0 = 1".to_string()),
3708            None => {}
3709        }
3710    }
3711    Ok(filters)
3712}
3713
3714fn scope_clause(metadata: &AppTableMetadata, scopes: &ScopeValues) -> Result<String> {
3715    let filters = scope_filters(metadata, scopes)?;
3716    if filters.is_empty() {
3717        Ok("1 = 1".to_string())
3718    } else {
3719        Ok(format!("({})", filters.join(" and ")))
3720    }
3721}
3722
3723fn scope_filter(column: &str, value: &Value) -> Result<String> {
3724    validate_identifier(column)?;
3725    Ok(match value {
3726        Value::Null => format!("{column} is null"),
3727        Value::Array(values) if values.is_empty() => "0 = 1".to_string(),
3728        Value::Array(values) => format!(
3729            "{column} in ({})",
3730            values.iter().map(sql_value).collect::<Vec<_>>().join(", ")
3731        ),
3732        value => format!("{column} = {}", sql_value(value)),
3733    })
3734}
3735
3736fn row_matches_scope_values(
3737    metadata: &AppTableMetadata,
3738    row: &Value,
3739    scopes: &ScopeValues,
3740) -> bool {
3741    metadata.scopes.iter().all(|scope| {
3742        let Some(expected) = scopes.get(scope.name) else {
3743            return !scope.required;
3744        };
3745        let actual = row.get(scope.column);
3746        match expected {
3747            Value::Array(values) => actual.is_some_and(|actual| values.iter().any(|v| v == actual)),
3748            value => actual == Some(value),
3749        }
3750    })
3751}
3752
3753fn syncable_columns(metadata: &AppTableMetadata) -> Vec<&'static str> {
3754    let mut columns = metadata
3755        .columns
3756        .iter()
3757        .map(|column| column.name)
3758        .collect::<Vec<_>>();
3759    if !columns.contains(&metadata.primary_key_column) {
3760        columns.insert(0, metadata.primary_key_column);
3761    }
3762    if !columns.contains(&metadata.server_version_column) {
3763        columns.push(metadata.server_version_column);
3764    }
3765    columns
3766}
3767
3768fn validate_app_table_metadata(metadata: &AppTableMetadata) -> Result<()> {
3769    validate_identifier(metadata.name)?;
3770    validate_identifier(metadata.primary_key_column)?;
3771    validate_identifier(metadata.server_version_column)?;
3772    for column in metadata.columns {
3773        validate_identifier(column.name)?;
3774    }
3775    for scope in metadata.scopes {
3776        validate_identifier(scope.column)?;
3777    }
3778    Ok(())
3779}
3780
3781fn validate_identifier(identifier: &str) -> Result<()> {
3782    if !identifier.is_empty()
3783        && identifier
3784            .bytes()
3785            .all(|byte| byte.is_ascii_alphanumeric() || byte == b'_')
3786    {
3787        Ok(())
3788    } else {
3789        Err(SyncularError::schema(format!(
3790            "invalid sqlite identifier: {identifier}"
3791        )))
3792    }
3793}
3794
3795fn add_column_if_missing(
3796    conn: &mut SqliteConnection,
3797    table: &str,
3798    column: &str,
3799    alter_sql: &str,
3800) -> Result<()> {
3801    let columns = sql_query(format!(
3802        "select name from pragma_table_info({})",
3803        sql_string(table)
3804    ))
3805    .load::<ColumnNameRow>(conn)?;
3806    if columns.iter().any(|row| row.name == column) {
3807        return Ok(());
3808    }
3809    sql_query(alter_sql).execute(conn)?;
3810    Ok(())
3811}
3812
3813fn sql_string(value: &str) -> String {
3814    format!("'{}'", value.replace('\'', "''"))
3815}
3816
3817fn sql_value(value: &Value) -> String {
3818    match value {
3819        Value::Null => "NULL".to_string(),
3820        Value::Bool(value) => i32::from(*value).to_string(),
3821        Value::Number(value) => value.to_string(),
3822        Value::String(value) => sql_string(value),
3823        Value::Array(_) | Value::Object(_) => sql_string(&value.to_string()),
3824    }
3825}
3826
3827fn crdt_sync_mode_str(mode: crate::crdt_field::CrdtFieldSyncMode) -> &'static str {
3828    match mode {
3829        crate::crdt_field::CrdtFieldSyncMode::ServerMerge => "server-merge",
3830        crate::crdt_field::CrdtFieldSyncMode::EncryptedUpdateLog => "encrypted-update-log",
3831    }
3832}
3833
3834fn crdt_sync_mode_from_str(value: &str) -> Result<crate::crdt_field::CrdtFieldSyncMode> {
3835    match value {
3836        "server-merge" => Ok(crate::crdt_field::CrdtFieldSyncMode::ServerMerge),
3837        "encrypted-update-log" => Ok(crate::crdt_field::CrdtFieldSyncMode::EncryptedUpdateLog),
3838        other => Err(SyncularError::message(
3839            ErrorKind::Storage,
3840            format!("unknown CRDT document sync mode: {other}"),
3841        )),
3842    }
3843}
3844
3845fn crdt_update_origin_str(origin: CrdtUpdateOrigin) -> &'static str {
3846    match origin {
3847        CrdtUpdateOrigin::Local => "local",
3848        CrdtUpdateOrigin::Remote => "remote",
3849        CrdtUpdateOrigin::Compaction => "compaction",
3850    }
3851}
3852
3853fn crdt_update_origin_from_str(value: &str) -> Result<CrdtUpdateOrigin> {
3854    match value {
3855        "local" => Ok(CrdtUpdateOrigin::Local),
3856        "remote" => Ok(CrdtUpdateOrigin::Remote),
3857        "compaction" => Ok(CrdtUpdateOrigin::Compaction),
3858        other => Err(SyncularError::message(
3859            ErrorKind::Storage,
3860            format!("unknown CRDT update origin: {other}"),
3861        )),
3862    }
3863}
3864
3865fn crdt_update_status_str(status: CrdtUpdateStatus) -> &'static str {
3866    match status {
3867        CrdtUpdateStatus::Pending => "pending",
3868        CrdtUpdateStatus::Flushed => "flushed",
3869        CrdtUpdateStatus::Acked => "acked",
3870        CrdtUpdateStatus::Pruned => "pruned",
3871    }
3872}
3873
3874fn crdt_update_status_from_str(value: &str) -> Result<CrdtUpdateStatus> {
3875    match value {
3876        "pending" => Ok(CrdtUpdateStatus::Pending),
3877        "flushed" => Ok(CrdtUpdateStatus::Flushed),
3878        "acked" => Ok(CrdtUpdateStatus::Acked),
3879        "pruned" => Ok(CrdtUpdateStatus::Pruned),
3880        other => Err(SyncularError::message(
3881            ErrorKind::Storage,
3882            format!("unknown CRDT update status: {other}"),
3883        )),
3884    }
3885}
3886
3887fn crdt_field_state_base64(field: &CrdtField, row: Option<&Value>) -> Option<String> {
3888    row.and_then(|row| {
3889        row.get(field.state_column())
3890            .and_then(Value::as_str)
3891            .filter(|value| !value.is_empty())
3892            .map(str::to_string)
3893    })
3894}
3895
3896fn assert_crdt_document_capacity(
3897    conn: &mut SqliteConnection,
3898    document_key: &str,
3899    max_pending_updates: i64,
3900) -> Result<()> {
3901    if max_pending_updates < 1 {
3902        return Err(SyncularError::config(
3903            "CRDT update queue capacity must be at least 1",
3904        ));
3905    }
3906    let pending = sql_query(
3907        r#"
3908        select count(*) as count
3909        from sync_crdt_update_log
3910        where document_key = ?1 and status in ('pending', 'flushed')
3911        "#,
3912    )
3913    .bind::<Text, _>(document_key)
3914    .load::<CountRow>(conn)?
3915    .into_iter()
3916    .next()
3917    .map(|row| row.count)
3918    .unwrap_or(0);
3919    if pending >= max_pending_updates {
3920        return Err(SyncularError::message(ErrorKind::Storage, format!(
3921            "CRDT update queue is full for document {document_key}; pending={pending}, capacity={max_pending_updates}"
3922        )));
3923    }
3924    Ok(())
3925}
3926
3927fn upsert_crdt_document_snapshot(
3928    conn: &mut SqliteConnection,
3929    field: &CrdtField,
3930    state_base64: Option<&str>,
3931    state_vector_base64: &str,
3932    compacted_at: Option<i64>,
3933) -> Result<()> {
3934    let now = now_ms();
3935    sql_query(
3936        r#"
3937        insert into sync_crdt_documents (
3938          document_key, app_table, row_id, field_name, state_column, sync_mode,
3939          state_base64, state_vector_base64, pending_updates, flushed_updates,
3940          acked_updates, log_updates, created_at, updated_at, compacted_at
3941        ) values (
3942          ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8,
3943          0, 0, 0, 0, ?9, ?10, ?11
3944        )
3945        on conflict(document_key) do update set
3946          state_base64 = excluded.state_base64,
3947          state_vector_base64 = excluded.state_vector_base64,
3948          state_column = excluded.state_column,
3949          sync_mode = excluded.sync_mode,
3950          updated_at = excluded.updated_at,
3951          compacted_at = coalesce(excluded.compacted_at, sync_crdt_documents.compacted_at)
3952        "#,
3953    )
3954    .bind::<Text, _>(field.document_key())
3955    .bind::<Text, _>(field.table())
3956    .bind::<Text, _>(field.row_id())
3957    .bind::<Text, _>(field.field())
3958    .bind::<Text, _>(field.state_column())
3959    .bind::<Text, _>(crdt_sync_mode_str(field.sync_mode()))
3960    .bind::<Nullable<Text>, _>(state_base64)
3961    .bind::<Text, _>(state_vector_base64)
3962    .bind::<BigInt, _>(now)
3963    .bind::<BigInt, _>(now)
3964    .bind::<Nullable<BigInt>, _>(compacted_at)
3965    .execute(conn)?;
3966    refresh_crdt_document_counts(conn, &field.document_key())
3967}
3968
3969fn record_crdt_update_log(
3970    conn: &mut SqliteConnection,
3971    field: &CrdtField,
3972    update: &YjsUpdateEnvelope,
3973    client_commit_id: Option<&str>,
3974    origin: CrdtUpdateOrigin,
3975    status: CrdtUpdateStatus,
3976    state_base64: Option<&str>,
3977    state_vector_base64: &str,
3978) -> Result<()> {
3979    upsert_crdt_document_snapshot(conn, field, state_base64, state_vector_base64, None)?;
3980    let now = now_ms();
3981    let document_key = field.document_key();
3982    sql_query(
3983        r#"
3984        insert into sync_crdt_update_log (
3985          document_key, app_table, row_id, field_name, update_id, client_commit_id,
3986          origin, status, update_base64, state_vector_base64, created_at, flushed_at, acked_at
3987        ) values (
3988          ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11,
3989          case when ?8 in ('flushed', 'acked') then ?11 else null end,
3990          case when ?8 = 'acked' then ?11 else null end
3991        )
3992        on conflict(update_id) do update set
3993          state_vector_base64 = excluded.state_vector_base64,
3994          status = case
3995            when sync_crdt_update_log.status = 'acked' then sync_crdt_update_log.status
3996            else excluded.status
3997          end,
3998          flushed_at = coalesce(sync_crdt_update_log.flushed_at, excluded.flushed_at),
3999          acked_at = coalesce(sync_crdt_update_log.acked_at, excluded.acked_at)
4000        "#,
4001    )
4002    .bind::<Text, _>(&document_key)
4003    .bind::<Text, _>(field.table())
4004    .bind::<Text, _>(field.row_id())
4005    .bind::<Text, _>(field.field())
4006    .bind::<Text, _>(&update.update_id)
4007    .bind::<Nullable<Text>, _>(client_commit_id)
4008    .bind::<Text, _>(crdt_update_origin_str(origin))
4009    .bind::<Text, _>(crdt_update_status_str(status))
4010    .bind::<Text, _>(&update.update_base64)
4011    .bind::<Text, _>(state_vector_base64)
4012    .bind::<BigInt, _>(now)
4013    .execute(conn)?;
4014    refresh_crdt_document_counts(conn, &document_key)
4015}
4016
4017fn select_crdt_document_snapshot(
4018    conn: &mut SqliteConnection,
4019    document_key: &str,
4020) -> Result<CrdtDocumentSnapshot> {
4021    sql_query(
4022        r#"
4023        select document_key, app_table, row_id, field_name, state_column, sync_mode,
4024               state_base64, state_vector_base64, pending_updates, flushed_updates,
4025               acked_updates, log_updates, updated_at, compacted_at
4026        from sync_crdt_documents
4027        where document_key = ?1
4028        limit 1
4029        "#,
4030    )
4031    .bind::<Text, _>(document_key)
4032    .load::<CrdtDocumentSnapshotRow>(conn)?
4033    .into_iter()
4034    .next()
4035    .ok_or_else(|| {
4036        SyncularError::message(
4037            ErrorKind::Storage,
4038            format!("CRDT document not found: {document_key}"),
4039        )
4040    })?
4041    .try_into()
4042}
4043
4044fn refresh_crdt_document_counts(conn: &mut SqliteConnection, document_key: &str) -> Result<()> {
4045    sql_query(
4046        r#"
4047        update sync_crdt_documents
4048        set pending_updates = (
4049              select count(*) from sync_crdt_update_log
4050              where document_key = ?1 and status = 'pending'
4051            ),
4052            flushed_updates = (
4053              select count(*) from sync_crdt_update_log
4054              where document_key = ?1 and status = 'flushed'
4055            ),
4056            acked_updates = (
4057              select count(*) from sync_crdt_update_log
4058              where document_key = ?1 and status = 'acked'
4059            ),
4060            log_updates = (
4061              select count(*) from sync_crdt_update_log
4062              where document_key = ?1
4063            ),
4064            updated_at = ?2
4065        where document_key = ?1
4066        "#,
4067    )
4068    .bind::<Text, _>(document_key)
4069    .bind::<BigInt, _>(now_ms())
4070    .execute(conn)?;
4071    Ok(())
4072}
4073
4074fn refresh_all_crdt_document_counts(conn: &mut SqliteConnection) -> Result<()> {
4075    let keys = sql_query("select document_key as value from sync_crdt_documents")
4076        .load::<StringValueRow>(conn)?;
4077    for key in keys {
4078        refresh_crdt_document_counts(conn, &key.value)?;
4079    }
4080    Ok(())
4081}
4082
4083impl SyncStoreTx for DieselSqliteTx<'_> {
4084    fn pending_outbox(&mut self, limit: i64) -> Result<Vec<OutboxCommit>> {
4085        use schema::sync_outbox_commits::dsl as o;
4086        let now = now_ms();
4087
4088        let rows: Vec<OutboxCommitRow> = o::sync_outbox_commits
4089            .select(OutboxCommitRow::as_select())
4090            .filter(o::status.eq("pending"))
4091            .filter(o::attempt_count.lt(MAX_SYNC_RETRIES))
4092            .filter(o::next_attempt_at.le(now))
4093            .order(o::created_at.asc())
4094            .limit(limit)
4095            .load(self.conn)?;
4096
4097        Ok(rows.into_iter().map(OutboxCommit::from).collect())
4098    }
4099
4100    fn requeue_stale_outbox(&mut self) -> Result<()> {
4101        let now = now_ms();
4102        let stale_before = now - SYNC_SENDING_TIMEOUT_MS;
4103        sql_query(
4104            r#"
4105            update sync_outbox_commits
4106            set status = case when attempt_count >= ?1 then 'failed' else 'pending' end,
4107                next_attempt_at = case when attempt_count >= ?1 then 0 else ?2 end,
4108                error = case
4109                    when attempt_count >= ?1 then 'Sync attempt timed out while in sending state'
4110                    else 'Sync attempt timed out while in sending state; retrying'
4111                end,
4112                updated_at = ?2
4113            where status = 'sending' and updated_at < ?3
4114            "#,
4115        )
4116        .bind::<Integer, _>(MAX_SYNC_RETRIES)
4117        .bind::<BigInt, _>(now)
4118        .bind::<BigInt, _>(stale_before)
4119        .execute(self.conn)?;
4120        Ok(())
4121    }
4122
4123    fn mark_outbox_sending(&mut self, row_id: &str) -> Result<()> {
4124        use schema::sync_outbox_commits::dsl as o;
4125        let now = now_ms();
4126        diesel::update(o::sync_outbox_commits.filter(o::id.eq(row_id)))
4127            .set((
4128                o::status.eq("sending"),
4129                o::updated_at.eq(now),
4130                o::attempt_count.eq(o::attempt_count + 1),
4131                o::error.eq::<Option<String>>(None),
4132                o::next_attempt_at.eq(0),
4133            ))
4134            .execute(self.conn)?;
4135        sql_query(
4136            r#"
4137            update sync_crdt_update_log
4138            set status = 'flushed',
4139                flushed_at = coalesce(flushed_at, ?1)
4140            where status = 'pending'
4141              and client_commit_id = (
4142                select client_commit_id from sync_outbox_commits where id = ?2
4143              )
4144            "#,
4145        )
4146        .bind::<BigInt, _>(now)
4147        .bind::<Text, _>(row_id)
4148        .execute(self.conn)?;
4149        refresh_all_crdt_document_counts(self.conn)?;
4150        Ok(())
4151    }
4152
4153    fn mark_pushed_operation_server_versions(
4154        &mut self,
4155        outbox: &OutboxCommit,
4156        response: &PushCommitResponse,
4157    ) -> Result<()> {
4158        let operations: Vec<SyncOperation> = serde_json::from_str(&outbox.operations_json)?;
4159        if response.results.is_empty() {
4160            if let Some(server_seq) = response.commit_seq {
4161                for operation in &operations {
4162                    if is_encrypted_crdt_system_table(&operation.table) {
4163                        update_encrypted_crdt_system_server_seq(
4164                            self.conn,
4165                            &operation.table,
4166                            &operation.row_id,
4167                            server_seq,
4168                        )?;
4169                    }
4170                }
4171            }
4172            return Ok(());
4173        }
4174
4175        for result in &response.results {
4176            if !matches!(result.status.as_str(), "applied" | "cached") {
4177                continue;
4178            }
4179            let Some(server_seq) = result.server_version.or(response.commit_seq) else {
4180                continue;
4181            };
4182            let operation = operations.get(result.op_index as usize).ok_or_else(|| {
4183                SyncularError::protocol_message(format!(
4184                    "push response op_index {} out of bounds for local outbox commit {}",
4185                    result.op_index, outbox.client_commit_id
4186                ))
4187            })?;
4188            if is_encrypted_crdt_system_table(&operation.table) {
4189                update_encrypted_crdt_system_server_seq(
4190                    self.conn,
4191                    &operation.table,
4192                    &operation.row_id,
4193                    server_seq,
4194                )?;
4195            }
4196        }
4197        Ok(())
4198    }
4199
4200    fn mark_outbox_acked(&mut self, row_id: &str, response: &PushCommitResponse) -> Result<()> {
4201        use schema::sync_outbox_commits::dsl as o;
4202        let now = now_ms();
4203        diesel::update(o::sync_outbox_commits.filter(o::id.eq(row_id)))
4204            .set((
4205                o::status.eq("acked"),
4206                o::updated_at.eq(now),
4207                o::acked_commit_seq.eq(response.commit_seq),
4208                o::last_response_json.eq(Some(serde_json::to_string(response)?)),
4209                o::error.eq::<Option<String>>(None),
4210                o::next_attempt_at.eq(0),
4211            ))
4212            .execute(self.conn)?;
4213        sql_query(
4214            r#"
4215            update sync_crdt_update_log
4216            set status = 'acked',
4217                flushed_at = coalesce(flushed_at, ?1),
4218                acked_at = coalesce(acked_at, ?1)
4219            where status in ('pending', 'flushed')
4220              and client_commit_id = (
4221                select client_commit_id from sync_outbox_commits where id = ?2
4222              )
4223            "#,
4224        )
4225        .bind::<BigInt, _>(now)
4226        .bind::<Text, _>(row_id)
4227        .execute(self.conn)?;
4228        refresh_all_crdt_document_counts(self.conn)?;
4229        Ok(())
4230    }
4231
4232    fn mark_outbox_failed(
4233        &mut self,
4234        row_id: &str,
4235        error: &str,
4236        response: &PushCommitResponse,
4237    ) -> Result<()> {
4238        use schema::sync_outbox_commits::dsl as o;
4239        diesel::update(o::sync_outbox_commits.filter(o::id.eq(row_id)))
4240            .set((
4241                o::status.eq("failed"),
4242                o::updated_at.eq(now_ms()),
4243                o::last_response_json.eq(Some(serde_json::to_string(response)?)),
4244                o::error.eq(Some(error.to_string())),
4245                o::next_attempt_at.eq(0),
4246            ))
4247            .execute(self.conn)?;
4248        Ok(())
4249    }
4250
4251    fn mark_outbox_retry(
4252        &mut self,
4253        row_id: &str,
4254        error: &str,
4255        next_attempt_at: i64,
4256        failed: bool,
4257    ) -> Result<()> {
4258        use schema::sync_outbox_commits::dsl as o;
4259        let now = now_ms();
4260        diesel::update(o::sync_outbox_commits.filter(o::id.eq(row_id)))
4261            .set((
4262                o::status.eq(if failed { "failed" } else { "pending" }),
4263                o::updated_at.eq(now),
4264                o::error.eq(Some(error.to_string())),
4265                o::next_attempt_at.eq(if failed { 0 } else { next_attempt_at }),
4266            ))
4267            .execute(self.conn)?;
4268        if !failed {
4269            sql_query(
4270                r#"
4271                update sync_crdt_update_log
4272                set status = 'pending'
4273                where status = 'flushed'
4274                  and client_commit_id = (
4275                    select client_commit_id from sync_outbox_commits where id = ?1
4276                  )
4277                "#,
4278            )
4279            .bind::<Text, _>(row_id)
4280            .execute(self.conn)?;
4281            refresh_all_crdt_document_counts(self.conn)?;
4282        }
4283        Ok(())
4284    }
4285
4286    fn insert_conflict(&mut self, outbox: &OutboxCommit, result: &OperationResult) -> Result<()> {
4287        sql_query(
4288            r#"
4289            insert into sync_conflicts (
4290                id, outbox_commit_id, client_commit_id, op_index, result_status,
4291                message, code, server_version, server_row_json, created_at,
4292                resolved_at, resolution
4293            ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, null, null)
4294            "#,
4295        )
4296        .bind::<diesel::sql_types::Text, _>(Uuid::new_v4().to_string())
4297        .bind::<diesel::sql_types::Text, _>(&outbox.id)
4298        .bind::<diesel::sql_types::Text, _>(&outbox.client_commit_id)
4299        .bind::<diesel::sql_types::Integer, _>(result.op_index)
4300        .bind::<diesel::sql_types::Text, _>(&result.status)
4301        .bind::<diesel::sql_types::Text, _>(
4302            result
4303                .message
4304                .clone()
4305                .or_else(|| result.error.clone())
4306                .unwrap_or_else(|| result.status.clone()),
4307        )
4308        .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(result.code.clone())
4309        .bind::<diesel::sql_types::Nullable<diesel::sql_types::BigInt>, _>(result.server_version)
4310        .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
4311            result.server_row.as_ref().map(Value::to_string),
4312        )
4313        .bind::<diesel::sql_types::BigInt, _>(now_ms())
4314        .execute(self.conn)?;
4315
4316        Ok(())
4317    }
4318
4319    fn upsert_auth_lease(&mut self, lease: &AuthLeaseRecord) -> Result<()> {
4320        use schema::sync_auth_leases::dsl as l;
4321
4322        let row = AuthLeaseRecordRow::from(lease);
4323        diesel::insert_into(l::sync_auth_leases)
4324            .values(&row)
4325            .on_conflict(l::lease_id)
4326            .do_update()
4327            .set(&row)
4328            .execute(self.conn)?;
4329        Ok(())
4330    }
4331
4332    fn auth_lease(&mut self, lease_id_value: &str) -> Result<Option<AuthLeaseRecord>> {
4333        use schema::sync_auth_leases::dsl as l;
4334
4335        let row = l::sync_auth_leases
4336            .select(AuthLeaseRecordRow::as_select())
4337            .filter(l::lease_id.eq(lease_id_value))
4338            .first::<AuthLeaseRecordRow>(self.conn)
4339            .optional()?;
4340        Ok(row.map(AuthLeaseRecord::from))
4341    }
4342
4343    fn active_auth_leases(
4344        &mut self,
4345        actor_id_value: Option<&str>,
4346        now_ms_value: i64,
4347    ) -> Result<Vec<AuthLeaseRecord>> {
4348        use schema::sync_auth_leases::dsl as l;
4349
4350        let mut query = l::sync_auth_leases
4351            .select(AuthLeaseRecordRow::as_select())
4352            .filter(l::status.eq("active"))
4353            .filter(l::not_before_ms.le(now_ms_value))
4354            .filter(l::expires_at_ms.gt(now_ms_value))
4355            .into_boxed();
4356        if let Some(actor_id_value) = actor_id_value {
4357            query = query.filter(l::actor_id.eq(actor_id_value));
4358        }
4359        let rows = query.order(l::expires_at_ms.asc()).load(self.conn)?;
4360        Ok(rows.into_iter().map(AuthLeaseRecord::from).collect())
4361    }
4362
4363    fn set_outbox_auth_lease(
4364        &mut self,
4365        client_commit_id_value: &str,
4366        provenance: Option<&AuthLeaseProvenance>,
4367    ) -> Result<()> {
4368        use schema::sync_outbox_commits::dsl as o;
4369
4370        let lease_token_value = match provenance {
4371            Some(lease) => lease.lease_token.clone().or_else(|| {
4372                self.auth_lease(&lease.lease_id)
4373                    .ok()
4374                    .flatten()
4375                    .map(|record| record.token)
4376            }),
4377            None => None,
4378        };
4379        let affected = diesel::update(
4380            o::sync_outbox_commits.filter(o::client_commit_id.eq(client_commit_id_value)),
4381        )
4382        .set((
4383            o::lease_id.eq(provenance.map(|lease| lease.lease_id.clone())),
4384            o::lease_expires_at_ms.eq(provenance.map(|lease| lease.lease_expires_at_ms)),
4385            o::lease_status_at_enqueue
4386                .eq(provenance.map(|lease| lease.lease_status_at_enqueue.clone())),
4387            o::lease_scope_summary_json
4388                .eq(provenance.and_then(|lease| lease.lease_scope_summary_json.clone())),
4389            o::lease_token.eq(lease_token_value),
4390        ))
4391        .execute(self.conn)?;
4392        if affected == 0 {
4393            return Err(SyncularError::storage(anyhow::anyhow!(
4394                "outbox commit {client_commit_id_value} does not exist"
4395            )));
4396        }
4397        Ok(())
4398    }
4399
4400    fn subscription_state(
4401        &mut self,
4402        state_id_value: &str,
4403        subscription_id_value: &str,
4404    ) -> Result<Option<SubscriptionState>> {
4405        use schema::sync_subscription_state::dsl as s;
4406
4407        let row: Option<SubscriptionStateRow> = s::sync_subscription_state
4408            .select(SubscriptionStateRow::as_select())
4409            .filter(s::state_id.eq(state_id_value))
4410            .filter(s::subscription_id.eq(subscription_id_value))
4411            .first(self.conn)
4412            .optional()?;
4413
4414        Ok(row.map(SubscriptionState::from))
4415    }
4416
4417    fn subscription_states(&mut self, state_id_value: &str) -> Result<Vec<SubscriptionState>> {
4418        use schema::sync_subscription_state::dsl as s;
4419
4420        let rows: Vec<SubscriptionStateRow> = s::sync_subscription_state
4421            .select(SubscriptionStateRow::as_select())
4422            .filter(s::state_id.eq(state_id_value))
4423            .order(s::subscription_id.asc())
4424            .load(self.conn)?;
4425
4426        Ok(rows.into_iter().map(SubscriptionState::from).collect())
4427    }
4428
4429    fn upsert_subscription_state(&mut self, state: &SubscriptionState) -> Result<()> {
4430        sql_query(
4431            r#"
4432            insert into sync_subscription_state (
4433                state_id, subscription_id, "table", scopes_json, params_json,
4434                cursor, bootstrap_state_json, status, created_at, updated_at
4435            ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
4436            on conflict (state_id, subscription_id) do update set
4437                "table" = excluded."table",
4438                scopes_json = excluded.scopes_json,
4439                params_json = excluded.params_json,
4440                cursor = excluded.cursor,
4441                bootstrap_state_json = excluded.bootstrap_state_json,
4442                status = excluded.status,
4443                updated_at = excluded.updated_at
4444            "#,
4445        )
4446        .bind::<diesel::sql_types::Text, _>(&state.state_id)
4447        .bind::<diesel::sql_types::Text, _>(&state.subscription_id)
4448        .bind::<diesel::sql_types::Text, _>(&state.table)
4449        .bind::<diesel::sql_types::Text, _>(&state.scopes_json)
4450        .bind::<diesel::sql_types::Text, _>(&state.params_json)
4451        .bind::<diesel::sql_types::BigInt, _>(state.cursor)
4452        .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
4453            state.bootstrap_state_json.clone(),
4454        )
4455        .bind::<diesel::sql_types::Text, _>(&state.status)
4456        .bind::<diesel::sql_types::BigInt, _>(now_ms())
4457        .bind::<diesel::sql_types::BigInt, _>(now_ms())
4458        .execute(self.conn)?;
4459
4460        Ok(())
4461    }
4462
4463    fn delete_subscription_state(
4464        &mut self,
4465        state_id_value: &str,
4466        subscription_id_value: &str,
4467    ) -> Result<()> {
4468        use schema::sync_subscription_state::dsl as s;
4469
4470        diesel::delete(
4471            s::sync_subscription_state
4472                .filter(s::state_id.eq(state_id_value))
4473                .filter(s::subscription_id.eq(subscription_id_value)),
4474        )
4475        .execute(self.conn)?;
4476        Ok(())
4477    }
4478
4479    fn verified_root(
4480        &mut self,
4481        state_id_value: &str,
4482        subscription_id_value: &str,
4483    ) -> Result<Option<VerifiedRoot>> {
4484        let row: Option<VerifiedRootRow> = sql_query(
4485            r#"
4486            select state_id, subscription_id, partition_id, commit_seq, root
4487            from sync_verified_roots
4488            where state_id = ?1 and subscription_id = ?2
4489            limit 1
4490            "#,
4491        )
4492        .bind::<Text, _>(state_id_value)
4493        .bind::<Text, _>(subscription_id_value)
4494        .get_result(self.conn)
4495        .optional()?;
4496        Ok(row.map(VerifiedRoot::from))
4497    }
4498
4499    fn verified_roots(&mut self, state_id_value: &str) -> Result<Vec<VerifiedRoot>> {
4500        let rows: Vec<VerifiedRootRow> = sql_query(
4501            r#"
4502            select state_id, subscription_id, partition_id, commit_seq, root
4503            from sync_verified_roots
4504            where state_id = ?1
4505            order by subscription_id asc
4506            "#,
4507        )
4508        .bind::<Text, _>(state_id_value)
4509        .load(self.conn)?;
4510        Ok(rows.into_iter().map(VerifiedRoot::from).collect())
4511    }
4512
4513    fn upsert_verified_root(&mut self, root: &VerifiedRoot) -> Result<()> {
4514        let now = now_ms();
4515        sql_query(
4516            r#"
4517            insert into sync_verified_roots (
4518                state_id, subscription_id, partition_id, commit_seq, root,
4519                created_at, updated_at
4520            ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7)
4521            on conflict (state_id, subscription_id) do update set
4522                partition_id = excluded.partition_id,
4523                commit_seq = excluded.commit_seq,
4524                root = excluded.root,
4525                updated_at = excluded.updated_at
4526            "#,
4527        )
4528        .bind::<Text, _>(&root.state_id)
4529        .bind::<Text, _>(&root.subscription_id)
4530        .bind::<Text, _>(&root.partition_id)
4531        .bind::<BigInt, _>(root.commit_seq)
4532        .bind::<Text, _>(&root.root)
4533        .bind::<BigInt, _>(now)
4534        .bind::<BigInt, _>(now)
4535        .execute(self.conn)?;
4536        Ok(())
4537    }
4538
4539    fn delete_verified_root(
4540        &mut self,
4541        state_id_value: &str,
4542        subscription_id_value: &str,
4543    ) -> Result<()> {
4544        sql_query(
4545            r#"
4546            delete from sync_verified_roots
4547            where state_id = ?1 and subscription_id = ?2
4548            "#,
4549        )
4550        .bind::<Text, _>(state_id_value)
4551        .bind::<Text, _>(subscription_id_value)
4552        .execute(self.conn)?;
4553        Ok(())
4554    }
4555
4556    fn crdt_state_vector_hints(
4557        &mut self,
4558        table: &str,
4559        scopes: &ScopeValues,
4560        limit: i64,
4561    ) -> Result<Vec<CrdtStateVectorHint>> {
4562        crdt_state_vector_hints_for_subscription(self.conn, self.app_schema, table, scopes, limit)
4563    }
4564
4565    fn clear_table_for_scopes(&mut self, table: &str, scopes: &ScopeValues) -> Result<()> {
4566        if is_encrypted_crdt_system_table(table) {
4567            return clear_encrypted_crdt_system_table_for_scopes(self.conn, table, scopes);
4568        }
4569        clear_app_table_for_scopes(self.conn, self.app_schema, table, scopes)
4570    }
4571
4572    fn clear_synced_rows_for_scopes(&mut self, table: &str, scopes: &ScopeValues) -> Result<i64> {
4573        clear_synced_app_rows_for_scopes(self.conn, self.app_schema, table, scopes)
4574    }
4575
4576    fn clear_table_for_scopes_preserving_local_crdt(
4577        &mut self,
4578        table: &str,
4579        scopes: &ScopeValues,
4580    ) -> Result<()> {
4581        if is_encrypted_crdt_system_table(table) {
4582            return clear_encrypted_crdt_system_table_for_scopes(self.conn, table, scopes);
4583        }
4584        clear_app_table_for_scopes_preserving_local_crdt(self.conn, self.app_schema, table, scopes)
4585    }
4586
4587    fn current_row_json(&mut self, table: &str, row_id: &str) -> Result<Option<Value>> {
4588        if is_encrypted_crdt_system_table(table) {
4589            return Ok(None);
4590        }
4591        current_app_row_json(self.conn, self.app_schema, table, row_id)
4592    }
4593
4594    fn upsert_row(
4595        &mut self,
4596        table: &str,
4597        row: &Value,
4598        fallback_version: Option<i64>,
4599    ) -> Result<()> {
4600        if is_encrypted_crdt_system_table(table) {
4601            let identity = encrypted_crdt_identity_column(table)?;
4602            let row_id = row.get(identity).and_then(Value::as_str).ok_or_else(|| {
4603                SyncularError::protocol_message(format!(
4604                    "encrypted CRDT row missing identity column {identity}"
4605                ))
4606            })?;
4607            let row = apply_encrypted_crdt_system_row(
4608                self.conn,
4609                table,
4610                row_id,
4611                Some(row),
4612                fallback_version,
4613            )?;
4614            materialize_encrypted_crdt_system_row(self.conn, self.app_schema, table, &row)?;
4615            return Ok(());
4616        }
4617
4618        let metadata = self.app_schema.table_metadata(table).ok_or_else(|| {
4619            SyncularError::config(format!("unknown generated app table: {table}"))
4620        })?;
4621        let row = materialize_row_for_metadata(table, None, row.clone(), metadata)?;
4622        let row = preserve_encrypted_crdt_materialized_columns(
4623            self.conn,
4624            self.app_schema,
4625            metadata,
4626            row,
4627        )?;
4628        upsert_app_row(self.conn, self.app_schema, table, &row, fallback_version)
4629    }
4630
4631    fn upsert_rows(
4632        &mut self,
4633        table: &str,
4634        rows: &[Value],
4635        fallback_version: Option<i64>,
4636    ) -> Result<()> {
4637        if rows.is_empty() {
4638            return Ok(());
4639        }
4640        if is_encrypted_crdt_system_table(table) {
4641            for row in rows {
4642                self.upsert_row(table, row, fallback_version)?;
4643            }
4644            return Ok(());
4645        }
4646
4647        let metadata = self.app_schema.table_metadata(table).ok_or_else(|| {
4648            SyncularError::config(format!("unknown generated app table: {table}"))
4649        })?;
4650        if metadata
4651            .crdt_yjs_fields
4652            .iter()
4653            .any(|field| field.sync_mode == "encrypted-update-log")
4654        {
4655            for row in rows {
4656                self.upsert_row(table, row, fallback_version)?;
4657            }
4658            return Ok(());
4659        }
4660
4661        let row_objects = rows
4662            .iter()
4663            .map(|row| {
4664                let row = materialize_row_for_metadata(table, None, row.clone(), metadata)?;
4665                row.as_object().cloned().ok_or_else(|| {
4666                    SyncularError::protocol_message(format!("row is not a JSON object: {row}"))
4667                })
4668            })
4669            .collect::<Result<Vec<_>>>()?;
4670
4671        upsert_rows_generic_batch(self.conn, metadata, &row_objects, fallback_version)
4672    }
4673
4674    fn upsert_snapshot_chunk_rows(
4675        &mut self,
4676        table: &str,
4677        rows: &SnapshotChunkRows,
4678        fallback_version: Option<i64>,
4679    ) -> Result<()> {
4680        match rows {
4681            SnapshotChunkRows::Json(rows) => self.upsert_rows(table, rows, fallback_version),
4682            SnapshotChunkRows::Binary(rows) => {
4683                if fallback_version.is_some() || is_encrypted_crdt_system_table(table) {
4684                    let rows = rows.clone().into_value_rows();
4685                    return self.upsert_rows(table, &rows, fallback_version);
4686                }
4687
4688                let metadata = self.app_schema.table_metadata(table).ok_or_else(|| {
4689                    SyncularError::config(format!("unknown generated app table: {table}"))
4690                })?;
4691                if metadata
4692                    .crdt_yjs_fields
4693                    .iter()
4694                    .any(|field| field.sync_mode == "encrypted-update-log")
4695                {
4696                    let rows = rows.clone().into_value_rows();
4697                    return self.upsert_rows(table, &rows, fallback_version);
4698                }
4699
4700                upsert_binary_snapshot_rows_batch(self.conn, metadata, rows)
4701            }
4702            SnapshotChunkRows::BinaryPayload(rows) => {
4703                let rows = rows.clone().into_decoded_rows()?;
4704                if fallback_version.is_some() || is_encrypted_crdt_system_table(table) {
4705                    let rows = rows.into_value_rows();
4706                    return self.upsert_rows(table, &rows, fallback_version);
4707                }
4708
4709                let metadata = self.app_schema.table_metadata(table).ok_or_else(|| {
4710                    SyncularError::config(format!("unknown generated app table: {table}"))
4711                })?;
4712                if metadata
4713                    .crdt_yjs_fields
4714                    .iter()
4715                    .any(|field| field.sync_mode == "encrypted-update-log")
4716                {
4717                    let rows = rows.into_value_rows();
4718                    return self.upsert_rows(table, &rows, fallback_version);
4719                }
4720
4721                upsert_binary_snapshot_rows_batch(self.conn, metadata, &rows)
4722            }
4723        }
4724    }
4725
4726    fn apply_change(&mut self, change: &SyncChange) -> Result<()> {
4727        if is_encrypted_crdt_system_table(&change.table) {
4728            if change.op == "delete" {
4729                return delete_encrypted_crdt_system_row(self.conn, &change.table, &change.row_id);
4730            }
4731            let row = apply_encrypted_crdt_system_row(
4732                self.conn,
4733                &change.table,
4734                &change.row_id,
4735                change.row_json.as_ref(),
4736                change.row_version,
4737            )?;
4738            materialize_encrypted_crdt_system_row(self.conn, self.app_schema, &change.table, &row)?;
4739            return Ok(());
4740        }
4741
4742        if change.op == "upsert" {
4743            let metadata = self
4744                .app_schema
4745                .table_metadata(&change.table)
4746                .ok_or_else(|| {
4747                    SyncularError::config(format!("unknown generated app table: {}", change.table))
4748                })?;
4749            if let Some(row) = change.row_json.as_ref() {
4750                let mut change = change.clone();
4751                change.row_json = Some(if has_yjs_payload(row) {
4752                    let existing_row = current_app_row_json(
4753                        self.conn,
4754                        self.app_schema,
4755                        &change.table,
4756                        &change.row_id,
4757                    )?;
4758                    transform_local_row_for_metadata(
4759                        &change.table,
4760                        &change.row_id,
4761                        None,
4762                        Some(row),
4763                        existing_row.as_ref(),
4764                        metadata,
4765                    )?
4766                    .ok_or_else(|| {
4767                        SyncularError::protocol_message(format!(
4768                            "server-merge Yjs change for {}.{} did not materialize a row",
4769                            change.table, change.row_id
4770                        ))
4771                    })?
4772                } else {
4773                    materialize_row_for_metadata(
4774                        &change.table,
4775                        Some(&change.row_id),
4776                        row.clone(),
4777                        metadata,
4778                    )?
4779                });
4780                return apply_app_change(self.conn, self.app_schema, &change);
4781            }
4782        }
4783        apply_app_change(self.conn, self.app_schema, change)
4784    }
4785}
4786
4787fn has_yjs_payload(value: &Value) -> bool {
4788    value
4789        .as_object()
4790        .is_some_and(|object| object.contains_key(YJS_PAYLOAD_KEY))
4791}