1use crate::app_schema::{
2 checksum, default_app_schema, split_sql_statements, AppSchema, AppTableMetadata,
3};
4use crate::auth_lease_selection::{
5 app_table_operation_scope,
6 select_active_auth_lease_for_operations as select_auth_lease_for_operation_scopes,
7 system_table_operation_scope, ActiveAuthLeasePolicy, MutationOperationScope,
8};
9use crate::binary_snapshot::{BinarySnapshotCell, DecodedBinarySnapshotRows, SnapshotChunkRows};
10use crate::client::SubscriptionSpec;
11use crate::command_history::{CommandHistoryEntry, CommandHistoryRecord, CommandHistoryState};
12use crate::compaction::{
13 required_compaction_cutoff, tombstone_delete_statements, StorageCompactionOptions,
14 StorageCompactionReport,
15};
16use crate::crdt_field::{
17 validate_crdt_field, CrdtDocumentSnapshot, CrdtField, CrdtFieldId, CrdtUpdateLogEntry,
18 CrdtUpdateOrigin, CrdtUpdateStatus,
19};
20use crate::crdt_yjs::{
21 materialize_row_for_metadata, transform_local_row_for_metadata, yjs_state_vector_base64,
22 YjsUpdateEnvelope, YJS_PAYLOAD_KEY,
23};
24use crate::encrypted_crdt::{
25 apply_encrypted_crdt_plaintext_to_row, encrypted_crdt_identity_column,
26 encrypted_crdt_normalize_row, encrypted_crdt_row_matches_scopes, encrypted_crdt_scopes_json,
27 is_encrypted_crdt_system_table, EncryptedCrdtStreamStats, CRDT_CHECKPOINTS_TABLE,
28 CRDT_UPDATES_TABLE,
29};
30use crate::error::{ErrorKind, Result, SyncularError};
31#[cfg(feature = "demo-todo-native-fixture")]
32use crate::fixtures::todo::tasks::{insert_local_task, list_tasks, patch_local_task_title};
33use crate::limits::validate_unresolved_outbox_capacity;
34use crate::protocol::*;
35use crate::protocol::{sync_operations_json_for_outbox, validate_pending_mutation_batch_size};
36use crate::runtime_schema::RUNTIME_SYSTEM_SCHEMA_SQL;
37use crate::schema;
38use crate::store::{
39 now_ms, AppSchemaState, AppliedMigration, AuthLeaseRecord, BlobHealthSummary, ConflictSummary,
40 CrdtHealthSummary, OutboxCommit, OutboxSummary, ScopedRowsHealthSummary, ScopedRowsTableHealth,
41 SubscriptionState, SyncStateStore, SyncStore, SyncStoreTx, VerifiedRoot, APP_SCHEMA_ID,
42 BLOB_UPLOAD_STALE_TIMEOUT_MS, MAX_BLOB_UPLOAD_RETRIES, MAX_SYNC_RETRIES,
43 SQLITE_BUSY_TIMEOUT_MS, SYNC_SENDING_TIMEOUT_MS,
44};
45#[cfg(feature = "demo-todo-fixture")]
46use crate::store::{DemoTaskStore, Task};
47use diesel::connection::SimpleConnection;
48use diesel::prelude::*;
49use diesel::sql_query;
50use diesel::sql_types::{BigInt, Binary, Bool, Integer, Nullable, Text};
51use diesel::sqlite::SqliteConnection;
52use serde::{Deserialize, Serialize};
53use serde_json::{Map, Value};
54use uuid::Uuid;
55
56const SNAPSHOT_UPSERT_BATCH_ROWS: usize = 128;
57
58#[derive(Debug, Clone, Queryable, Selectable)]
59#[diesel(table_name = schema::sync_outbox_commits)]
60#[allow(dead_code)]
61struct OutboxCommitRow {
62 id: String,
63 client_commit_id: String,
64 status: String,
65 operations_json: String,
66 last_response_json: Option<String>,
67 error: Option<String>,
68 created_at: i64,
69 updated_at: i64,
70 attempt_count: i32,
71 acked_commit_seq: Option<i64>,
72 schema_version: i32,
73 next_attempt_at: i64,
74 lease_id: Option<String>,
75 lease_expires_at_ms: Option<i64>,
76 lease_status_at_enqueue: Option<String>,
77 lease_scope_summary_json: Option<String>,
78 lease_token: Option<String>,
79}
80
81#[derive(Debug, Clone, QueryableByName)]
82struct CommandHistoryRow {
83 #[diesel(sql_type = Text)]
84 id: String,
85 #[diesel(sql_type = Text)]
86 mutation_scope: String,
87 #[diesel(sql_type = Text)]
88 state: String,
89 #[diesel(sql_type = Text)]
90 entries_json: String,
91 #[diesel(sql_type = Text)]
92 client_commit_id: String,
93 #[diesel(sql_type = Nullable<Text>)]
94 undo_client_commit_id: Option<String>,
95 #[diesel(sql_type = Nullable<Text>)]
96 redo_client_commit_id: Option<String>,
97 #[diesel(sql_type = BigInt)]
98 created_at: i64,
99 #[diesel(sql_type = BigInt)]
100 updated_at: i64,
101}
102
103#[derive(Debug, Clone)]
104struct PendingCommandHistoryEntry {
105 table: String,
106 row_id: String,
107 before: Option<Value>,
108}
109
110impl TryFrom<CommandHistoryRow> for CommandHistoryRecord {
111 type Error = SyncularError;
112
113 fn try_from(row: CommandHistoryRow) -> Result<Self> {
114 let entries =
115 serde_json::from_str::<Vec<CommandHistoryEntry>>(&row.entries_json).map_err(|err| {
116 SyncularError::storage(err).context("deserialize sync_command_history.entries_json")
117 })?;
118 Ok(Self {
119 id: row.id,
120 mutation_scope: row.mutation_scope,
121 state: CommandHistoryState::try_from(row.state.as_str())?,
122 entries,
123 client_commit_id: row.client_commit_id,
124 undo_client_commit_id: row.undo_client_commit_id,
125 redo_client_commit_id: row.redo_client_commit_id,
126 created_at: row.created_at,
127 updated_at: row.updated_at,
128 })
129 }
130}
131
132fn insert_command_history_record(
133 conn: &mut SqliteConnection,
134 mutation_scope: &str,
135 entries: &[CommandHistoryEntry],
136 receipt: &MutationReceipt,
137) -> Result<CommandHistoryRecord> {
138 let id = Uuid::new_v4().to_string();
139 let entries_json = serde_json::to_string(entries)?;
140 let created_at = now_ms();
141 sql_query("delete from sync_command_history where state = 'undone'").execute(conn)?;
142 sql_query(
143 r#"
144 insert into sync_command_history (
145 id,
146 mutation_scope,
147 state,
148 entries_json,
149 client_commit_id,
150 undo_client_commit_id,
151 redo_client_commit_id,
152 created_at,
153 updated_at
154 )
155 values (?1, ?2, 'done', ?3, ?4, null, null, ?5, ?5)
156 "#,
157 )
158 .bind::<Text, _>(&id)
159 .bind::<Text, _>(mutation_scope)
160 .bind::<Text, _>(&entries_json)
161 .bind::<Text, _>(&receipt.client_commit_id)
162 .bind::<BigInt, _>(created_at)
163 .execute(conn)?;
164 Ok(CommandHistoryRecord {
165 id,
166 mutation_scope: mutation_scope.to_string(),
167 state: CommandHistoryState::Done,
168 entries: entries.to_vec(),
169 client_commit_id: receipt.client_commit_id.clone(),
170 undo_client_commit_id: None,
171 redo_client_commit_id: None,
172 created_at,
173 updated_at: created_at,
174 })
175}
176
177#[derive(Debug, Clone, QueryableByName)]
178struct CrdtDocumentSnapshotRow {
179 #[diesel(sql_type = Text)]
180 document_key: String,
181 #[diesel(sql_type = Text)]
182 app_table: String,
183 #[diesel(sql_type = Text)]
184 row_id: String,
185 #[diesel(sql_type = Text)]
186 field_name: String,
187 #[diesel(sql_type = Text)]
188 state_column: String,
189 #[diesel(sql_type = Text)]
190 sync_mode: String,
191 #[diesel(sql_type = Nullable<Text>)]
192 state_base64: Option<String>,
193 #[diesel(sql_type = Text)]
194 state_vector_base64: String,
195 #[diesel(sql_type = BigInt)]
196 pending_updates: i64,
197 #[diesel(sql_type = BigInt)]
198 flushed_updates: i64,
199 #[diesel(sql_type = BigInt)]
200 acked_updates: i64,
201 #[diesel(sql_type = BigInt)]
202 log_updates: i64,
203 #[diesel(sql_type = BigInt)]
204 updated_at: i64,
205 #[diesel(sql_type = Nullable<BigInt>)]
206 compacted_at: Option<i64>,
207}
208
209#[derive(Debug, Clone, QueryableByName)]
210struct CrdtUpdateLogRow {
211 #[diesel(sql_type = BigInt)]
212 id: i64,
213 #[diesel(sql_type = Text)]
214 document_key: String,
215 #[diesel(sql_type = Text)]
216 update_id: String,
217 #[diesel(sql_type = Nullable<Text>)]
218 client_commit_id: Option<String>,
219 #[diesel(sql_type = Text)]
220 origin: String,
221 #[diesel(sql_type = Text)]
222 status: String,
223 #[diesel(sql_type = Text)]
224 update_base64: String,
225 #[diesel(sql_type = Text)]
226 state_vector_base64: String,
227 #[diesel(sql_type = BigInt)]
228 created_at: i64,
229 #[diesel(sql_type = Nullable<BigInt>)]
230 flushed_at: Option<i64>,
231 #[diesel(sql_type = Nullable<BigInt>)]
232 acked_at: Option<i64>,
233}
234
235impl From<OutboxCommitRow> for OutboxCommit {
236 fn from(row: OutboxCommitRow) -> Self {
237 Self {
238 id: row.id,
239 client_commit_id: row.client_commit_id,
240 status: row.status,
241 operations_json: row.operations_json,
242 last_response_json: row.last_response_json,
243 error: row.error,
244 created_at: row.created_at,
245 updated_at: row.updated_at,
246 attempt_count: row.attempt_count,
247 acked_commit_seq: row.acked_commit_seq,
248 schema_version: row.schema_version,
249 next_attempt_at: row.next_attempt_at,
250 auth_lease: auth_lease_provenance_from_columns(
251 row.lease_id,
252 row.lease_expires_at_ms,
253 row.lease_status_at_enqueue,
254 row.lease_scope_summary_json,
255 row.lease_token,
256 ),
257 }
258 }
259}
260
261fn auth_lease_provenance_from_columns(
262 lease_id: Option<String>,
263 lease_expires_at_ms: Option<i64>,
264 lease_status_at_enqueue: Option<String>,
265 lease_scope_summary_json: Option<String>,
266 lease_token: Option<String>,
267) -> Option<AuthLeaseProvenance> {
268 Some(AuthLeaseProvenance {
269 lease_id: lease_id?,
270 lease_expires_at_ms: lease_expires_at_ms?,
271 lease_status_at_enqueue: lease_status_at_enqueue?,
272 lease_scope_summary_json,
273 lease_token,
274 })
275}
276
277impl TryFrom<CrdtDocumentSnapshotRow> for CrdtDocumentSnapshot {
278 type Error = SyncularError;
279
280 fn try_from(row: CrdtDocumentSnapshotRow) -> Result<Self> {
281 Ok(Self {
282 document_key: row.document_key,
283 table: row.app_table,
284 row_id: row.row_id,
285 field: row.field_name,
286 state_column: row.state_column,
287 sync_mode: crdt_sync_mode_from_str(&row.sync_mode)?,
288 state_base64: row.state_base64,
289 state_vector_base64: row.state_vector_base64,
290 pending_updates: row.pending_updates,
291 flushed_updates: row.flushed_updates,
292 acked_updates: row.acked_updates,
293 log_updates: row.log_updates,
294 updated_at: row.updated_at,
295 compacted_at: row.compacted_at,
296 })
297 }
298}
299
300impl TryFrom<CrdtUpdateLogRow> for CrdtUpdateLogEntry {
301 type Error = SyncularError;
302
303 fn try_from(row: CrdtUpdateLogRow) -> Result<Self> {
304 Ok(Self {
305 id: row.id,
306 document_key: row.document_key,
307 update_id: row.update_id,
308 client_commit_id: row.client_commit_id,
309 origin: crdt_update_origin_from_str(&row.origin)?,
310 status: crdt_update_status_from_str(&row.status)?,
311 update_base64: row.update_base64,
312 state_vector_base64: row.state_vector_base64,
313 created_at: row.created_at,
314 flushed_at: row.flushed_at,
315 acked_at: row.acked_at,
316 })
317 }
318}
319
320#[derive(Insertable)]
321#[diesel(table_name = schema::sync_outbox_commits)]
322struct NewOutboxCommit {
323 id: String,
324 client_commit_id: String,
325 status: String,
326 operations_json: String,
327 last_response_json: Option<String>,
328 error: Option<String>,
329 created_at: i64,
330 updated_at: i64,
331 attempt_count: i32,
332 acked_commit_seq: Option<i64>,
333 schema_version: i32,
334 next_attempt_at: i64,
335 lease_id: Option<String>,
336 lease_expires_at_ms: Option<i64>,
337 lease_status_at_enqueue: Option<String>,
338 lease_scope_summary_json: Option<String>,
339 lease_token: Option<String>,
340}
341
342#[derive(Debug, Clone, Queryable, Selectable, Insertable, AsChangeset)]
343#[diesel(table_name = schema::sync_auth_leases)]
344struct AuthLeaseRecordRow {
345 lease_id: String,
346 kid: String,
347 actor_id: String,
348 issued_at_ms: i64,
349 not_before_ms: i64,
350 expires_at_ms: i64,
351 schema_version: i32,
352 payload_json: String,
353 token: String,
354 status: String,
355 last_validation_error: Option<String>,
356 created_at_ms: i64,
357 updated_at_ms: i64,
358}
359
360impl From<AuthLeaseRecordRow> for AuthLeaseRecord {
361 fn from(row: AuthLeaseRecordRow) -> Self {
362 Self {
363 lease_id: row.lease_id,
364 kid: row.kid,
365 actor_id: row.actor_id,
366 issued_at_ms: row.issued_at_ms,
367 not_before_ms: row.not_before_ms,
368 expires_at_ms: row.expires_at_ms,
369 schema_version: row.schema_version,
370 payload_json: row.payload_json,
371 token: row.token,
372 status: row.status,
373 last_validation_error: row.last_validation_error,
374 created_at_ms: row.created_at_ms,
375 updated_at_ms: row.updated_at_ms,
376 }
377 }
378}
379
380impl From<&AuthLeaseRecord> for AuthLeaseRecordRow {
381 fn from(record: &AuthLeaseRecord) -> Self {
382 Self {
383 lease_id: record.lease_id.clone(),
384 kid: record.kid.clone(),
385 actor_id: record.actor_id.clone(),
386 issued_at_ms: record.issued_at_ms,
387 not_before_ms: record.not_before_ms,
388 expires_at_ms: record.expires_at_ms,
389 schema_version: record.schema_version,
390 payload_json: record.payload_json.clone(),
391 token: record.token.clone(),
392 status: record.status.clone(),
393 last_validation_error: record.last_validation_error.clone(),
394 created_at_ms: record.created_at_ms,
395 updated_at_ms: record.updated_at_ms,
396 }
397 }
398}
399
400#[derive(Debug, Clone, Queryable, Selectable)]
401#[diesel(table_name = schema::sync_subscription_state)]
402#[allow(dead_code)]
403struct SubscriptionStateRow {
404 state_id: String,
405 subscription_id: String,
406 table_name: String,
407 scopes_json: String,
408 params_json: String,
409 cursor: i64,
410 bootstrap_state_json: Option<String>,
411 status: String,
412 created_at: i64,
413 updated_at: i64,
414}
415
416impl From<SubscriptionStateRow> for SubscriptionState {
417 fn from(row: SubscriptionStateRow) -> Self {
418 Self {
419 state_id: row.state_id,
420 subscription_id: row.subscription_id,
421 table: row.table_name,
422 scopes_json: row.scopes_json,
423 params_json: row.params_json,
424 cursor: row.cursor,
425 bootstrap_state_json: row.bootstrap_state_json,
426 status: row.status,
427 }
428 }
429}
430
431#[derive(Debug, Clone, QueryableByName)]
432struct VerifiedRootRow {
433 #[diesel(sql_type = Text)]
434 state_id: String,
435 #[diesel(sql_type = Text)]
436 subscription_id: String,
437 #[diesel(sql_type = Text)]
438 partition_id: String,
439 #[diesel(sql_type = BigInt)]
440 commit_seq: i64,
441 #[diesel(sql_type = Text)]
442 root: String,
443}
444
445impl From<VerifiedRootRow> for VerifiedRoot {
446 fn from(row: VerifiedRootRow) -> Self {
447 Self {
448 state_id: row.state_id,
449 subscription_id: row.subscription_id,
450 partition_id: row.partition_id,
451 commit_seq: row.commit_seq,
452 root: row.root,
453 }
454 }
455}
456
457#[derive(QueryableByName)]
458struct MigrationVersionRow {
459 #[diesel(sql_type = Text)]
460 version: String,
461 #[diesel(sql_type = Text)]
462 checksum: String,
463}
464
465#[derive(QueryableByName)]
466struct AppliedMigrationRow {
467 #[diesel(sql_type = Text)]
468 version: String,
469 #[diesel(sql_type = Text)]
470 name: String,
471 #[diesel(sql_type = Text)]
472 checksum: String,
473 #[diesel(sql_type = BigInt)]
474 applied_at: i64,
475}
476
477impl From<AppliedMigrationRow> for AppliedMigration {
478 fn from(row: AppliedMigrationRow) -> Self {
479 Self {
480 version: row.version,
481 name: row.name,
482 checksum: row.checksum,
483 applied_at: row.applied_at,
484 }
485 }
486}
487
488#[derive(QueryableByName)]
489struct AppSchemaStateRow {
490 #[diesel(sql_type = Integer)]
491 schema_version: i32,
492 #[diesel(sql_type = BigInt)]
493 updated_at: i64,
494}
495
496#[derive(QueryableByName)]
497struct OutboxSummaryRow {
498 #[diesel(sql_type = Text)]
499 client_commit_id: String,
500 #[diesel(sql_type = Text)]
501 status: String,
502 #[diesel(sql_type = Integer)]
503 schema_version: i32,
504 #[diesel(sql_type = Nullable<Text>)]
505 lease_id: Option<String>,
506 #[diesel(sql_type = Nullable<BigInt>)]
507 lease_expires_at_ms: Option<i64>,
508 #[diesel(sql_type = Nullable<Text>)]
509 lease_status_at_enqueue: Option<String>,
510 #[diesel(sql_type = Nullable<Text>)]
511 lease_scope_summary_json: Option<String>,
512 #[diesel(sql_type = Nullable<Text>)]
513 lease_token: Option<String>,
514}
515
516impl From<OutboxSummaryRow> for OutboxSummary {
517 fn from(row: OutboxSummaryRow) -> Self {
518 Self {
519 client_commit_id: row.client_commit_id,
520 status: row.status,
521 schema_version: row.schema_version,
522 auth_lease: auth_lease_provenance_from_columns(
523 row.lease_id,
524 row.lease_expires_at_ms,
525 row.lease_status_at_enqueue,
526 row.lease_scope_summary_json,
527 row.lease_token,
528 ),
529 }
530 }
531}
532
533#[derive(QueryableByName)]
534struct NextRetryAtRow {
535 #[diesel(sql_type = diesel::sql_types::Nullable<BigInt>)]
536 next_attempt_at: Option<i64>,
537}
538
539#[derive(QueryableByName)]
540struct ConflictSummaryRow {
541 #[diesel(sql_type = Text)]
542 id: String,
543 #[diesel(sql_type = Text)]
544 client_commit_id: String,
545 #[diesel(sql_type = Integer)]
546 op_index: i32,
547 #[diesel(sql_type = Text)]
548 result_status: String,
549 #[diesel(sql_type = Text)]
550 message: String,
551 #[diesel(sql_type = diesel::sql_types::Nullable<Text>)]
552 code: Option<String>,
553 #[diesel(sql_type = diesel::sql_types::Nullable<BigInt>)]
554 server_version: Option<i64>,
555 #[diesel(sql_type = diesel::sql_types::Nullable<BigInt>)]
556 resolved_at: Option<i64>,
557 #[diesel(sql_type = diesel::sql_types::Nullable<Text>)]
558 resolution: Option<String>,
559}
560
561#[derive(QueryableByName)]
562struct ConflictRetryRow {
563 #[diesel(sql_type = BigInt)]
564 server_version: i64,
565 #[diesel(sql_type = Integer)]
566 op_index: i32,
567 #[diesel(sql_type = Text)]
568 operations_json: String,
569}
570
571#[derive(QueryableByName)]
572struct BlobBodyRow {
573 #[diesel(sql_type = Binary)]
574 body: Vec<u8>,
575}
576
577#[derive(QueryableByName)]
578struct BlobFoundRow {
579 #[diesel(sql_type = Integer)]
580 found: i32,
581}
582
583#[derive(QueryableByName)]
584struct JsonObjectRow {
585 #[diesel(sql_type = Text)]
586 row_json: String,
587}
588
589#[derive(QueryableByName)]
590struct EncryptedCrdtScopeRow {
591 #[diesel(sql_type = Text)]
592 identity: String,
593 #[diesel(sql_type = Text)]
594 scopes: String,
595}
596
597#[derive(QueryableByName)]
598struct EncryptedCrdtStreamStatsRow {
599 #[diesel(sql_type = BigInt)]
600 update_count: i64,
601 #[diesel(sql_type = BigInt)]
602 checkpoint_count: i64,
603 #[diesel(sql_type = BigInt)]
604 checkpointable_update_count: i64,
605 #[diesel(sql_type = diesel::sql_types::Nullable<BigInt>)]
606 max_server_seq: Option<i64>,
607 #[diesel(sql_type = diesel::sql_types::Nullable<BigInt>)]
608 latest_checkpoint_covers_seq: Option<i64>,
609}
610
611impl From<EncryptedCrdtStreamStatsRow> for EncryptedCrdtStreamStats {
612 fn from(row: EncryptedCrdtStreamStatsRow) -> Self {
613 Self {
614 update_count: row.update_count,
615 checkpoint_count: row.checkpoint_count,
616 checkpointable_update_count: row.checkpointable_update_count,
617 max_server_seq: row.max_server_seq,
618 latest_checkpoint_covers_seq: row.latest_checkpoint_covers_seq,
619 }
620 }
621}
622
623#[derive(QueryableByName)]
624pub struct PendingBlobUploadRow {
625 #[diesel(sql_type = Text)]
626 pub hash: String,
627 #[diesel(sql_type = BigInt)]
628 pub size: i64,
629 #[diesel(sql_type = Text)]
630 pub mime_type: String,
631 #[diesel(sql_type = Binary)]
632 pub body: Vec<u8>,
633 #[diesel(sql_type = Integer)]
634 pub encrypted: i32,
635 #[diesel(sql_type = Nullable<Text>)]
636 pub key_id: Option<String>,
637 #[diesel(sql_type = Integer)]
638 pub attempt_count: i32,
639}
640
641#[derive(QueryableByName)]
642struct BlobQueueStatsRow {
643 #[diesel(sql_type = Text)]
644 status: String,
645 #[diesel(sql_type = BigInt)]
646 count: i64,
647}
648
649#[derive(QueryableByName)]
650struct BlobCacheStatsRow {
651 #[diesel(sql_type = BigInt)]
652 count: i64,
653 #[diesel(sql_type = BigInt)]
654 total_bytes: i64,
655}
656
657#[derive(QueryableByName)]
658struct BlobCacheEntryRow {
659 #[diesel(sql_type = Text)]
660 hash: String,
661 #[diesel(sql_type = BigInt)]
662 size: i64,
663}
664
665#[derive(QueryableByName)]
666struct CountRow {
667 #[diesel(sql_type = BigInt)]
668 count: i64,
669}
670
671#[derive(QueryableByName)]
672struct StringValueRow {
673 #[diesel(sql_type = Text)]
674 value: String,
675}
676
677#[derive(QueryableByName)]
678struct ColumnNameRow {
679 #[diesel(sql_type = Text)]
680 name: String,
681}
682
683#[derive(QueryableByName)]
684struct NullableStringValueRow {
685 #[diesel(sql_type = Nullable<Text>)]
686 value: Option<String>,
687}
688
689#[derive(QueryableByName)]
690struct CrdtHealthStatsRow {
691 #[diesel(sql_type = BigInt)]
692 document_count: i64,
693 #[diesel(sql_type = BigInt)]
694 pending_updates: i64,
695 #[diesel(sql_type = BigInt)]
696 flushed_updates: i64,
697 #[diesel(sql_type = BigInt)]
698 acked_updates: i64,
699 #[diesel(sql_type = BigInt)]
700 log_updates: i64,
701}
702
703#[derive(QueryableByName)]
704struct CrdtDocumentIdentityRow {
705 #[diesel(sql_type = Text)]
706 app_table: String,
707 #[diesel(sql_type = Text)]
708 row_id: String,
709}
710
711#[derive(Debug, Clone, Serialize, Deserialize)]
712#[serde(rename_all = "camelCase")]
713pub struct BlobUploadQueueStats {
714 pub pending: i64,
715 pub uploading: i64,
716 pub failed: i64,
717}
718
719#[derive(Debug, Clone, Serialize, Deserialize)]
720#[serde(rename_all = "camelCase")]
721pub struct BlobCacheStats {
722 pub count: i64,
723 pub total_bytes: i64,
724}
725
726#[derive(Debug, Clone, Serialize, Deserialize)]
727#[serde(rename_all = "camelCase")]
728pub struct BlobUploadQueueResult {
729 pub uploaded: i32,
730 pub failed: i32,
731}
732
733impl From<ConflictSummaryRow> for ConflictSummary {
734 fn from(row: ConflictSummaryRow) -> Self {
735 Self {
736 id: row.id,
737 client_commit_id: row.client_commit_id,
738 op_index: row.op_index,
739 result_status: row.result_status,
740 message: row.message,
741 code: row.code,
742 server_version: row.server_version,
743 resolved_at: row.resolved_at,
744 resolution: row.resolution,
745 }
746 }
747}
748
749pub struct DieselSqliteStore {
750 conn: SqliteConnection,
751 app_schema: AppSchema,
752}
753
754pub struct DieselSqliteTx<'a> {
755 conn: &'a mut SqliteConnection,
756 app_schema: AppSchema,
757}
758
759#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
760pub struct SqliteRuntimePragmaReport {
761 pub journal_mode: String,
762 pub foreign_keys: i32,
763 pub busy_timeout: i32,
764 pub synchronous: i32,
765}
766
767#[derive(QueryableByName)]
768struct JournalModePragmaRow {
769 #[diesel(sql_type = Text)]
770 journal_mode: String,
771}
772
773#[derive(QueryableByName)]
774struct ForeignKeysPragmaRow {
775 #[diesel(sql_type = Integer)]
776 foreign_keys: i32,
777}
778
779#[derive(QueryableByName)]
780struct BusyTimeoutPragmaRow {
781 #[diesel(sql_type = Integer)]
782 timeout: i32,
783}
784
785#[derive(QueryableByName)]
786struct SynchronousPragmaRow {
787 #[diesel(sql_type = Integer)]
788 synchronous: i32,
789}
790
791impl DieselSqliteStore {
792 pub fn open(path: &str) -> Result<Self> {
793 Self::open_with_schema(path, default_app_schema())
794 }
795
796 pub fn open_with_schema(path: &str, app_schema: AppSchema) -> Result<Self> {
797 let mut conn = SqliteConnection::establish(path).map_err(|err| {
798 SyncularError::storage(err).context(format!("open sqlite database at {path}"))
799 })?;
800 apply_sqlite_runtime_pragmas(&mut conn)?;
801 let mut store = Self { conn, app_schema };
802 store.ensure_schema()?;
803 Ok(store)
804 }
805
806 pub fn ensure_schema(&mut self) -> Result<()> {
807 sql_query(
808 r#"
809 create table if not exists sync_migrations (
810 version text primary key,
811 name text not null,
812 checksum text not null,
813 applied_at bigint not null
814 )
815 "#,
816 )
817 .execute(&mut self.conn)?;
818 self.ensure_app_schema_state_table()?;
819 self.reject_future_app_schema_state()?;
820 self.ensure_runtime_system_schema()?;
821
822 for migration in self.app_schema.migrations {
823 let applied = sql_query(
824 r#"
825 select version, checksum
826 from sync_migrations
827 where version = ?1
828 limit 1
829 "#,
830 )
831 .bind::<Text, _>(migration.version)
832 .load::<MigrationVersionRow>(&mut self.conn)?
833 .into_iter()
834 .next();
835 let expected_checksum = checksum(migration.up_sql);
836
837 if let Some(applied) = applied {
838 if applied.checksum != expected_checksum {
839 return Err(SyncularError::schema(format!(
840 "migration {} checksum mismatch",
841 applied.version
842 )));
843 }
844 continue;
845 }
846
847 self.conn.transaction::<(), SyncularError, _>(|conn| {
848 for statement in split_sql_statements(migration.up_sql) {
849 sql_query(statement).execute(conn)?;
850 }
851 sql_query(
852 r#"
853 insert into sync_migrations (version, name, checksum, applied_at)
854 values (?1, ?2, ?3, ?4)
855 "#,
856 )
857 .bind::<Text, _>(migration.version)
858 .bind::<Text, _>(migration.name)
859 .bind::<Text, _>(&expected_checksum)
860 .bind::<BigInt, _>(now_ms())
861 .execute(conn)?;
862 Ok(())
863 })?;
864 }
865
866 self.record_app_schema_state()?;
867 Ok(())
868 }
869
870 fn ensure_app_schema_state_table(&mut self) -> Result<()> {
871 sql_query(
872 r#"
873 create table if not exists syncular_app_schema (
874 schema_id text primary key,
875 schema_version integer not null,
876 updated_at bigint not null
877 )
878 "#,
879 )
880 .execute(&mut self.conn)?;
881 Ok(())
882 }
883
884 fn app_schema_state_row(&mut self) -> Result<Option<AppSchemaStateRow>> {
885 Ok(sql_query(
886 r#"
887 select schema_version, updated_at
888 from syncular_app_schema
889 where schema_id = ?1
890 limit 1
891 "#,
892 )
893 .bind::<Text, _>(APP_SCHEMA_ID)
894 .load::<AppSchemaStateRow>(&mut self.conn)?
895 .into_iter()
896 .next())
897 }
898
899 fn reject_future_app_schema_state(&mut self) -> Result<()> {
900 let current = self.app_schema.current_schema_version();
901 if let Some(row) = self.app_schema_state_row()? {
902 if row.schema_version > current {
903 return Err(SyncularError::schema(format!(
904 "Syncular app schema version mismatch: local {}, generated {}",
905 row.schema_version, current
906 )));
907 }
908 }
909 Ok(())
910 }
911
912 fn record_app_schema_state(&mut self) -> Result<()> {
913 let current = self.app_schema.current_schema_version();
914 sql_query(
915 r#"
916 insert into syncular_app_schema (schema_id, schema_version, updated_at)
917 values (?1, ?2, ?3)
918 on conflict (schema_id) do update set
919 schema_version = excluded.schema_version,
920 updated_at = excluded.updated_at
921 "#,
922 )
923 .bind::<Text, _>(APP_SCHEMA_ID)
924 .bind::<Integer, _>(current)
925 .bind::<BigInt, _>(now_ms())
926 .execute(&mut self.conn)?;
927 Ok(())
928 }
929
930 pub fn app_schema_state(&mut self) -> Result<AppSchemaState> {
931 self.ensure_app_schema_state_table()?;
932 let row = self.app_schema_state_row()?;
933 Ok(AppSchemaState {
934 schema_id: APP_SCHEMA_ID.to_string(),
935 schema_version: row.as_ref().map(|row| row.schema_version),
936 current_schema_version: self.app_schema.current_schema_version(),
937 updated_at: row.as_ref().map(|row| row.updated_at),
938 })
939 }
940
941 pub fn runtime_pragma_report(&mut self) -> Result<SqliteRuntimePragmaReport> {
942 sqlite_runtime_pragma_report(&mut self.conn)
943 }
944
945 fn ensure_runtime_system_schema(&mut self) -> Result<()> {
946 for statement in split_sql_statements(RUNTIME_SYSTEM_SCHEMA_SQL) {
947 sql_query(statement).execute(&mut self.conn)?;
948 }
949 self.ensure_runtime_system_schema_upgrades()?;
950 Ok(())
951 }
952
953 fn ensure_runtime_system_schema_upgrades(&mut self) -> Result<()> {
954 add_column_if_missing(
955 &mut self.conn,
956 "sync_outbox_commits",
957 "lease_id",
958 "alter table sync_outbox_commits add column lease_id text null",
959 )?;
960 add_column_if_missing(
961 &mut self.conn,
962 "sync_outbox_commits",
963 "lease_expires_at_ms",
964 "alter table sync_outbox_commits add column lease_expires_at_ms bigint null",
965 )?;
966 add_column_if_missing(
967 &mut self.conn,
968 "sync_outbox_commits",
969 "lease_status_at_enqueue",
970 "alter table sync_outbox_commits add column lease_status_at_enqueue text null",
971 )?;
972 add_column_if_missing(
973 &mut self.conn,
974 "sync_outbox_commits",
975 "lease_scope_summary_json",
976 "alter table sync_outbox_commits add column lease_scope_summary_json text null",
977 )?;
978 add_column_if_missing(
979 &mut self.conn,
980 "sync_outbox_commits",
981 "lease_token",
982 "alter table sync_outbox_commits add column lease_token text null",
983 )
984 }
985
986 pub fn list_table_json(&mut self, table: &str) -> Result<Vec<Value>> {
987 if self.app_schema.table_metadata(table).is_none() {
988 return Err(SyncularError::config(format!(
989 "unknown generated app table: {table}"
990 )));
991 }
992
993 list_app_rows_json(&mut self.conn, self.app_schema, table)
994 }
995
996 pub fn read_row_json(&mut self, table: &str, row_id: &str) -> Result<Option<Value>> {
997 current_app_row_json(&mut self.conn, self.app_schema, table, row_id)
998 }
999
1000 pub fn record_command_history(
1001 &mut self,
1002 mutation_scope: &str,
1003 entries: &[CommandHistoryEntry],
1004 receipt: &MutationReceipt,
1005 ) -> Result<CommandHistoryRecord> {
1006 self.conn
1007 .transaction::<CommandHistoryRecord, SyncularError, _>(|conn| {
1008 insert_command_history_record(conn, mutation_scope, entries, receipt)
1009 })
1010 }
1011
1012 pub fn apply_syncular_mutations_with_command_history(
1013 &mut self,
1014 mutation_scope: &str,
1015 actor_id: Option<&str>,
1016 now_ms_value: i64,
1017 mutations: Vec<PendingSyncularMutation>,
1018 ) -> Result<MutationReceipt> {
1019 self.transaction(|tx| {
1020 tx.apply_syncular_mutations_with_command_history(
1021 mutation_scope,
1022 actor_id,
1023 now_ms_value,
1024 mutations,
1025 )
1026 })
1027 }
1028
1029 pub fn latest_command_history(
1030 &mut self,
1031 state: CommandHistoryState,
1032 ) -> Result<Option<CommandHistoryRecord>> {
1033 sql_query(
1034 r#"
1035 select id, mutation_scope, state, entries_json, client_commit_id,
1036 undo_client_commit_id, redo_client_commit_id, created_at, updated_at
1037 from sync_command_history
1038 where state = ?1
1039 order by updated_at desc, created_at desc, id desc
1040 limit 1
1041 "#,
1042 )
1043 .bind::<Text, _>(state.as_str())
1044 .load::<CommandHistoryRow>(&mut self.conn)?
1045 .into_iter()
1046 .next()
1047 .map(CommandHistoryRecord::try_from)
1048 .transpose()
1049 }
1050
1051 pub fn mark_command_history(
1052 &mut self,
1053 id: &str,
1054 state: CommandHistoryState,
1055 receipt: &MutationReceipt,
1056 ) -> Result<()> {
1057 let replay_column = match state {
1058 CommandHistoryState::Done => "redo_client_commit_id",
1059 CommandHistoryState::Undone => "undo_client_commit_id",
1060 };
1061 let statement = format!(
1062 "update sync_command_history set state = ?1, updated_at = ?2, {replay_column} = ?3 where id = ?4"
1063 );
1064 sql_query(statement)
1065 .bind::<Text, _>(state.as_str())
1066 .bind::<BigInt, _>(now_ms())
1067 .bind::<Text, _>(&receipt.client_commit_id)
1068 .bind::<Text, _>(id)
1069 .execute(&mut self.conn)?;
1070 Ok(())
1071 }
1072
1073 pub fn read<'query, Q, Row>(&mut self, query: Q) -> Result<Vec<Row>>
1074 where
1075 Q: diesel::query_dsl::LoadQuery<'query, SqliteConnection, Row>,
1076 {
1077 query.load(&mut self.conn).map_err(Into::into)
1078 }
1079
1080 pub fn apply_local_operation(
1081 &mut self,
1082 operation: SyncOperation,
1083 local_row: Option<Value>,
1084 ) -> Result<String> {
1085 if self.app_schema.table_metadata(&operation.table).is_none()
1086 && !is_encrypted_crdt_system_table(&operation.table)
1087 {
1088 return Err(SyncularError::config(format!(
1089 "unknown generated app table: {}",
1090 operation.table
1091 )));
1092 }
1093
1094 self.transaction(|tx| tx.apply_local_operation(operation, local_row))
1095 }
1096
1097 pub fn apply_local_operation_with_active_auth_lease(
1098 &mut self,
1099 actor_id: Option<&str>,
1100 now_ms_value: i64,
1101 operation: SyncOperation,
1102 local_row: Option<Value>,
1103 ) -> Result<String> {
1104 self.transaction(|tx| {
1105 tx.apply_local_operation_with_active_auth_lease(
1106 actor_id,
1107 now_ms_value,
1108 operation,
1109 local_row,
1110 )
1111 })
1112 }
1113
1114 pub fn apply_syncular_mutations(
1115 &mut self,
1116 mutations: Vec<PendingSyncularMutation>,
1117 ) -> Result<MutationReceipt> {
1118 self.transaction(|tx| tx.apply_syncular_mutations(mutations))
1119 }
1120
1121 pub fn apply_syncular_mutations_with_active_auth_lease(
1122 &mut self,
1123 actor_id: Option<&str>,
1124 now_ms_value: i64,
1125 mutations: Vec<PendingSyncularMutation>,
1126 ) -> Result<MutationReceipt> {
1127 self.transaction(|tx| {
1128 tx.apply_syncular_mutations_with_active_auth_lease(actor_id, now_ms_value, mutations)
1129 })
1130 }
1131
1132 pub fn upsert_auth_lease(&mut self, lease: &AuthLeaseRecord) -> Result<()> {
1133 self.transaction(|tx| tx.upsert_auth_lease(lease))
1134 }
1135
1136 pub fn auth_lease(&mut self, lease_id: &str) -> Result<Option<AuthLeaseRecord>> {
1137 self.transaction(|tx| tx.auth_lease(lease_id))
1138 }
1139
1140 pub fn active_auth_leases(
1141 &mut self,
1142 actor_id: Option<&str>,
1143 now_ms_value: i64,
1144 ) -> Result<Vec<AuthLeaseRecord>> {
1145 self.transaction(|tx| tx.active_auth_leases(actor_id, now_ms_value))
1146 }
1147
1148 pub fn set_outbox_auth_lease(
1149 &mut self,
1150 client_commit_id: &str,
1151 provenance: Option<&AuthLeaseProvenance>,
1152 ) -> Result<()> {
1153 self.transaction(|tx| tx.set_outbox_auth_lease(client_commit_id, provenance))
1154 }
1155
1156 pub fn store_blob_bytes(
1157 &mut self,
1158 data: &[u8],
1159 mime_type: &str,
1160 enqueue_upload: bool,
1161 ) -> Result<BlobRef> {
1162 let size = i64::try_from(data.len()).map_err(|_| {
1163 SyncularError::protocol_message("blob is too large for SQLite size metadata")
1164 })?;
1165 validate_blob_size_bytes(size)?;
1166 let blob = BlobRef {
1167 hash: blob_hash(data),
1168 size,
1169 mime_type: normalize_blob_mime_type(mime_type),
1170 encrypted: false,
1171 key_id: None,
1172 };
1173
1174 self.store_blob_body(&blob, data, enqueue_upload)?;
1175 Ok(blob)
1176 }
1177
1178 pub fn store_blob_body(
1179 &mut self,
1180 blob: &BlobRef,
1181 data: &[u8],
1182 enqueue_upload: bool,
1183 ) -> Result<()> {
1184 self.conn.transaction::<(), SyncularError, _>(|conn| {
1185 cache_blob(conn, blob, data)?;
1186 if enqueue_upload {
1187 enqueue_blob_upload(conn, blob, data)?;
1188 }
1189 Ok(())
1190 })
1191 }
1192
1193 pub fn cache_blob_bytes(&mut self, blob: &BlobRef, data: &[u8]) -> Result<()> {
1194 cache_blob(&mut self.conn, blob, data)
1195 }
1196
1197 pub fn read_cached_blob(&mut self, hash: &str) -> Result<Option<Vec<u8>>> {
1198 validate_blob_hash(hash)?;
1199 let row = sql_query("select body from sync_blob_cache where hash = ?1 limit 1")
1200 .bind::<Text, _>(hash)
1201 .load::<BlobBodyRow>(&mut self.conn)?
1202 .into_iter()
1203 .next();
1204 let Some(row) = row else {
1205 return Ok(None);
1206 };
1207 sql_query("update sync_blob_cache set last_accessed_at = ?1 where hash = ?2")
1208 .bind::<BigInt, _>(now_ms())
1209 .bind::<Text, _>(hash)
1210 .execute(&mut self.conn)?;
1211 Ok(Some(row.body))
1212 }
1213
1214 pub fn is_blob_local(&mut self, hash: &str) -> Result<bool> {
1215 validate_blob_hash(hash)?;
1216 let row = sql_query("select 1 as found from sync_blob_cache where hash = ?1 limit 1")
1217 .bind::<Text, _>(hash)
1218 .load::<BlobFoundRow>(&mut self.conn)?
1219 .into_iter()
1220 .next();
1221 Ok(row.is_some_and(|row| row.found == 1))
1222 }
1223
1224 pub fn requeue_stale_blob_uploads(&mut self) -> Result<()> {
1225 let now = now_ms();
1226 let stale_before = now - BLOB_UPLOAD_STALE_TIMEOUT_MS;
1227 sql_query(
1228 r#"
1229 update sync_blob_outbox
1230 set status = case
1231 when attempt_count >= ?1 then 'failed'
1232 else 'pending'
1233 end,
1234 error = case
1235 when attempt_count >= ?1 then 'Upload timed out while in uploading state'
1236 else 'Upload timed out while in uploading state; retrying'
1237 end,
1238 next_attempt_at = case
1239 when attempt_count >= ?1 then 0
1240 else ?2
1241 end,
1242 updated_at = ?2
1243 where status = 'uploading' and updated_at < ?3
1244 "#,
1245 )
1246 .bind::<Integer, _>(MAX_BLOB_UPLOAD_RETRIES)
1247 .bind::<BigInt, _>(now)
1248 .bind::<BigInt, _>(stale_before)
1249 .execute(&mut self.conn)?;
1250 Ok(())
1251 }
1252
1253 pub fn pending_blob_uploads(
1254 &mut self,
1255 limit: i64,
1256 retry_now: bool,
1257 ) -> Result<Vec<PendingBlobUploadRow>> {
1258 let now = now_ms();
1259 Ok(sql_query(
1260 r#"
1261 select hash, size, mime_type, body, encrypted, key_id, attempt_count
1262 from sync_blob_outbox
1263 where status = 'pending' and attempt_count < ?1 and (?2 or next_attempt_at <= ?3)
1264 order by created_at asc
1265 limit ?4
1266 "#,
1267 )
1268 .bind::<Integer, _>(MAX_BLOB_UPLOAD_RETRIES)
1269 .bind::<Bool, _>(retry_now)
1270 .bind::<BigInt, _>(now)
1271 .bind::<BigInt, _>(limit)
1272 .load::<PendingBlobUploadRow>(&mut self.conn)?)
1273 }
1274
1275 pub fn mark_blob_uploading(&mut self, hash: &str, attempt_count: i32) -> Result<()> {
1276 sql_query(
1277 r#"
1278 update sync_blob_outbox
1279 set status = 'uploading',
1280 attempt_count = ?1,
1281 error = null,
1282 next_attempt_at = 0,
1283 updated_at = ?2
1284 where hash = ?3 and status = 'pending'
1285 "#,
1286 )
1287 .bind::<Integer, _>(attempt_count)
1288 .bind::<BigInt, _>(now_ms())
1289 .bind::<Text, _>(hash)
1290 .execute(&mut self.conn)?;
1291 Ok(())
1292 }
1293
1294 pub fn mark_blob_upload_error(
1295 &mut self,
1296 hash: &str,
1297 status: &str,
1298 error: &str,
1299 next_attempt_at: i64,
1300 ) -> Result<()> {
1301 sql_query(
1302 r#"
1303 update sync_blob_outbox
1304 set status = ?1, error = ?2, next_attempt_at = ?3, updated_at = ?4
1305 where hash = ?5
1306 "#,
1307 )
1308 .bind::<Text, _>(status)
1309 .bind::<Text, _>(error)
1310 .bind::<BigInt, _>(next_attempt_at)
1311 .bind::<BigInt, _>(now_ms())
1312 .bind::<Text, _>(hash)
1313 .execute(&mut self.conn)?;
1314 Ok(())
1315 }
1316
1317 pub fn delete_blob_upload(&mut self, hash: &str) -> Result<()> {
1318 sql_query("delete from sync_blob_outbox where hash = ?1")
1319 .bind::<Text, _>(hash)
1320 .execute(&mut self.conn)?;
1321 Ok(())
1322 }
1323
1324 pub fn blob_upload_queue_stats(&mut self) -> Result<BlobUploadQueueStats> {
1325 let rows = sql_query(
1326 r#"
1327 select status, count(hash) as count
1328 from sync_blob_outbox
1329 group by status
1330 "#,
1331 )
1332 .load::<BlobQueueStatsRow>(&mut self.conn)?;
1333 let mut stats = BlobUploadQueueStats {
1334 pending: 0,
1335 uploading: 0,
1336 failed: 0,
1337 };
1338 for row in rows {
1339 match row.status.as_str() {
1340 "pending" => stats.pending = row.count,
1341 "uploading" => stats.uploading = row.count,
1342 "failed" => stats.failed = row.count,
1343 _ => {}
1344 }
1345 }
1346 Ok(stats)
1347 }
1348
1349 pub fn blob_cache_stats(&mut self) -> Result<BlobCacheStats> {
1350 let row = sql_query(
1351 r#"
1352 select count(hash) as count, coalesce(sum(size), 0) as total_bytes
1353 from sync_blob_cache
1354 "#,
1355 )
1356 .load::<BlobCacheStatsRow>(&mut self.conn)?
1357 .into_iter()
1358 .next()
1359 .unwrap_or(BlobCacheStatsRow {
1360 count: 0,
1361 total_bytes: 0,
1362 });
1363 Ok(BlobCacheStats {
1364 count: row.count,
1365 total_bytes: row.total_bytes,
1366 })
1367 }
1368
1369 fn blob_reference_health_counts(&mut self) -> Result<(i64, i64)> {
1370 let mut checked = 0i64;
1371 let mut invalid = 0i64;
1372 for metadata in self.app_schema.app_table_metadata {
1373 validate_app_table_metadata(metadata)?;
1374 for column in metadata.blob_columns {
1375 validate_identifier(column)?;
1376 let sql = format!(
1377 "select {column} as value from {table} where {column} is not null and {column} <> ''",
1378 table = metadata.name
1379 );
1380 let rows = sql_query(sql).load::<NullableStringValueRow>(&mut self.conn)?;
1381 for row in rows {
1382 let Some(value) = row.value else {
1383 continue;
1384 };
1385 checked += 1;
1386 let parsed = serde_json::from_str::<BlobRef>(&value);
1387 match parsed {
1388 Ok(blob) if validate_blob_ref_size(&blob).is_ok() => {}
1389 _ => invalid += 1,
1390 }
1391 }
1392 }
1393 }
1394 Ok((checked, invalid))
1395 }
1396
1397 fn scoped_rows_health_summary(
1398 &mut self,
1399 subscriptions: &[SubscriptionSpec],
1400 ) -> Result<ScopedRowsHealthSummary> {
1401 scoped_rows_health_summary_for_schema(&mut self.conn, self.app_schema, subscriptions)
1402 }
1403
1404 fn clear_orphaned_synced_rows(
1405 &mut self,
1406 subscriptions: &[SubscriptionSpec],
1407 tables: &[String],
1408 ) -> Result<ScopedRowsHealthSummary> {
1409 self.conn.transaction(|conn| {
1410 clear_orphaned_synced_rows_for_schema(conn, self.app_schema, subscriptions, tables)
1411 })
1412 }
1413
1414 pub fn crdt_health_summary(&mut self) -> Result<CrdtHealthSummary> {
1415 let stats = sql_query(
1416 r#"
1417 select
1418 count(*) as document_count,
1419 coalesce(sum(pending_updates), 0) as pending_updates,
1420 coalesce(sum(flushed_updates), 0) as flushed_updates,
1421 coalesce(sum(acked_updates), 0) as acked_updates,
1422 coalesce(sum(log_updates), 0) as log_updates
1423 from sync_crdt_documents
1424 "#,
1425 )
1426 .load::<CrdtHealthStatsRow>(&mut self.conn)?
1427 .into_iter()
1428 .next()
1429 .unwrap_or(CrdtHealthStatsRow {
1430 document_count: 0,
1431 pending_updates: 0,
1432 flushed_updates: 0,
1433 acked_updates: 0,
1434 log_updates: 0,
1435 });
1436 let orphaned_log_entries = sql_query(
1437 r#"
1438 select count(*) as count
1439 from sync_crdt_update_log log
1440 left join sync_crdt_documents documents
1441 on documents.document_key = log.document_key
1442 where documents.document_key is null
1443 "#,
1444 )
1445 .load::<CountRow>(&mut self.conn)?
1446 .into_iter()
1447 .next()
1448 .map(|row| row.count)
1449 .unwrap_or(0);
1450
1451 Ok(CrdtHealthSummary {
1452 document_count: stats.document_count,
1453 pending_updates: stats.pending_updates,
1454 flushed_updates: stats.flushed_updates,
1455 acked_updates: stats.acked_updates,
1456 log_updates: stats.log_updates,
1457 orphaned_documents: self.orphaned_crdt_document_count()?,
1458 orphaned_log_entries,
1459 })
1460 }
1461
1462 fn orphaned_crdt_document_count(&mut self) -> Result<i64> {
1463 let documents = sql_query(
1464 r#"
1465 select app_table, row_id
1466 from sync_crdt_documents
1467 order by app_table asc, row_id asc
1468 "#,
1469 )
1470 .load::<CrdtDocumentIdentityRow>(&mut self.conn)?;
1471 let mut orphaned = 0i64;
1472 for document in documents {
1473 let Some(metadata) = self.app_schema.table_metadata(&document.app_table) else {
1474 orphaned += 1;
1475 continue;
1476 };
1477 if get_app_row_json_generic(&mut self.conn, metadata, &document.row_id)?.is_none() {
1478 orphaned += 1;
1479 }
1480 }
1481 Ok(orphaned)
1482 }
1483
1484 pub fn apply_crdt_field_yjs_update(
1485 &mut self,
1486 field: &CrdtField,
1487 update: YjsUpdateEnvelope,
1488 max_pending_updates: i64,
1489 ) -> Result<String> {
1490 self.transaction(|tx| tx.apply_crdt_field_yjs_update(field, update, max_pending_updates))
1491 }
1492
1493 pub fn crdt_document_snapshot(&mut self, field: &CrdtField) -> Result<CrdtDocumentSnapshot> {
1494 let row = current_app_row_json(
1495 &mut self.conn,
1496 self.app_schema,
1497 field.table(),
1498 field.row_id(),
1499 )?;
1500 let state_base64 = crdt_field_state_base64(field, row.as_ref());
1501 let state_vector_base64 = yjs_state_vector_base64(state_base64.as_deref())?;
1502 upsert_crdt_document_snapshot(
1503 &mut self.conn,
1504 field,
1505 state_base64.as_deref(),
1506 &state_vector_base64,
1507 None,
1508 )?;
1509 select_crdt_document_snapshot(&mut self.conn, &field.document_key())
1510 }
1511
1512 pub fn crdt_update_log(
1513 &mut self,
1514 field: &CrdtField,
1515 limit: i64,
1516 ) -> Result<Vec<CrdtUpdateLogEntry>> {
1517 sql_query(
1518 r#"
1519 select id, document_key, update_id, client_commit_id, origin, status, update_base64,
1520 state_vector_base64, created_at, flushed_at, acked_at
1521 from sync_crdt_update_log
1522 where document_key = ?1
1523 order by id asc
1524 limit ?2
1525 "#,
1526 )
1527 .bind::<Text, _>(field.document_key())
1528 .bind::<BigInt, _>(limit.max(0))
1529 .load::<CrdtUpdateLogRow>(&mut self.conn)?
1530 .into_iter()
1531 .map(CrdtUpdateLogEntry::try_from)
1532 .collect()
1533 }
1534
1535 pub fn compact_crdt_document(&mut self, field: &CrdtField) -> Result<CrdtDocumentSnapshot> {
1536 let row = current_app_row_json(
1537 &mut self.conn,
1538 self.app_schema,
1539 field.table(),
1540 field.row_id(),
1541 )?;
1542 let state_base64 = crdt_field_state_base64(field, row.as_ref());
1543 let state_vector_base64 = yjs_state_vector_base64(state_base64.as_deref())?;
1544 upsert_crdt_document_snapshot(
1545 &mut self.conn,
1546 field,
1547 state_base64.as_deref(),
1548 &state_vector_base64,
1549 Some(now_ms()),
1550 )?;
1551 select_crdt_document_snapshot(&mut self.conn, &field.document_key())
1552 }
1553
1554 pub fn prune_blob_cache(&mut self, max_bytes: i64) -> Result<i64> {
1555 if max_bytes <= 0 {
1556 return Ok(0);
1557 }
1558 let stats = self.blob_cache_stats()?;
1559 if stats.total_bytes <= max_bytes {
1560 return Ok(0);
1561 }
1562 let target = stats.total_bytes - max_bytes;
1563 let entries = sql_query(
1564 r#"
1565 select hash, size
1566 from sync_blob_cache
1567 order by last_accessed_at asc
1568 "#,
1569 )
1570 .load::<BlobCacheEntryRow>(&mut self.conn)?;
1571 let mut freed = 0i64;
1572 for entry in entries {
1573 if freed >= target {
1574 break;
1575 }
1576 sql_query("delete from sync_blob_cache where hash = ?1")
1577 .bind::<Text, _>(&entry.hash)
1578 .execute(&mut self.conn)?;
1579 freed += entry.size;
1580 }
1581 Ok(freed)
1582 }
1583
1584 pub fn prune_crdt_update_log(&mut self, cutoff: i64) -> Result<i64> {
1585 let deleted = sql_query(
1586 r#"
1587 delete from sync_crdt_update_log
1588 where status in ('acked', 'pruned')
1589 and coalesce(acked_at, flushed_at, created_at) <= ?1
1590 "#,
1591 )
1592 .bind::<BigInt, _>(cutoff)
1593 .execute(&mut self.conn)? as i64;
1594 refresh_all_crdt_document_counts(&mut self.conn)?;
1595 Ok(deleted)
1596 }
1597
1598 pub fn clear_blob_cache(&mut self) -> Result<()> {
1599 sql_query("delete from sync_blob_cache").execute(&mut self.conn)?;
1600 Ok(())
1601 }
1602
1603 pub fn encrypted_crdt_stream_stats(
1604 &mut self,
1605 partition_id: &str,
1606 stream_id: &str,
1607 ) -> Result<EncryptedCrdtStreamStats> {
1608 Ok(sql_query(
1609 r#"
1610 select
1611 (select count(*) from sync_crdt_updates
1612 where partition_id = ?1 and stream_id = ?2) as update_count,
1613 (select count(*) from sync_crdt_checkpoints
1614 where partition_id = ?1 and stream_id = ?2) as checkpoint_count,
1615 (select count(*) from sync_crdt_updates
1616 where partition_id = ?1 and stream_id = ?2
1617 and server_seq is not null
1618 and server_seq > coalesce((
1619 select max(covers_seq) from sync_crdt_checkpoints
1620 where partition_id = ?1 and stream_id = ?2
1621 ), 0)) as checkpointable_update_count,
1622 (select max(server_seq) from sync_crdt_updates
1623 where partition_id = ?1 and stream_id = ?2) as max_server_seq,
1624 (select max(covers_seq) from sync_crdt_checkpoints
1625 where partition_id = ?1 and stream_id = ?2) as latest_checkpoint_covers_seq
1626 "#,
1627 )
1628 .bind::<Text, _>(partition_id)
1629 .bind::<Text, _>(stream_id)
1630 .load::<EncryptedCrdtStreamStatsRow>(&mut self.conn)?
1631 .into_iter()
1632 .next()
1633 .map(Into::into)
1634 .unwrap_or_default())
1635 }
1636
1637 pub fn prune_encrypted_crdt_updates(&mut self) -> Result<i64> {
1638 Ok(sql_query(
1639 r#"
1640 delete from sync_crdt_updates
1641 where server_seq is not null
1642 and exists (
1643 select 1
1644 from sync_crdt_checkpoints
1645 where sync_crdt_checkpoints.partition_id = sync_crdt_updates.partition_id
1646 and sync_crdt_checkpoints.stream_id = sync_crdt_updates.stream_id
1647 and sync_crdt_checkpoints.key_id = sync_crdt_updates.key_id
1648 and sync_crdt_checkpoints.server_seq is not null
1649 and sync_crdt_checkpoints.covers_seq >= sync_crdt_updates.server_seq
1650 )
1651 "#,
1652 )
1653 .execute(&mut self.conn)? as i64)
1654 }
1655
1656 pub fn prune_encrypted_crdt_checkpoints(&mut self, keep_per_stream: i64) -> Result<i64> {
1657 Ok(sql_query(
1658 r#"
1659 delete from sync_crdt_checkpoints
1660 where checkpoint_id in (
1661 select checkpoint_id
1662 from (
1663 select
1664 checkpoint_id,
1665 row_number() over (
1666 partition by partition_id, stream_id, key_id
1667 order by covers_seq desc, coalesce(server_seq, 0) desc, seq desc
1668 ) as checkpoint_rank
1669 from sync_crdt_checkpoints
1670 ) ranked
1671 where checkpoint_rank > ?1
1672 )
1673 "#,
1674 )
1675 .bind::<BigInt, _>(keep_per_stream)
1676 .execute(&mut self.conn)? as i64)
1677 }
1678
1679 pub fn compact_storage(
1680 &mut self,
1681 options: &StorageCompactionOptions,
1682 ) -> Result<StorageCompactionReport> {
1683 let cutoff = options.cutoff_ms_now()?;
1684 let mut report = StorageCompactionReport::default();
1685
1686 if options.should_prune_acked_outbox() {
1687 let cutoff = required_compaction_cutoff(cutoff, "acked outbox")?;
1688 report.acked_outbox_commits_deleted = sql_query(
1689 "delete from sync_outbox_commits where status = 'acked' and updated_at <= ?1",
1690 )
1691 .bind::<BigInt, _>(cutoff)
1692 .execute(&mut self.conn)? as i64;
1693 }
1694
1695 if options.should_prune_resolved_conflicts() {
1696 let cutoff = required_compaction_cutoff(cutoff, "resolved conflicts")?;
1697 report.resolved_conflicts_deleted = sql_query(
1698 "delete from sync_conflicts where resolved_at is not null and resolved_at <= ?1",
1699 )
1700 .bind::<BigInt, _>(cutoff)
1701 .execute(&mut self.conn)? as i64;
1702 }
1703
1704 if options.should_prune_failed_blob_uploads() {
1705 let cutoff = required_compaction_cutoff(cutoff, "failed blob uploads")?;
1706 report.failed_blob_uploads_deleted = sql_query(
1707 "delete from sync_blob_outbox where status = 'failed' and updated_at <= ?1",
1708 )
1709 .bind::<BigInt, _>(cutoff)
1710 .execute(&mut self.conn)? as i64;
1711 }
1712
1713 if options.should_prune_inactive_subscription_states() {
1714 let cutoff = required_compaction_cutoff(cutoff, "inactive subscription states")?;
1715 report.inactive_subscription_states_deleted = sql_query(
1716 "delete from sync_subscription_state where status != 'active' and updated_at <= ?1",
1717 )
1718 .bind::<BigInt, _>(cutoff)
1719 .execute(&mut self.conn)?
1720 as i64;
1721 }
1722
1723 if options.should_prune_tombstones() {
1724 let max_server_version = options.max_tombstone_server_version.ok_or_else(|| {
1725 SyncularError::config(
1726 "storage compaction tombstone cleanup requires maxTombstoneServerVersion",
1727 )
1728 })?;
1729 for statement in
1730 tombstone_delete_statements(self.app_schema.app_table_metadata, max_server_version)?
1731 {
1732 report.tombstone_rows_deleted +=
1733 sql_query(statement).execute(&mut self.conn)? as i64;
1734 }
1735 }
1736
1737 if let Some(max_bytes) = options.max_blob_cache_bytes {
1738 report.blob_cache_bytes_pruned = self.prune_blob_cache(max_bytes)?;
1739 }
1740
1741 if options.should_prune_encrypted_crdt_updates() {
1742 report.encrypted_crdt_updates_deleted = self.prune_encrypted_crdt_updates()?;
1743 }
1744
1745 if let Some(keep) = options.encrypted_crdt_checkpoint_keep_count()? {
1746 report.encrypted_crdt_checkpoints_deleted =
1747 self.prune_encrypted_crdt_checkpoints(keep)?;
1748 }
1749
1750 if options.should_prune_crdt_update_log() {
1751 let cutoff = required_compaction_cutoff(cutoff, "CRDT update log")?;
1752 report.crdt_update_log_deleted = self.prune_crdt_update_log(cutoff)?;
1753 }
1754
1755 Ok(report)
1756 }
1757
1758 pub fn compact_storage_json(&mut self, options_json: Option<&str>) -> Result<String> {
1759 let options = StorageCompactionOptions::from_json(options_json)?;
1760 Ok(serde_json::to_string(&self.compact_storage(&options)?)?)
1761 }
1762}
1763
1764pub fn apply_sqlite_runtime_pragmas(conn: &mut SqliteConnection) -> Result<()> {
1765 conn.batch_execute(&format!(
1766 r#"
1767 pragma busy_timeout = {SQLITE_BUSY_TIMEOUT_MS};
1768 pragma foreign_keys = on;
1769 pragma journal_mode = wal;
1770 pragma synchronous = normal;
1771 "#
1772 ))?;
1773 Ok(())
1774}
1775
1776fn sqlite_runtime_pragma_report(conn: &mut SqliteConnection) -> Result<SqliteRuntimePragmaReport> {
1777 Ok(SqliteRuntimePragmaReport {
1778 journal_mode: sql_query("pragma journal_mode")
1779 .load::<JournalModePragmaRow>(conn)?
1780 .into_iter()
1781 .next()
1782 .map(|row| row.journal_mode)
1783 .unwrap_or_default(),
1784 foreign_keys: sql_query("pragma foreign_keys")
1785 .load::<ForeignKeysPragmaRow>(conn)?
1786 .into_iter()
1787 .next()
1788 .map(|row| row.foreign_keys)
1789 .unwrap_or_default(),
1790 busy_timeout: sql_query("pragma busy_timeout")
1791 .load::<BusyTimeoutPragmaRow>(conn)?
1792 .into_iter()
1793 .next()
1794 .map(|row| row.timeout)
1795 .unwrap_or_default(),
1796 synchronous: sql_query("pragma synchronous")
1797 .load::<SynchronousPragmaRow>(conn)?
1798 .into_iter()
1799 .next()
1800 .map(|row| row.synchronous)
1801 .unwrap_or_default(),
1802 })
1803}
1804
1805fn cache_blob(conn: &mut SqliteConnection, blob: &BlobRef, data: &[u8]) -> Result<()> {
1806 validate_blob_bytes(blob, data)?;
1807 let now = now_ms();
1808 sql_query(
1809 r#"
1810 insert into sync_blob_cache
1811 (hash, size, mime_type, body, encrypted, key_id, cached_at, last_accessed_at)
1812 values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
1813 on conflict(hash) do update set
1814 size = excluded.size,
1815 mime_type = excluded.mime_type,
1816 body = excluded.body,
1817 encrypted = excluded.encrypted,
1818 key_id = excluded.key_id,
1819 last_accessed_at = excluded.last_accessed_at
1820 "#,
1821 )
1822 .bind::<Text, _>(&blob.hash)
1823 .bind::<BigInt, _>(blob.size)
1824 .bind::<Text, _>(&blob.mime_type)
1825 .bind::<Binary, _>(data)
1826 .bind::<Integer, _>(if blob.encrypted { 1 } else { 0 })
1827 .bind::<diesel::sql_types::Nullable<Text>, _>(blob.key_id.as_deref())
1828 .bind::<BigInt, _>(now)
1829 .bind::<BigInt, _>(now)
1830 .execute(conn)?;
1831 Ok(())
1832}
1833
1834fn enqueue_blob_upload(conn: &mut SqliteConnection, blob: &BlobRef, data: &[u8]) -> Result<()> {
1835 let now = now_ms();
1836 sql_query(
1837 r#"
1838 insert into sync_blob_outbox
1839 (hash, size, mime_type, body, encrypted, key_id, status, attempt_count, error, created_at, updated_at, next_attempt_at)
1840 values (?1, ?2, ?3, ?4, ?5, ?6, 'pending', 0, null, ?7, ?8, 0)
1841 on conflict(hash) do nothing
1842 "#,
1843 )
1844 .bind::<Text, _>(&blob.hash)
1845 .bind::<BigInt, _>(blob.size)
1846 .bind::<Text, _>(&blob.mime_type)
1847 .bind::<Binary, _>(data)
1848 .bind::<Integer, _>(if blob.encrypted { 1 } else { 0 })
1849 .bind::<diesel::sql_types::Nullable<Text>, _>(blob.key_id.as_deref())
1850 .bind::<BigInt, _>(now)
1851 .bind::<BigInt, _>(now)
1852 .execute(conn)?;
1853 Ok(())
1854}
1855
1856fn mutation_has_server_merge_yjs_payload(
1857 mutation: &PendingSyncularMutation,
1858 metadata: &AppTableMetadata,
1859) -> bool {
1860 operation_payload_has_server_merge_yjs_payload(mutation.payload.as_ref(), metadata)
1861}
1862
1863fn operation_payload_has_server_merge_yjs_payload(
1864 payload: Option<&Value>,
1865 metadata: &AppTableMetadata,
1866) -> bool {
1867 let Some(Value::Object(payload)) = payload else {
1868 return false;
1869 };
1870 let Some(Value::Object(envelope)) = payload.get(YJS_PAYLOAD_KEY) else {
1871 return false;
1872 };
1873
1874 metadata.crdt_yjs_fields.iter().any(|field| {
1875 (field.sync_mode == "server-merge" || field.sync_mode.is_empty())
1876 && envelope.contains_key(field.field)
1877 })
1878}
1879
1880fn collect_server_merge_yjs_updates(
1881 app_schema: AppSchema,
1882 metadata: &'static AppTableMetadata,
1883 operation: &SyncOperation,
1884) -> Result<Vec<(CrdtField, YjsUpdateEnvelope)>> {
1885 let Some(Value::Object(payload)) = operation.payload.as_ref() else {
1886 return Ok(Vec::new());
1887 };
1888 let Some(Value::Object(envelope)) = payload.get(YJS_PAYLOAD_KEY) else {
1889 return Ok(Vec::new());
1890 };
1891
1892 let mut updates = Vec::new();
1893 for field_metadata in metadata.crdt_yjs_fields.iter().filter(|field| {
1894 (field.sync_mode == "server-merge" || field.sync_mode.is_empty())
1895 && envelope.contains_key(field.field)
1896 }) {
1897 let field = validate_crdt_field(
1898 app_schema,
1899 &CrdtFieldId::new(&operation.table, &operation.row_id, field_metadata.field),
1900 )?;
1901 let Some(value) = envelope.get(field_metadata.field) else {
1902 continue;
1903 };
1904 match value {
1905 Value::Array(items) => {
1906 for item in items {
1907 updates.push((field.clone(), serde_json::from_value(item.clone())?));
1908 }
1909 }
1910 Value::Null => {}
1911 item => updates.push((field, serde_json::from_value(item.clone())?)),
1912 }
1913 }
1914
1915 Ok(updates)
1916}
1917
1918impl SyncStore for DieselSqliteStore {
1919 type Tx<'a> = DieselSqliteTx<'a>;
1920
1921 fn transaction<T>(&mut self, f: impl FnOnce(&mut Self::Tx<'_>) -> Result<T>) -> Result<T> {
1922 let app_schema = self.app_schema;
1923 self.conn.transaction::<T, SyncularError, _>(|conn| {
1924 let mut tx = DieselSqliteTx { conn, app_schema };
1925 f(&mut tx)
1926 })
1927 }
1928
1929 fn supports_sqlite_snapshot_artifacts(&self) -> bool {
1930 true
1931 }
1932
1933 fn decode_sqlite_snapshot_artifact_rows(
1934 &self,
1935 table: &str,
1936 artifact_bytes: &[u8],
1937 ) -> Result<Vec<Value>> {
1938 let mut artifact_conn = SqliteConnection::establish(":memory:")
1939 .map_err(|err| SyncularError::storage(err).context("open sqlite snapshot artifact"))?;
1940 artifact_conn
1941 .deserialize_readonly_database_from_buffer(artifact_bytes)
1942 .map_err(|err| {
1943 SyncularError::storage(err).context("deserialize sqlite snapshot artifact")
1944 })?;
1945 list_app_rows_json(&mut artifact_conn, self.app_schema, table)
1946 }
1947}
1948
1949#[cfg(feature = "demo-todo-native-fixture")]
1950impl DemoTaskStore for DieselSqliteStore {
1951 fn add_task(
1952 &mut self,
1953 actor_id: &str,
1954 project_id: Option<&str>,
1955 task_id: String,
1956 title_value: String,
1957 ) -> Result<()> {
1958 self.transaction(|tx| tx.add_task(actor_id, project_id, task_id, title_value))
1959 }
1960
1961 fn list_tasks(&mut self) -> Result<Vec<Task>> {
1962 list_tasks(&mut self.conn)
1963 }
1964
1965 fn patch_task_title(
1966 &mut self,
1967 project_id: Option<&str>,
1968 task_id: String,
1969 title_value: String,
1970 ) -> Result<()> {
1971 self.transaction(|tx| tx.patch_task_title(project_id, task_id, title_value))
1972 }
1973}
1974
1975impl SyncStateStore for DieselSqliteStore {
1976 fn applied_migrations(&mut self) -> Result<Vec<AppliedMigration>> {
1977 let rows = sql_query(
1978 r#"
1979 select version, name, checksum, applied_at
1980 from sync_migrations
1981 order by version asc
1982 "#,
1983 )
1984 .load::<AppliedMigrationRow>(&mut self.conn)?;
1985
1986 Ok(rows.into_iter().map(AppliedMigration::from).collect())
1987 }
1988
1989 fn app_schema_state(&mut self, _current_schema_version: i32) -> Result<AppSchemaState> {
1990 DieselSqliteStore::app_schema_state(self)
1991 }
1992
1993 fn outbox_summaries(&mut self) -> Result<Vec<OutboxSummary>> {
1994 let rows = sql_query(
1995 r#"
1996 select client_commit_id, status, schema_version,
1997 lease_id, lease_expires_at_ms, lease_status_at_enqueue,
1998 lease_scope_summary_json, lease_token
1999 from sync_outbox_commits
2000 order by created_at asc
2001 "#,
2002 )
2003 .load::<OutboxSummaryRow>(&mut self.conn)?;
2004
2005 Ok(rows.into_iter().map(OutboxSummary::from).collect())
2006 }
2007
2008 fn next_outbox_retry_at(&mut self) -> Result<Option<i64>> {
2009 Ok(sql_query(
2010 r#"
2011 select min(next_attempt_at) as next_attempt_at
2012 from sync_outbox_commits
2013 where status = 'pending' and attempt_count > 0 and attempt_count < ?1
2014 "#,
2015 )
2016 .bind::<Integer, _>(MAX_SYNC_RETRIES)
2017 .load::<NextRetryAtRow>(&mut self.conn)?
2018 .into_iter()
2019 .next()
2020 .and_then(|row| row.next_attempt_at))
2021 }
2022
2023 fn next_blob_upload_retry_at(&mut self) -> Result<Option<i64>> {
2024 Ok(sql_query(
2025 r#"
2026 select min(next_attempt_at) as next_attempt_at
2027 from sync_blob_outbox
2028 where status = 'pending' and attempt_count > 0 and attempt_count < ?1
2029 "#,
2030 )
2031 .bind::<Integer, _>(MAX_BLOB_UPLOAD_RETRIES)
2032 .load::<NextRetryAtRow>(&mut self.conn)?
2033 .into_iter()
2034 .next()
2035 .and_then(|row| row.next_attempt_at))
2036 }
2037
2038 fn conflict_summaries(&mut self) -> Result<Vec<ConflictSummary>> {
2039 let rows = sql_query(
2040 r#"
2041 select id, client_commit_id, op_index, result_status, message, code, server_version,
2042 resolved_at, resolution
2043 from sync_conflicts
2044 where resolved_at is null
2045 order by created_at desc
2046 "#,
2047 )
2048 .load::<ConflictSummaryRow>(&mut self.conn)?;
2049
2050 Ok(rows.into_iter().map(ConflictSummary::from).collect())
2051 }
2052
2053 fn blob_health_summary(&mut self) -> Result<Option<BlobHealthSummary>> {
2054 let upload = self.blob_upload_queue_stats()?;
2055 let cache = self.blob_cache_stats()?;
2056 let (checked_references, invalid_references) = self.blob_reference_health_counts()?;
2057 Ok(Some(BlobHealthSummary {
2058 cache_count: cache.count,
2059 cache_bytes: cache.total_bytes,
2060 upload_pending: upload.pending,
2061 upload_uploading: upload.uploading,
2062 upload_failed: upload.failed,
2063 checked_references,
2064 invalid_references,
2065 }))
2066 }
2067
2068 fn crdt_health_summary(&mut self) -> Result<Option<CrdtHealthSummary>> {
2069 Ok(Some(DieselSqliteStore::crdt_health_summary(self)?))
2070 }
2071
2072 fn scoped_rows_health_summary(
2073 &mut self,
2074 subscriptions: &[SubscriptionSpec],
2075 ) -> Result<Option<ScopedRowsHealthSummary>> {
2076 Ok(Some(DieselSqliteStore::scoped_rows_health_summary(
2077 self,
2078 subscriptions,
2079 )?))
2080 }
2081
2082 fn clear_orphaned_synced_rows(
2083 &mut self,
2084 subscriptions: &[SubscriptionSpec],
2085 tables: &[String],
2086 ) -> Result<ScopedRowsHealthSummary> {
2087 DieselSqliteStore::clear_orphaned_synced_rows(self, subscriptions, tables)
2088 }
2089
2090 fn resolve_conflict(&mut self, id_value: &str, resolution_value: &str) -> Result<()> {
2091 sql_query(
2092 r#"
2093 update sync_conflicts
2094 set resolved_at = ?1, resolution = ?2
2095 where id = ?3 and resolved_at is null
2096 "#,
2097 )
2098 .bind::<BigInt, _>(now_ms())
2099 .bind::<Text, _>(resolution_value)
2100 .bind::<Text, _>(id_value)
2101 .execute(&mut self.conn)?;
2102 Ok(())
2103 }
2104
2105 fn retry_conflict_keep_local(&mut self, id_value: &str) -> Result<String> {
2106 self.transaction(|tx| tx.retry_conflict_keep_local(id_value))
2107 }
2108}
2109
2110impl<'a> DieselSqliteTx<'a> {
2111 #[cfg(feature = "demo-todo-native-fixture")]
2112 fn add_task(
2113 &mut self,
2114 actor_id: &str,
2115 project_id: Option<&str>,
2116 task_id: String,
2117 title_value: String,
2118 ) -> Result<()> {
2119 let operation = insert_local_task(self.conn, actor_id, project_id, &task_id, &title_value)?;
2120 self.enqueue_outbox(vec![operation])?;
2121
2122 Ok(())
2123 }
2124
2125 #[cfg(feature = "demo-todo-native-fixture")]
2126 fn patch_task_title(
2127 &mut self,
2128 project_id: Option<&str>,
2129 task_id: String,
2130 title_value: String,
2131 ) -> Result<()> {
2132 let operation = patch_local_task_title(self.conn, project_id, &task_id, &title_value)?;
2133 self.enqueue_outbox(vec![operation])?;
2134
2135 Ok(())
2136 }
2137
2138 fn enqueue_outbox_receipt(
2139 &mut self,
2140 operations: Vec<SyncOperation>,
2141 ) -> Result<MutationReceipt> {
2142 use schema::sync_outbox_commits::dsl as o;
2143
2144 self.assert_outbox_capacity()?;
2145 let id = Uuid::new_v4().to_string();
2146 let client_commit_id = Uuid::new_v4().to_string();
2147 let now = now_ms();
2148 let row = NewOutboxCommit {
2149 id: id.clone(),
2150 client_commit_id: client_commit_id.clone(),
2151 status: "pending".to_string(),
2152 operations_json: sync_operations_json_for_outbox(&operations)?,
2153 last_response_json: None,
2154 error: None,
2155 created_at: now,
2156 updated_at: now,
2157 attempt_count: 0,
2158 acked_commit_seq: None,
2159 schema_version: self.app_schema.current_schema_version(),
2160 next_attempt_at: 0,
2161 lease_id: None,
2162 lease_expires_at_ms: None,
2163 lease_status_at_enqueue: None,
2164 lease_scope_summary_json: None,
2165 lease_token: None,
2166 };
2167
2168 diesel::insert_into(o::sync_outbox_commits)
2169 .values(row)
2170 .execute(self.conn)?;
2171
2172 Ok(MutationReceipt {
2173 commit_id: id,
2174 client_commit_id,
2175 })
2176 }
2177
2178 fn enqueue_outbox(&mut self, operations: Vec<SyncOperation>) -> Result<String> {
2179 Ok(self.enqueue_outbox_receipt(operations)?.client_commit_id)
2180 }
2181
2182 fn assert_outbox_capacity(&mut self) -> Result<()> {
2183 let unresolved = sql_query(
2184 r#"
2185 select count(*) as count
2186 from sync_outbox_commits
2187 where status <> 'acked'
2188 "#,
2189 )
2190 .load::<CountRow>(self.conn)?
2191 .into_iter()
2192 .next()
2193 .map(|row| row.count)
2194 .unwrap_or(0);
2195 validate_unresolved_outbox_capacity(usize::try_from(unresolved).unwrap_or(usize::MAX))
2196 }
2197
2198 fn retry_conflict_keep_local(&mut self, conflict_id: &str) -> Result<String> {
2199 let row = sql_query(
2200 r#"
2201 select c.server_version as server_version,
2202 c.op_index as op_index,
2203 o.operations_json as operations_json
2204 from sync_conflicts c
2205 join sync_outbox_commits o on o.id = c.outbox_commit_id
2206 where c.id = ?1 and c.resolved_at is null
2207 limit 1
2208 "#,
2209 )
2210 .bind::<Text, _>(conflict_id)
2211 .load::<ConflictRetryRow>(self.conn)?
2212 .into_iter()
2213 .next()
2214 .ok_or_else(|| {
2215 SyncularError::config(format!("pending conflict not found: {conflict_id}"))
2216 })?;
2217
2218 let mut operations: Vec<SyncOperation> = serde_json::from_str(&row.operations_json)?;
2219 let operation = operations.get_mut(row.op_index as usize).ok_or_else(|| {
2220 SyncularError::protocol_message(format!(
2221 "conflict op index {} out of bounds",
2222 row.op_index
2223 ))
2224 })?;
2225 operation.base_version = Some(row.server_version);
2226 let retry_client_commit_id = self.enqueue_outbox(vec![operation.clone()])?;
2227
2228 sql_query(
2229 r#"
2230 update sync_conflicts
2231 set resolved_at = ?1, resolution = 'keep-local'
2232 where id = ?2 and resolved_at is null
2233 "#,
2234 )
2235 .bind::<BigInt, _>(now_ms())
2236 .bind::<Text, _>(conflict_id)
2237 .execute(self.conn)?;
2238
2239 Ok(retry_client_commit_id)
2240 }
2241
2242 fn apply_local_operation(
2243 &mut self,
2244 operation: SyncOperation,
2245 local_row: Option<Value>,
2246 ) -> Result<String> {
2247 if is_encrypted_crdt_system_table(&operation.table) {
2248 match operation.op.as_str() {
2249 "upsert" => {
2250 let row = apply_encrypted_crdt_system_row(
2251 self.conn,
2252 &operation.table,
2253 &operation.row_id,
2254 local_row.as_ref().or(operation.payload.as_ref()),
2255 None,
2256 )?;
2257 materialize_encrypted_crdt_system_row(
2258 self.conn,
2259 self.app_schema,
2260 &operation.table,
2261 &row,
2262 )?;
2263 }
2264 "delete" => delete_encrypted_crdt_system_row(
2265 self.conn,
2266 &operation.table,
2267 &operation.row_id,
2268 )?,
2269 op => {
2270 return Err(SyncularError::protocol_message(format!(
2271 "unsupported local operation: {op}"
2272 )));
2273 }
2274 }
2275 return self.enqueue_outbox(vec![operation]);
2276 }
2277
2278 let metadata = self
2279 .app_schema
2280 .table_metadata(&operation.table)
2281 .ok_or_else(|| {
2282 SyncularError::config(format!("unknown generated app table: {}", operation.table))
2283 })?;
2284 let current_row = self.current_row_json(&operation.table, &operation.row_id)?;
2285 let current_server_version =
2286 self.row_server_version(&operation.table, current_row.as_ref());
2287 let yjs_updates = collect_server_merge_yjs_updates(self.app_schema, metadata, &operation)?;
2288 let local_row = transform_local_row_for_metadata(
2289 &operation.table,
2290 &operation.row_id,
2291 local_row,
2292 operation.payload.as_ref(),
2293 current_row.as_ref(),
2294 metadata,
2295 )?;
2296
2297 match operation.op.as_str() {
2298 "upsert" => {
2299 let row = local_row.unwrap_or_else(|| {
2300 merged_local_row(
2301 metadata,
2302 current_row,
2303 &operation.row_id,
2304 operation.payload.as_ref(),
2305 )
2306 });
2307 let local_server_version = if operation_payload_has_server_merge_yjs_payload(
2308 operation.payload.as_ref(),
2309 metadata,
2310 ) {
2311 current_server_version.or(operation.base_version)
2312 } else {
2313 Some(operation.base_version.unwrap_or(0))
2314 };
2315 self.upsert_row(&operation.table, &row, local_server_version)?;
2316 }
2317 "delete" => {
2318 self.apply_change(&SyncChange {
2319 table: operation.table.clone(),
2320 row_id: operation.row_id.clone(),
2321 op: "delete".to_string(),
2322 row_json: None,
2323 row_version: operation.base_version,
2324 scopes: Map::new(),
2325 })?;
2326 }
2327 op => {
2328 return Err(SyncularError::protocol_message(format!(
2329 "unsupported local operation: {op}"
2330 )));
2331 }
2332 }
2333
2334 let client_commit_id = self.enqueue_outbox(vec![operation])?;
2335 for (field, update) in yjs_updates {
2336 let row = self.current_row_json(field.table(), field.row_id())?;
2337 let state_base64 = crdt_field_state_base64(&field, row.as_ref());
2338 let state_vector_base64 = yjs_state_vector_base64(state_base64.as_deref())?;
2339 record_crdt_update_log(
2340 self.conn,
2341 &field,
2342 &update,
2343 Some(&client_commit_id),
2344 CrdtUpdateOrigin::Local,
2345 CrdtUpdateStatus::Pending,
2346 state_base64.as_deref(),
2347 &state_vector_base64,
2348 )?;
2349 }
2350 Ok(client_commit_id)
2351 }
2352
2353 fn apply_local_operation_with_active_auth_lease(
2354 &mut self,
2355 actor_id: Option<&str>,
2356 now_ms_value: i64,
2357 operation: SyncOperation,
2358 local_row: Option<Value>,
2359 ) -> Result<String> {
2360 let pre_delete_scope = if operation.op == "delete" {
2361 Some(self.operation_scope_for_current_row(&operation)?)
2362 } else {
2363 None
2364 };
2365 let client_commit_id = self.apply_local_operation(operation.clone(), local_row)?;
2366 let operation_scope = pre_delete_scope
2367 .map(Ok)
2368 .unwrap_or_else(|| self.operation_scope_for_current_row(&operation))?;
2369 let provenance = self.select_active_auth_lease_for_operations(
2370 ActiveAuthLeasePolicy {
2371 actor_id,
2372 now_ms: now_ms_value,
2373 },
2374 &[operation_scope],
2375 )?;
2376 self.set_outbox_auth_lease(&client_commit_id, Some(&provenance))?;
2377 Ok(client_commit_id)
2378 }
2379
2380 fn operation_scope_for_current_row(
2381 &mut self,
2382 operation: &SyncOperation,
2383 ) -> Result<MutationOperationScope> {
2384 if is_encrypted_crdt_system_table(&operation.table) {
2385 return Ok(system_table_operation_scope(operation));
2386 }
2387 let metadata = self
2388 .app_schema
2389 .table_metadata(&operation.table)
2390 .ok_or_else(|| {
2391 SyncularError::config(format!("unknown generated app table: {}", operation.table))
2392 })?;
2393 let row = self.current_row_json(&operation.table, &operation.row_id)?;
2394 Ok(app_table_operation_scope(
2395 metadata,
2396 operation,
2397 row.as_ref(),
2398 row.is_some() || operation.op == "upsert",
2399 ))
2400 }
2401
2402 fn apply_crdt_field_yjs_update(
2403 &mut self,
2404 field: &CrdtField,
2405 update: YjsUpdateEnvelope,
2406 max_pending_updates: i64,
2407 ) -> Result<String> {
2408 assert_crdt_document_capacity(self.conn, &field.document_key(), max_pending_updates)?;
2409 let mut envelope = Map::new();
2410 envelope.insert(field.field().to_string(), serde_json::to_value(&update)?);
2411 let mut payload = Map::new();
2412 payload.insert(YJS_PAYLOAD_KEY.to_string(), Value::Object(envelope));
2413 let operation = SyncOperation {
2414 table: field.table().to_string(),
2415 row_id: field.row_id().to_string(),
2416 op: "upsert".to_string(),
2417 payload: Some(Value::Object(payload)),
2418 base_version: None,
2419 };
2420 let client_commit_id = self.apply_local_operation(operation, None)?;
2421 let row = self.current_row_json(field.table(), field.row_id())?;
2422 let state_base64 = crdt_field_state_base64(field, row.as_ref());
2423 let state_vector_base64 = yjs_state_vector_base64(state_base64.as_deref())?;
2424 record_crdt_update_log(
2425 self.conn,
2426 field,
2427 &update,
2428 Some(&client_commit_id),
2429 CrdtUpdateOrigin::Local,
2430 CrdtUpdateStatus::Pending,
2431 state_base64.as_deref(),
2432 &state_vector_base64,
2433 )?;
2434 Ok(client_commit_id)
2435 }
2436
2437 fn apply_syncular_mutations(
2438 &mut self,
2439 mutations: Vec<PendingSyncularMutation>,
2440 ) -> Result<MutationReceipt> {
2441 self.apply_syncular_mutations_inner(mutations, None)
2442 }
2443
2444 fn apply_syncular_mutations_with_active_auth_lease(
2445 &mut self,
2446 actor_id: Option<&str>,
2447 now_ms_value: i64,
2448 mutations: Vec<PendingSyncularMutation>,
2449 ) -> Result<MutationReceipt> {
2450 self.apply_syncular_mutations_inner(
2451 mutations,
2452 Some(ActiveAuthLeasePolicy {
2453 actor_id,
2454 now_ms: now_ms_value,
2455 }),
2456 )
2457 }
2458
2459 fn apply_syncular_mutations_with_command_history(
2460 &mut self,
2461 mutation_scope: &str,
2462 actor_id: Option<&str>,
2463 now_ms_value: i64,
2464 mutations: Vec<PendingSyncularMutation>,
2465 ) -> Result<MutationReceipt> {
2466 let pending = self.command_history_pending_entries(&mutations)?;
2467 let receipt = match mutation_scope {
2468 "mutations" => self.apply_syncular_mutations_inner(mutations, None)?,
2469 "leasedMutations" => self.apply_syncular_mutations_inner(
2470 mutations,
2471 Some(ActiveAuthLeasePolicy {
2472 actor_id,
2473 now_ms: now_ms_value,
2474 }),
2475 )?,
2476 other => {
2477 return Err(SyncularError::config(format!(
2478 "sync.command_history_scope_unsupported: {other}"
2479 )));
2480 }
2481 };
2482 let entries = self.command_history_committed_entries(pending)?;
2483 if !entries.is_empty() {
2484 insert_command_history_record(self.conn, mutation_scope, &entries, &receipt)?;
2485 }
2486 Ok(receipt)
2487 }
2488
2489 fn command_history_pending_entries(
2490 &mut self,
2491 mutations: &[PendingSyncularMutation],
2492 ) -> Result<Vec<PendingCommandHistoryEntry>> {
2493 let mut entries = Vec::new();
2494 for mutation in mutations {
2495 if entries.iter().any(|entry: &PendingCommandHistoryEntry| {
2496 entry.table == mutation.table && entry.row_id == mutation.row_id
2497 }) {
2498 continue;
2499 }
2500 let before = self.current_row_json(&mutation.table, &mutation.row_id)?;
2501 entries.push(PendingCommandHistoryEntry {
2502 table: mutation.table.clone(),
2503 row_id: mutation.row_id.clone(),
2504 before,
2505 });
2506 }
2507 Ok(entries)
2508 }
2509
2510 fn command_history_committed_entries(
2511 &mut self,
2512 pending: Vec<PendingCommandHistoryEntry>,
2513 ) -> Result<Vec<CommandHistoryEntry>> {
2514 let mut entries = Vec::new();
2515 for entry in pending {
2516 let after = self.current_row_json(&entry.table, &entry.row_id)?;
2517 if entry.before == after {
2518 continue;
2519 }
2520 entries.push(CommandHistoryEntry {
2521 table: entry.table,
2522 row_id: entry.row_id,
2523 before: entry.before,
2524 after,
2525 });
2526 }
2527 Ok(entries)
2528 }
2529
2530 fn apply_syncular_mutations_inner(
2531 &mut self,
2532 mutations: Vec<PendingSyncularMutation>,
2533 auth_lease_policy: Option<ActiveAuthLeasePolicy<'_>>,
2534 ) -> Result<MutationReceipt> {
2535 if mutations.is_empty() {
2536 return Err(SyncularError::config(
2537 "cannot commit an empty Syncular mutation batch",
2538 ));
2539 }
2540 validate_pending_mutation_batch_size(&mutations)?;
2541
2542 let mut operations = Vec::with_capacity(mutations.len());
2543 let mut operation_scopes = Vec::with_capacity(mutations.len());
2544 for mutation in mutations {
2545 if is_encrypted_crdt_system_table(&mutation.table) {
2546 let operation = mutation.operation(None);
2547 operation_scopes.push(system_table_operation_scope(&operation));
2548 match mutation.kind {
2549 SyncularMutationKind::Delete => {
2550 delete_encrypted_crdt_system_row(
2551 self.conn,
2552 &mutation.table,
2553 &mutation.row_id,
2554 )?;
2555 }
2556 SyncularMutationKind::Insert
2557 | SyncularMutationKind::Update
2558 | SyncularMutationKind::Upsert => {
2559 let row = apply_encrypted_crdt_system_row(
2560 self.conn,
2561 &mutation.table,
2562 &mutation.row_id,
2563 mutation.local_row.as_ref().or(operation.payload.as_ref()),
2564 None,
2565 )?;
2566 materialize_encrypted_crdt_system_row(
2567 self.conn,
2568 self.app_schema,
2569 &mutation.table,
2570 &row,
2571 )?;
2572 }
2573 }
2574 operations.push(operation);
2575 continue;
2576 }
2577
2578 if self.app_schema.table_metadata(&mutation.table).is_none() {
2579 return Err(SyncularError::config(format!(
2580 "unknown generated app table: {}",
2581 mutation.table
2582 )));
2583 }
2584
2585 let metadata = self
2586 .app_schema
2587 .table_metadata(&mutation.table)
2588 .expect("validated mutation table has metadata");
2589 let current_row = self.current_row_json(&mutation.table, &mutation.row_id)?;
2590 let current_server_version =
2591 self.row_server_version(&mutation.table, current_row.as_ref());
2592 let base_version = mutation.base_version.or_else(|| {
2593 if mutation_has_server_merge_yjs_payload(&mutation, metadata) {
2594 return None;
2595 }
2596
2597 match mutation.kind {
2598 SyncularMutationKind::Insert => None,
2599 SyncularMutationKind::Update
2600 | SyncularMutationKind::Upsert
2601 | SyncularMutationKind::Delete => current_server_version,
2602 }
2603 });
2604 let operation = mutation.operation(base_version);
2605
2606 match mutation.kind {
2607 SyncularMutationKind::Delete => {
2608 operation_scopes.push(app_table_operation_scope(
2609 metadata,
2610 &operation,
2611 current_row.as_ref(),
2612 current_row.is_some(),
2613 ));
2614 self.apply_change(&SyncChange {
2615 table: mutation.table.clone(),
2616 row_id: mutation.row_id.clone(),
2617 op: "delete".to_string(),
2618 row_json: None,
2619 row_version: base_version,
2620 scopes: Map::new(),
2621 })?;
2622 }
2623 SyncularMutationKind::Insert
2624 | SyncularMutationKind::Update
2625 | SyncularMutationKind::Upsert => {
2626 let local_row = transform_local_row_for_metadata(
2627 &mutation.table,
2628 &mutation.row_id,
2629 mutation.local_row,
2630 operation.payload.as_ref(),
2631 current_row.as_ref(),
2632 metadata,
2633 )?;
2634 let local_row = local_row.unwrap_or_else(|| {
2635 merged_local_row(
2636 metadata,
2637 current_row,
2638 &mutation.row_id,
2639 operation.payload.as_ref(),
2640 )
2641 });
2642 operation_scopes.push(app_table_operation_scope(
2643 metadata,
2644 &operation,
2645 Some(&local_row),
2646 true,
2647 ));
2648 self.upsert_row(
2649 &mutation.table,
2650 &local_row,
2651 current_server_version.or(base_version),
2652 )?;
2653 }
2654 }
2655
2656 operations.push(operation);
2657 }
2658
2659 let receipt = self.enqueue_outbox_receipt(operations)?;
2660 if let Some(policy) = auth_lease_policy {
2661 let provenance =
2662 self.select_active_auth_lease_for_operations(policy, &operation_scopes)?;
2663 self.set_outbox_auth_lease(&receipt.client_commit_id, Some(&provenance))?;
2664 }
2665 Ok(receipt)
2666 }
2667
2668 fn select_active_auth_lease_for_operations(
2669 &mut self,
2670 policy: ActiveAuthLeasePolicy<'_>,
2671 operations: &[MutationOperationScope],
2672 ) -> Result<AuthLeaseProvenance> {
2673 let candidate_leases = self.auth_lease_candidates_for_selection(policy.actor_id)?;
2674 select_auth_lease_for_operation_scopes(
2675 policy,
2676 candidate_leases,
2677 self.app_schema.current_schema_version(),
2678 operations,
2679 )
2680 }
2681
2682 fn auth_lease_candidates_for_selection(
2683 &mut self,
2684 actor_id_value: Option<&str>,
2685 ) -> Result<Vec<AuthLeaseRecord>> {
2686 use schema::sync_auth_leases::dsl as l;
2687
2688 let mut query = l::sync_auth_leases
2689 .select(AuthLeaseRecordRow::as_select())
2690 .filter(l::status.eq("active"))
2691 .into_boxed();
2692 if let Some(actor_id_value) = actor_id_value {
2693 query = query.filter(l::actor_id.eq(actor_id_value));
2694 }
2695 let rows = query.order(l::expires_at_ms.asc()).load(self.conn)?;
2696 Ok(rows.into_iter().map(AuthLeaseRecord::from).collect())
2697 }
2698
2699 fn current_row_json(&mut self, table: &str, row_id: &str) -> Result<Option<Value>> {
2700 current_app_row_json(self.conn, self.app_schema, table, row_id)
2701 }
2702
2703 fn row_server_version(&self, table: &str, row: Option<&Value>) -> Option<i64> {
2704 let metadata = self.app_schema.table_metadata(table)?;
2705 row.and_then(|row| {
2706 row.get(metadata.server_version_column)
2707 .and_then(Value::as_i64)
2708 })
2709 }
2710}
2711
2712fn merged_local_row(
2713 metadata: &AppTableMetadata,
2714 current_row: Option<Value>,
2715 row_id: &str,
2716 payload: Option<&Value>,
2717) -> Value {
2718 let mut row = current_row
2719 .and_then(|row| row.as_object().cloned())
2720 .unwrap_or_default();
2721 if let Some(payload) = payload.and_then(Value::as_object) {
2722 for (key, value) in payload {
2723 row.insert(key.clone(), value.clone());
2724 }
2725 }
2726 row.insert(
2727 metadata.primary_key_column.to_string(),
2728 Value::String(row_id.to_string()),
2729 );
2730 Value::Object(row)
2731}
2732
2733fn apply_encrypted_crdt_system_row(
2734 conn: &mut SqliteConnection,
2735 table: &str,
2736 row_id: &str,
2737 row: Option<&Value>,
2738 server_seq: Option<i64>,
2739) -> Result<Map<String, Value>> {
2740 let row = encrypted_crdt_normalize_row(table, row_id, row)?;
2741 let server_seq = server_seq
2742 .or_else(|| row.get("server_seq").and_then(Value::as_i64))
2743 .or_else(|| row.get("seq").and_then(Value::as_i64));
2744 let scopes_json = encrypted_crdt_scopes_json(&row)?;
2745 let partition_id = row
2746 .get("partition_id")
2747 .and_then(Value::as_str)
2748 .unwrap_or("default");
2749 let stream_id = row.get("stream_id").and_then(Value::as_str).unwrap();
2750 let app_table = row.get("app_table").and_then(Value::as_str).unwrap();
2751 let app_row_id = row.get("row_id").and_then(Value::as_str).unwrap();
2752 let field_name = row.get("field_name").and_then(Value::as_str).unwrap();
2753 let key_id = row.get("key_id").and_then(Value::as_str).unwrap();
2754 let ciphertext = row.get("ciphertext").and_then(Value::as_str).unwrap();
2755 let actor_id = row.get("actor_id").and_then(Value::as_str);
2756 let client_id = row.get("client_id").and_then(Value::as_str);
2757 let created_at = row
2758 .get("created_at")
2759 .and_then(Value::as_i64)
2760 .unwrap_or_else(now_ms);
2761
2762 match table {
2763 CRDT_UPDATES_TABLE => {
2764 let update_id = row.get("update_id").and_then(Value::as_str).unwrap();
2765 sql_query(
2766 r#"
2767 insert into sync_crdt_updates (
2768 partition_id, stream_id, app_table, row_id, field_name,
2769 update_id, actor_id, client_id, key_id, ciphertext, scopes, created_at,
2770 server_seq
2771 ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
2772 on conflict (update_id) do update set
2773 server_seq = coalesce(excluded.server_seq, sync_crdt_updates.server_seq)
2774 "#,
2775 )
2776 .bind::<Text, _>(partition_id)
2777 .bind::<Text, _>(stream_id)
2778 .bind::<Text, _>(app_table)
2779 .bind::<Text, _>(app_row_id)
2780 .bind::<Text, _>(field_name)
2781 .bind::<Text, _>(update_id)
2782 .bind::<diesel::sql_types::Nullable<Text>, _>(actor_id)
2783 .bind::<diesel::sql_types::Nullable<Text>, _>(client_id)
2784 .bind::<Text, _>(key_id)
2785 .bind::<Text, _>(ciphertext)
2786 .bind::<Text, _>(&scopes_json)
2787 .bind::<BigInt, _>(created_at)
2788 .bind::<diesel::sql_types::Nullable<BigInt>, _>(server_seq)
2789 .execute(conn)?;
2790 }
2791 CRDT_CHECKPOINTS_TABLE => {
2792 let checkpoint_id = row.get("checkpoint_id").and_then(Value::as_str).unwrap();
2793 let covers_seq = row.get("covers_seq").and_then(Value::as_i64).unwrap();
2794 sql_query(
2795 r#"
2796 insert into sync_crdt_checkpoints (
2797 partition_id, stream_id, app_table, row_id, field_name,
2798 checkpoint_id, covers_seq, actor_id, client_id, key_id,
2799 ciphertext, scopes, created_at, server_seq
2800 ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)
2801 on conflict (checkpoint_id) do update set
2802 server_seq = coalesce(excluded.server_seq, sync_crdt_checkpoints.server_seq)
2803 "#,
2804 )
2805 .bind::<Text, _>(partition_id)
2806 .bind::<Text, _>(stream_id)
2807 .bind::<Text, _>(app_table)
2808 .bind::<Text, _>(app_row_id)
2809 .bind::<Text, _>(field_name)
2810 .bind::<Text, _>(checkpoint_id)
2811 .bind::<BigInt, _>(covers_seq)
2812 .bind::<diesel::sql_types::Nullable<Text>, _>(actor_id)
2813 .bind::<diesel::sql_types::Nullable<Text>, _>(client_id)
2814 .bind::<Text, _>(key_id)
2815 .bind::<Text, _>(ciphertext)
2816 .bind::<Text, _>(&scopes_json)
2817 .bind::<BigInt, _>(created_at)
2818 .bind::<diesel::sql_types::Nullable<BigInt>, _>(server_seq)
2819 .execute(conn)?;
2820 }
2821 _ => unreachable!("validated encrypted CRDT table"),
2822 }
2823 Ok(row)
2824}
2825
2826fn delete_encrypted_crdt_system_row(
2827 conn: &mut SqliteConnection,
2828 table: &str,
2829 row_id: &str,
2830) -> Result<()> {
2831 let identity = encrypted_crdt_identity_column(table)?;
2832 let sql = format!("delete from {table} where {identity} = ?1");
2833 sql_query(sql).bind::<Text, _>(row_id).execute(conn)?;
2834 Ok(())
2835}
2836
2837fn update_encrypted_crdt_system_server_seq(
2838 conn: &mut SqliteConnection,
2839 table: &str,
2840 row_id: &str,
2841 server_seq: i64,
2842) -> Result<()> {
2843 let identity = encrypted_crdt_identity_column(table)?;
2844 let sql = format!("update {table} set server_seq = ?1 where {identity} = ?2");
2845 sql_query(sql)
2846 .bind::<BigInt, _>(server_seq)
2847 .bind::<Text, _>(row_id)
2848 .execute(conn)?;
2849 Ok(())
2850}
2851
2852fn materialize_encrypted_crdt_system_row(
2853 conn: &mut SqliteConnection,
2854 app_schema: AppSchema,
2855 system_table: &str,
2856 system_row: &Map<String, Value>,
2857) -> Result<()> {
2858 let app_table = system_row
2859 .get("app_table")
2860 .and_then(Value::as_str)
2861 .ok_or_else(|| SyncularError::protocol_message("encrypted CRDT row missing app_table"))?;
2862 let app_row_id = system_row
2863 .get("row_id")
2864 .and_then(Value::as_str)
2865 .ok_or_else(|| SyncularError::protocol_message("encrypted CRDT row missing row_id"))?;
2866 let field_name = system_row
2867 .get("field_name")
2868 .and_then(Value::as_str)
2869 .ok_or_else(|| SyncularError::protocol_message("encrypted CRDT row missing field_name"))?;
2870 let Some(metadata) = app_schema.table_metadata(app_table) else {
2871 return Ok(());
2872 };
2873 if !metadata
2874 .crdt_yjs_fields
2875 .iter()
2876 .any(|field| field.field == field_name && field.sync_mode == "encrypted-update-log")
2877 {
2878 return Ok(());
2879 }
2880 let current_row = current_app_row_json(conn, app_schema, app_table, app_row_id)?;
2881 let Some(row) = apply_encrypted_crdt_plaintext_to_row(
2882 metadata,
2883 field_name,
2884 app_row_id,
2885 system_table,
2886 system_row,
2887 current_row,
2888 )?
2889 else {
2890 return Ok(());
2891 };
2892 let fallback_version = row
2893 .get(metadata.server_version_column)
2894 .and_then(Value::as_i64)
2895 .or(Some(0));
2896 upsert_app_row(conn, app_schema, app_table, &row, fallback_version)?;
2897 Ok(())
2898}
2899
2900fn current_app_row_json(
2901 conn: &mut SqliteConnection,
2902 app_schema: AppSchema,
2903 table: &str,
2904 row_id: &str,
2905) -> Result<Option<Value>> {
2906 let metadata = app_schema
2907 .table_metadata(table)
2908 .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
2909 get_app_row_json_generic(conn, metadata, row_id)
2910}
2911
2912fn get_app_row_json_generic(
2913 conn: &mut SqliteConnection,
2914 metadata: &AppTableMetadata,
2915 row_id: &str,
2916) -> Result<Option<Value>> {
2917 validate_app_table_metadata(metadata)?;
2918 let projection = json_object_projection(metadata)?;
2919 let sql = format!(
2920 "select {projection} as row_json from {table} where {pk} = {row_id} limit 1",
2921 table = metadata.name,
2922 pk = metadata.primary_key_column,
2923 row_id = sql_string(row_id)
2924 );
2925 Ok(rows_from_json_query(conn, sql)?.into_iter().next())
2926}
2927
2928fn crdt_state_vector_hints_for_subscription(
2929 conn: &mut SqliteConnection,
2930 app_schema: AppSchema,
2931 table: &str,
2932 scopes: &ScopeValues,
2933 limit: i64,
2934) -> Result<Vec<CrdtStateVectorHint>> {
2935 let metadata = app_schema
2936 .table_metadata(table)
2937 .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
2938 validate_app_table_metadata(metadata)?;
2939 let rows = sql_query(
2940 r#"
2941 select document_key, app_table, row_id, field_name, state_column, sync_mode,
2942 state_base64, state_vector_base64, pending_updates, flushed_updates,
2943 acked_updates, log_updates, updated_at, compacted_at
2944 from sync_crdt_documents
2945 where app_table = ?1 and state_vector_base64 != ''
2946 order by updated_at desc
2947 limit ?2
2948 "#,
2949 )
2950 .bind::<Text, _>(table)
2951 .bind::<BigInt, _>(limit.max(0))
2952 .load::<CrdtDocumentSnapshotRow>(conn)?;
2953
2954 let mut hints = Vec::new();
2955 for row in rows {
2956 let Some(app_row) = get_app_row_json_generic(conn, metadata, &row.row_id)? else {
2957 continue;
2958 };
2959 if !row_matches_scope_values(metadata, &app_row, scopes) {
2960 continue;
2961 }
2962 hints.push(CrdtStateVectorHint {
2963 row_id: row.row_id,
2964 field: row.field_name,
2965 state_column: row.state_column,
2966 state_vector_base64: row.state_vector_base64,
2967 sync_mode: row.sync_mode,
2968 updated_at: row.updated_at,
2969 });
2970 }
2971 Ok(hints)
2972}
2973
2974fn clear_encrypted_crdt_system_table_for_scopes(
2975 conn: &mut SqliteConnection,
2976 table: &str,
2977 scopes: &ScopeValues,
2978) -> Result<()> {
2979 let identity = encrypted_crdt_identity_column(table)?;
2980 if scopes.is_empty() {
2981 let sql = format!("delete from {table}");
2982 sql_query(sql).execute(conn)?;
2983 return Ok(());
2984 }
2985
2986 let sql = format!("select {identity} as identity, scopes from {table}");
2987 let rows = sql_query(sql).load::<EncryptedCrdtScopeRow>(conn)?;
2988 for row in rows {
2989 let stored_scopes: Value = serde_json::from_str(&row.scopes)?;
2990 let mut object = Map::new();
2991 object.insert("scopes".to_string(), stored_scopes);
2992 if encrypted_crdt_row_matches_scopes(&object, scopes) {
2993 delete_encrypted_crdt_system_row(conn, table, &row.identity)?;
2994 }
2995 }
2996 Ok(())
2997}
2998
2999fn list_app_rows_json(
3000 conn: &mut SqliteConnection,
3001 app_schema: AppSchema,
3002 table: &str,
3003) -> Result<Vec<Value>> {
3004 let metadata = app_schema
3005 .table_metadata(table)
3006 .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
3007 match app_schema.adapter_for(table) {
3008 Ok(adapter) => adapter.list_rows_json(conn),
3009 Err(_) => list_rows_json_generic(conn, metadata),
3010 }
3011}
3012
3013fn clear_app_table_for_scopes(
3014 conn: &mut SqliteConnection,
3015 app_schema: AppSchema,
3016 table: &str,
3017 scopes: &ScopeValues,
3018) -> Result<()> {
3019 let metadata = app_schema
3020 .table_metadata(table)
3021 .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
3022 match app_schema.adapter_for(table) {
3023 Ok(adapter) => adapter.clear_for_scopes(conn, scopes),
3024 Err(_) => clear_table_for_scopes_generic(conn, metadata, scopes),
3025 }
3026}
3027
3028fn clear_app_table_for_scopes_preserving_local_crdt(
3029 conn: &mut SqliteConnection,
3030 app_schema: AppSchema,
3031 table: &str,
3032 scopes: &ScopeValues,
3033) -> Result<()> {
3034 let metadata = app_schema
3035 .table_metadata(table)
3036 .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
3037 let encrypted_fields = metadata
3038 .crdt_yjs_fields
3039 .iter()
3040 .filter(|field| field.sync_mode == "encrypted-update-log")
3041 .collect::<Vec<_>>();
3042 if encrypted_fields.is_empty() {
3043 return clear_app_table_for_scopes(conn, app_schema, table, scopes);
3044 }
3045
3046 validate_app_table_metadata(metadata)?;
3047 for field in &encrypted_fields {
3048 validate_identifier(field.state_column)?;
3049 }
3050 let mut filters = scope_filters(metadata, scopes)?;
3051 filters.extend(encrypted_fields.iter().map(|field| {
3052 format!(
3053 "({} is null or {} = '')",
3054 field.state_column, field.state_column
3055 )
3056 }));
3057 let where_clause = if filters.is_empty() {
3058 String::new()
3059 } else {
3060 format!(" where {}", filters.join(" and "))
3061 };
3062 let sql = format!("delete from {table}{where_clause}", table = metadata.name);
3063 sql_query(sql).execute(conn)?;
3064 Ok(())
3065}
3066
3067fn clear_synced_app_rows_for_scopes(
3068 conn: &mut SqliteConnection,
3069 app_schema: AppSchema,
3070 table: &str,
3071 scopes: &ScopeValues,
3072) -> Result<i64> {
3073 let metadata = app_schema
3074 .table_metadata(table)
3075 .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
3076 clear_synced_rows_for_scopes_generic(conn, metadata, scopes)
3077}
3078
3079fn scoped_rows_health_summary_for_schema(
3080 conn: &mut SqliteConnection,
3081 app_schema: AppSchema,
3082 subscriptions: &[SubscriptionSpec],
3083) -> Result<ScopedRowsHealthSummary> {
3084 let mut summary = ScopedRowsHealthSummary::default();
3085 for metadata in app_schema.app_table_metadata {
3086 validate_app_table_metadata(metadata)?;
3087 validate_identifier(metadata.server_version_column)?;
3088 let checked_synced_rows = count_rows(
3089 conn,
3090 &format!(
3091 "select count(*) as count from {table} where {server_version} > 0",
3092 table = metadata.name,
3093 server_version = metadata.server_version_column
3094 ),
3095 )?;
3096 let table_subscriptions = subscriptions
3097 .iter()
3098 .filter(|subscription| subscription.table == metadata.name)
3099 .collect::<Vec<_>>();
3100 let orphaned_synced_rows = if checked_synced_rows == 0 {
3101 0
3102 } else if table_subscriptions.is_empty() {
3103 checked_synced_rows
3104 } else {
3105 let scope_clauses = table_subscriptions
3106 .iter()
3107 .map(|subscription| scope_clause(metadata, &subscription.scopes))
3108 .collect::<Result<Vec<_>>>()?;
3109 count_rows(
3110 conn,
3111 &format!(
3112 "select count(*) as count from {table} where {server_version} > 0 and not ({scope_clause})",
3113 table = metadata.name,
3114 server_version = metadata.server_version_column,
3115 scope_clause = scope_clauses.join(" or ")
3116 ),
3117 )?
3118 };
3119 summary.checked_synced_rows += checked_synced_rows;
3120 summary.orphaned_synced_rows += orphaned_synced_rows;
3121 summary.tables.push(ScopedRowsTableHealth {
3122 table: metadata.name.to_string(),
3123 checked_synced_rows,
3124 orphaned_synced_rows,
3125 });
3126 }
3127 Ok(summary)
3128}
3129
3130fn clear_orphaned_synced_rows_for_schema(
3131 conn: &mut SqliteConnection,
3132 app_schema: AppSchema,
3133 subscriptions: &[SubscriptionSpec],
3134 tables: &[String],
3135) -> Result<ScopedRowsHealthSummary> {
3136 validate_requested_app_tables(app_schema, tables)?;
3137 let mut summary = ScopedRowsHealthSummary::default();
3138 for metadata in app_schema
3139 .app_table_metadata
3140 .iter()
3141 .filter(|metadata| tables.is_empty() || tables.iter().any(|table| table == metadata.name))
3142 {
3143 validate_app_table_metadata(metadata)?;
3144 validate_identifier(metadata.server_version_column)?;
3145 let checked_synced_rows = count_rows(
3146 conn,
3147 &format!(
3148 "select count(*) as count from {table} where {server_version} > 0",
3149 table = metadata.name,
3150 server_version = metadata.server_version_column
3151 ),
3152 )?;
3153 let table_subscriptions = subscriptions
3154 .iter()
3155 .filter(|subscription| subscription.table == metadata.name)
3156 .collect::<Vec<_>>();
3157 let orphaned_synced_rows = if checked_synced_rows == 0 {
3158 0
3159 } else if table_subscriptions.is_empty() {
3160 delete_rows_with_count(
3161 conn,
3162 &format!(
3163 "delete from {table} where {server_version} > 0",
3164 table = metadata.name,
3165 server_version = metadata.server_version_column
3166 ),
3167 )?
3168 } else {
3169 let scope_clauses = table_subscriptions
3170 .iter()
3171 .map(|subscription| scope_clause(metadata, &subscription.scopes))
3172 .collect::<Result<Vec<_>>>()?;
3173 delete_rows_with_count(
3174 conn,
3175 &format!(
3176 "delete from {table} where {server_version} > 0 and not ({scope_clause})",
3177 table = metadata.name,
3178 server_version = metadata.server_version_column,
3179 scope_clause = scope_clauses.join(" or ")
3180 ),
3181 )?
3182 };
3183 summary.checked_synced_rows += checked_synced_rows;
3184 summary.orphaned_synced_rows += orphaned_synced_rows;
3185 summary.tables.push(ScopedRowsTableHealth {
3186 table: metadata.name.to_string(),
3187 checked_synced_rows,
3188 orphaned_synced_rows,
3189 });
3190 }
3191 Ok(summary)
3192}
3193
3194fn validate_requested_app_tables(app_schema: AppSchema, tables: &[String]) -> Result<()> {
3195 for table in tables {
3196 if app_schema.table_metadata(table).is_none() {
3197 return Err(SyncularError::config(format!(
3198 "unknown generated app table: {table}"
3199 )));
3200 }
3201 }
3202 Ok(())
3203}
3204
3205fn count_rows(conn: &mut SqliteConnection, sql: &str) -> Result<i64> {
3206 Ok(sql_query(sql)
3207 .load::<CountRow>(conn)?
3208 .into_iter()
3209 .next()
3210 .map(|row| row.count)
3211 .unwrap_or_default())
3212}
3213
3214fn delete_rows_with_count(conn: &mut SqliteConnection, sql: &str) -> Result<i64> {
3215 Ok(sql_query(sql).execute(conn)? as i64)
3216}
3217
3218fn preserve_encrypted_crdt_materialized_columns(
3219 conn: &mut SqliteConnection,
3220 app_schema: AppSchema,
3221 metadata: &'static AppTableMetadata,
3222 row: Value,
3223) -> Result<Value> {
3224 if !metadata
3225 .crdt_yjs_fields
3226 .iter()
3227 .any(|field| field.sync_mode == "encrypted-update-log")
3228 {
3229 return Ok(row);
3230 }
3231 let Some(mut row_object) = row.as_object().cloned() else {
3232 return Ok(row);
3233 };
3234 let Some(row_id) = row_object
3235 .get(metadata.primary_key_column)
3236 .and_then(Value::as_str)
3237 .map(str::to_string)
3238 else {
3239 return Ok(Value::Object(row_object));
3240 };
3241 let Some(existing_row) = current_app_row_json(conn, app_schema, metadata.name, &row_id)? else {
3242 return Ok(Value::Object(row_object));
3243 };
3244 let Some(existing_object) = existing_row.as_object() else {
3245 return Ok(Value::Object(row_object));
3246 };
3247
3248 for field in metadata
3249 .crdt_yjs_fields
3250 .iter()
3251 .filter(|field| field.sync_mode == "encrypted-update-log")
3252 {
3253 let Some(state) = existing_object
3254 .get(field.state_column)
3255 .and_then(Value::as_str)
3256 .filter(|state| !state.is_empty())
3257 else {
3258 continue;
3259 };
3260 row_object.insert(
3261 field.state_column.to_string(),
3262 Value::String(state.to_string()),
3263 );
3264 if let Some(value) = existing_object.get(field.field) {
3265 row_object.insert(field.field.to_string(), value.clone());
3266 }
3267 }
3268 Ok(Value::Object(row_object))
3269}
3270
3271fn upsert_app_row(
3272 conn: &mut SqliteConnection,
3273 app_schema: AppSchema,
3274 table: &str,
3275 row: &Value,
3276 fallback_version: Option<i64>,
3277) -> Result<()> {
3278 let metadata = app_schema
3279 .table_metadata(table)
3280 .ok_or_else(|| SyncularError::config(format!("unknown generated app table: {table}")))?;
3281 match app_schema.adapter_for(table) {
3282 Ok(adapter) => adapter.upsert_row(conn, row, fallback_version),
3283 Err(_) => upsert_row_generic(conn, metadata, row, fallback_version),
3284 }
3285}
3286
3287fn apply_app_change(
3288 conn: &mut SqliteConnection,
3289 app_schema: AppSchema,
3290 change: &SyncChange,
3291) -> Result<()> {
3292 let metadata = app_schema.table_metadata(&change.table).ok_or_else(|| {
3293 SyncularError::config(format!("unknown generated app table: {}", change.table))
3294 })?;
3295 if change.op == "upsert" && change.row_json.is_some() {
3296 let mut change = change.clone();
3297 let row = change.row_json.take().expect("checked row_json presence");
3298 change.row_json = Some(preserve_encrypted_crdt_materialized_columns(
3299 conn, app_schema, metadata, row,
3300 )?);
3301 return match app_schema.adapter_for(&change.table) {
3302 Ok(adapter) => adapter.apply_change(conn, &change),
3303 Err(_) => apply_change_generic(conn, metadata, &change),
3304 };
3305 }
3306 match app_schema.adapter_for(&change.table) {
3307 Ok(adapter) => adapter.apply_change(conn, change),
3308 Err(_) => apply_change_generic(conn, metadata, change),
3309 }
3310}
3311
3312fn list_rows_json_generic(
3313 conn: &mut SqliteConnection,
3314 metadata: &AppTableMetadata,
3315) -> Result<Vec<Value>> {
3316 validate_app_table_metadata(metadata)?;
3317 let projection = json_object_projection(metadata)?;
3318 let sql = format!(
3319 "select {projection} as row_json from {table} order by {pk} asc",
3320 table = metadata.name,
3321 pk = metadata.primary_key_column
3322 );
3323 rows_from_json_query(conn, sql)
3324}
3325
3326fn clear_table_for_scopes_generic(
3327 conn: &mut SqliteConnection,
3328 metadata: &AppTableMetadata,
3329 scopes: &ScopeValues,
3330) -> Result<()> {
3331 validate_app_table_metadata(metadata)?;
3332 let filters = scope_filters(metadata, scopes)?;
3333 let where_clause = if filters.is_empty() {
3334 String::new()
3335 } else {
3336 format!(" where {}", filters.join(" and "))
3337 };
3338 let sql = format!("delete from {table}{where_clause}", table = metadata.name);
3339 sql_query(sql).execute(conn)?;
3340 Ok(())
3341}
3342
3343fn clear_synced_rows_for_scopes_generic(
3344 conn: &mut SqliteConnection,
3345 metadata: &AppTableMetadata,
3346 scopes: &ScopeValues,
3347) -> Result<i64> {
3348 validate_app_table_metadata(metadata)?;
3349 validate_identifier(metadata.server_version_column)?;
3350 let mut filters = scope_filters(metadata, scopes)?;
3351 filters.push(format!("{} > 0", metadata.server_version_column));
3352 let sql = format!(
3353 "delete from {table} where {where_clause}",
3354 table = metadata.name,
3355 where_clause = filters.join(" and ")
3356 );
3357 Ok(sql_query(sql).execute(conn)? as i64)
3358}
3359
3360fn upsert_row_generic(
3361 conn: &mut SqliteConnection,
3362 metadata: &AppTableMetadata,
3363 row: &Value,
3364 fallback_version: Option<i64>,
3365) -> Result<()> {
3366 validate_app_table_metadata(metadata)?;
3367 let row = row.as_object().ok_or_else(|| {
3368 SyncularError::protocol_message(format!("row is not a JSON object: {row}"))
3369 })?;
3370 row.get(metadata.primary_key_column)
3371 .and_then(Value::as_str)
3372 .ok_or_else(|| {
3373 SyncularError::protocol_message(format!(
3374 "row for table {} is missing string primary key {}",
3375 metadata.name, metadata.primary_key_column
3376 ))
3377 })?;
3378
3379 let columns = syncable_columns(metadata);
3380 if columns.is_empty() {
3381 return Ok(());
3382 }
3383 let values = columns
3384 .iter()
3385 .map(|column| generic_column_sql_value(metadata, row, column, fallback_version))
3386 .collect::<Result<Vec<_>>>()?;
3387 let update_columns = columns
3388 .iter()
3389 .copied()
3390 .filter(|column| *column != metadata.primary_key_column)
3391 .collect::<Vec<_>>();
3392 let on_conflict = if update_columns.is_empty() {
3393 "do nothing".to_string()
3394 } else {
3395 format!(
3396 "do update set {}",
3397 update_columns
3398 .iter()
3399 .map(|column| format!("{column} = excluded.{column}"))
3400 .collect::<Vec<_>>()
3401 .join(", ")
3402 )
3403 };
3404
3405 let sql = format!(
3406 "insert into {table} ({columns}) values ({values}) on conflict({pk}) {on_conflict}",
3407 table = metadata.name,
3408 columns = columns.join(", "),
3409 values = values.join(", "),
3410 pk = metadata.primary_key_column,
3411 );
3412 sql_query(sql).execute(conn)?;
3413 Ok(())
3414}
3415
3416fn upsert_rows_generic_batch(
3417 conn: &mut SqliteConnection,
3418 metadata: &AppTableMetadata,
3419 rows: &[Map<String, Value>],
3420 fallback_version: Option<i64>,
3421) -> Result<()> {
3422 if rows.is_empty() {
3423 return Ok(());
3424 }
3425 validate_app_table_metadata(metadata)?;
3426
3427 let columns = syncable_columns(metadata);
3428 if columns.is_empty() {
3429 return Ok(());
3430 }
3431 let update_columns = columns
3432 .iter()
3433 .copied()
3434 .filter(|column| *column != metadata.primary_key_column)
3435 .collect::<Vec<_>>();
3436 let on_conflict = if update_columns.is_empty() {
3437 "do nothing".to_string()
3438 } else {
3439 format!(
3440 "do update set {}",
3441 update_columns
3442 .iter()
3443 .map(|column| format!("{column} = excluded.{column}"))
3444 .collect::<Vec<_>>()
3445 .join(", ")
3446 )
3447 };
3448
3449 for batch in rows.chunks(SNAPSHOT_UPSERT_BATCH_ROWS) {
3450 let value_groups = batch
3451 .iter()
3452 .map(|row| {
3453 row.get(metadata.primary_key_column)
3454 .and_then(Value::as_str)
3455 .ok_or_else(|| {
3456 SyncularError::protocol_message(format!(
3457 "row for table {} is missing string primary key {}",
3458 metadata.name, metadata.primary_key_column
3459 ))
3460 })?;
3461 let values = columns
3462 .iter()
3463 .map(|column| generic_column_sql_value(metadata, row, column, fallback_version))
3464 .collect::<Result<Vec<_>>>()?;
3465 Ok(format!("({})", values.join(", ")))
3466 })
3467 .collect::<Result<Vec<_>>>()?;
3468 let sql = format!(
3469 "insert into {table} ({columns}) values {values} on conflict({pk}) {on_conflict}",
3470 table = metadata.name,
3471 columns = columns.join(", "),
3472 values = value_groups.join(", "),
3473 pk = metadata.primary_key_column,
3474 );
3475 sql_query(sql).execute(conn)?;
3476 }
3477
3478 Ok(())
3479}
3480
3481fn upsert_binary_snapshot_rows_batch(
3482 conn: &mut SqliteConnection,
3483 metadata: &AppTableMetadata,
3484 rows: &DecodedBinarySnapshotRows,
3485) -> Result<()> {
3486 if rows.rows.is_empty() {
3487 return Ok(());
3488 }
3489 if rows.table != metadata.name {
3490 return Err(SyncularError::protocol_message(format!(
3491 "binary snapshot table mismatch: expected {}, got {}",
3492 metadata.name, rows.table
3493 )));
3494 }
3495 validate_app_table_metadata(metadata)?;
3496
3497 let columns = rows
3498 .columns
3499 .iter()
3500 .map(|column| {
3501 validate_identifier(&column.name)?;
3502 Ok(column.name.as_str())
3503 })
3504 .collect::<Result<Vec<_>>>()?;
3505 if !columns.contains(&metadata.primary_key_column) {
3506 return Err(SyncularError::protocol_message(format!(
3507 "binary snapshot for table {} is missing primary key {}",
3508 metadata.name, metadata.primary_key_column
3509 )));
3510 }
3511 if rows.rows.iter().any(|row| row.len() != columns.len()) {
3512 return Err(SyncularError::protocol_message(format!(
3513 "binary snapshot for table {} has a row with the wrong column count",
3514 metadata.name
3515 )));
3516 }
3517
3518 let update_columns = columns
3519 .iter()
3520 .copied()
3521 .filter(|column| *column != metadata.primary_key_column)
3522 .collect::<Vec<_>>();
3523 let on_conflict = if update_columns.is_empty() {
3524 "do nothing".to_string()
3525 } else {
3526 format!(
3527 "do update set {}",
3528 update_columns
3529 .iter()
3530 .map(|column| format!("{column} = excluded.{column}"))
3531 .collect::<Vec<_>>()
3532 .join(", ")
3533 )
3534 };
3535
3536 for batch in rows.rows.chunks(SNAPSHOT_UPSERT_BATCH_ROWS) {
3537 let value_groups = batch
3538 .iter()
3539 .map(|row| {
3540 let values = row
3541 .iter()
3542 .map(binary_snapshot_cell_sql_value)
3543 .collect::<Result<Vec<_>>>()?;
3544 Ok(format!("({})", values.join(", ")))
3545 })
3546 .collect::<Result<Vec<_>>>()?;
3547 let sql = format!(
3548 "insert into {table} ({columns}) values {values} on conflict({pk}) {on_conflict}",
3549 table = metadata.name,
3550 columns = columns.join(", "),
3551 values = value_groups.join(", "),
3552 pk = metadata.primary_key_column,
3553 );
3554 sql_query(sql).execute(conn)?;
3555 }
3556
3557 Ok(())
3558}
3559
3560fn binary_snapshot_cell_sql_value(value: &BinarySnapshotCell) -> Result<String> {
3561 Ok(match value {
3562 BinarySnapshotCell::Null => "NULL".to_string(),
3563 BinarySnapshotCell::String(value) => sql_string(value),
3564 BinarySnapshotCell::Integer(value) => value.to_string(),
3565 BinarySnapshotCell::Float(value) => {
3566 if value.is_finite() {
3567 value.to_string()
3568 } else {
3569 return Err(SyncularError::protocol_message(
3570 "binary snapshot float value must be finite",
3571 ));
3572 }
3573 }
3574 BinarySnapshotCell::Boolean(value) => i32::from(*value).to_string(),
3575 BinarySnapshotCell::Json(value) => sql_string(&value.to_string()),
3576 BinarySnapshotCell::Bytes(value) => {
3577 format!("X'{}'", hex::encode(value))
3578 }
3579 })
3580}
3581
3582fn apply_change_generic(
3583 conn: &mut SqliteConnection,
3584 metadata: &AppTableMetadata,
3585 change: &SyncChange,
3586) -> Result<()> {
3587 if change.table != metadata.name {
3588 return Err(SyncularError::schema(format!(
3589 "metadata for {} cannot apply change for {}",
3590 metadata.name, change.table
3591 )));
3592 }
3593 match change.op.as_str() {
3594 "delete" => delete_row_generic(conn, metadata, &change.row_id),
3595 "upsert" => {
3596 let row = change.row_json.as_ref().ok_or_else(|| {
3597 SyncularError::protocol_message(format!(
3598 "upsert change missing row_json for {}",
3599 change.row_id
3600 ))
3601 })?;
3602 upsert_row_generic(conn, metadata, row, change.row_version)
3603 }
3604 op => Err(SyncularError::protocol_message(format!(
3605 "unsupported sync change operation: {op}"
3606 ))),
3607 }
3608}
3609
3610fn delete_row_generic(
3611 conn: &mut SqliteConnection,
3612 metadata: &AppTableMetadata,
3613 row_id: &str,
3614) -> Result<()> {
3615 validate_app_table_metadata(metadata)?;
3616 let sql = format!(
3617 "delete from {table} where {pk} = {row_id}",
3618 table = metadata.name,
3619 pk = metadata.primary_key_column,
3620 row_id = sql_string(row_id),
3621 );
3622 sql_query(sql).execute(conn)?;
3623 Ok(())
3624}
3625
3626fn rows_from_json_query(conn: &mut SqliteConnection, sql: String) -> Result<Vec<Value>> {
3627 sql_query(sql)
3628 .load::<JsonObjectRow>(conn)?
3629 .into_iter()
3630 .map(|row| Ok(serde_json::from_str::<Value>(&row.row_json)?))
3631 .collect()
3632}
3633
3634fn json_object_projection(metadata: &AppTableMetadata) -> Result<String> {
3635 let columns = syncable_columns(metadata);
3636 if columns.is_empty() {
3637 return Err(SyncularError::schema(format!(
3638 "app table {} has no declared columns",
3639 metadata.name
3640 )));
3641 }
3642 let pairs = columns
3643 .iter()
3644 .map(|column| {
3645 validate_identifier(column)?;
3646 Ok(format!("{label}, {column}", label = sql_string(column)))
3647 })
3648 .collect::<Result<Vec<_>>>()?;
3649 Ok(format!("json_object({})", pairs.join(", ")))
3650}
3651
3652fn generic_column_sql_value(
3653 metadata: &AppTableMetadata,
3654 row: &Map<String, Value>,
3655 column: &str,
3656 fallback_version: Option<i64>,
3657) -> Result<String> {
3658 if column == metadata.server_version_column {
3659 if let Some(version) = fallback_version {
3660 return Ok(version.to_string());
3661 }
3662 return Ok(row
3663 .get(column)
3664 .map(sql_value)
3665 .unwrap_or_else(|| "0".to_string()));
3666 }
3667
3668 if let Some(value) = row.get(column) {
3669 return Ok(sql_value(value));
3670 }
3671
3672 if column == metadata.primary_key_column {
3673 return Err(SyncularError::protocol_message(format!(
3674 "row for table {} is missing primary key {}",
3675 metadata.name, metadata.primary_key_column
3676 )));
3677 }
3678
3679 let column_metadata = metadata
3680 .columns
3681 .iter()
3682 .find(|candidate| candidate.name == column);
3683 if column_metadata.is_some_and(|column| column.notnull_required) {
3684 return Err(SyncularError::protocol_message(format!(
3685 "row for table {} is missing required column {}",
3686 metadata.name, column
3687 )));
3688 }
3689
3690 Ok("NULL".to_string())
3691}
3692
3693fn scope_filters(metadata: &AppTableMetadata, scopes: &ScopeValues) -> Result<Vec<String>> {
3694 for scope_name in scopes.keys() {
3695 if !metadata.scopes.iter().any(|scope| scope.name == scope_name) {
3696 return Err(SyncularError::config(format!(
3697 "unknown scope {scope_name} for table {}",
3698 metadata.name
3699 )));
3700 }
3701 }
3702
3703 let mut filters = Vec::new();
3704 for scope in metadata.scopes {
3705 match scopes.get(scope.name) {
3706 Some(value) => filters.push(scope_filter(scope.column, value)?),
3707 None if scope.required => filters.push("0 = 1".to_string()),
3708 None => {}
3709 }
3710 }
3711 Ok(filters)
3712}
3713
3714fn scope_clause(metadata: &AppTableMetadata, scopes: &ScopeValues) -> Result<String> {
3715 let filters = scope_filters(metadata, scopes)?;
3716 if filters.is_empty() {
3717 Ok("1 = 1".to_string())
3718 } else {
3719 Ok(format!("({})", filters.join(" and ")))
3720 }
3721}
3722
3723fn scope_filter(column: &str, value: &Value) -> Result<String> {
3724 validate_identifier(column)?;
3725 Ok(match value {
3726 Value::Null => format!("{column} is null"),
3727 Value::Array(values) if values.is_empty() => "0 = 1".to_string(),
3728 Value::Array(values) => format!(
3729 "{column} in ({})",
3730 values.iter().map(sql_value).collect::<Vec<_>>().join(", ")
3731 ),
3732 value => format!("{column} = {}", sql_value(value)),
3733 })
3734}
3735
3736fn row_matches_scope_values(
3737 metadata: &AppTableMetadata,
3738 row: &Value,
3739 scopes: &ScopeValues,
3740) -> bool {
3741 metadata.scopes.iter().all(|scope| {
3742 let Some(expected) = scopes.get(scope.name) else {
3743 return !scope.required;
3744 };
3745 let actual = row.get(scope.column);
3746 match expected {
3747 Value::Array(values) => actual.is_some_and(|actual| values.iter().any(|v| v == actual)),
3748 value => actual == Some(value),
3749 }
3750 })
3751}
3752
3753fn syncable_columns(metadata: &AppTableMetadata) -> Vec<&'static str> {
3754 let mut columns = metadata
3755 .columns
3756 .iter()
3757 .map(|column| column.name)
3758 .collect::<Vec<_>>();
3759 if !columns.contains(&metadata.primary_key_column) {
3760 columns.insert(0, metadata.primary_key_column);
3761 }
3762 if !columns.contains(&metadata.server_version_column) {
3763 columns.push(metadata.server_version_column);
3764 }
3765 columns
3766}
3767
3768fn validate_app_table_metadata(metadata: &AppTableMetadata) -> Result<()> {
3769 validate_identifier(metadata.name)?;
3770 validate_identifier(metadata.primary_key_column)?;
3771 validate_identifier(metadata.server_version_column)?;
3772 for column in metadata.columns {
3773 validate_identifier(column.name)?;
3774 }
3775 for scope in metadata.scopes {
3776 validate_identifier(scope.column)?;
3777 }
3778 Ok(())
3779}
3780
3781fn validate_identifier(identifier: &str) -> Result<()> {
3782 if !identifier.is_empty()
3783 && identifier
3784 .bytes()
3785 .all(|byte| byte.is_ascii_alphanumeric() || byte == b'_')
3786 {
3787 Ok(())
3788 } else {
3789 Err(SyncularError::schema(format!(
3790 "invalid sqlite identifier: {identifier}"
3791 )))
3792 }
3793}
3794
3795fn add_column_if_missing(
3796 conn: &mut SqliteConnection,
3797 table: &str,
3798 column: &str,
3799 alter_sql: &str,
3800) -> Result<()> {
3801 let columns = sql_query(format!(
3802 "select name from pragma_table_info({})",
3803 sql_string(table)
3804 ))
3805 .load::<ColumnNameRow>(conn)?;
3806 if columns.iter().any(|row| row.name == column) {
3807 return Ok(());
3808 }
3809 sql_query(alter_sql).execute(conn)?;
3810 Ok(())
3811}
3812
3813fn sql_string(value: &str) -> String {
3814 format!("'{}'", value.replace('\'', "''"))
3815}
3816
3817fn sql_value(value: &Value) -> String {
3818 match value {
3819 Value::Null => "NULL".to_string(),
3820 Value::Bool(value) => i32::from(*value).to_string(),
3821 Value::Number(value) => value.to_string(),
3822 Value::String(value) => sql_string(value),
3823 Value::Array(_) | Value::Object(_) => sql_string(&value.to_string()),
3824 }
3825}
3826
3827fn crdt_sync_mode_str(mode: crate::crdt_field::CrdtFieldSyncMode) -> &'static str {
3828 match mode {
3829 crate::crdt_field::CrdtFieldSyncMode::ServerMerge => "server-merge",
3830 crate::crdt_field::CrdtFieldSyncMode::EncryptedUpdateLog => "encrypted-update-log",
3831 }
3832}
3833
3834fn crdt_sync_mode_from_str(value: &str) -> Result<crate::crdt_field::CrdtFieldSyncMode> {
3835 match value {
3836 "server-merge" => Ok(crate::crdt_field::CrdtFieldSyncMode::ServerMerge),
3837 "encrypted-update-log" => Ok(crate::crdt_field::CrdtFieldSyncMode::EncryptedUpdateLog),
3838 other => Err(SyncularError::message(
3839 ErrorKind::Storage,
3840 format!("unknown CRDT document sync mode: {other}"),
3841 )),
3842 }
3843}
3844
3845fn crdt_update_origin_str(origin: CrdtUpdateOrigin) -> &'static str {
3846 match origin {
3847 CrdtUpdateOrigin::Local => "local",
3848 CrdtUpdateOrigin::Remote => "remote",
3849 CrdtUpdateOrigin::Compaction => "compaction",
3850 }
3851}
3852
3853fn crdt_update_origin_from_str(value: &str) -> Result<CrdtUpdateOrigin> {
3854 match value {
3855 "local" => Ok(CrdtUpdateOrigin::Local),
3856 "remote" => Ok(CrdtUpdateOrigin::Remote),
3857 "compaction" => Ok(CrdtUpdateOrigin::Compaction),
3858 other => Err(SyncularError::message(
3859 ErrorKind::Storage,
3860 format!("unknown CRDT update origin: {other}"),
3861 )),
3862 }
3863}
3864
3865fn crdt_update_status_str(status: CrdtUpdateStatus) -> &'static str {
3866 match status {
3867 CrdtUpdateStatus::Pending => "pending",
3868 CrdtUpdateStatus::Flushed => "flushed",
3869 CrdtUpdateStatus::Acked => "acked",
3870 CrdtUpdateStatus::Pruned => "pruned",
3871 }
3872}
3873
3874fn crdt_update_status_from_str(value: &str) -> Result<CrdtUpdateStatus> {
3875 match value {
3876 "pending" => Ok(CrdtUpdateStatus::Pending),
3877 "flushed" => Ok(CrdtUpdateStatus::Flushed),
3878 "acked" => Ok(CrdtUpdateStatus::Acked),
3879 "pruned" => Ok(CrdtUpdateStatus::Pruned),
3880 other => Err(SyncularError::message(
3881 ErrorKind::Storage,
3882 format!("unknown CRDT update status: {other}"),
3883 )),
3884 }
3885}
3886
3887fn crdt_field_state_base64(field: &CrdtField, row: Option<&Value>) -> Option<String> {
3888 row.and_then(|row| {
3889 row.get(field.state_column())
3890 .and_then(Value::as_str)
3891 .filter(|value| !value.is_empty())
3892 .map(str::to_string)
3893 })
3894}
3895
3896fn assert_crdt_document_capacity(
3897 conn: &mut SqliteConnection,
3898 document_key: &str,
3899 max_pending_updates: i64,
3900) -> Result<()> {
3901 if max_pending_updates < 1 {
3902 return Err(SyncularError::config(
3903 "CRDT update queue capacity must be at least 1",
3904 ));
3905 }
3906 let pending = sql_query(
3907 r#"
3908 select count(*) as count
3909 from sync_crdt_update_log
3910 where document_key = ?1 and status in ('pending', 'flushed')
3911 "#,
3912 )
3913 .bind::<Text, _>(document_key)
3914 .load::<CountRow>(conn)?
3915 .into_iter()
3916 .next()
3917 .map(|row| row.count)
3918 .unwrap_or(0);
3919 if pending >= max_pending_updates {
3920 return Err(SyncularError::message(ErrorKind::Storage, format!(
3921 "CRDT update queue is full for document {document_key}; pending={pending}, capacity={max_pending_updates}"
3922 )));
3923 }
3924 Ok(())
3925}
3926
3927fn upsert_crdt_document_snapshot(
3928 conn: &mut SqliteConnection,
3929 field: &CrdtField,
3930 state_base64: Option<&str>,
3931 state_vector_base64: &str,
3932 compacted_at: Option<i64>,
3933) -> Result<()> {
3934 let now = now_ms();
3935 sql_query(
3936 r#"
3937 insert into sync_crdt_documents (
3938 document_key, app_table, row_id, field_name, state_column, sync_mode,
3939 state_base64, state_vector_base64, pending_updates, flushed_updates,
3940 acked_updates, log_updates, created_at, updated_at, compacted_at
3941 ) values (
3942 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8,
3943 0, 0, 0, 0, ?9, ?10, ?11
3944 )
3945 on conflict(document_key) do update set
3946 state_base64 = excluded.state_base64,
3947 state_vector_base64 = excluded.state_vector_base64,
3948 state_column = excluded.state_column,
3949 sync_mode = excluded.sync_mode,
3950 updated_at = excluded.updated_at,
3951 compacted_at = coalesce(excluded.compacted_at, sync_crdt_documents.compacted_at)
3952 "#,
3953 )
3954 .bind::<Text, _>(field.document_key())
3955 .bind::<Text, _>(field.table())
3956 .bind::<Text, _>(field.row_id())
3957 .bind::<Text, _>(field.field())
3958 .bind::<Text, _>(field.state_column())
3959 .bind::<Text, _>(crdt_sync_mode_str(field.sync_mode()))
3960 .bind::<Nullable<Text>, _>(state_base64)
3961 .bind::<Text, _>(state_vector_base64)
3962 .bind::<BigInt, _>(now)
3963 .bind::<BigInt, _>(now)
3964 .bind::<Nullable<BigInt>, _>(compacted_at)
3965 .execute(conn)?;
3966 refresh_crdt_document_counts(conn, &field.document_key())
3967}
3968
3969fn record_crdt_update_log(
3970 conn: &mut SqliteConnection,
3971 field: &CrdtField,
3972 update: &YjsUpdateEnvelope,
3973 client_commit_id: Option<&str>,
3974 origin: CrdtUpdateOrigin,
3975 status: CrdtUpdateStatus,
3976 state_base64: Option<&str>,
3977 state_vector_base64: &str,
3978) -> Result<()> {
3979 upsert_crdt_document_snapshot(conn, field, state_base64, state_vector_base64, None)?;
3980 let now = now_ms();
3981 let document_key = field.document_key();
3982 sql_query(
3983 r#"
3984 insert into sync_crdt_update_log (
3985 document_key, app_table, row_id, field_name, update_id, client_commit_id,
3986 origin, status, update_base64, state_vector_base64, created_at, flushed_at, acked_at
3987 ) values (
3988 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11,
3989 case when ?8 in ('flushed', 'acked') then ?11 else null end,
3990 case when ?8 = 'acked' then ?11 else null end
3991 )
3992 on conflict(update_id) do update set
3993 state_vector_base64 = excluded.state_vector_base64,
3994 status = case
3995 when sync_crdt_update_log.status = 'acked' then sync_crdt_update_log.status
3996 else excluded.status
3997 end,
3998 flushed_at = coalesce(sync_crdt_update_log.flushed_at, excluded.flushed_at),
3999 acked_at = coalesce(sync_crdt_update_log.acked_at, excluded.acked_at)
4000 "#,
4001 )
4002 .bind::<Text, _>(&document_key)
4003 .bind::<Text, _>(field.table())
4004 .bind::<Text, _>(field.row_id())
4005 .bind::<Text, _>(field.field())
4006 .bind::<Text, _>(&update.update_id)
4007 .bind::<Nullable<Text>, _>(client_commit_id)
4008 .bind::<Text, _>(crdt_update_origin_str(origin))
4009 .bind::<Text, _>(crdt_update_status_str(status))
4010 .bind::<Text, _>(&update.update_base64)
4011 .bind::<Text, _>(state_vector_base64)
4012 .bind::<BigInt, _>(now)
4013 .execute(conn)?;
4014 refresh_crdt_document_counts(conn, &document_key)
4015}
4016
4017fn select_crdt_document_snapshot(
4018 conn: &mut SqliteConnection,
4019 document_key: &str,
4020) -> Result<CrdtDocumentSnapshot> {
4021 sql_query(
4022 r#"
4023 select document_key, app_table, row_id, field_name, state_column, sync_mode,
4024 state_base64, state_vector_base64, pending_updates, flushed_updates,
4025 acked_updates, log_updates, updated_at, compacted_at
4026 from sync_crdt_documents
4027 where document_key = ?1
4028 limit 1
4029 "#,
4030 )
4031 .bind::<Text, _>(document_key)
4032 .load::<CrdtDocumentSnapshotRow>(conn)?
4033 .into_iter()
4034 .next()
4035 .ok_or_else(|| {
4036 SyncularError::message(
4037 ErrorKind::Storage,
4038 format!("CRDT document not found: {document_key}"),
4039 )
4040 })?
4041 .try_into()
4042}
4043
4044fn refresh_crdt_document_counts(conn: &mut SqliteConnection, document_key: &str) -> Result<()> {
4045 sql_query(
4046 r#"
4047 update sync_crdt_documents
4048 set pending_updates = (
4049 select count(*) from sync_crdt_update_log
4050 where document_key = ?1 and status = 'pending'
4051 ),
4052 flushed_updates = (
4053 select count(*) from sync_crdt_update_log
4054 where document_key = ?1 and status = 'flushed'
4055 ),
4056 acked_updates = (
4057 select count(*) from sync_crdt_update_log
4058 where document_key = ?1 and status = 'acked'
4059 ),
4060 log_updates = (
4061 select count(*) from sync_crdt_update_log
4062 where document_key = ?1
4063 ),
4064 updated_at = ?2
4065 where document_key = ?1
4066 "#,
4067 )
4068 .bind::<Text, _>(document_key)
4069 .bind::<BigInt, _>(now_ms())
4070 .execute(conn)?;
4071 Ok(())
4072}
4073
4074fn refresh_all_crdt_document_counts(conn: &mut SqliteConnection) -> Result<()> {
4075 let keys = sql_query("select document_key as value from sync_crdt_documents")
4076 .load::<StringValueRow>(conn)?;
4077 for key in keys {
4078 refresh_crdt_document_counts(conn, &key.value)?;
4079 }
4080 Ok(())
4081}
4082
4083impl SyncStoreTx for DieselSqliteTx<'_> {
4084 fn pending_outbox(&mut self, limit: i64) -> Result<Vec<OutboxCommit>> {
4085 use schema::sync_outbox_commits::dsl as o;
4086 let now = now_ms();
4087
4088 let rows: Vec<OutboxCommitRow> = o::sync_outbox_commits
4089 .select(OutboxCommitRow::as_select())
4090 .filter(o::status.eq("pending"))
4091 .filter(o::attempt_count.lt(MAX_SYNC_RETRIES))
4092 .filter(o::next_attempt_at.le(now))
4093 .order(o::created_at.asc())
4094 .limit(limit)
4095 .load(self.conn)?;
4096
4097 Ok(rows.into_iter().map(OutboxCommit::from).collect())
4098 }
4099
4100 fn requeue_stale_outbox(&mut self) -> Result<()> {
4101 let now = now_ms();
4102 let stale_before = now - SYNC_SENDING_TIMEOUT_MS;
4103 sql_query(
4104 r#"
4105 update sync_outbox_commits
4106 set status = case when attempt_count >= ?1 then 'failed' else 'pending' end,
4107 next_attempt_at = case when attempt_count >= ?1 then 0 else ?2 end,
4108 error = case
4109 when attempt_count >= ?1 then 'Sync attempt timed out while in sending state'
4110 else 'Sync attempt timed out while in sending state; retrying'
4111 end,
4112 updated_at = ?2
4113 where status = 'sending' and updated_at < ?3
4114 "#,
4115 )
4116 .bind::<Integer, _>(MAX_SYNC_RETRIES)
4117 .bind::<BigInt, _>(now)
4118 .bind::<BigInt, _>(stale_before)
4119 .execute(self.conn)?;
4120 Ok(())
4121 }
4122
4123 fn mark_outbox_sending(&mut self, row_id: &str) -> Result<()> {
4124 use schema::sync_outbox_commits::dsl as o;
4125 let now = now_ms();
4126 diesel::update(o::sync_outbox_commits.filter(o::id.eq(row_id)))
4127 .set((
4128 o::status.eq("sending"),
4129 o::updated_at.eq(now),
4130 o::attempt_count.eq(o::attempt_count + 1),
4131 o::error.eq::<Option<String>>(None),
4132 o::next_attempt_at.eq(0),
4133 ))
4134 .execute(self.conn)?;
4135 sql_query(
4136 r#"
4137 update sync_crdt_update_log
4138 set status = 'flushed',
4139 flushed_at = coalesce(flushed_at, ?1)
4140 where status = 'pending'
4141 and client_commit_id = (
4142 select client_commit_id from sync_outbox_commits where id = ?2
4143 )
4144 "#,
4145 )
4146 .bind::<BigInt, _>(now)
4147 .bind::<Text, _>(row_id)
4148 .execute(self.conn)?;
4149 refresh_all_crdt_document_counts(self.conn)?;
4150 Ok(())
4151 }
4152
4153 fn mark_pushed_operation_server_versions(
4154 &mut self,
4155 outbox: &OutboxCommit,
4156 response: &PushCommitResponse,
4157 ) -> Result<()> {
4158 let operations: Vec<SyncOperation> = serde_json::from_str(&outbox.operations_json)?;
4159 if response.results.is_empty() {
4160 if let Some(server_seq) = response.commit_seq {
4161 for operation in &operations {
4162 if is_encrypted_crdt_system_table(&operation.table) {
4163 update_encrypted_crdt_system_server_seq(
4164 self.conn,
4165 &operation.table,
4166 &operation.row_id,
4167 server_seq,
4168 )?;
4169 }
4170 }
4171 }
4172 return Ok(());
4173 }
4174
4175 for result in &response.results {
4176 if !matches!(result.status.as_str(), "applied" | "cached") {
4177 continue;
4178 }
4179 let Some(server_seq) = result.server_version.or(response.commit_seq) else {
4180 continue;
4181 };
4182 let operation = operations.get(result.op_index as usize).ok_or_else(|| {
4183 SyncularError::protocol_message(format!(
4184 "push response op_index {} out of bounds for local outbox commit {}",
4185 result.op_index, outbox.client_commit_id
4186 ))
4187 })?;
4188 if is_encrypted_crdt_system_table(&operation.table) {
4189 update_encrypted_crdt_system_server_seq(
4190 self.conn,
4191 &operation.table,
4192 &operation.row_id,
4193 server_seq,
4194 )?;
4195 }
4196 }
4197 Ok(())
4198 }
4199
4200 fn mark_outbox_acked(&mut self, row_id: &str, response: &PushCommitResponse) -> Result<()> {
4201 use schema::sync_outbox_commits::dsl as o;
4202 let now = now_ms();
4203 diesel::update(o::sync_outbox_commits.filter(o::id.eq(row_id)))
4204 .set((
4205 o::status.eq("acked"),
4206 o::updated_at.eq(now),
4207 o::acked_commit_seq.eq(response.commit_seq),
4208 o::last_response_json.eq(Some(serde_json::to_string(response)?)),
4209 o::error.eq::<Option<String>>(None),
4210 o::next_attempt_at.eq(0),
4211 ))
4212 .execute(self.conn)?;
4213 sql_query(
4214 r#"
4215 update sync_crdt_update_log
4216 set status = 'acked',
4217 flushed_at = coalesce(flushed_at, ?1),
4218 acked_at = coalesce(acked_at, ?1)
4219 where status in ('pending', 'flushed')
4220 and client_commit_id = (
4221 select client_commit_id from sync_outbox_commits where id = ?2
4222 )
4223 "#,
4224 )
4225 .bind::<BigInt, _>(now)
4226 .bind::<Text, _>(row_id)
4227 .execute(self.conn)?;
4228 refresh_all_crdt_document_counts(self.conn)?;
4229 Ok(())
4230 }
4231
4232 fn mark_outbox_failed(
4233 &mut self,
4234 row_id: &str,
4235 error: &str,
4236 response: &PushCommitResponse,
4237 ) -> Result<()> {
4238 use schema::sync_outbox_commits::dsl as o;
4239 diesel::update(o::sync_outbox_commits.filter(o::id.eq(row_id)))
4240 .set((
4241 o::status.eq("failed"),
4242 o::updated_at.eq(now_ms()),
4243 o::last_response_json.eq(Some(serde_json::to_string(response)?)),
4244 o::error.eq(Some(error.to_string())),
4245 o::next_attempt_at.eq(0),
4246 ))
4247 .execute(self.conn)?;
4248 Ok(())
4249 }
4250
4251 fn mark_outbox_retry(
4252 &mut self,
4253 row_id: &str,
4254 error: &str,
4255 next_attempt_at: i64,
4256 failed: bool,
4257 ) -> Result<()> {
4258 use schema::sync_outbox_commits::dsl as o;
4259 let now = now_ms();
4260 diesel::update(o::sync_outbox_commits.filter(o::id.eq(row_id)))
4261 .set((
4262 o::status.eq(if failed { "failed" } else { "pending" }),
4263 o::updated_at.eq(now),
4264 o::error.eq(Some(error.to_string())),
4265 o::next_attempt_at.eq(if failed { 0 } else { next_attempt_at }),
4266 ))
4267 .execute(self.conn)?;
4268 if !failed {
4269 sql_query(
4270 r#"
4271 update sync_crdt_update_log
4272 set status = 'pending'
4273 where status = 'flushed'
4274 and client_commit_id = (
4275 select client_commit_id from sync_outbox_commits where id = ?1
4276 )
4277 "#,
4278 )
4279 .bind::<Text, _>(row_id)
4280 .execute(self.conn)?;
4281 refresh_all_crdt_document_counts(self.conn)?;
4282 }
4283 Ok(())
4284 }
4285
4286 fn insert_conflict(&mut self, outbox: &OutboxCommit, result: &OperationResult) -> Result<()> {
4287 sql_query(
4288 r#"
4289 insert into sync_conflicts (
4290 id, outbox_commit_id, client_commit_id, op_index, result_status,
4291 message, code, server_version, server_row_json, created_at,
4292 resolved_at, resolution
4293 ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, null, null)
4294 "#,
4295 )
4296 .bind::<diesel::sql_types::Text, _>(Uuid::new_v4().to_string())
4297 .bind::<diesel::sql_types::Text, _>(&outbox.id)
4298 .bind::<diesel::sql_types::Text, _>(&outbox.client_commit_id)
4299 .bind::<diesel::sql_types::Integer, _>(result.op_index)
4300 .bind::<diesel::sql_types::Text, _>(&result.status)
4301 .bind::<diesel::sql_types::Text, _>(
4302 result
4303 .message
4304 .clone()
4305 .or_else(|| result.error.clone())
4306 .unwrap_or_else(|| result.status.clone()),
4307 )
4308 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(result.code.clone())
4309 .bind::<diesel::sql_types::Nullable<diesel::sql_types::BigInt>, _>(result.server_version)
4310 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
4311 result.server_row.as_ref().map(Value::to_string),
4312 )
4313 .bind::<diesel::sql_types::BigInt, _>(now_ms())
4314 .execute(self.conn)?;
4315
4316 Ok(())
4317 }
4318
4319 fn upsert_auth_lease(&mut self, lease: &AuthLeaseRecord) -> Result<()> {
4320 use schema::sync_auth_leases::dsl as l;
4321
4322 let row = AuthLeaseRecordRow::from(lease);
4323 diesel::insert_into(l::sync_auth_leases)
4324 .values(&row)
4325 .on_conflict(l::lease_id)
4326 .do_update()
4327 .set(&row)
4328 .execute(self.conn)?;
4329 Ok(())
4330 }
4331
4332 fn auth_lease(&mut self, lease_id_value: &str) -> Result<Option<AuthLeaseRecord>> {
4333 use schema::sync_auth_leases::dsl as l;
4334
4335 let row = l::sync_auth_leases
4336 .select(AuthLeaseRecordRow::as_select())
4337 .filter(l::lease_id.eq(lease_id_value))
4338 .first::<AuthLeaseRecordRow>(self.conn)
4339 .optional()?;
4340 Ok(row.map(AuthLeaseRecord::from))
4341 }
4342
4343 fn active_auth_leases(
4344 &mut self,
4345 actor_id_value: Option<&str>,
4346 now_ms_value: i64,
4347 ) -> Result<Vec<AuthLeaseRecord>> {
4348 use schema::sync_auth_leases::dsl as l;
4349
4350 let mut query = l::sync_auth_leases
4351 .select(AuthLeaseRecordRow::as_select())
4352 .filter(l::status.eq("active"))
4353 .filter(l::not_before_ms.le(now_ms_value))
4354 .filter(l::expires_at_ms.gt(now_ms_value))
4355 .into_boxed();
4356 if let Some(actor_id_value) = actor_id_value {
4357 query = query.filter(l::actor_id.eq(actor_id_value));
4358 }
4359 let rows = query.order(l::expires_at_ms.asc()).load(self.conn)?;
4360 Ok(rows.into_iter().map(AuthLeaseRecord::from).collect())
4361 }
4362
4363 fn set_outbox_auth_lease(
4364 &mut self,
4365 client_commit_id_value: &str,
4366 provenance: Option<&AuthLeaseProvenance>,
4367 ) -> Result<()> {
4368 use schema::sync_outbox_commits::dsl as o;
4369
4370 let lease_token_value = match provenance {
4371 Some(lease) => lease.lease_token.clone().or_else(|| {
4372 self.auth_lease(&lease.lease_id)
4373 .ok()
4374 .flatten()
4375 .map(|record| record.token)
4376 }),
4377 None => None,
4378 };
4379 let affected = diesel::update(
4380 o::sync_outbox_commits.filter(o::client_commit_id.eq(client_commit_id_value)),
4381 )
4382 .set((
4383 o::lease_id.eq(provenance.map(|lease| lease.lease_id.clone())),
4384 o::lease_expires_at_ms.eq(provenance.map(|lease| lease.lease_expires_at_ms)),
4385 o::lease_status_at_enqueue
4386 .eq(provenance.map(|lease| lease.lease_status_at_enqueue.clone())),
4387 o::lease_scope_summary_json
4388 .eq(provenance.and_then(|lease| lease.lease_scope_summary_json.clone())),
4389 o::lease_token.eq(lease_token_value),
4390 ))
4391 .execute(self.conn)?;
4392 if affected == 0 {
4393 return Err(SyncularError::storage(anyhow::anyhow!(
4394 "outbox commit {client_commit_id_value} does not exist"
4395 )));
4396 }
4397 Ok(())
4398 }
4399
4400 fn subscription_state(
4401 &mut self,
4402 state_id_value: &str,
4403 subscription_id_value: &str,
4404 ) -> Result<Option<SubscriptionState>> {
4405 use schema::sync_subscription_state::dsl as s;
4406
4407 let row: Option<SubscriptionStateRow> = s::sync_subscription_state
4408 .select(SubscriptionStateRow::as_select())
4409 .filter(s::state_id.eq(state_id_value))
4410 .filter(s::subscription_id.eq(subscription_id_value))
4411 .first(self.conn)
4412 .optional()?;
4413
4414 Ok(row.map(SubscriptionState::from))
4415 }
4416
4417 fn subscription_states(&mut self, state_id_value: &str) -> Result<Vec<SubscriptionState>> {
4418 use schema::sync_subscription_state::dsl as s;
4419
4420 let rows: Vec<SubscriptionStateRow> = s::sync_subscription_state
4421 .select(SubscriptionStateRow::as_select())
4422 .filter(s::state_id.eq(state_id_value))
4423 .order(s::subscription_id.asc())
4424 .load(self.conn)?;
4425
4426 Ok(rows.into_iter().map(SubscriptionState::from).collect())
4427 }
4428
4429 fn upsert_subscription_state(&mut self, state: &SubscriptionState) -> Result<()> {
4430 sql_query(
4431 r#"
4432 insert into sync_subscription_state (
4433 state_id, subscription_id, "table", scopes_json, params_json,
4434 cursor, bootstrap_state_json, status, created_at, updated_at
4435 ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
4436 on conflict (state_id, subscription_id) do update set
4437 "table" = excluded."table",
4438 scopes_json = excluded.scopes_json,
4439 params_json = excluded.params_json,
4440 cursor = excluded.cursor,
4441 bootstrap_state_json = excluded.bootstrap_state_json,
4442 status = excluded.status,
4443 updated_at = excluded.updated_at
4444 "#,
4445 )
4446 .bind::<diesel::sql_types::Text, _>(&state.state_id)
4447 .bind::<diesel::sql_types::Text, _>(&state.subscription_id)
4448 .bind::<diesel::sql_types::Text, _>(&state.table)
4449 .bind::<diesel::sql_types::Text, _>(&state.scopes_json)
4450 .bind::<diesel::sql_types::Text, _>(&state.params_json)
4451 .bind::<diesel::sql_types::BigInt, _>(state.cursor)
4452 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
4453 state.bootstrap_state_json.clone(),
4454 )
4455 .bind::<diesel::sql_types::Text, _>(&state.status)
4456 .bind::<diesel::sql_types::BigInt, _>(now_ms())
4457 .bind::<diesel::sql_types::BigInt, _>(now_ms())
4458 .execute(self.conn)?;
4459
4460 Ok(())
4461 }
4462
4463 fn delete_subscription_state(
4464 &mut self,
4465 state_id_value: &str,
4466 subscription_id_value: &str,
4467 ) -> Result<()> {
4468 use schema::sync_subscription_state::dsl as s;
4469
4470 diesel::delete(
4471 s::sync_subscription_state
4472 .filter(s::state_id.eq(state_id_value))
4473 .filter(s::subscription_id.eq(subscription_id_value)),
4474 )
4475 .execute(self.conn)?;
4476 Ok(())
4477 }
4478
4479 fn verified_root(
4480 &mut self,
4481 state_id_value: &str,
4482 subscription_id_value: &str,
4483 ) -> Result<Option<VerifiedRoot>> {
4484 let row: Option<VerifiedRootRow> = sql_query(
4485 r#"
4486 select state_id, subscription_id, partition_id, commit_seq, root
4487 from sync_verified_roots
4488 where state_id = ?1 and subscription_id = ?2
4489 limit 1
4490 "#,
4491 )
4492 .bind::<Text, _>(state_id_value)
4493 .bind::<Text, _>(subscription_id_value)
4494 .get_result(self.conn)
4495 .optional()?;
4496 Ok(row.map(VerifiedRoot::from))
4497 }
4498
4499 fn verified_roots(&mut self, state_id_value: &str) -> Result<Vec<VerifiedRoot>> {
4500 let rows: Vec<VerifiedRootRow> = sql_query(
4501 r#"
4502 select state_id, subscription_id, partition_id, commit_seq, root
4503 from sync_verified_roots
4504 where state_id = ?1
4505 order by subscription_id asc
4506 "#,
4507 )
4508 .bind::<Text, _>(state_id_value)
4509 .load(self.conn)?;
4510 Ok(rows.into_iter().map(VerifiedRoot::from).collect())
4511 }
4512
4513 fn upsert_verified_root(&mut self, root: &VerifiedRoot) -> Result<()> {
4514 let now = now_ms();
4515 sql_query(
4516 r#"
4517 insert into sync_verified_roots (
4518 state_id, subscription_id, partition_id, commit_seq, root,
4519 created_at, updated_at
4520 ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7)
4521 on conflict (state_id, subscription_id) do update set
4522 partition_id = excluded.partition_id,
4523 commit_seq = excluded.commit_seq,
4524 root = excluded.root,
4525 updated_at = excluded.updated_at
4526 "#,
4527 )
4528 .bind::<Text, _>(&root.state_id)
4529 .bind::<Text, _>(&root.subscription_id)
4530 .bind::<Text, _>(&root.partition_id)
4531 .bind::<BigInt, _>(root.commit_seq)
4532 .bind::<Text, _>(&root.root)
4533 .bind::<BigInt, _>(now)
4534 .bind::<BigInt, _>(now)
4535 .execute(self.conn)?;
4536 Ok(())
4537 }
4538
4539 fn delete_verified_root(
4540 &mut self,
4541 state_id_value: &str,
4542 subscription_id_value: &str,
4543 ) -> Result<()> {
4544 sql_query(
4545 r#"
4546 delete from sync_verified_roots
4547 where state_id = ?1 and subscription_id = ?2
4548 "#,
4549 )
4550 .bind::<Text, _>(state_id_value)
4551 .bind::<Text, _>(subscription_id_value)
4552 .execute(self.conn)?;
4553 Ok(())
4554 }
4555
4556 fn crdt_state_vector_hints(
4557 &mut self,
4558 table: &str,
4559 scopes: &ScopeValues,
4560 limit: i64,
4561 ) -> Result<Vec<CrdtStateVectorHint>> {
4562 crdt_state_vector_hints_for_subscription(self.conn, self.app_schema, table, scopes, limit)
4563 }
4564
4565 fn clear_table_for_scopes(&mut self, table: &str, scopes: &ScopeValues) -> Result<()> {
4566 if is_encrypted_crdt_system_table(table) {
4567 return clear_encrypted_crdt_system_table_for_scopes(self.conn, table, scopes);
4568 }
4569 clear_app_table_for_scopes(self.conn, self.app_schema, table, scopes)
4570 }
4571
4572 fn clear_synced_rows_for_scopes(&mut self, table: &str, scopes: &ScopeValues) -> Result<i64> {
4573 clear_synced_app_rows_for_scopes(self.conn, self.app_schema, table, scopes)
4574 }
4575
4576 fn clear_table_for_scopes_preserving_local_crdt(
4577 &mut self,
4578 table: &str,
4579 scopes: &ScopeValues,
4580 ) -> Result<()> {
4581 if is_encrypted_crdt_system_table(table) {
4582 return clear_encrypted_crdt_system_table_for_scopes(self.conn, table, scopes);
4583 }
4584 clear_app_table_for_scopes_preserving_local_crdt(self.conn, self.app_schema, table, scopes)
4585 }
4586
4587 fn current_row_json(&mut self, table: &str, row_id: &str) -> Result<Option<Value>> {
4588 if is_encrypted_crdt_system_table(table) {
4589 return Ok(None);
4590 }
4591 current_app_row_json(self.conn, self.app_schema, table, row_id)
4592 }
4593
4594 fn upsert_row(
4595 &mut self,
4596 table: &str,
4597 row: &Value,
4598 fallback_version: Option<i64>,
4599 ) -> Result<()> {
4600 if is_encrypted_crdt_system_table(table) {
4601 let identity = encrypted_crdt_identity_column(table)?;
4602 let row_id = row.get(identity).and_then(Value::as_str).ok_or_else(|| {
4603 SyncularError::protocol_message(format!(
4604 "encrypted CRDT row missing identity column {identity}"
4605 ))
4606 })?;
4607 let row = apply_encrypted_crdt_system_row(
4608 self.conn,
4609 table,
4610 row_id,
4611 Some(row),
4612 fallback_version,
4613 )?;
4614 materialize_encrypted_crdt_system_row(self.conn, self.app_schema, table, &row)?;
4615 return Ok(());
4616 }
4617
4618 let metadata = self.app_schema.table_metadata(table).ok_or_else(|| {
4619 SyncularError::config(format!("unknown generated app table: {table}"))
4620 })?;
4621 let row = materialize_row_for_metadata(table, None, row.clone(), metadata)?;
4622 let row = preserve_encrypted_crdt_materialized_columns(
4623 self.conn,
4624 self.app_schema,
4625 metadata,
4626 row,
4627 )?;
4628 upsert_app_row(self.conn, self.app_schema, table, &row, fallback_version)
4629 }
4630
4631 fn upsert_rows(
4632 &mut self,
4633 table: &str,
4634 rows: &[Value],
4635 fallback_version: Option<i64>,
4636 ) -> Result<()> {
4637 if rows.is_empty() {
4638 return Ok(());
4639 }
4640 if is_encrypted_crdt_system_table(table) {
4641 for row in rows {
4642 self.upsert_row(table, row, fallback_version)?;
4643 }
4644 return Ok(());
4645 }
4646
4647 let metadata = self.app_schema.table_metadata(table).ok_or_else(|| {
4648 SyncularError::config(format!("unknown generated app table: {table}"))
4649 })?;
4650 if metadata
4651 .crdt_yjs_fields
4652 .iter()
4653 .any(|field| field.sync_mode == "encrypted-update-log")
4654 {
4655 for row in rows {
4656 self.upsert_row(table, row, fallback_version)?;
4657 }
4658 return Ok(());
4659 }
4660
4661 let row_objects = rows
4662 .iter()
4663 .map(|row| {
4664 let row = materialize_row_for_metadata(table, None, row.clone(), metadata)?;
4665 row.as_object().cloned().ok_or_else(|| {
4666 SyncularError::protocol_message(format!("row is not a JSON object: {row}"))
4667 })
4668 })
4669 .collect::<Result<Vec<_>>>()?;
4670
4671 upsert_rows_generic_batch(self.conn, metadata, &row_objects, fallback_version)
4672 }
4673
4674 fn upsert_snapshot_chunk_rows(
4675 &mut self,
4676 table: &str,
4677 rows: &SnapshotChunkRows,
4678 fallback_version: Option<i64>,
4679 ) -> Result<()> {
4680 match rows {
4681 SnapshotChunkRows::Json(rows) => self.upsert_rows(table, rows, fallback_version),
4682 SnapshotChunkRows::Binary(rows) => {
4683 if fallback_version.is_some() || is_encrypted_crdt_system_table(table) {
4684 let rows = rows.clone().into_value_rows();
4685 return self.upsert_rows(table, &rows, fallback_version);
4686 }
4687
4688 let metadata = self.app_schema.table_metadata(table).ok_or_else(|| {
4689 SyncularError::config(format!("unknown generated app table: {table}"))
4690 })?;
4691 if metadata
4692 .crdt_yjs_fields
4693 .iter()
4694 .any(|field| field.sync_mode == "encrypted-update-log")
4695 {
4696 let rows = rows.clone().into_value_rows();
4697 return self.upsert_rows(table, &rows, fallback_version);
4698 }
4699
4700 upsert_binary_snapshot_rows_batch(self.conn, metadata, rows)
4701 }
4702 SnapshotChunkRows::BinaryPayload(rows) => {
4703 let rows = rows.clone().into_decoded_rows()?;
4704 if fallback_version.is_some() || is_encrypted_crdt_system_table(table) {
4705 let rows = rows.into_value_rows();
4706 return self.upsert_rows(table, &rows, fallback_version);
4707 }
4708
4709 let metadata = self.app_schema.table_metadata(table).ok_or_else(|| {
4710 SyncularError::config(format!("unknown generated app table: {table}"))
4711 })?;
4712 if metadata
4713 .crdt_yjs_fields
4714 .iter()
4715 .any(|field| field.sync_mode == "encrypted-update-log")
4716 {
4717 let rows = rows.into_value_rows();
4718 return self.upsert_rows(table, &rows, fallback_version);
4719 }
4720
4721 upsert_binary_snapshot_rows_batch(self.conn, metadata, &rows)
4722 }
4723 }
4724 }
4725
4726 fn apply_change(&mut self, change: &SyncChange) -> Result<()> {
4727 if is_encrypted_crdt_system_table(&change.table) {
4728 if change.op == "delete" {
4729 return delete_encrypted_crdt_system_row(self.conn, &change.table, &change.row_id);
4730 }
4731 let row = apply_encrypted_crdt_system_row(
4732 self.conn,
4733 &change.table,
4734 &change.row_id,
4735 change.row_json.as_ref(),
4736 change.row_version,
4737 )?;
4738 materialize_encrypted_crdt_system_row(self.conn, self.app_schema, &change.table, &row)?;
4739 return Ok(());
4740 }
4741
4742 if change.op == "upsert" {
4743 let metadata = self
4744 .app_schema
4745 .table_metadata(&change.table)
4746 .ok_or_else(|| {
4747 SyncularError::config(format!("unknown generated app table: {}", change.table))
4748 })?;
4749 if let Some(row) = change.row_json.as_ref() {
4750 let mut change = change.clone();
4751 change.row_json = Some(if has_yjs_payload(row) {
4752 let existing_row = current_app_row_json(
4753 self.conn,
4754 self.app_schema,
4755 &change.table,
4756 &change.row_id,
4757 )?;
4758 transform_local_row_for_metadata(
4759 &change.table,
4760 &change.row_id,
4761 None,
4762 Some(row),
4763 existing_row.as_ref(),
4764 metadata,
4765 )?
4766 .ok_or_else(|| {
4767 SyncularError::protocol_message(format!(
4768 "server-merge Yjs change for {}.{} did not materialize a row",
4769 change.table, change.row_id
4770 ))
4771 })?
4772 } else {
4773 materialize_row_for_metadata(
4774 &change.table,
4775 Some(&change.row_id),
4776 row.clone(),
4777 metadata,
4778 )?
4779 });
4780 return apply_app_change(self.conn, self.app_schema, &change);
4781 }
4782 }
4783 apply_app_change(self.conn, self.app_schema, change)
4784 }
4785}
4786
4787fn has_yjs_payload(value: &Value) -> bool {
4788 value
4789 .as_object()
4790 .is_some_and(|object| object.contains_key(YJS_PAYLOAD_KEY))
4791}