1use crate::app_schema::{app_schema_from_json, default_app_schema, AppSchema, AppTableMetadata};
2use crate::client::{
3 sync_changed_crdt_field_from_metadata, sync_changed_row_for_local_operation, BootstrapStatus,
4 CrdtFieldCompactionReceipt, CrdtFieldMaterialization, CrdtFieldWriteReceipt, SubscriptionSpec,
5 SyncChangedRow, SyncReport, SyncularClient, SyncularClientConfig,
6};
7use crate::crdt_field::{CrdtField, CrdtFieldId, CrdtFieldSyncMode};
8use crate::crdt_yjs::{
9 validate_crdt_request_json_size, validate_yjs_text_input_size,
10 validate_yjs_update_envelope_size, YjsUpdateEnvelope,
11};
12use crate::diesel_sqlite::DieselSqliteStore;
13use crate::encrypted_crdt::{EncryptedCrdt, CRDT_CHECKPOINTS_TABLE, CRDT_UPDATES_TABLE};
14use crate::encryption::{encryption_helpers_json, BlobEncryption, FieldEncryption};
15use crate::error::{ErrorKind, Result, SyncularError};
16use crate::limits::{
17 runtime_default_limits, RuntimeLimits, DEFAULT_CRDT_UPDATE_LOG_LIMIT,
18 DEFAULT_NATIVE_EVENT_STREAM_CAPACITY, DEFAULT_NATIVE_RECENT_EVENT_LIMIT,
19 DEFAULT_READONLY_QUERY_STATEMENT_CACHE_CAPACITY,
20 MAX_NATIVE_DIAGNOSTIC_EVENT_PAYLOAD_JSON_BYTES,
21};
22use crate::protocol::{
23 validate_mutation_json_input_size, AuthLeaseProvenance, BlobRef, BootstrapState,
24};
25use crate::runtime_schema::runtime_schema_version;
26use crate::sqlite_query::ReadonlySqlQueryExecutor;
27use crate::store::{now_ms, AuthLeaseRecord, ConflictSummary, OutboxSummary};
28use crate::transport::{
29 HttpSyncTransport, RealtimeEvent, RealtimePresenceEntry, RealtimePresenceEvent,
30 SyncAuthHeaderStore, SyncAuthHeaders, SyncTransportConfig,
31};
32use crate::worker::{
33 PersistentRealtimeWorker, SyncWorker, SyncWorkerEvent, SyncWorkerEventSubscription,
34};
35use serde::{Deserialize, Serialize};
36use serde_json::{json, Map, Value};
37use std::collections::{BTreeMap, BTreeSet, VecDeque};
38use std::path::Path;
39use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
40use std::sync::{Arc, Condvar, Mutex};
41use std::thread::{self, JoinHandle};
42use std::time::Duration;
43use uuid::Uuid;
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NativeClientConfig {
47 pub db_path: String,
48 pub base_url: String,
49 pub client_id: String,
50 pub actor_id: String,
51 pub project_id: Option<String>,
52 #[serde(default)]
53 pub app_schema_json: Option<String>,
54}
55
56#[derive(Debug, Clone)]
57pub struct NativeClientOptions {
58 pub auto_sync_local_writes: bool,
59}
60
61impl Default for NativeClientOptions {
62 fn default() -> Self {
63 Self {
64 auto_sync_local_writes: true,
65 }
66 }
67}
68
69pub struct NativeSyncularClient {
70 config: SyncularClientConfig,
71 writer: SyncularClient<DieselSqliteStore, HttpSyncTransport>,
72 worker: Option<SyncWorker>,
73 worker_event_pump: Option<JoinHandle<()>>,
74 realtime_worker: Option<PersistentRealtimeWorker>,
75 presence_by_scope: Arc<Mutex<BTreeMap<String, Vec<NativePresenceEntry>>>>,
76 auth_headers: SyncAuthHeaders,
77 field_encryption: Option<FieldEncryption>,
78 encrypted_crdt: Option<EncryptedCrdt>,
79 blob_encryption: Option<BlobEncryption>,
80 auto_sync_local_writes: bool,
81 shutdown_on_drop: bool,
82 command_seq: Mutex<u64>,
83 events: NativeEventHub,
84 default_events: NativeEventSubscription,
85 read_executor: Mutex<ReadonlySqlQueryExecutor>,
86}
87
88pub struct NativeClientOpenTask {
89 command_id: String,
90 result_rx: Option<Receiver<Result<NativeSyncularClient>>>,
91 completed: Option<Result<NativeSyncularClient>>,
92 finished: bool,
93 taken: bool,
94}
95
96#[derive(Debug, Clone)]
97pub struct NativeSyncularClientBuilder {
98 config: NativeClientConfig,
99 options: NativeClientOptions,
100 realtime: bool,
101 auth_headers: Option<SyncAuthHeaders>,
102 subscriptions: Option<Vec<SubscriptionSpec>>,
103 initial_sync: bool,
104 initial_websocket_sync: bool,
105 process_blob_uploads_on_open: bool,
106 shutdown_on_drop: bool,
107}
108
109pub struct NativePresenceHandle<'a> {
110 client: &'a mut NativeSyncularClient,
111 scope_key: String,
112 active: bool,
113}
114
115pub const NATIVE_FFI_ABI_VERSION: u32 = 2;
116
117#[derive(Debug, Clone, Serialize)]
118pub struct NativeRuntimeManifest {
119 pub ffi_abi_version: u32,
120 pub crate_name: &'static str,
121 pub crate_version: &'static str,
122 pub schema_version: i32,
123 pub storage_backend: &'static str,
124 pub transport_backends: &'static [&'static str],
125 pub worker_model: &'static str,
126 pub string_encoding: &'static str,
127 pub error_shape: &'static str,
128 pub event_model: &'static str,
129 pub limits: RuntimeLimits,
130 pub capabilities: &'static [&'static str],
131 pub app_tables: &'static [&'static str],
132 pub app_table_metadata: &'static [AppTableMetadata],
133}
134
135#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
136pub enum NativeEventKind {
137 SyncStarted,
138 SyncCompleted,
139 SyncFailed,
140 AuthExpired,
141 LocalWriteCommitted,
142 LocalWriteFailed,
143 ConflictResolutionCompleted,
144 ConflictResolutionFailed,
145 SnapshotReady,
146 CrdtFieldChanged,
147 CrdtFieldCompacted,
148 WorkerCommandCompleted,
149 WorkerCommandFailed,
150 RowsChanged,
151 QueriesChanged,
152 ConflictsChanged,
153 PresenceChanged,
154 BlobUploadsChanged,
155 Diagnostic,
156 EventsOverflowed,
157}
158
159#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
160#[serde(rename_all = "camelCase")]
161pub enum NativeLifecyclePhase {
162 Offline,
163 Syncing,
164 Recovering,
165 AuthRequired,
166 Degraded,
167 Complete,
168 Closed,
169}
170
171#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
172#[serde(rename_all = "camelCase")]
173pub struct NativeLifecycleBootstrap {
174 pub complete: bool,
175 pub critical_ready: bool,
176 pub interactive_ready: bool,
177 pub is_bootstrapping: bool,
178 pub progress_percent: i64,
179}
180
181#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
182#[serde(rename_all = "camelCase")]
183pub struct NativeLifecycleOutbox {
184 pub pending: usize,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
188#[serde(rename_all = "camelCase")]
189pub struct NativeLifecycleConflicts {
190 pub unresolved: usize,
191}
192
193#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
194#[serde(rename_all = "camelCase")]
195pub struct NativeLifecycleBlobUploads {
196 pub pending: i64,
197 pub uploading: i64,
198 pub failed: i64,
199}
200
201#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
202#[serde(rename_all = "camelCase")]
203pub struct NativeLifecycleState {
204 pub phase: NativeLifecyclePhase,
205 pub online: bool,
206 pub requires_action: bool,
207 pub pending_requests: usize,
208 #[serde(default, skip_serializing_if = "Option::is_none")]
209 pub bootstrap: Option<NativeLifecycleBootstrap>,
210 #[serde(default, skip_serializing_if = "Option::is_none")]
211 pub outbox: Option<NativeLifecycleOutbox>,
212 #[serde(default, skip_serializing_if = "Option::is_none")]
213 pub conflicts: Option<NativeLifecycleConflicts>,
214 #[serde(default, skip_serializing_if = "Option::is_none")]
215 pub blob_uploads: Option<NativeLifecycleBlobUploads>,
216 #[serde(default, skip_serializing_if = "Option::is_none")]
217 pub last_error: Option<NativeErrorInfo>,
218 #[serde(default, skip_serializing_if = "Option::is_none")]
219 pub last_diagnostic: Option<NativeDiagnostic>,
220}
221
222#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
223pub struct NativeErrorInfo {
224 pub kind: ErrorKind,
225 pub code: String,
226 pub category: String,
227 pub retryable: bool,
228 #[serde(rename = "recommendedAction")]
229 pub recommended_action: String,
230 pub message: String,
231 #[serde(skip_serializing_if = "Option::is_none")]
232 pub debug: Option<String>,
233}
234
235#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
236pub struct NativeEvent {
237 #[serde(default)]
238 pub event_seq: u64,
239 pub kind: NativeEventKind,
240 pub error: Option<NativeErrorInfo>,
241 #[serde(default, skip_serializing_if = "Option::is_none")]
242 pub command_id: Option<String>,
243 #[serde(default, skip_serializing_if = "Option::is_none")]
244 pub client_commit_id: Option<String>,
245 #[serde(default, skip_serializing_if = "Option::is_none")]
246 pub outbox_count: Option<usize>,
247 #[serde(default, skip_serializing_if = "Option::is_none")]
248 pub conflict_count: Option<usize>,
249 #[serde(default, skip_serializing_if = "Option::is_none")]
250 pub retry_scheduled: Option<bool>,
251 #[serde(default, skip_serializing_if = "Option::is_none")]
252 pub duration_ms: Option<u64>,
253 #[serde(
254 default,
255 rename = "droppedCount",
256 skip_serializing_if = "Option::is_none"
257 )]
258 pub dropped_count: Option<usize>,
259 #[serde(
260 default,
261 rename = "resyncRequired",
262 skip_serializing_if = "Option::is_none"
263 )]
264 pub resync_required: Option<bool>,
265 #[serde(default, skip_serializing_if = "Option::is_none")]
266 pub auth: Option<NativeAuthInfo>,
267 #[serde(default, skip_serializing_if = "Option::is_none")]
268 pub diagnostic: Option<NativeDiagnostic>,
269 pub tables: Vec<String>,
270 #[serde(default, rename = "changedRows", skip_serializing_if = "Vec::is_empty")]
271 pub changed_rows: Vec<SyncChangedRow>,
272 #[serde(default)]
273 pub queries: Vec<String>,
274 #[serde(default, skip_serializing_if = "Option::is_none")]
275 pub bootstrap: Option<BootstrapStatus>,
276 #[serde(default, skip_serializing_if = "Option::is_none")]
277 pub lifecycle: Option<NativeLifecycleState>,
278 #[serde(default, skip_serializing_if = "Option::is_none")]
279 pub payload_json: Option<Value>,
280}
281
282#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
283pub struct NativeDiagnostic {
284 pub at: i64,
285 pub level: String,
286 pub source: String,
287 pub code: String,
288 pub message: String,
289 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
290 pub details: BTreeMap<String, Value>,
291}
292
293#[derive(Debug, Clone, Serialize)]
294#[serde(rename_all = "camelCase")]
295pub struct NativeDiagnosticSnapshot {
296 pub generated_at: i64,
297 pub runtime: NativeRuntimeManifest,
298 pub connection: NativeDiagnosticConnectionSnapshot,
299 pub subscriptions: Vec<NativeDiagnosticSubscriptionSnapshot>,
300 pub recent_events: Vec<NativeEvent>,
301 pub recent_diagnostics: Vec<NativeDiagnostic>,
302 pub recent_sync_timings: Vec<NativeSyncTimingSnapshot>,
303 pub limits: RuntimeLimits,
304 pub bootstrap: BootstrapStatus,
305 pub outbox_stats: NativeOutboxStats,
306 pub conflict_stats: NativeConflictStats,
307 pub blob_upload_queue_stats: Value,
308 pub blob_cache_stats: Value,
309 pub observed_queries: Vec<NativeObservedQuery>,
310}
311
312#[derive(Debug, Clone, Serialize)]
313#[serde(rename_all = "camelCase")]
314pub struct NativeDiagnosticConnectionSnapshot {
315 pub sync_worker_running: bool,
316 pub realtime_worker_running: bool,
317 pub auto_sync_local_writes: bool,
318 pub event_subscriber_count: usize,
319 pub observed_query_count: usize,
320}
321
322#[derive(Debug, Clone, Serialize)]
323#[serde(rename_all = "camelCase")]
324pub struct NativeDiagnosticSubscriptionSnapshot {
325 pub id: String,
326 pub table: String,
327 pub scope_keys: Vec<String>,
328 pub scope_value_count: usize,
329 pub params_keys: Vec<String>,
330 pub params_value_count: usize,
331 pub status: Option<String>,
332 pub ready: bool,
333 pub phase: String,
334 pub progress_percent: i64,
335 pub cursor: Option<i64>,
336 pub bootstrap_phase: i64,
337 pub bootstrap_state: Option<BootstrapState>,
338}
339
340#[derive(Debug, Clone, Default, Serialize)]
341#[serde(rename_all = "camelCase")]
342pub struct NativeOutboxStats {
343 pub pending: usize,
344 pub sending: usize,
345 pub failed: usize,
346 pub acked: usize,
347 pub total: usize,
348}
349
350#[derive(Debug, Clone, Default, Serialize)]
351#[serde(rename_all = "camelCase")]
352pub struct NativeConflictStats {
353 pub unresolved: usize,
354 pub resolved: usize,
355 pub total: usize,
356}
357
358#[derive(Debug, Clone, Serialize)]
359#[serde(rename_all = "camelCase")]
360pub struct NativeSyncTimingSnapshot {
361 pub event_seq: u64,
362 pub kind: String,
363 pub command_id: Option<String>,
364 pub total_ms: u64,
365 pub success: bool,
366 pub retry_scheduled: Option<bool>,
367 pub outbox_count: Option<usize>,
368 pub conflict_count: Option<usize>,
369}
370
371#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
372pub struct NativeAuthInfo {
373 pub operation: String,
374 pub status: u16,
375}
376
377#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
378#[serde(rename_all = "camelCase")]
379pub struct NativePresenceEntry {
380 pub client_id: String,
381 pub actor_id: String,
382 pub joined_at: i64,
383 #[serde(default, skip_serializing_if = "Option::is_none")]
384 pub metadata: Option<Value>,
385}
386
387#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
388#[serde(rename_all = "camelCase")]
389pub struct NativeObservedQueryDependencyHint {
390 pub table: String,
391 #[serde(default, skip_serializing_if = "Vec::is_empty")]
392 pub row_ids: Vec<String>,
393 #[serde(default, skip_serializing_if = "Vec::is_empty")]
394 pub fields: Vec<String>,
395}
396
397#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
398pub struct NativeObservedQuery {
399 pub id: String,
400 pub tables: Vec<String>,
401 #[serde(
402 default,
403 rename = "dependencyHints",
404 skip_serializing_if = "Vec::is_empty"
405 )]
406 pub dependency_hints: Vec<NativeObservedQueryDependencyHint>,
407 #[serde(default, skip_serializing_if = "Option::is_none")]
408 pub label: Option<String>,
409}
410
411#[derive(Clone, Default)]
412struct NativeEventHub {
413 event_seq: Arc<Mutex<u64>>,
414 subscriber_seq: Arc<Mutex<u64>>,
415 subscribers: Arc<Mutex<BTreeMap<u64, Arc<NativeEventQueue>>>>,
416 query_observers: Arc<Mutex<BTreeMap<String, NativeObservedQuery>>>,
417 recent_events: Arc<Mutex<VecDeque<NativeEvent>>>,
418}
419
420pub struct NativeEventSubscription {
421 hub: NativeEventHub,
422 subscriber_id: u64,
423 queue: Arc<NativeEventQueue>,
424}
425
426pub struct NativeEventJsonIterator {
427 subscription: NativeEventSubscription,
428}
429
430struct NativeEventQueue {
431 capacity: usize,
432 state: Mutex<NativeEventQueueState>,
433 ready: Condvar,
434}
435
436struct NativeEventQueueState {
437 events: VecDeque<NativeEvent>,
438 closed: bool,
439}
440
441#[derive(Debug, Clone, Deserialize)]
442struct NativeObservedQueryRegistration {
443 #[serde(default)]
444 id: Option<String>,
445 tables: Vec<String>,
446 #[serde(default, rename = "dependencyHints")]
447 dependency_hints: Vec<NativeObservedQueryDependencyHint>,
448 #[serde(default)]
449 label: Option<String>,
450}
451
452#[derive(Debug, Clone, Deserialize)]
453#[serde(rename_all = "camelCase")]
454struct NativeEncryptedCrdtRequest {
455 table: String,
456}
457
458#[derive(Debug, Clone, Deserialize)]
459#[serde(rename_all = "camelCase")]
460struct NativeCrdtFieldRequest {
461 table: String,
462 row_id: String,
463 field: String,
464}
465
466#[derive(Debug, Clone, Deserialize)]
467#[serde(rename_all = "camelCase")]
468struct NativeCrdtFieldTextRequest {
469 table: String,
470 row_id: String,
471 field: String,
472 next_text: String,
473}
474
475#[derive(Debug, Clone, Deserialize)]
476#[serde(rename_all = "camelCase")]
477struct NativeCrdtFieldYjsUpdateRequest {
478 table: String,
479 row_id: String,
480 field: String,
481 update: YjsUpdateEnvelope,
482}
483
484#[derive(Debug, Clone, Deserialize)]
485#[serde(rename_all = "camelCase")]
486struct NativeCrdtFieldCompactionRequest {
487 table: String,
488 row_id: String,
489 field: String,
490 #[serde(default)]
491 min_uncheckpointed_updates: Option<i64>,
492}
493
494#[derive(Debug, Clone, Deserialize)]
495#[serde(rename_all = "camelCase")]
496struct NativeCrdtFieldLogRequest {
497 table: String,
498 row_id: String,
499 field: String,
500 #[serde(default)]
501 limit: Option<i64>,
502}
503
504#[derive(Debug, Clone, Default, Deserialize)]
505#[serde(rename_all = "camelCase")]
506pub struct NativeBlobStoreOptions {
507 pub mime_type: Option<String>,
508 pub immediate: Option<bool>,
509 pub cache_local: Option<bool>,
510}
511
512#[derive(Debug, Clone, Default, Deserialize)]
513#[serde(rename_all = "camelCase")]
514pub struct NativeBlobRetrieveOptions {
515 pub cache_local: Option<bool>,
516}
517
518pub fn native_runtime_manifest() -> NativeRuntimeManifest {
519 NativeRuntimeManifest {
520 ffi_abi_version: NATIVE_FFI_ABI_VERSION,
521 crate_name: env!("CARGO_PKG_NAME"),
522 crate_version: env!("CARGO_PKG_VERSION"),
523 schema_version: runtime_schema_version(),
524 storage_backend: "diesel-sqlite",
525 transport_backends: &["http", "websocket"],
526 worker_model: "background-sync-worker",
527 string_encoding: "utf-8-json",
528 error_shape: "native-error-info-v1",
529 event_model: "native-event-stream-json-v1",
530 limits: runtime_default_limits(),
531 capabilities: &[
532 "dynamic-auth-headers",
533 "dynamic-subscriptions",
534 "auth-expired-events",
535 "generated-app-table-metadata",
536 "generated-app-schema-state",
537 "generated-json-table-reads",
538 "generated-json-local-operations",
539 "generated-json-mutations",
540 "generated-json-leased-mutations",
541 "queued-json-local-operations",
542 "queued-json-leased-mutations",
543 "auth-lease-issue",
544 "queued-yjs-updates",
545 "queued-snapshot-refresh",
546 "queued-storage-compaction",
547 "queued-blob-cache-work",
548 "worker-command-queue",
549 "ordered-native-events",
550 "native-event-stream",
551 "read-only-query-json",
552 "outbox-json",
553 "conflicts-json",
554 "table-level-rows-changed-events",
555 "query-observer-events",
556 "conflicts-changed-events",
557 "realtime-presence",
558 "presence-changed-events",
559 "blob-file-api",
560 "background-worker-lifecycle",
561 "background-resume-recovery",
562 "structured-diagnostics",
563 "diagnostic-snapshot",
564 "runtime-limits",
565 "storage-compaction",
566 "streaming-blob-file-api",
567 "crdt-yjs",
568 "field-encryption",
569 "encrypted-crdt",
570 "queued-encrypted-crdt",
571 "generic-crdt-field-api",
572 "queued-crdt-field-updates",
573 "encryption-key-sharing",
574 "async-native-open",
575 ],
576 app_tables: &[],
577 app_table_metadata: &[],
578 }
579}
580
581pub fn native_runtime_manifest_json() -> Result<String> {
582 Ok(serde_json::to_string(&native_runtime_manifest())?)
583}
584
585impl NativeSyncularClient {
586 pub fn builder(config: NativeClientConfig) -> NativeSyncularClientBuilder {
587 NativeSyncularClientBuilder::new(config)
588 }
589
590 pub fn open(config: SyncularClientConfig) -> Result<Self> {
591 Self::open_with_options(config, NativeClientOptions::default())
592 }
593
594 pub fn open_native(config: NativeClientConfig) -> Result<Self> {
595 Self::open(config.into())
596 }
597
598 pub fn open_native_with_options(
599 config: NativeClientConfig,
600 options: NativeClientOptions,
601 ) -> Result<Self> {
602 let app_schema = config.app_schema()?;
603 Self::open_with_options_and_schema(config.into(), options, app_schema)
604 }
605
606 pub fn open_native_async_with_options(
607 config: NativeClientConfig,
608 options: NativeClientOptions,
609 ) -> NativeClientOpenTask {
610 NativeClientOpenTask::open_native_with_options(config, options)
611 }
612
613 pub fn open_with_options(
614 config: SyncularClientConfig,
615 options: NativeClientOptions,
616 ) -> Result<Self> {
617 Self::open_with_options_and_schema(config, options, default_app_schema())
618 }
619
620 pub fn open_with_options_and_schema(
621 config: SyncularClientConfig,
622 options: NativeClientOptions,
623 app_schema: AppSchema,
624 ) -> Result<Self> {
625 let writer = SyncularClient::open_with_schema(config.clone(), app_schema)?;
626 let worker_client = SyncularClient::open_with_schema(config.clone(), app_schema)?;
627 let events = NativeEventHub::default();
628 let default_events = events.subscribe(DEFAULT_NATIVE_EVENT_STREAM_CAPACITY);
629 let worker = SyncWorker::start(worker_client);
630 let worker_event_pump = Some(start_worker_event_pump(
631 events.clone(),
632 worker.event_source(),
633 ));
634 let read_executor = ReadonlySqlQueryExecutor::open(
635 &config.db_path,
636 app_schema,
637 DEFAULT_READONLY_QUERY_STATEMENT_CACHE_CAPACITY,
638 )?;
639 let presence_by_scope = Arc::new(Mutex::new(BTreeMap::new()));
640
641 Ok(Self {
642 config,
643 writer,
644 worker: Some(worker),
645 worker_event_pump,
646 realtime_worker: None,
647 presence_by_scope,
648 auth_headers: SyncAuthHeaders::new(),
649 field_encryption: None,
650 encrypted_crdt: None,
651 blob_encryption: None,
652 auto_sync_local_writes: options.auto_sync_local_writes,
653 shutdown_on_drop: false,
654 command_seq: Mutex::new(0),
655 events,
656 default_events,
657 read_executor: Mutex::new(read_executor),
658 })
659 }
660
661 pub fn trigger_sync(&self) -> Result<()> {
662 self.worker()?.trigger_sync()
663 }
664
665 pub fn trigger_sync_websocket(&self) -> Result<()> {
666 self.worker()?.trigger_sync_websocket()
667 }
668
669 pub fn enqueue_sync_now(&self) -> Result<String> {
670 let command_id = self.next_command_id("sync")?;
671 self.worker()?.enqueue_sync_now(command_id.clone())?;
672 Ok(command_id)
673 }
674
675 pub fn enqueue_sync_websocket(&self) -> Result<String> {
676 let command_id = self.next_command_id("sync-ws")?;
677 self.worker()?.enqueue_sync_websocket(command_id.clone())?;
678 Ok(command_id)
679 }
680
681 pub fn enqueue_mutation_json(
682 &self,
683 mutation_json: &str,
684 local_row_json: Option<&str>,
685 ) -> Result<String> {
686 let command_id = self.next_command_id("mutation")?;
687 self.worker()?.enqueue_mutation_json(
688 command_id.clone(),
689 mutation_json.to_string(),
690 local_row_json.map(str::to_string),
691 self.auto_sync_local_writes,
692 )?;
693 Ok(command_id)
694 }
695
696 pub fn enqueue_leased_mutation_json(
697 &self,
698 mutation_json: &str,
699 local_row_json: Option<&str>,
700 ) -> Result<String> {
701 let command_id = self.next_command_id("leased-mutation")?;
702 self.worker()?.enqueue_leased_mutation_json(
703 command_id.clone(),
704 mutation_json.to_string(),
705 local_row_json.map(str::to_string),
706 self.auto_sync_local_writes,
707 )?;
708 Ok(command_id)
709 }
710
711 pub fn enqueue_yjs_update_json(&self, update_json: &str) -> Result<String> {
712 validate_crdt_request_json_size(update_json)?;
713 let command_id = self.next_command_id("yjs")?;
714 self.worker()?.enqueue_yjs_update_json(
715 command_id.clone(),
716 update_json.to_string(),
717 self.auto_sync_local_writes,
718 )?;
719 Ok(command_id)
720 }
721
722 pub fn enqueue_crdt_field_yjs_update_json(&self, request_json: &str) -> Result<String> {
723 validate_crdt_request_json_size(request_json)?;
724 let request: NativeCrdtFieldYjsUpdateRequest = serde_json::from_str(request_json)?;
725 validate_yjs_update_envelope_size(&request.update)?;
726 let field = self.writer.open_crdt_field(request.id())?;
727 match field.sync_mode() {
728 CrdtFieldSyncMode::ServerMerge => self.enqueue_yjs_update_json(request_json),
729 CrdtFieldSyncMode::EncryptedUpdateLog => {
730 self.enqueue_encrypted_crdt_update_json(request_json)
731 }
732 }
733 }
734
735 pub fn enqueue_crdt_field_text_json(&self, request_json: &str) -> Result<String> {
736 validate_crdt_request_json_size(request_json)?;
737 let request: NativeCrdtFieldTextRequest = serde_json::from_str(request_json)?;
738 validate_yjs_text_input_size(&request.next_text)?;
739 self.writer.open_crdt_field(request.id())?;
740 let command_id = self.next_command_id("crdt-text")?;
741 self.worker()?.enqueue_crdt_field_text_json(
742 command_id.clone(),
743 request_json.to_string(),
744 self.auto_sync_local_writes,
745 )?;
746 Ok(command_id)
747 }
748
749 pub fn enqueue_crdt_field_compaction_json(&self, request_json: &str) -> Result<String> {
750 validate_crdt_request_json_size(request_json)?;
751 let request: NativeCrdtFieldCompactionRequest = serde_json::from_str(request_json)?;
752 self.writer.open_crdt_field(request.id())?;
753 let command_id = self.next_command_id("crdt-compact")?;
754 self.worker()?.enqueue_crdt_field_compaction_json(
755 command_id.clone(),
756 request_json.to_string(),
757 self.auto_sync_local_writes,
758 )?;
759 Ok(command_id)
760 }
761
762 pub fn enqueue_encrypted_crdt_update_json(&self, request_json: &str) -> Result<String> {
763 validate_crdt_request_json_size(request_json)?;
764 let command_id = self.next_command_id("encrypted-crdt")?;
765 self.worker()?.enqueue_encrypted_crdt_update_json(
766 command_id.clone(),
767 request_json.to_string(),
768 self.auto_sync_local_writes,
769 )?;
770 Ok(command_id)
771 }
772
773 pub fn enqueue_encrypted_crdt_checkpoint_json(&self, request_json: &str) -> Result<String> {
774 validate_crdt_request_json_size(request_json)?;
775 let command_id = self.next_command_id("encrypted-crdt-checkpoint")?;
776 self.worker()?.enqueue_encrypted_crdt_checkpoint_json(
777 command_id.clone(),
778 request_json.to_string(),
779 self.auto_sync_local_writes,
780 )?;
781 Ok(command_id)
782 }
783
784 pub fn enqueue_resolve_conflict(&self, id: &str, resolution: &str) -> Result<String> {
785 let command_id = self.next_command_id("conflict")?;
786 self.worker()?.enqueue_conflict_resolution(
787 command_id.clone(),
788 id.to_string(),
789 resolution.to_string(),
790 self.auto_sync_local_writes,
791 )?;
792 Ok(command_id)
793 }
794
795 pub fn enqueue_refresh_snapshot_json(&self, request_json: &str) -> Result<String> {
796 let command_id = self.next_command_id("snapshot")?;
797 self.worker()?
798 .enqueue_refresh_snapshot_json(command_id.clone(), request_json.to_string())?;
799 Ok(command_id)
800 }
801
802 pub fn enqueue_compact_storage_json(&self, options_json: Option<&str>) -> Result<String> {
803 let command_id = self.next_command_id("compact")?;
804 self.worker()?
805 .enqueue_compact_storage_json(command_id.clone(), options_json.map(str::to_string))?;
806 Ok(command_id)
807 }
808
809 pub fn enqueue_store_blob_file_json(
810 &self,
811 path: &str,
812 options_json: Option<&str>,
813 ) -> Result<String> {
814 let command_id = self.next_command_id("blob-store")?;
815 self.worker()?.enqueue_store_blob_file_json(
816 command_id.clone(),
817 path.to_string(),
818 options_json.map(str::to_string),
819 )?;
820 Ok(command_id)
821 }
822
823 pub fn enqueue_retrieve_blob_file_json(
824 &self,
825 ref_json: &str,
826 path: &str,
827 options_json: Option<&str>,
828 ) -> Result<String> {
829 let command_id = self.next_command_id("blob-retrieve")?;
830 self.worker()?.enqueue_retrieve_blob_file_json(
831 command_id.clone(),
832 ref_json.to_string(),
833 path.to_string(),
834 options_json.map(str::to_string),
835 )?;
836 Ok(command_id)
837 }
838
839 pub fn enqueue_process_blob_upload_queue(&self) -> Result<String> {
840 let command_id = self.next_command_id("blob-upload")?;
841 self.worker()?
842 .enqueue_process_blob_upload_queue(command_id.clone())?;
843 Ok(command_id)
844 }
845
846 pub fn enqueue_prune_blob_cache(&self, max_bytes: i64) -> Result<String> {
847 let command_id = self.next_command_id("blob-prune")?;
848 self.worker()?
849 .enqueue_prune_blob_cache(command_id.clone(), max_bytes)?;
850 Ok(command_id)
851 }
852
853 pub fn enqueue_clear_blob_cache(&self) -> Result<String> {
854 let command_id = self.next_command_id("blob-clear")?;
855 self.worker()?
856 .enqueue_clear_blob_cache(command_id.clone())?;
857 Ok(command_id)
858 }
859
860 pub fn set_auth_headers(&mut self, headers: SyncAuthHeaders) -> Result<()> {
861 self.auth_headers = headers.clone();
862 self.writer.set_auth_headers(headers.clone());
863 if let Some(worker) = &self.worker {
864 worker.set_auth_headers(headers)?;
865 }
866 if let Some(realtime_worker) = &self.realtime_worker {
867 realtime_worker.set_auth_headers(self.auth_headers.clone())?;
868 }
869 Ok(())
870 }
871
872 pub fn set_auth_headers_json(&mut self, headers_json: &str) -> Result<()> {
873 let headers: SyncAuthHeaders = serde_json::from_str(headers_json)?;
874 self.set_auth_headers(headers)
875 }
876
877 pub fn set_subscriptions(&mut self, subscriptions: Vec<SubscriptionSpec>) -> Result<()> {
878 self.writer.set_subscriptions(subscriptions.clone())?;
879 if let Some(worker) = &self.worker {
880 worker.set_subscriptions(subscriptions)?;
881 }
882 Ok(())
883 }
884
885 pub fn set_subscriptions_json(&mut self, subscriptions_json: &str) -> Result<()> {
886 let subscriptions: Vec<SubscriptionSpec> = serde_json::from_str(subscriptions_json)?;
887 self.set_subscriptions(subscriptions)
888 }
889
890 pub fn force_subscriptions_bootstrap(
891 &mut self,
892 subscription_ids: Vec<String>,
893 ) -> Result<usize> {
894 self.writer.force_subscriptions_bootstrap(&subscription_ids)
895 }
896
897 pub fn force_subscriptions_bootstrap_json(
898 &mut self,
899 subscription_ids_json: &str,
900 ) -> Result<String> {
901 let subscription_ids: Vec<String> = serde_json::from_str(subscription_ids_json)?;
902 Ok(serde_json::to_string(
903 &self.force_subscriptions_bootstrap(subscription_ids)?,
904 )?)
905 }
906
907 pub fn set_field_encryption(&mut self, encryption: Option<FieldEncryption>) -> Result<()> {
908 self.writer.set_field_encryption(encryption.clone())?;
909 self.field_encryption = encryption.clone();
910 if let Some(worker) = &self.worker {
911 worker.set_field_encryption(encryption)?;
912 }
913 Ok(())
914 }
915
916 pub fn set_field_encryption_json(&mut self, config_json: &str) -> Result<()> {
917 self.set_field_encryption(FieldEncryption::from_static_config_json(config_json)?)
918 }
919
920 pub fn set_encrypted_crdt(&mut self, encryption: Option<EncryptedCrdt>) -> Result<()> {
921 self.writer.set_encrypted_crdt(encryption.clone())?;
922 self.encrypted_crdt = encryption.clone();
923 if let Some(worker) = &self.worker {
924 worker.set_encrypted_crdt(encryption)?;
925 }
926 Ok(())
927 }
928
929 pub fn set_encrypted_crdt_json(&mut self, config_json: &str) -> Result<()> {
930 self.set_encrypted_crdt(EncryptedCrdt::from_static_config_json(config_json)?)
931 }
932
933 pub fn set_blob_encryption(&mut self, encryption: Option<BlobEncryption>) -> Result<()> {
934 self.writer.set_blob_encryption(encryption.clone())?;
935 self.blob_encryption = encryption.clone();
936 if let Some(worker) = &self.worker {
937 worker.set_blob_encryption(encryption)?;
938 }
939 Ok(())
940 }
941
942 pub fn set_blob_encryption_json(&mut self, config_json: &str) -> Result<()> {
943 self.set_blob_encryption(BlobEncryption::from_static_config_json(config_json)?)
944 }
945
946 pub fn encryption_helper_json(&self, method: &str, args_json: &str) -> Result<String> {
947 encryption_helpers_json(method, args_json)
948 }
949
950 pub fn pause_sync_worker(&mut self) -> Result<()> {
951 self.stop_realtime_worker()?;
952 if let Some(worker) = self.worker.take() {
953 worker.stop()?;
954 }
955 self.join_worker_event_pump()?;
956 Ok(())
957 }
958
959 pub fn resume_sync_worker(&mut self) -> Result<()> {
960 if self.worker.is_some() {
961 return Ok(());
962 }
963 let mut worker_client =
964 SyncularClient::open_with_schema(self.config.clone(), self.writer.app_schema())?;
965 worker_client.set_subscriptions(self.writer.subscriptions().to_vec())?;
966 worker_client.set_auth_headers(self.auth_headers.clone());
967 worker_client.set_field_encryption(self.field_encryption.clone())?;
968 worker_client.set_encrypted_crdt(self.encrypted_crdt.clone())?;
969 worker_client.set_blob_encryption(self.blob_encryption.clone())?;
970 let worker = SyncWorker::start(worker_client);
971 self.worker_event_pump = Some(start_worker_event_pump(
972 self.events.clone(),
973 worker.event_source(),
974 ));
975 self.worker = Some(worker);
976 Ok(())
977 }
978
979 pub fn resume_from_background(&mut self) -> Result<String> {
980 self.resume_sync_worker()?;
981 self.start_realtime_worker()?;
982 self.enqueue_sync_now()
983 }
984
985 pub fn shutdown(&mut self) -> Result<()> {
986 self.close()
987 }
988
989 pub fn start_realtime_worker(&mut self) -> Result<()> {
990 if self.realtime_worker.is_some() {
991 return Ok(());
992 }
993 let trigger = self.worker()?.trigger_handle();
994 let mut transport = HttpSyncTransport::new(SyncTransportConfig::new(
995 self.config.base_url.clone(),
996 self.config.client_id.clone(),
997 self.config.actor_id.clone(),
998 ))
999 .with_schema_version(self.writer.app_schema().current_schema_version());
1000 transport.set_auth_headers(self.auth_headers.clone());
1001 let events = self.events.clone();
1002 let presence_by_scope = self.presence_by_scope.clone();
1003 self.realtime_worker = Some(PersistentRealtimeWorker::start_with_event_handler(
1004 transport,
1005 trigger,
1006 Some(Arc::new(move |event| {
1007 if let RealtimeEvent::Presence(presence) = event {
1008 apply_native_presence_event(&presence_by_scope, &events, presence);
1009 }
1010 })),
1011 ));
1012 if let Some(realtime_worker) = &self.realtime_worker {
1013 for (scope_key, metadata) in self.local_presence_metadata()? {
1014 realtime_worker.send_presence("join", scope_key, metadata)?;
1015 }
1016 }
1017 Ok(())
1018 }
1019
1020 pub fn stop_realtime_worker(&mut self) -> Result<()> {
1021 if let Some(mut realtime_worker) = self.realtime_worker.take() {
1022 realtime_worker.stop()?;
1023 }
1024 Ok(())
1025 }
1026
1027 pub fn sync_worker_running(&self) -> bool {
1028 self.worker.is_some()
1029 }
1030
1031 pub fn join_presence(&mut self, scope_key: &str, metadata: Option<Value>) -> Result<()> {
1032 let event = RealtimePresenceEvent {
1033 action: "join".to_string(),
1034 scope_key: scope_key.to_string(),
1035 client_id: Some(self.config.client_id.clone()),
1036 actor_id: Some(self.config.actor_id.clone()),
1037 metadata: metadata.clone(),
1038 entries: Vec::new(),
1039 };
1040 apply_native_presence_event(&self.presence_by_scope, &self.events, event);
1041 if let Some(realtime_worker) = &self.realtime_worker {
1042 realtime_worker.send_presence("join", scope_key, metadata)?;
1043 }
1044 Ok(())
1045 }
1046
1047 pub fn join_presence_handle(
1048 &mut self,
1049 scope_key: &str,
1050 metadata: Option<Value>,
1051 ) -> Result<NativePresenceHandle<'_>> {
1052 self.join_presence(scope_key, metadata)?;
1053 Ok(NativePresenceHandle {
1054 client: self,
1055 scope_key: scope_key.to_string(),
1056 active: true,
1057 })
1058 }
1059
1060 pub fn leave_presence(&mut self, scope_key: &str) -> Result<()> {
1061 let event = RealtimePresenceEvent {
1062 action: "leave".to_string(),
1063 scope_key: scope_key.to_string(),
1064 client_id: Some(self.config.client_id.clone()),
1065 actor_id: Some(self.config.actor_id.clone()),
1066 metadata: None,
1067 entries: Vec::new(),
1068 };
1069 apply_native_presence_event(&self.presence_by_scope, &self.events, event);
1070 if let Some(realtime_worker) = &self.realtime_worker {
1071 realtime_worker.send_presence("leave", scope_key, None)?;
1072 }
1073 Ok(())
1074 }
1075
1076 pub fn update_presence_metadata(&mut self, scope_key: &str, metadata: Value) -> Result<()> {
1077 let event = RealtimePresenceEvent {
1078 action: "update".to_string(),
1079 scope_key: scope_key.to_string(),
1080 client_id: Some(self.config.client_id.clone()),
1081 actor_id: Some(self.config.actor_id.clone()),
1082 metadata: Some(metadata.clone()),
1083 entries: Vec::new(),
1084 };
1085 apply_native_presence_event(&self.presence_by_scope, &self.events, event);
1086 if let Some(realtime_worker) = &self.realtime_worker {
1087 realtime_worker.send_presence("update", scope_key, Some(metadata))?;
1088 }
1089 Ok(())
1090 }
1091
1092 pub fn presence_json(&self, scope_key: &str) -> Result<String> {
1093 let presence = self
1094 .presence_by_scope
1095 .lock()
1096 .map_err(|_| {
1097 SyncularError::message(ErrorKind::Internal, "native presence is poisoned")
1098 })?
1099 .get(scope_key)
1100 .cloned()
1101 .unwrap_or_default();
1102 Ok(serde_json::to_string(&presence)?)
1103 }
1104
1105 fn local_presence_metadata(&self) -> Result<Vec<(String, Option<Value>)>> {
1106 let client_id = self.config.client_id.as_str();
1107 let joined = self
1108 .presence_by_scope
1109 .lock()
1110 .map_err(|_| {
1111 SyncularError::message(ErrorKind::Internal, "native presence is poisoned")
1112 })?
1113 .iter()
1114 .filter_map(|(scope_key, entries)| {
1115 entries
1116 .iter()
1117 .find(|entry| entry.client_id == client_id)
1118 .map(|entry| (scope_key.clone(), entry.metadata.clone()))
1119 })
1120 .collect();
1121 Ok(joined)
1122 }
1123
1124 pub fn subscribe_events(&self, capacity: usize) -> NativeEventSubscription {
1125 self.events.subscribe(capacity)
1126 }
1127
1128 pub fn event_receiver(&self, capacity: usize) -> NativeEventSubscription {
1129 self.subscribe_events(capacity)
1130 }
1131
1132 pub fn next_event(&self) -> Option<NativeEvent> {
1133 self.default_events.next_event()
1134 }
1135
1136 pub fn next_event_timeout(&self, timeout: Duration) -> Option<NativeEvent> {
1137 self.default_events.next_event_timeout(timeout)
1138 }
1139
1140 pub fn next_event_json(&self) -> Option<Result<String>> {
1141 self.default_events.next_event_json()
1142 }
1143
1144 pub fn next_event_json_timeout(&self, timeout: Duration) -> Option<Result<String>> {
1145 self.default_events.next_event_json_timeout(timeout)
1146 }
1147
1148 pub fn apply_mutation_json(
1149 &mut self,
1150 mutation_json: &str,
1151 local_row_json: Option<&str>,
1152 ) -> Result<String> {
1153 validate_mutation_json_input_size(mutation_json, local_row_json)?;
1154 let operation: crate::protocol::SyncOperation = serde_json::from_str(mutation_json)?;
1155 let table = operation.table.clone();
1156 let previous_row = self
1157 .writer
1158 .current_row_json(&operation.table, &operation.row_id)?;
1159 let local_row = local_row_json.map(serde_json::from_str).transpose()?;
1160 let client_commit_id = self
1161 .writer
1162 .apply_mutation_json(mutation_json, local_row_json)?;
1163 let changed_rows = sync_changed_row_for_local_operation(
1164 self.writer.app_schema(),
1165 &operation,
1166 previous_row.as_ref(),
1167 local_row.as_ref(),
1168 Some(client_commit_id.clone()),
1169 )
1170 .into_iter()
1171 .collect();
1172 self.events.push_rows_changed_events_with_details(
1173 [table.as_str()],
1174 changed_rows,
1175 Some("localWrite"),
1176 );
1177 self.trigger_after_local_write()?;
1178 Ok(client_commit_id)
1179 }
1180
1181 pub fn apply_leased_mutation_json(
1182 &mut self,
1183 mutation_json: &str,
1184 local_row_json: Option<&str>,
1185 ) -> Result<String> {
1186 validate_mutation_json_input_size(mutation_json, local_row_json)?;
1187 let operation: crate::protocol::SyncOperation = serde_json::from_str(mutation_json)?;
1188 let table = operation.table.clone();
1189 let previous_row = self
1190 .writer
1191 .current_row_json(&operation.table, &operation.row_id)?;
1192 let local_row = local_row_json.map(serde_json::from_str).transpose()?;
1193 let client_commit_id = self
1194 .writer
1195 .apply_leased_mutation_json(mutation_json, local_row_json)?;
1196 let changed_rows = sync_changed_row_for_local_operation(
1197 self.writer.app_schema(),
1198 &operation,
1199 previous_row.as_ref(),
1200 local_row.as_ref(),
1201 Some(client_commit_id.clone()),
1202 )
1203 .into_iter()
1204 .collect();
1205 self.events.push_rows_changed_events_with_details(
1206 [table.as_str()],
1207 changed_rows,
1208 Some("localWrite"),
1209 );
1210 self.trigger_after_local_write()?;
1211 Ok(client_commit_id)
1212 }
1213
1214 pub fn open_crdt_field_json(&self, request_json: &str) -> Result<String> {
1215 validate_crdt_request_json_size(request_json)?;
1216 let request: NativeCrdtFieldRequest = serde_json::from_str(request_json)?;
1217 let field = self.writer.open_crdt_field(request.id())?;
1218 Ok(serde_json::to_string(&crdt_field_descriptor(&field))?)
1219 }
1220
1221 pub fn apply_crdt_field_text_json(&mut self, request_json: &str) -> Result<String> {
1222 validate_crdt_request_json_size(request_json)?;
1223 let request: NativeCrdtFieldTextRequest = serde_json::from_str(request_json)?;
1224 validate_yjs_text_input_size(&request.next_text)?;
1225 let field = self.writer.open_crdt_field(request.id())?;
1226 let receipt = self
1227 .writer
1228 .apply_crdt_field_text(&field, &request.next_text)?;
1229 self.after_crdt_field_write(&field, Some(receipt.client_commit_id.clone()))?;
1230 crdt_field_write_receipt_json(receipt)
1231 }
1232
1233 pub fn apply_crdt_field_yjs_update_json(&mut self, request_json: &str) -> Result<String> {
1234 validate_crdt_request_json_size(request_json)?;
1235 let request: NativeCrdtFieldYjsUpdateRequest = serde_json::from_str(request_json)?;
1236 validate_yjs_update_envelope_size(&request.update)?;
1237 let field = self.writer.open_crdt_field(request.id())?;
1238 let receipt = self
1239 .writer
1240 .apply_crdt_field_yjs_update(&field, request.update)?;
1241 self.after_crdt_field_write(&field, Some(receipt.client_commit_id.clone()))?;
1242 crdt_field_write_receipt_json(receipt)
1243 }
1244
1245 pub fn materialize_crdt_field_json(&mut self, request_json: &str) -> Result<String> {
1246 validate_crdt_request_json_size(request_json)?;
1247 let request: NativeCrdtFieldRequest = serde_json::from_str(request_json)?;
1248 let field = self.writer.open_crdt_field(request.id())?;
1249 crdt_field_materialization_json(self.writer.materialize_crdt_field(&field)?)
1250 }
1251
1252 pub fn crdt_document_snapshot_json(&mut self, request_json: &str) -> Result<String> {
1253 validate_crdt_request_json_size(request_json)?;
1254 let request: NativeCrdtFieldRequest = serde_json::from_str(request_json)?;
1255 let field = self.writer.open_crdt_field(request.id())?;
1256 self.writer.crdt_document_snapshot_json(&field)
1257 }
1258
1259 pub fn crdt_update_log_json(&mut self, request_json: &str) -> Result<String> {
1260 validate_crdt_request_json_size(request_json)?;
1261 let request: NativeCrdtFieldLogRequest = serde_json::from_str(request_json)?;
1262 let field = self.writer.open_crdt_field(request.id())?;
1263 self.writer.crdt_update_log_json(
1264 &field,
1265 request.limit.unwrap_or(DEFAULT_CRDT_UPDATE_LOG_LIMIT),
1266 )
1267 }
1268
1269 pub fn snapshot_crdt_field_state_vector_json(&mut self, request_json: &str) -> Result<String> {
1270 validate_crdt_request_json_size(request_json)?;
1271 let request: NativeCrdtFieldRequest = serde_json::from_str(request_json)?;
1272 let field = self.writer.open_crdt_field(request.id())?;
1273 Ok(serde_json::to_string(&json!({
1274 "stateVectorBase64": self.writer.snapshot_crdt_field_state_vector_base64(&field)?
1275 }))?)
1276 }
1277
1278 pub fn compact_crdt_field_json(&mut self, request_json: &str) -> Result<String> {
1279 validate_crdt_request_json_size(request_json)?;
1280 let request: NativeCrdtFieldCompactionRequest = serde_json::from_str(request_json)?;
1281 let field = self.writer.open_crdt_field(request.id())?;
1282 let receipt = self
1283 .writer
1284 .compact_crdt_field(&field, request.min_uncheckpointed_updates.unwrap_or(1))?;
1285 let should_emit_compaction_event =
1286 receipt.checkpoint_created || field.sync_mode() == CrdtFieldSyncMode::ServerMerge;
1287 if should_emit_compaction_event {
1288 let extra_payload_json = crdt_field_compaction_event_payload(
1289 &mut self.writer,
1290 &field,
1291 &receipt,
1292 receipt.checkpoint_created,
1293 request.min_uncheckpointed_updates.unwrap_or(1),
1294 );
1295 self.events.publish_event(crdt_field_compacted_event(
1296 &field,
1297 receipt.client_commit_id.clone(),
1298 crdt_field_compaction_tables(&field)
1299 .into_iter()
1300 .map(str::to_string)
1301 .collect(),
1302 None,
1303 None,
1304 receipt.checkpoint_created,
1305 extra_payload_json,
1306 ));
1307 self.events.push_rows_changed_events_with_details(
1308 crdt_field_compaction_tables(&field),
1309 vec![crdt_field_compacted_row(
1310 &field,
1311 receipt.client_commit_id.clone(),
1312 )],
1313 Some("localWrite"),
1314 );
1315 self.trigger_after_local_write()?;
1316 }
1317 crdt_field_compaction_receipt_json(receipt)
1318 }
1319
1320 pub fn apply_encrypted_crdt_update_json(&mut self, request_json: &str) -> Result<String> {
1321 validate_crdt_request_json_size(request_json)?;
1322 let request: NativeEncryptedCrdtRequest = serde_json::from_str(request_json)?;
1323 let receipt = self.writer.apply_encrypted_crdt_update_json(request_json)?;
1324 self.events
1325 .push_rows_changed_events([request.table.as_str(), CRDT_UPDATES_TABLE]);
1326 self.trigger_after_local_write()?;
1327 Ok(receipt.client_commit_id)
1328 }
1329
1330 pub fn apply_encrypted_crdt_checkpoint_json(&mut self, request_json: &str) -> Result<String> {
1331 validate_crdt_request_json_size(request_json)?;
1332 let _request: NativeEncryptedCrdtRequest = serde_json::from_str(request_json)?;
1333 let receipt = self
1334 .writer
1335 .apply_encrypted_crdt_checkpoint_json(request_json)?;
1336 if let Some(receipt) = receipt {
1337 self.events
1338 .push_rows_changed_events([CRDT_CHECKPOINTS_TABLE]);
1339 self.trigger_after_local_write()?;
1340 Ok(serde_json::to_string(&json!({
1341 "checkpointed": true,
1342 "clientCommitId": receipt.client_commit_id,
1343 "commitId": receipt.commit_id
1344 }))?)
1345 } else {
1346 Ok(serde_json::to_string(&json!({ "checkpointed": false }))?)
1347 }
1348 }
1349
1350 pub fn list_table_json(&mut self, table: &str) -> Result<String> {
1351 self.writer.list_table_json(table)
1352 }
1353
1354 pub fn query_json(&self, request_json: &str) -> Result<String> {
1355 self.read_executor
1356 .lock()
1357 .map_err(|_| {
1358 SyncularError::message(ErrorKind::Internal, "native read executor lock poisoned")
1359 })?
1360 .execute_json(request_json)
1361 }
1362
1363 pub fn store_blob_file_json(
1364 &mut self,
1365 path: &str,
1366 options_json: Option<&str>,
1367 ) -> Result<String> {
1368 let options: NativeBlobStoreOptions = options_json
1369 .filter(|value| !value.trim().is_empty())
1370 .map(serde_json::from_str)
1371 .transpose()?
1372 .unwrap_or_default();
1373 let blob = self.writer.store_blob_file(
1374 Path::new(path),
1375 options
1376 .mime_type
1377 .as_deref()
1378 .unwrap_or("application/octet-stream"),
1379 options.immediate.unwrap_or(false),
1380 options.cache_local.unwrap_or(true),
1381 )?;
1382 self.events.push_diagnostic(blob_store_diagnostic(
1383 &blob,
1384 options.immediate.unwrap_or(false),
1385 options.cache_local.unwrap_or(true),
1386 ));
1387 Ok(serde_json::to_string(&blob)?)
1388 }
1389
1390 pub fn retrieve_blob_file(&mut self, ref_json: &str, path: &str) -> Result<()> {
1391 self.retrieve_blob_file_with_options(ref_json, path, None)
1392 }
1393
1394 pub fn retrieve_blob_file_with_options(
1395 &mut self,
1396 ref_json: &str,
1397 path: &str,
1398 options_json: Option<&str>,
1399 ) -> Result<()> {
1400 let options: NativeBlobRetrieveOptions = options_json
1401 .filter(|value| !value.trim().is_empty())
1402 .map(serde_json::from_str)
1403 .transpose()?
1404 .unwrap_or_default();
1405 let blob: BlobRef = serde_json::from_str(ref_json)?;
1406 let was_local = self.writer.is_blob_local(&blob.hash).unwrap_or(false);
1407 match self.writer.retrieve_blob_file(
1408 &blob,
1409 Path::new(path),
1410 options.cache_local.unwrap_or(true),
1411 ) {
1412 Ok(()) => {
1413 self.events
1414 .push_diagnostic(blob_cache_retrieve_diagnostic(&blob, was_local));
1415 Ok(())
1416 }
1417 Err(error) => {
1418 self.events
1419 .push_diagnostic(blob_download_failed_diagnostic(&blob, &error));
1420 Err(error)
1421 }
1422 }
1423 }
1424
1425 pub fn is_blob_local(&mut self, hash: &str) -> Result<bool> {
1426 let local = self.writer.is_blob_local(hash)?;
1427 self.events
1428 .push_diagnostic(blob_cache_lookup_diagnostic(hash, local));
1429 Ok(local)
1430 }
1431
1432 pub fn process_blob_upload_queue_json(&mut self) -> Result<String> {
1433 let result = self.writer.process_blob_upload_queue()?;
1434 self.events
1435 .push_diagnostic(blob_upload_queue_processed_diagnostic(
1436 serde_json::to_value(&result)?,
1437 ));
1438 Ok(serde_json::to_string(&result)?)
1439 }
1440
1441 pub fn blob_upload_queue_stats_json(&mut self) -> Result<String> {
1442 Ok(serde_json::to_string(
1443 &self.writer.blob_upload_queue_stats()?,
1444 )?)
1445 }
1446
1447 pub fn blob_cache_stats_json(&mut self) -> Result<String> {
1448 Ok(serde_json::to_string(&self.writer.blob_cache_stats()?)?)
1449 }
1450
1451 pub fn prune_blob_cache(&mut self, max_bytes: i64) -> Result<i64> {
1452 let pruned_bytes = self.writer.prune_blob_cache(max_bytes)?;
1453 self.events
1454 .push_diagnostic(blob_cache_pruned_diagnostic(pruned_bytes, Some(max_bytes)));
1455 Ok(pruned_bytes)
1456 }
1457
1458 pub fn clear_blob_cache(&mut self) -> Result<()> {
1459 self.writer.clear_blob_cache()?;
1460 self.events.push_diagnostic(blob_cache_cleared_diagnostic());
1461 Ok(())
1462 }
1463
1464 pub fn compact_storage_json(&mut self, options_json: Option<&str>) -> Result<String> {
1465 self.writer.compact_storage_json(options_json)
1466 }
1467
1468 pub fn app_tables(&self) -> Vec<String> {
1469 self.writer
1470 .app_schema()
1471 .app_table_metadata
1472 .iter()
1473 .map(|metadata| metadata.name.to_string())
1474 .collect()
1475 }
1476
1477 pub fn app_tables_json(&self) -> Result<String> {
1478 Ok(serde_json::to_string(self.writer.app_schema().app_tables)?)
1479 }
1480
1481 pub fn app_table_metadata_json(&self) -> Result<String> {
1482 Ok(serde_json::to_string(
1483 self.writer.app_schema().app_table_metadata,
1484 )?)
1485 }
1486
1487 pub fn app_schema_state_json(&mut self) -> Result<String> {
1488 self.writer.app_schema_state_json()
1489 }
1490
1491 pub fn local_health_check_json(&mut self) -> Result<String> {
1492 self.writer.local_health_check_json()
1493 }
1494
1495 pub fn export_local_support_bundle_json(&mut self) -> Result<String> {
1496 self.writer.export_local_support_bundle_json()
1497 }
1498
1499 pub fn import_local_support_bundle_json(&mut self, bundle_json: &str) -> Result<String> {
1500 self.writer.import_local_support_bundle_json(bundle_json)
1501 }
1502
1503 pub fn repair_local_health_json(&mut self, request_json: &str) -> Result<String> {
1504 self.writer.repair_local_health_json(request_json)
1505 }
1506
1507 pub fn reset_local_sync_state_json(&mut self, request_json: &str) -> Result<String> {
1508 self.writer.reset_local_sync_state_json(request_json)
1509 }
1510
1511 pub fn register_query_json(&self, query_json: &str) -> Result<String> {
1512 let registration: NativeObservedQueryRegistration = serde_json::from_str(query_json)?;
1513 let observed_query = registration.into_observed_query(self.writer.app_schema())?;
1514 let id = observed_query.id.clone();
1515 let mut observers = self.events.query_observers.lock().map_err(|_| {
1516 SyncularError::message(
1517 ErrorKind::Internal,
1518 "native query observer registry is poisoned",
1519 )
1520 })?;
1521 observers.insert(id.clone(), observed_query);
1522 Ok(id)
1523 }
1524
1525 pub fn unregister_query(&self, id: &str) -> Result<()> {
1526 let mut observers = self.events.query_observers.lock().map_err(|_| {
1527 SyncularError::message(
1528 ErrorKind::Internal,
1529 "native query observer registry is poisoned",
1530 )
1531 })?;
1532 observers.remove(id);
1533 Ok(())
1534 }
1535
1536 pub fn observed_queries(&self) -> Result<Vec<NativeObservedQuery>> {
1537 let observers = self.events.query_observers.lock().map_err(|_| {
1538 SyncularError::message(
1539 ErrorKind::Internal,
1540 "native query observer registry is poisoned",
1541 )
1542 })?;
1543 Ok(observers.values().cloned().collect())
1544 }
1545
1546 pub fn observed_queries_json(&self) -> Result<String> {
1547 Ok(serde_json::to_string(&self.observed_queries()?)?)
1548 }
1549
1550 pub fn diagnostic_snapshot(&mut self) -> Result<NativeDiagnosticSnapshot> {
1551 let bootstrap = self.writer.bootstrap_status()?;
1552 let outbox = self.writer.outbox_summaries()?;
1553 let conflicts = self.writer.conflict_summaries()?;
1554 let observed_queries = self.observed_queries()?;
1555 let recent_events = self.events.recent_events();
1556 let recent_diagnostics = recent_events
1557 .iter()
1558 .filter_map(|event| event.diagnostic.clone())
1559 .collect();
1560 let recent_sync_timings = native_sync_timing_snapshots(&recent_events);
1561 let connection = NativeDiagnosticConnectionSnapshot {
1562 sync_worker_running: self.sync_worker_running(),
1563 realtime_worker_running: self.realtime_worker.is_some(),
1564 auto_sync_local_writes: self.auto_sync_local_writes,
1565 event_subscriber_count: self.events.subscriber_count(),
1566 observed_query_count: observed_queries.len(),
1567 };
1568 Ok(NativeDiagnosticSnapshot {
1569 generated_at: now_ms(),
1570 runtime: native_runtime_manifest(),
1571 connection,
1572 subscriptions: native_diagnostic_subscription_snapshots(
1573 self.writer.subscriptions(),
1574 &bootstrap,
1575 ),
1576 recent_events,
1577 recent_diagnostics,
1578 recent_sync_timings,
1579 limits: runtime_default_limits(),
1580 bootstrap,
1581 outbox_stats: native_outbox_stats(&outbox),
1582 conflict_stats: native_conflict_stats(&conflicts),
1583 blob_upload_queue_stats: serde_json::to_value(self.writer.blob_upload_queue_stats()?)?,
1584 blob_cache_stats: serde_json::to_value(self.writer.blob_cache_stats()?)?,
1585 observed_queries,
1586 })
1587 }
1588
1589 pub fn diagnostic_snapshot_json(&mut self) -> Result<String> {
1590 Ok(serde_json::to_string(&self.diagnostic_snapshot()?)?)
1591 }
1592
1593 pub fn outbox_summaries(&mut self) -> Result<Vec<OutboxSummary>> {
1594 self.writer.outbox_summaries()
1595 }
1596
1597 pub fn outbox_summaries_json(&mut self) -> Result<String> {
1598 Ok(serde_json::to_string(&self.outbox_summaries()?)?)
1599 }
1600
1601 pub fn upsert_auth_lease_json(&mut self, lease_json: &str) -> Result<()> {
1602 let lease: AuthLeaseRecord = serde_json::from_str(lease_json)?;
1603 self.writer.upsert_auth_lease(&lease)
1604 }
1605
1606 pub fn issue_auth_lease_json(&mut self, request_json: &str) -> Result<String> {
1607 self.writer.issue_auth_lease_json(request_json)
1608 }
1609
1610 pub fn auth_lease_json(&mut self, lease_id: &str) -> Result<String> {
1611 Ok(serde_json::to_string(&self.writer.auth_lease(lease_id)?)?)
1612 }
1613
1614 pub fn active_auth_leases_json(
1615 &mut self,
1616 actor_id: Option<&str>,
1617 now_ms_value: i64,
1618 ) -> Result<String> {
1619 Ok(serde_json::to_string(
1620 &self.writer.active_auth_leases(actor_id, now_ms_value)?,
1621 )?)
1622 }
1623
1624 pub fn set_outbox_auth_lease_json(
1625 &mut self,
1626 client_commit_id: &str,
1627 provenance_json: Option<&str>,
1628 ) -> Result<()> {
1629 let provenance: Option<AuthLeaseProvenance> =
1630 provenance_json.map(serde_json::from_str).transpose()?;
1631 self.writer
1632 .set_outbox_auth_lease(client_commit_id, provenance.as_ref())
1633 }
1634
1635 pub fn conflict_summaries(&mut self) -> Result<Vec<ConflictSummary>> {
1636 self.writer.conflict_summaries()
1637 }
1638
1639 pub fn conflict_summaries_json(&mut self) -> Result<String> {
1640 Ok(serde_json::to_string(&self.conflict_summaries()?)?)
1641 }
1642
1643 pub fn resolve_conflict(&mut self, id: &str, resolution: &str) -> Result<()> {
1644 self.writer.resolve_conflict(id, resolution)?;
1645 self.events.publish_event(conflicts_changed_event());
1646 Ok(())
1647 }
1648
1649 pub fn retry_conflict_keep_local(&mut self, id: &str) -> Result<String> {
1650 let client_commit_id = self.writer.retry_conflict_keep_local(id)?;
1651 self.events.publish_event(conflicts_changed_event());
1652 self.trigger_after_local_write()?;
1653 Ok(client_commit_id)
1654 }
1655
1656 pub fn close(&mut self) -> Result<()> {
1657 self.stop_realtime_worker()?;
1658 if let Some(worker) = self.worker.take() {
1659 worker.stop()?;
1660 }
1661 self.join_worker_event_pump()?;
1662 Ok(())
1663 }
1664
1665 fn join_worker_event_pump(&mut self) -> Result<()> {
1666 if let Some(join) = self.worker_event_pump.take() {
1667 join.join().map_err(|_| {
1668 SyncularError::message(ErrorKind::Internal, "native worker event pump panicked")
1669 })?;
1670 }
1671 Ok(())
1672 }
1673
1674 fn trigger_after_local_write(&self) -> Result<()> {
1675 if self.auto_sync_local_writes && self.worker.is_some() {
1676 self.trigger_sync()
1677 .map_err(|err| err.context("local write succeeded but sync trigger failed"))?;
1678 }
1679 Ok(())
1680 }
1681
1682 fn next_command_id(&self, prefix: &str) -> Result<String> {
1683 let mut seq = self.command_seq.lock().map_err(|_| {
1684 SyncularError::message(ErrorKind::Internal, "native command sequence is poisoned")
1685 })?;
1686 *seq = seq.saturating_add(1);
1687 Ok(format!("native-{prefix}-{seq}"))
1688 }
1689
1690 fn worker(&self) -> Result<&SyncWorker> {
1691 self.worker.as_ref().ok_or_else(|| {
1692 SyncularError::message(ErrorKind::Internal, "native sync client is closed")
1693 })
1694 }
1695
1696 fn after_crdt_field_write(
1697 &mut self,
1698 field: &CrdtField,
1699 client_commit_id: Option<String>,
1700 ) -> Result<()> {
1701 let extra_payload_json = crdt_field_event_payload(&mut self.writer, field);
1702 self.events.publish_event(crdt_field_changed_event(
1703 field,
1704 client_commit_id.clone(),
1705 crdt_field_write_tables(field)
1706 .into_iter()
1707 .map(str::to_string)
1708 .collect(),
1709 None,
1710 None,
1711 extra_payload_json,
1712 ));
1713 self.events.push_rows_changed_events_with_details(
1714 crdt_field_write_tables(field),
1715 vec![crdt_field_changed_row(field, client_commit_id)],
1716 Some("localWrite"),
1717 );
1718 self.trigger_after_local_write()
1719 }
1720}
1721
1722impl NativeSyncularClientBuilder {
1723 pub fn new(config: NativeClientConfig) -> Self {
1724 Self {
1725 config,
1726 options: NativeClientOptions::default(),
1727 realtime: true,
1728 auth_headers: None,
1729 subscriptions: None,
1730 initial_sync: false,
1731 initial_websocket_sync: false,
1732 process_blob_uploads_on_open: false,
1733 shutdown_on_drop: false,
1734 }
1735 }
1736
1737 pub fn auto_sync_local_writes(mut self, enabled: bool) -> Self {
1738 self.options.auto_sync_local_writes = enabled;
1739 self
1740 }
1741
1742 pub fn realtime(mut self, enabled: bool) -> Self {
1743 self.realtime = enabled;
1744 self
1745 }
1746
1747 pub fn auth_headers(mut self, headers: SyncAuthHeaders) -> Self {
1748 self.auth_headers = Some(headers);
1749 self
1750 }
1751
1752 pub fn auth_headers_json(mut self, headers_json: &str) -> Result<Self> {
1753 self.auth_headers = Some(serde_json::from_str(headers_json)?);
1754 Ok(self)
1755 }
1756
1757 pub fn subscriptions(mut self, subscriptions: Vec<SubscriptionSpec>) -> Self {
1758 self.subscriptions = Some(subscriptions);
1759 self
1760 }
1761
1762 pub fn subscriptions_json(mut self, subscriptions_json: &str) -> Result<Self> {
1763 self.subscriptions = Some(serde_json::from_str(subscriptions_json)?);
1764 Ok(self)
1765 }
1766
1767 pub fn initial_sync(mut self, enabled: bool) -> Self {
1768 self.initial_sync = enabled;
1769 self
1770 }
1771
1772 pub fn initial_websocket_sync(mut self, enabled: bool) -> Self {
1773 self.initial_websocket_sync = enabled;
1774 self
1775 }
1776
1777 pub fn process_blob_uploads_on_open(mut self, enabled: bool) -> Self {
1778 self.process_blob_uploads_on_open = enabled;
1779 self
1780 }
1781
1782 pub fn shutdown_on_drop(mut self, enabled: bool) -> Self {
1783 self.shutdown_on_drop = enabled;
1784 self
1785 }
1786
1787 pub fn open(self) -> Result<NativeSyncularClient> {
1788 let mut client = NativeSyncularClient::open_native_with_options(self.config, self.options)?;
1789 client.shutdown_on_drop = self.shutdown_on_drop;
1790 if let Some(headers) = self.auth_headers {
1791 client.set_auth_headers(headers)?;
1792 }
1793 if let Some(subscriptions) = self.subscriptions {
1794 client.set_subscriptions(subscriptions)?;
1795 }
1796 if self.realtime {
1797 client.start_realtime_worker()?;
1798 }
1799 if self.initial_websocket_sync {
1800 client.trigger_sync_websocket()?;
1801 } else if self.initial_sync {
1802 client.trigger_sync()?;
1803 }
1804 if self.process_blob_uploads_on_open {
1805 let _ = client.process_blob_upload_queue_json()?;
1806 }
1807 Ok(client)
1808 }
1809}
1810
1811impl Drop for NativeSyncularClient {
1812 fn drop(&mut self) {
1813 if self.shutdown_on_drop {
1814 let _ = self.close();
1815 }
1816 }
1817}
1818
1819impl NativePresenceHandle<'_> {
1820 pub fn update_metadata(&mut self, metadata: Value) -> Result<()> {
1821 self.client
1822 .update_presence_metadata(&self.scope_key, metadata)
1823 }
1824
1825 pub fn leave(mut self) -> Result<()> {
1826 self.leave_inner()
1827 }
1828
1829 fn leave_inner(&mut self) -> Result<()> {
1830 if !self.active {
1831 return Ok(());
1832 }
1833 self.active = false;
1834 self.client.leave_presence(&self.scope_key)
1835 }
1836}
1837
1838impl Drop for NativePresenceHandle<'_> {
1839 fn drop(&mut self) {
1840 let _ = self.leave_inner();
1841 }
1842}
1843
1844impl NativeClientOpenTask {
1845 pub fn open_native_with_options(
1846 config: NativeClientConfig,
1847 options: NativeClientOptions,
1848 ) -> Self {
1849 let command_id = format!("native-open-{}", Uuid::new_v4());
1850 let (result_tx, result_rx) = mpsc::channel();
1851 thread::spawn(move || {
1852 let _ = result_tx.send(NativeSyncularClient::open_native_with_options(
1853 config, options,
1854 ));
1855 });
1856 Self {
1857 command_id,
1858 result_rx: Some(result_rx),
1859 completed: None,
1860 finished: false,
1861 taken: false,
1862 }
1863 }
1864
1865 pub fn command_id(&self) -> &str {
1866 &self.command_id
1867 }
1868
1869 pub fn is_finished(&mut self) -> bool {
1870 self.fill_completed(Duration::ZERO);
1871 self.finished
1872 }
1873
1874 pub fn take_client_timeout(
1875 &mut self,
1876 timeout: Duration,
1877 ) -> Option<Result<NativeSyncularClient>> {
1878 if self.taken {
1879 return Some(Err(SyncularError::message(
1880 ErrorKind::Internal,
1881 "native async open result was already taken",
1882 )));
1883 }
1884 self.fill_completed(timeout);
1885 let result = self.completed.take();
1886 if result.is_some() {
1887 self.taken = true;
1888 }
1889 result
1890 }
1891
1892 fn fill_completed(&mut self, timeout: Duration) {
1893 if self.completed.is_some() {
1894 return;
1895 }
1896 let Some(result_rx) = &self.result_rx else {
1897 return;
1898 };
1899 match result_rx.recv_timeout(timeout) {
1900 Ok(result) => {
1901 self.completed = Some(result);
1902 self.result_rx = None;
1903 self.finished = true;
1904 }
1905 Err(RecvTimeoutError::Timeout) => {}
1906 Err(RecvTimeoutError::Disconnected) => {
1907 self.completed = Some(Err(SyncularError::message(
1908 ErrorKind::Internal,
1909 "native async open worker stopped before returning a client",
1910 )));
1911 self.result_rx = None;
1912 self.finished = true;
1913 }
1914 }
1915 }
1916}
1917
1918impl NativeCrdtFieldRequest {
1919 fn id(&self) -> CrdtFieldId {
1920 CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
1921 }
1922}
1923
1924impl NativeCrdtFieldLogRequest {
1925 fn id(&self) -> CrdtFieldId {
1926 CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
1927 }
1928}
1929
1930impl NativeCrdtFieldTextRequest {
1931 fn id(&self) -> CrdtFieldId {
1932 CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
1933 }
1934}
1935
1936impl NativeCrdtFieldYjsUpdateRequest {
1937 fn id(&self) -> CrdtFieldId {
1938 CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
1939 }
1940}
1941
1942impl NativeCrdtFieldCompactionRequest {
1943 fn id(&self) -> CrdtFieldId {
1944 CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
1945 }
1946}
1947
1948impl NativeEventSubscription {
1949 pub fn recv(&self) -> Option<NativeEvent> {
1950 self.next_event()
1951 }
1952
1953 pub fn recv_timeout(&self, timeout: Duration) -> Option<NativeEvent> {
1954 self.next_event_timeout(timeout)
1955 }
1956
1957 pub fn next_event(&self) -> Option<NativeEvent> {
1958 self.queue.next_event()
1959 }
1960
1961 pub fn next_event_timeout(&self, timeout: Duration) -> Option<NativeEvent> {
1962 self.queue.next_event_timeout(timeout)
1963 }
1964
1965 pub fn next_event_json(&self) -> Option<Result<String>> {
1966 self.next_event()
1967 .map(|event| serde_json::to_string(&event).map_err(Into::into))
1968 }
1969
1970 pub fn next_event_json_timeout(&self, timeout: Duration) -> Option<Result<String>> {
1971 self.next_event_timeout(timeout)
1972 .map(|event| serde_json::to_string(&event).map_err(Into::into))
1973 }
1974
1975 pub fn into_json_iter(self) -> NativeEventJsonIterator {
1976 NativeEventJsonIterator { subscription: self }
1977 }
1978
1979 pub fn close(&self) {
1980 if let Ok(mut subscribers) = self.hub.subscribers.lock() {
1981 subscribers.remove(&self.subscriber_id);
1982 }
1983 self.queue.close();
1984 }
1985}
1986
1987impl Iterator for NativeEventSubscription {
1988 type Item = NativeEvent;
1989
1990 fn next(&mut self) -> Option<Self::Item> {
1991 self.next_event()
1992 }
1993}
1994
1995impl Iterator for NativeEventJsonIterator {
1996 type Item = Result<String>;
1997
1998 fn next(&mut self) -> Option<Self::Item> {
1999 self.subscription.next_event_json()
2000 }
2001}
2002
2003impl Drop for NativeEventSubscription {
2004 fn drop(&mut self) {
2005 self.close();
2006 }
2007}
2008
2009impl NativeEventQueue {
2010 fn new(capacity: usize) -> Self {
2011 Self {
2012 capacity: capacity.max(1),
2013 state: Mutex::new(NativeEventQueueState {
2014 events: VecDeque::new(),
2015 closed: false,
2016 }),
2017 ready: Condvar::new(),
2018 }
2019 }
2020
2021 fn push(&self, event: NativeEvent, overflow_event: impl FnOnce(usize) -> NativeEvent) {
2022 let Ok(mut state) = self.state.lock() else {
2023 return;
2024 };
2025 if state.closed {
2026 return;
2027 }
2028 if state.events.len() >= self.capacity {
2029 let dropped_count = state.events.len().saturating_add(1);
2030 state.events.clear();
2031 state.events.push_back(overflow_event(dropped_count));
2032 state.closed = true;
2033 } else {
2034 state.events.push_back(event);
2035 }
2036 self.ready.notify_one();
2037 }
2038
2039 fn next_event(&self) -> Option<NativeEvent> {
2040 let mut state = self.state.lock().ok()?;
2041 loop {
2042 if let Some(event) = state.events.pop_front() {
2043 return Some(event);
2044 }
2045 if state.closed {
2046 return None;
2047 }
2048 state = self.ready.wait(state).ok()?;
2049 }
2050 }
2051
2052 fn next_event_timeout(&self, timeout: Duration) -> Option<NativeEvent> {
2053 let deadline = std::time::Instant::now().checked_add(timeout)?;
2054 let mut state = self.state.lock().ok()?;
2055 loop {
2056 if let Some(event) = state.events.pop_front() {
2057 return Some(event);
2058 }
2059 if state.closed {
2060 return None;
2061 }
2062 let now = std::time::Instant::now();
2063 if now >= deadline {
2064 return None;
2065 }
2066 let wait = deadline.saturating_duration_since(now);
2067 let (next_state, timeout) = self.ready.wait_timeout(state, wait).ok()?;
2068 state = next_state;
2069 if timeout.timed_out() && state.events.is_empty() {
2070 return None;
2071 }
2072 }
2073 }
2074
2075 fn close(&self) {
2076 if let Ok(mut state) = self.state.lock() {
2077 state.closed = true;
2078 state.events.clear();
2079 self.ready.notify_all();
2080 }
2081 }
2082
2083 fn is_closed(&self) -> bool {
2084 self.state.lock().map(|state| state.closed).unwrap_or(true)
2085 }
2086}
2087
2088fn start_worker_event_pump(
2089 events: NativeEventHub,
2090 worker_events: SyncWorkerEventSubscription,
2091) -> JoinHandle<()> {
2092 thread::spawn(move || {
2093 while let Some(event) = worker_events.next_event() {
2094 events.publish_worker_event(event);
2095 }
2096 })
2097}
2098
2099impl NativeEventHub {
2100 fn subscribe(&self, capacity: usize) -> NativeEventSubscription {
2101 let queue = Arc::new(NativeEventQueue::new(capacity));
2102 let subscriber_id = self.next_subscriber_id();
2103 if let Ok(mut subscribers) = self.subscribers.lock() {
2104 subscribers.insert(subscriber_id, queue.clone());
2105 }
2106 NativeEventSubscription {
2107 hub: self.clone(),
2108 subscriber_id,
2109 queue,
2110 }
2111 }
2112
2113 fn publish_event(&self, event: NativeEvent) {
2114 let event = self.stamp_event(event);
2115 self.record_recent_event(event.clone());
2116 let Ok(mut subscribers) = self.subscribers.lock() else {
2117 return;
2118 };
2119
2120 subscribers.retain(|_, queue| {
2121 queue.push(event.clone(), |dropped_count| {
2122 self.stamp_event(events_overflowed_event(dropped_count))
2123 });
2124 !queue.is_closed()
2125 });
2126 }
2127
2128 fn publish_worker_event(&self, event: SyncWorkerEvent) {
2129 for event in self.events_from_worker_event(event) {
2130 self.publish_event(event);
2131 }
2132 }
2133
2134 fn recent_events(&self) -> Vec<NativeEvent> {
2135 self.recent_events
2136 .lock()
2137 .map(|events| events.iter().cloned().collect())
2138 .unwrap_or_default()
2139 }
2140
2141 fn subscriber_count(&self) -> usize {
2142 self.subscribers
2143 .lock()
2144 .map(|subscribers| subscribers.len())
2145 .unwrap_or_default()
2146 }
2147
2148 fn record_recent_event(&self, event: NativeEvent) {
2149 let Ok(mut events) = self.recent_events.lock() else {
2150 return;
2151 };
2152 events.push_back(native_event_for_recent_diagnostics(event));
2153 while events.len() > DEFAULT_NATIVE_RECENT_EVENT_LIMIT {
2154 events.pop_front();
2155 }
2156 }
2157
2158 fn push_rows_changed_events<'a>(&self, tables: impl IntoIterator<Item = &'a str>) {
2159 self.push_rows_changed_events_with_details(tables, Vec::new(), None);
2160 }
2161
2162 fn push_diagnostic(&self, diagnostic: NativeDiagnostic) {
2163 self.publish_event(diagnostic_event(diagnostic));
2164 }
2165
2166 fn push_rows_changed_events_with_details<'a>(
2167 &self,
2168 tables: impl IntoIterator<Item = &'a str>,
2169 changed_rows: Vec<SyncChangedRow>,
2170 source: Option<&str>,
2171 ) {
2172 let tables = unique_event_tables(tables);
2173 if tables.is_empty() {
2174 return;
2175 }
2176
2177 self.publish_event(rows_changed_event_with_details(
2178 tables.iter().map(String::as_str),
2179 changed_rows.clone(),
2180 source,
2181 ));
2182 let queries = self.changed_query_ids(&tables, &changed_rows);
2183 if !queries.is_empty() {
2184 self.publish_event(queries_changed_event_with_details(
2185 &tables,
2186 queries,
2187 changed_rows,
2188 source,
2189 ));
2190 }
2191 }
2192
2193 fn changed_query_ids(&self, tables: &[String], changed_rows: &[SyncChangedRow]) -> Vec<String> {
2194 let changed = tables.iter().map(String::as_str).collect::<BTreeSet<_>>();
2195 let Ok(observers) = self.query_observers.lock() else {
2196 return Vec::new();
2197 };
2198
2199 observers
2200 .values()
2201 .filter(|query| observed_query_should_notify(query, &changed, changed_rows))
2202 .map(|query| query.id.clone())
2203 .collect()
2204 }
2205
2206 fn stamp_event(&self, mut event: NativeEvent) -> NativeEvent {
2207 if let Ok(mut seq) = self.event_seq.lock() {
2208 *seq = seq.saturating_add(1);
2209 event.event_seq = *seq;
2210 }
2211 event
2212 }
2213
2214 fn next_subscriber_id(&self) -> u64 {
2215 if let Ok(mut seq) = self.subscriber_seq.lock() {
2216 *seq = seq.saturating_add(1);
2217 *seq
2218 } else {
2219 0
2220 }
2221 }
2222
2223 fn events_from_worker_event(&self, event: SyncWorkerEvent) -> Vec<NativeEvent> {
2224 match event {
2225 SyncWorkerEvent::SyncStarted { command_id } => {
2226 vec![sync_started_event(command_id)]
2227 }
2228 SyncWorkerEvent::SyncCompleted {
2229 command_id,
2230 report,
2231 bootstrap,
2232 outbox_count,
2233 conflict_count,
2234 duration_ms,
2235 } => {
2236 let mut events = vec![sync_completed_event(
2237 report.clone(),
2238 bootstrap,
2239 command_id.clone(),
2240 outbox_count,
2241 conflict_count,
2242 duration_ms,
2243 )];
2244 if !report.changed_tables.is_empty() {
2245 events.push(rows_changed_event_with_details(
2246 report.changed_tables.iter().map(String::as_str),
2247 report.changed_rows.clone(),
2248 Some("remotePull"),
2249 ));
2250 let queries =
2251 self.changed_query_ids(&report.changed_tables, &report.changed_rows);
2252 if !queries.is_empty() {
2253 events.push(queries_changed_event_with_details(
2254 &report.changed_tables,
2255 queries,
2256 report.changed_rows.clone(),
2257 Some("remotePull"),
2258 ));
2259 }
2260 events.extend(crdt_field_changed_events_from_changed_rows(
2261 &report.changed_rows,
2262 "remotePull",
2263 command_id,
2264 Some(duration_ms),
2265 ));
2266 }
2267 if report.conflicts_changed {
2268 events.push(conflicts_changed_event());
2269 }
2270 events
2271 }
2272 SyncWorkerEvent::SyncFailed {
2273 command_id,
2274 error,
2275 retry_scheduled,
2276 duration_ms,
2277 } => vec![sync_failed_event(
2278 &error,
2279 command_id,
2280 retry_scheduled,
2281 duration_ms,
2282 )],
2283 SyncWorkerEvent::LocalWriteCommitted {
2284 command_id,
2285 client_commit_id,
2286 changed_tables,
2287 changed_rows,
2288 outbox_count,
2289 duration_ms,
2290 } => {
2291 let mut events = vec![local_write_committed_event(
2292 command_id,
2293 client_commit_id,
2294 changed_tables.clone(),
2295 changed_rows.clone(),
2296 outbox_count,
2297 duration_ms,
2298 )];
2299 if !changed_tables.is_empty() {
2300 events.push(rows_changed_event_with_details(
2301 changed_tables.iter().map(String::as_str),
2302 changed_rows.clone(),
2303 Some("localWrite"),
2304 ));
2305 let queries = self.changed_query_ids(&changed_tables, &changed_rows);
2306 if !queries.is_empty() {
2307 events.push(queries_changed_event_with_details(
2308 &changed_tables,
2309 queries,
2310 changed_rows,
2311 Some("localWrite"),
2312 ));
2313 }
2314 }
2315 events
2316 }
2317 SyncWorkerEvent::LocalWriteFailed {
2318 command_id,
2319 error,
2320 payload_json,
2321 duration_ms,
2322 } => vec![local_write_failed_event(
2323 &error,
2324 command_id,
2325 payload_json,
2326 duration_ms,
2327 )],
2328 SyncWorkerEvent::CrdtFieldChanged {
2329 command_id,
2330 client_commit_id,
2331 table,
2332 row_id,
2333 field,
2334 changed_tables,
2335 payload_json,
2336 duration_ms,
2337 } => vec![crdt_field_changed_event_from_parts(
2338 CrdtFieldEventParts {
2339 table,
2340 row_id,
2341 field,
2342 changed_tables,
2343 client_commit_id: Some(client_commit_id),
2344 checkpoint_created: None,
2345 extra_payload_json: payload_json,
2346 },
2347 Some(command_id),
2348 Some(duration_ms),
2349 )],
2350 SyncWorkerEvent::CrdtFieldCompacted {
2351 command_id,
2352 client_commit_id,
2353 table,
2354 row_id,
2355 field,
2356 changed_tables,
2357 checkpoint_created,
2358 payload_json,
2359 duration_ms,
2360 } => vec![crdt_field_compacted_event_from_parts(
2361 CrdtFieldEventParts {
2362 table,
2363 row_id,
2364 field,
2365 changed_tables,
2366 client_commit_id,
2367 checkpoint_created: Some(checkpoint_created),
2368 extra_payload_json: payload_json,
2369 },
2370 Some(command_id),
2371 Some(duration_ms),
2372 )],
2373 SyncWorkerEvent::ConflictResolutionCompleted {
2374 command_id,
2375 retry_client_commit_id,
2376 duration_ms,
2377 } => vec![
2378 conflict_resolution_completed_event(
2379 command_id,
2380 retry_client_commit_id,
2381 duration_ms,
2382 ),
2383 conflicts_changed_event(),
2384 ],
2385 SyncWorkerEvent::ConflictResolutionFailed {
2386 command_id,
2387 error,
2388 duration_ms,
2389 } => vec![conflict_resolution_failed_event(
2390 &error,
2391 command_id,
2392 duration_ms,
2393 )],
2394 SyncWorkerEvent::SnapshotReady {
2395 command_id,
2396 payload_json,
2397 duration_ms,
2398 } => vec![snapshot_ready_event(command_id, payload_json, duration_ms)],
2399 SyncWorkerEvent::WorkerCommandCompleted {
2400 command_id,
2401 operation,
2402 payload_json,
2403 duration_ms,
2404 } => vec![worker_command_completed_event(
2405 command_id,
2406 operation,
2407 payload_json,
2408 duration_ms,
2409 )],
2410 SyncWorkerEvent::WorkerCommandFailed {
2411 command_id,
2412 operation,
2413 error,
2414 duration_ms,
2415 } => vec![worker_command_failed_event(
2416 &error,
2417 command_id,
2418 operation,
2419 duration_ms,
2420 )],
2421 SyncWorkerEvent::BlobUploadsChanged { stats_json } => {
2422 vec![blob_uploads_changed_event(stats_json)]
2423 }
2424 SyncWorkerEvent::EventsOverflowed { dropped_count } => {
2425 vec![events_overflowed_event(dropped_count)]
2426 }
2427 }
2428 }
2429}
2430
2431#[derive(Clone, Default)]
2432pub struct NativeWorkerEventConverter {
2433 hub: NativeEventHub,
2434}
2435
2436impl NativeWorkerEventConverter {
2437 pub fn new() -> Self {
2438 Self::default()
2439 }
2440
2441 pub fn set_observed_queries(&self, observed_queries: &[NativeObservedQuery]) {
2442 if let Ok(mut observers) = self.hub.query_observers.lock() {
2443 observers.clear();
2444 for query in observed_queries {
2445 observers.insert(query.id.clone(), query.clone());
2446 }
2447 }
2448 }
2449
2450 pub fn convert(&self, event: SyncWorkerEvent) -> Vec<NativeEvent> {
2451 self.hub
2452 .events_from_worker_event(event)
2453 .into_iter()
2454 .map(|event| self.hub.stamp_event(event))
2455 .collect()
2456 }
2457
2458 pub fn convert_json(&self, event: SyncWorkerEvent) -> Result<Vec<String>> {
2459 self.convert(event)
2460 .into_iter()
2461 .map(|event| serde_json::to_string(&event).map_err(Into::into))
2462 .collect()
2463 }
2464}
2465
2466pub fn native_events_from_worker_event(event: SyncWorkerEvent) -> Vec<NativeEvent> {
2467 NativeWorkerEventConverter::new().convert(event)
2468}
2469
2470pub fn native_events_from_worker_event_with_observed_queries(
2471 event: SyncWorkerEvent,
2472 observed_queries: &[NativeObservedQuery],
2473) -> Vec<NativeEvent> {
2474 let converter = NativeWorkerEventConverter::new();
2475 converter.set_observed_queries(observed_queries);
2476 converter.convert(event)
2477}
2478
2479pub fn native_event_json_from_worker_event(event: SyncWorkerEvent) -> Result<Vec<String>> {
2480 NativeWorkerEventConverter::new().convert_json(event)
2481}
2482
2483impl NativeObservedQueryRegistration {
2484 fn into_observed_query(self, app_schema: AppSchema) -> Result<NativeObservedQuery> {
2485 let id = match self.id {
2486 Some(id) => {
2487 let id = id.trim();
2488 if id.is_empty() {
2489 return Err(SyncularError::config("native observed query id is empty"));
2490 }
2491 id.to_string()
2492 }
2493 None => format!("native-query-{}", Uuid::new_v4()),
2494 };
2495
2496 let tables = normalize_observed_tables(self.tables, app_schema)?;
2497 let dependency_hints =
2498 normalize_observed_query_dependency_hints(self.dependency_hints, &tables, app_schema)?;
2499
2500 Ok(NativeObservedQuery {
2501 id,
2502 tables,
2503 dependency_hints,
2504 label: self.label,
2505 })
2506 }
2507}
2508
2509impl NativeErrorInfo {
2510 pub fn from_error(error: &SyncularError) -> Self {
2511 let classification = error.classification();
2512 Self {
2513 kind: error.kind(),
2514 code: classification.code,
2515 category: classification.category,
2516 retryable: classification.retryable,
2517 recommended_action: classification.recommended_action,
2518 message: error.message_text(),
2519 debug: Some(error.debug_text()),
2520 }
2521 }
2522}
2523
2524impl From<NativeClientConfig> for SyncularClientConfig {
2525 fn from(config: NativeClientConfig) -> Self {
2526 Self {
2527 db_path: config.db_path,
2528 base_url: config.base_url,
2529 client_id: config.client_id,
2530 actor_id: config.actor_id,
2531 project_id: config.project_id,
2532 }
2533 }
2534}
2535
2536impl NativeClientConfig {
2537 fn app_schema(&self) -> Result<AppSchema> {
2538 match self.app_schema_json.as_deref() {
2539 Some(schema_json) => app_schema_from_json(schema_json),
2540 None => Ok(default_app_schema()),
2541 }
2542 }
2543}
2544
2545fn native_diagnostic_subscription_snapshots(
2546 subscriptions: &[SubscriptionSpec],
2547 bootstrap: &BootstrapStatus,
2548) -> Vec<NativeDiagnosticSubscriptionSnapshot> {
2549 subscriptions
2550 .iter()
2551 .map(|subscription| {
2552 let status = bootstrap
2553 .subscriptions
2554 .iter()
2555 .find(|status| status.id == subscription.id);
2556 NativeDiagnosticSubscriptionSnapshot {
2557 id: subscription.id.clone(),
2558 table: subscription.table.clone(),
2559 scope_keys: sorted_json_map_keys(&subscription.scopes),
2560 scope_value_count: count_redacted_values(&subscription.scopes),
2561 params_keys: sorted_json_map_keys(&subscription.params),
2562 params_value_count: count_redacted_values(&subscription.params),
2563 status: status.and_then(|status| status.status.clone()),
2564 ready: status.is_some_and(|status| status.ready),
2565 phase: status
2566 .map(|status| status.phase.clone())
2567 .unwrap_or_else(|| "pending".to_string()),
2568 progress_percent: status.map_or(0, |status| status.progress_percent),
2569 cursor: status.and_then(|status| status.cursor),
2570 bootstrap_phase: subscription.bootstrap_phase,
2571 bootstrap_state: status.and_then(|status| status.bootstrap_state.clone()),
2572 }
2573 })
2574 .collect()
2575}
2576
2577fn sorted_json_map_keys(map: &Map<String, Value>) -> Vec<String> {
2578 let mut keys = map.keys().cloned().collect::<Vec<_>>();
2579 keys.sort();
2580 keys
2581}
2582
2583fn count_redacted_values(map: &Map<String, Value>) -> usize {
2584 map.values()
2585 .map(|value| match value {
2586 Value::Array(values) => values.len(),
2587 Value::Null => 0,
2588 _ => 1,
2589 })
2590 .sum()
2591}
2592
2593fn native_outbox_stats(outbox: &[OutboxSummary]) -> NativeOutboxStats {
2594 let mut stats = NativeOutboxStats {
2595 total: outbox.len(),
2596 ..NativeOutboxStats::default()
2597 };
2598 for item in outbox {
2599 match item.status.as_str() {
2600 "pending" => stats.pending += 1,
2601 "sending" => stats.sending += 1,
2602 "failed" => stats.failed += 1,
2603 "acked" => stats.acked += 1,
2604 _ => {}
2605 }
2606 }
2607 stats
2608}
2609
2610fn native_conflict_stats(conflicts: &[ConflictSummary]) -> NativeConflictStats {
2611 let mut stats = NativeConflictStats {
2612 total: conflicts.len(),
2613 ..NativeConflictStats::default()
2614 };
2615 for conflict in conflicts {
2616 if conflict.resolved_at.is_some() {
2617 stats.resolved += 1;
2618 } else {
2619 stats.unresolved += 1;
2620 }
2621 }
2622 stats
2623}
2624
2625fn native_sync_timing_snapshots(events: &[NativeEvent]) -> Vec<NativeSyncTimingSnapshot> {
2626 events
2627 .iter()
2628 .filter_map(|event| {
2629 let total_ms = event.duration_ms?;
2630 let (kind, success) = match event.kind {
2631 NativeEventKind::SyncCompleted => ("syncCompleted", true),
2632 NativeEventKind::SyncFailed => ("syncFailed", false),
2633 NativeEventKind::AuthExpired => ("authExpired", false),
2634 _ => return None,
2635 };
2636 Some(NativeSyncTimingSnapshot {
2637 event_seq: event.event_seq,
2638 kind: kind.to_string(),
2639 command_id: event.command_id.clone(),
2640 total_ms,
2641 success,
2642 retry_scheduled: event.retry_scheduled,
2643 outbox_count: event.outbox_count,
2644 conflict_count: event.conflict_count,
2645 })
2646 })
2647 .collect()
2648}
2649
2650fn crdt_field_descriptor(field: &CrdtField) -> Value {
2651 json!({
2652 "table": field.table(),
2653 "rowId": field.row_id(),
2654 "field": field.field(),
2655 "stateColumn": field.state_column(),
2656 "containerKey": field.container_key(),
2657 "rowIdField": field.row_id_field(),
2658 "syncMode": field.sync_mode(),
2659 "kind": field.field_metadata().kind,
2660 })
2661}
2662
2663fn crdt_field_write_receipt_json(receipt: CrdtFieldWriteReceipt) -> Result<String> {
2664 Ok(serde_json::to_string(&receipt)?)
2665}
2666
2667fn crdt_field_materialization_json(materialization: CrdtFieldMaterialization) -> Result<String> {
2668 Ok(serde_json::to_string(&materialization)?)
2669}
2670
2671fn crdt_field_compaction_receipt_json(receipt: CrdtFieldCompactionReceipt) -> Result<String> {
2672 Ok(serde_json::to_string(&receipt)?)
2673}
2674
2675fn crdt_field_write_tables(field: &CrdtField) -> Vec<&'static str> {
2676 match field.sync_mode() {
2677 CrdtFieldSyncMode::ServerMerge => vec![field.table()],
2678 CrdtFieldSyncMode::EncryptedUpdateLog => vec![field.table(), CRDT_UPDATES_TABLE],
2679 }
2680}
2681
2682fn crdt_field_compaction_tables(field: &CrdtField) -> Vec<&'static str> {
2683 match field.sync_mode() {
2684 CrdtFieldSyncMode::ServerMerge => vec![field.table()],
2685 CrdtFieldSyncMode::EncryptedUpdateLog => vec![CRDT_CHECKPOINTS_TABLE],
2686 }
2687}
2688
2689fn crdt_field_changed_row(field: &CrdtField, client_commit_id: Option<String>) -> SyncChangedRow {
2690 let crdt_field_changes = vec![sync_changed_crdt_field_from_metadata(
2691 field.field_metadata(),
2692 )];
2693 SyncChangedRow {
2694 table: field.table().to_string(),
2695 row_id: Some(field.row_id().to_string()),
2696 operation: "update".to_string(),
2697 changed_fields: vec![field.field().to_string(), field.state_column().to_string()],
2698 crdt_fields: vec![field.state_column().to_string()],
2699 crdt_field_changes,
2700 commit_id: client_commit_id,
2701 commit_seq: None,
2702 subscription_id: None,
2703 server_version: None,
2704 }
2705}
2706
2707fn crdt_field_compacted_row(field: &CrdtField, client_commit_id: Option<String>) -> SyncChangedRow {
2708 let crdt_field_changes = vec![sync_changed_crdt_field_from_metadata(
2709 field.field_metadata(),
2710 )];
2711 SyncChangedRow {
2712 table: field.table().to_string(),
2713 row_id: Some(field.row_id().to_string()),
2714 operation: "compact".to_string(),
2715 changed_fields: vec![field.state_column().to_string()],
2716 crdt_fields: vec![field.state_column().to_string()],
2717 crdt_field_changes,
2718 commit_id: client_commit_id,
2719 commit_seq: None,
2720 subscription_id: None,
2721 server_version: None,
2722 }
2723}
2724
2725#[derive(Debug)]
2726struct CrdtFieldEventParts {
2727 table: String,
2728 row_id: String,
2729 field: String,
2730 changed_tables: Vec<String>,
2731 client_commit_id: Option<String>,
2732 checkpoint_created: Option<bool>,
2733 extra_payload_json: Option<Value>,
2734}
2735
2736fn crdt_field_changed_event(
2737 field: &CrdtField,
2738 client_commit_id: Option<String>,
2739 changed_tables: Vec<String>,
2740 command_id: Option<String>,
2741 duration_ms: Option<u64>,
2742 extra_payload_json: Option<Value>,
2743) -> NativeEvent {
2744 crdt_field_changed_event_from_parts(
2745 CrdtFieldEventParts {
2746 table: field.table().to_string(),
2747 row_id: field.row_id().to_string(),
2748 field: field.field().to_string(),
2749 changed_tables,
2750 client_commit_id,
2751 checkpoint_created: None,
2752 extra_payload_json,
2753 },
2754 command_id,
2755 duration_ms,
2756 )
2757}
2758
2759fn crdt_field_changed_event_from_parts(
2760 parts: CrdtFieldEventParts,
2761 command_id: Option<String>,
2762 duration_ms: Option<u64>,
2763) -> NativeEvent {
2764 let mut event = native_event(
2765 NativeEventKind::CrdtFieldChanged,
2766 parts.changed_tables.clone(),
2767 Some(native_diagnostic(
2768 "info",
2769 "storage",
2770 "crdt.field_changed",
2771 "Native Syncular CRDT field changed",
2772 [
2773 ("commandId", json!(command_id.clone())),
2774 ("clientCommitId", json!(parts.client_commit_id.clone())),
2775 ("table", json!(parts.table.clone())),
2776 ("rowId", json!(parts.row_id.clone())),
2777 ("field", json!(parts.field.clone())),
2778 ("tables", json!(parts.changed_tables.clone())),
2779 ("durationMs", json!(duration_ms)),
2780 ],
2781 )),
2782 );
2783 event.command_id = command_id;
2784 event.client_commit_id = parts.client_commit_id.clone();
2785 event.duration_ms = duration_ms;
2786 event.payload_json = Some(crdt_field_payload(parts));
2787 event
2788}
2789
2790fn crdt_field_compacted_event(
2791 field: &CrdtField,
2792 client_commit_id: Option<String>,
2793 changed_tables: Vec<String>,
2794 command_id: Option<String>,
2795 duration_ms: Option<u64>,
2796 checkpoint_created: bool,
2797 extra_payload_json: Option<Value>,
2798) -> NativeEvent {
2799 crdt_field_compacted_event_from_parts(
2800 CrdtFieldEventParts {
2801 table: field.table().to_string(),
2802 row_id: field.row_id().to_string(),
2803 field: field.field().to_string(),
2804 changed_tables,
2805 client_commit_id,
2806 checkpoint_created: Some(checkpoint_created),
2807 extra_payload_json,
2808 },
2809 command_id,
2810 duration_ms,
2811 )
2812}
2813
2814fn crdt_field_compacted_event_from_parts(
2815 parts: CrdtFieldEventParts,
2816 command_id: Option<String>,
2817 duration_ms: Option<u64>,
2818) -> NativeEvent {
2819 let mut event = native_event(
2820 NativeEventKind::CrdtFieldCompacted,
2821 parts.changed_tables.clone(),
2822 Some(native_diagnostic(
2823 "info",
2824 "storage",
2825 "crdt.field_compacted",
2826 "Native Syncular CRDT field compacted",
2827 [
2828 ("commandId", json!(command_id.clone())),
2829 ("clientCommitId", json!(parts.client_commit_id.clone())),
2830 ("table", json!(parts.table.clone())),
2831 ("rowId", json!(parts.row_id.clone())),
2832 ("field", json!(parts.field.clone())),
2833 (
2834 "checkpointCreated",
2835 json!(parts.checkpoint_created.unwrap_or(true)),
2836 ),
2837 ("tables", json!(parts.changed_tables.clone())),
2838 ("durationMs", json!(duration_ms)),
2839 ],
2840 )),
2841 );
2842 event.command_id = command_id;
2843 event.client_commit_id = parts.client_commit_id.clone();
2844 event.duration_ms = duration_ms;
2845 event.payload_json = Some(crdt_field_payload(parts));
2846 event
2847}
2848
2849fn crdt_field_event_payload(
2850 client: &mut SyncularClient<DieselSqliteStore, HttpSyncTransport>,
2851 field: &CrdtField,
2852) -> Option<Value> {
2853 let mut payload = crdt_field_base_event_payload(field);
2854 match client.materialize_crdt_field(field) {
2855 Ok(materialization) => {
2856 payload.insert("materializationAvailable".to_string(), json!(true));
2857 payload.insert(
2858 "hasState".to_string(),
2859 json!(materialization.state_base64.is_some()),
2860 );
2861 payload.insert(
2862 "stateVectorBase64".to_string(),
2863 json!(materialization.state_vector_base64),
2864 );
2865 }
2866 Err(error) => {
2867 payload.insert("materializationAvailable".to_string(), json!(false));
2868 payload.insert(
2869 "materializationError".to_string(),
2870 json!(error.message_text()),
2871 );
2872 }
2873 }
2874 Some(Value::Object(payload))
2875}
2876
2877fn crdt_field_compaction_event_payload(
2878 client: &mut SyncularClient<DieselSqliteStore, HttpSyncTransport>,
2879 field: &CrdtField,
2880 receipt: &CrdtFieldCompactionReceipt,
2881 checkpoint_created: bool,
2882 min_uncheckpointed_updates: i64,
2883) -> Option<Value> {
2884 let mut payload = crdt_field_event_payload(client, field)
2885 .and_then(|value| value.as_object().cloned())
2886 .unwrap_or_else(|| crdt_field_base_event_payload(field));
2887 payload.insert("checkpointCreated".to_string(), json!(checkpoint_created));
2888 payload.insert(
2889 "minUncheckpointedUpdates".to_string(),
2890 json!(min_uncheckpointed_updates),
2891 );
2892 payload.insert("before".to_string(), json!(&receipt.before));
2893 payload.insert("after".to_string(), json!(&receipt.after));
2894 payload.insert(
2895 "encryptedStreamBefore".to_string(),
2896 json!(&receipt.encrypted_stream_before),
2897 );
2898 payload.insert(
2899 "encryptedStreamAfter".to_string(),
2900 json!(&receipt.encrypted_stream_after),
2901 );
2902 Some(Value::Object(payload))
2903}
2904
2905fn crdt_field_base_event_payload(field: &CrdtField) -> serde_json::Map<String, Value> {
2906 let mut payload = serde_json::Map::new();
2907 payload.insert("syncMode".to_string(), json!(field.sync_mode()));
2908 payload.insert("kind".to_string(), json!(field.field_metadata().kind));
2909 payload.insert("stateColumn".to_string(), json!(field.state_column()));
2910 payload.insert("containerKey".to_string(), json!(field.container_key()));
2911 payload.insert("rowIdField".to_string(), json!(field.row_id_field()));
2912 payload
2913}
2914
2915fn crdt_field_payload(parts: CrdtFieldEventParts) -> Value {
2916 let mut payload = json!({
2917 "table": parts.table,
2918 "rowId": parts.row_id,
2919 "field": parts.field,
2920 "changedTables": parts.changed_tables,
2921 });
2922 if let Value::Object(ref mut object) = payload {
2923 if let Some(client_commit_id) = parts.client_commit_id {
2924 object.insert("clientCommitId".to_string(), json!(client_commit_id));
2925 }
2926 if let Some(checkpoint_created) = parts.checkpoint_created {
2927 object.insert("checkpointCreated".to_string(), json!(checkpoint_created));
2928 }
2929 if let Some(Value::Object(extra)) = parts.extra_payload_json {
2930 for (key, value) in extra {
2931 object.entry(key).or_insert(value);
2932 }
2933 }
2934 }
2935 payload
2936}
2937
2938fn crdt_field_changed_events_from_changed_rows(
2939 changed_rows: &[SyncChangedRow],
2940 source: &str,
2941 command_id: Option<String>,
2942 duration_ms: Option<u64>,
2943) -> Vec<NativeEvent> {
2944 let mut events = Vec::new();
2945 for row in changed_rows {
2946 let Some(row_id) = row.row_id.as_ref() else {
2947 continue;
2948 };
2949 for field in &row.crdt_field_changes {
2950 events.push(crdt_field_changed_event_from_parts(
2951 CrdtFieldEventParts {
2952 table: row.table.clone(),
2953 row_id: row_id.clone(),
2954 field: field.field.clone(),
2955 changed_tables: vec![row.table.clone()],
2956 client_commit_id: None,
2957 checkpoint_created: None,
2958 extra_payload_json: Some(json!({
2959 "source": source,
2960 "operation": row.operation,
2961 "stateColumn": field.state_column,
2962 "containerKey": field.container_key,
2963 "rowIdField": field.row_id_field,
2964 "kind": field.kind,
2965 "syncMode": field.sync_mode,
2966 "commitId": row.commit_id,
2967 "commitSeq": row.commit_seq,
2968 "subscriptionId": row.subscription_id,
2969 "serverVersion": row.server_version,
2970 "changedFields": row.changed_fields,
2971 "crdtFields": row.crdt_fields,
2972 })),
2973 },
2974 command_id.clone(),
2975 duration_ms,
2976 ));
2977 }
2978 }
2979 events
2980}
2981
2982fn native_event_for_recent_diagnostics(mut event: NativeEvent) -> NativeEvent {
2983 if let Some(payload_json) = event.payload_json.as_ref() {
2984 if let Ok(bytes) = serde_json::to_vec(payload_json) {
2985 if bytes.len() > MAX_NATIVE_DIAGNOSTIC_EVENT_PAYLOAD_JSON_BYTES {
2986 event.payload_json = Some(json!({
2987 "truncated": true,
2988 "reason": "diagnosticPayloadLimit",
2989 "originalBytes": bytes.len(),
2990 "maxBytes": MAX_NATIVE_DIAGNOSTIC_EVENT_PAYLOAD_JSON_BYTES,
2991 "limit": "maxNativeDiagnosticEventPayloadJsonBytes"
2992 }));
2993 }
2994 }
2995 }
2996 event
2997}
2998
2999fn rows_changed_event_with_details<'a>(
3000 tables: impl IntoIterator<Item = &'a str>,
3001 changed_rows: Vec<SyncChangedRow>,
3002 source: Option<&str>,
3003) -> NativeEvent {
3004 let tables = tables.into_iter().map(str::to_string).collect::<Vec<_>>();
3005 let mut event = native_event(
3006 NativeEventKind::RowsChanged,
3007 tables.clone(),
3008 Some(native_diagnostic(
3009 "info",
3010 "storage",
3011 "storage.rows_changed",
3012 "Native Syncular rows changed",
3013 [
3014 ("tables", json!(tables)),
3015 ("source", json!(source)),
3016 ("changedRows", json!(changed_rows.clone())),
3017 ],
3018 )),
3019 );
3020 event.changed_rows = changed_rows.clone();
3021 event.payload_json = Some(json!({
3022 "type": "rowsChanged",
3023 "source": source,
3024 "changedRows": changed_rows,
3025 }));
3026 event
3027}
3028
3029fn apply_native_presence_event(
3030 presence_by_scope: &Arc<Mutex<BTreeMap<String, Vec<NativePresenceEntry>>>>,
3031 events: &NativeEventHub,
3032 event: RealtimePresenceEvent,
3033) {
3034 let scope_key = event.scope_key.clone();
3035 let Ok(mut state) = presence_by_scope.lock() else {
3036 return;
3037 };
3038 let current = state.get(&scope_key).cloned().unwrap_or_default();
3039 let next = match event.action.as_str() {
3040 "snapshot" => event
3041 .entries
3042 .into_iter()
3043 .map(native_presence_entry_from_realtime)
3044 .collect::<Vec<_>>(),
3045 "leave" => {
3046 let Some(client_id) = event.client_id.as_deref() else {
3047 return;
3048 };
3049 current
3050 .into_iter()
3051 .filter(|entry| entry.client_id != client_id)
3052 .collect()
3053 }
3054 "update" => {
3055 let Some(client_id) = event.client_id.as_deref() else {
3056 return;
3057 };
3058 if !current.iter().any(|entry| entry.client_id == client_id) {
3059 return;
3060 }
3061 current
3062 .into_iter()
3063 .map(|entry| {
3064 if entry.client_id == client_id {
3065 NativePresenceEntry {
3066 metadata: event.metadata.clone(),
3067 ..entry
3068 }
3069 } else {
3070 entry
3071 }
3072 })
3073 .collect()
3074 }
3075 "join" => {
3076 let (Some(client_id), Some(actor_id)) = (event.client_id, event.actor_id) else {
3077 return;
3078 };
3079 let joined_at = current
3080 .iter()
3081 .find(|entry| entry.client_id == client_id)
3082 .map(|entry| entry.joined_at)
3083 .unwrap_or_else(now_ms);
3084 let mut next = current
3085 .into_iter()
3086 .filter(|entry| entry.client_id != client_id)
3087 .collect::<Vec<_>>();
3088 next.push(NativePresenceEntry {
3089 client_id,
3090 actor_id,
3091 joined_at,
3092 metadata: event.metadata,
3093 });
3094 next
3095 }
3096 _ => return,
3097 };
3098 if next.is_empty() {
3099 state.remove(&scope_key);
3100 } else {
3101 state.insert(scope_key.clone(), next.clone());
3102 }
3103 drop(state);
3104 events.publish_event(presence_changed_event(scope_key, next));
3105}
3106
3107fn native_presence_entry_from_realtime(entry: RealtimePresenceEntry) -> NativePresenceEntry {
3108 NativePresenceEntry {
3109 client_id: entry.client_id,
3110 actor_id: entry.actor_id,
3111 joined_at: entry.joined_at,
3112 metadata: entry.metadata,
3113 }
3114}
3115
3116fn presence_changed_event(scope_key: String, presence: Vec<NativePresenceEntry>) -> NativeEvent {
3117 let mut event = native_event(
3118 NativeEventKind::PresenceChanged,
3119 Vec::new(),
3120 Some(native_diagnostic(
3121 "info",
3122 "realtime",
3123 "realtime.presence_changed",
3124 "Native Syncular presence changed",
3125 [
3126 ("scopeKey", json!(scope_key.clone())),
3127 ("presence", json!(presence.clone())),
3128 ],
3129 )),
3130 );
3131 event.payload_json = Some(json!({
3132 "type": "presenceChanged",
3133 "scopeKey": scope_key,
3134 "presence": presence,
3135 }));
3136 event
3137}
3138
3139fn queries_changed_event_with_details(
3140 tables: &[String],
3141 queries: Vec<String>,
3142 changed_rows: Vec<SyncChangedRow>,
3143 source: Option<&str>,
3144) -> NativeEvent {
3145 let mut event = native_event(
3146 NativeEventKind::QueriesChanged,
3147 tables.to_vec(),
3148 Some(native_diagnostic(
3149 "info",
3150 "storage",
3151 "storage.queries_changed",
3152 "Native Syncular observed queries changed",
3153 [
3154 ("tables", json!(tables)),
3155 ("queries", json!(queries.clone())),
3156 ("source", json!(source)),
3157 ("changedRows", json!(changed_rows.clone())),
3158 ],
3159 )),
3160 );
3161 event.changed_rows = changed_rows;
3162 event.queries = queries;
3163 event
3164}
3165
3166fn conflicts_changed_event() -> NativeEvent {
3167 let diagnostic = native_diagnostic(
3168 "warn",
3169 "sync",
3170 "sync.conflicts_changed",
3171 "Native Syncular conflicts changed",
3172 std::iter::empty::<(&str, Value)>(),
3173 );
3174 let mut event = native_event(
3175 NativeEventKind::ConflictsChanged,
3176 Vec::new(),
3177 Some(diagnostic.clone()),
3178 );
3179 event.lifecycle = Some(native_lifecycle_state(
3180 NativeLifecyclePhase::Degraded,
3181 false,
3182 true,
3183 0,
3184 None,
3185 None,
3186 Some(NativeLifecycleConflicts { unresolved: 1 }),
3187 None,
3188 None,
3189 Some(diagnostic),
3190 ));
3191 event
3192}
3193
3194fn native_lifecycle_blob_uploads(stats_json: &Value) -> Option<NativeLifecycleBlobUploads> {
3195 Some(NativeLifecycleBlobUploads {
3196 pending: stats_json.get("pending")?.as_i64()?,
3197 uploading: stats_json.get("uploading")?.as_i64()?,
3198 failed: stats_json.get("failed")?.as_i64()?,
3199 })
3200}
3201
3202fn native_lifecycle_bootstrap(status: &BootstrapStatus) -> NativeLifecycleBootstrap {
3203 NativeLifecycleBootstrap {
3204 complete: status.complete,
3205 critical_ready: status.critical_ready,
3206 interactive_ready: status.interactive_ready,
3207 is_bootstrapping: status.is_bootstrapping,
3208 progress_percent: status.progress_percent,
3209 }
3210}
3211
3212fn native_lifecycle_state(
3213 phase: NativeLifecyclePhase,
3214 online: bool,
3215 requires_action: bool,
3216 pending_requests: usize,
3217 bootstrap: Option<NativeLifecycleBootstrap>,
3218 outbox: Option<NativeLifecycleOutbox>,
3219 conflicts: Option<NativeLifecycleConflicts>,
3220 blob_uploads: Option<NativeLifecycleBlobUploads>,
3221 last_error: Option<NativeErrorInfo>,
3222 last_diagnostic: Option<NativeDiagnostic>,
3223) -> NativeLifecycleState {
3224 NativeLifecycleState {
3225 phase,
3226 online,
3227 requires_action,
3228 pending_requests,
3229 bootstrap,
3230 outbox,
3231 conflicts,
3232 blob_uploads,
3233 last_error,
3234 last_diagnostic,
3235 }
3236}
3237
3238fn native_lifecycle_for_sync_completed(
3239 bootstrap: &BootstrapStatus,
3240 outbox_count: usize,
3241 conflict_count: usize,
3242 diagnostic: NativeDiagnostic,
3243) -> NativeLifecycleState {
3244 let has_conflicts = conflict_count > 0;
3245 let phase = if has_conflicts {
3246 NativeLifecyclePhase::Degraded
3247 } else if bootstrap.complete {
3248 NativeLifecyclePhase::Complete
3249 } else {
3250 NativeLifecyclePhase::Recovering
3251 };
3252 native_lifecycle_state(
3253 phase,
3254 true,
3255 has_conflicts,
3256 0,
3257 Some(native_lifecycle_bootstrap(bootstrap)),
3258 Some(NativeLifecycleOutbox {
3259 pending: outbox_count,
3260 }),
3261 Some(NativeLifecycleConflicts {
3262 unresolved: conflict_count,
3263 }),
3264 None,
3265 None,
3266 Some(diagnostic),
3267 )
3268}
3269
3270fn native_lifecycle_for_error(
3271 error: &SyncularError,
3272 retry_scheduled: bool,
3273 diagnostic: NativeDiagnostic,
3274) -> NativeLifecycleState {
3275 let error_info = NativeErrorInfo::from_error(error);
3276 let classification = error.classification();
3277 let resync_required = error.requires_full_snapshot_resync();
3278 let phase = if classification.code == "sync.auth_required" {
3279 NativeLifecyclePhase::AuthRequired
3280 } else if resync_required {
3281 NativeLifecyclePhase::Recovering
3282 } else if classification.category == "offline" || retry_scheduled {
3283 NativeLifecyclePhase::Offline
3284 } else {
3285 NativeLifecyclePhase::Degraded
3286 };
3287 let requires_action = matches!(phase, NativeLifecyclePhase::AuthRequired)
3288 || (matches!(phase, NativeLifecyclePhase::Degraded) && !classification.retryable);
3289 native_lifecycle_state(
3290 phase,
3291 false,
3292 requires_action,
3293 0,
3294 None,
3295 None,
3296 None,
3297 None,
3298 Some(error_info),
3299 Some(diagnostic),
3300 )
3301}
3302
3303fn events_overflowed_event(dropped_count: usize) -> NativeEvent {
3304 let diagnostic = native_diagnostic(
3305 "warn",
3306 "events",
3307 "events.overflowed",
3308 "Native Syncular event stream overflowed",
3309 [
3310 ("droppedCount", json!(dropped_count)),
3311 ("resyncRequired", json!(true)),
3312 ],
3313 );
3314 let mut event = native_event(
3315 NativeEventKind::EventsOverflowed,
3316 Vec::new(),
3317 Some(diagnostic.clone()),
3318 );
3319 event.dropped_count = Some(dropped_count);
3320 event.resync_required = Some(true);
3321 event.lifecycle = Some(native_lifecycle_state(
3322 NativeLifecyclePhase::Recovering,
3323 false,
3324 true,
3325 0,
3326 None,
3327 None,
3328 None,
3329 None,
3330 None,
3331 Some(diagnostic),
3332 ));
3333 event.payload_json = Some(json!({
3334 "type": "eventsOverflowed",
3335 "droppedCount": dropped_count,
3336 "resyncRequired": true
3337 }));
3338 event
3339}
3340
3341fn sync_started_event(command_id: Option<String>) -> NativeEvent {
3342 let diagnostic = native_diagnostic(
3343 "info",
3344 "sync",
3345 "sync.started",
3346 "Native Syncular sync started",
3347 std::iter::empty::<(&str, Value)>(),
3348 );
3349 let mut event = native_event(
3350 NativeEventKind::SyncStarted,
3351 Vec::new(),
3352 Some(diagnostic.clone()),
3353 );
3354 event.command_id = command_id;
3355 event.lifecycle = Some(native_lifecycle_state(
3356 NativeLifecyclePhase::Syncing,
3357 false,
3358 false,
3359 1,
3360 None,
3361 None,
3362 None,
3363 None,
3364 None,
3365 Some(diagnostic),
3366 ));
3367 event
3368}
3369
3370fn sync_completed_event(
3371 report: SyncReport,
3372 bootstrap: BootstrapStatus,
3373 command_id: Option<String>,
3374 outbox_count: usize,
3375 conflict_count: usize,
3376 duration_ms: u64,
3377) -> NativeEvent {
3378 let diagnostic = native_diagnostic(
3379 "info",
3380 "sync",
3381 "sync.completed",
3382 "Native Syncular sync completed",
3383 [
3384 ("changedTables", json!(report.changed_tables.clone())),
3385 ("changedTableCount", json!(report.changed_tables.len())),
3386 ("changedRows", json!(report.changed_rows.clone())),
3387 ("conflictsChanged", json!(report.conflicts_changed)),
3388 ("bootstrap", json!(bootstrap.clone())),
3389 ("outboxCount", json!(outbox_count)),
3390 ("conflictCount", json!(conflict_count)),
3391 ("durationMs", json!(duration_ms)),
3392 ],
3393 );
3394 let mut event = native_event(
3395 NativeEventKind::SyncCompleted,
3396 report.changed_tables.clone(),
3397 Some(diagnostic.clone()),
3398 );
3399 event.command_id = command_id;
3400 event.outbox_count = Some(outbox_count);
3401 event.conflict_count = Some(conflict_count);
3402 event.duration_ms = Some(duration_ms);
3403 event.changed_rows = report.changed_rows;
3404 event.lifecycle = Some(native_lifecycle_for_sync_completed(
3405 &bootstrap,
3406 outbox_count,
3407 conflict_count,
3408 diagnostic,
3409 ));
3410 event.bootstrap = Some(bootstrap);
3411 event
3412}
3413
3414fn sync_failed_event(
3415 error: &SyncularError,
3416 command_id: Option<String>,
3417 retry_scheduled: bool,
3418 duration_ms: u64,
3419) -> NativeEvent {
3420 match native_auth_info_from_error(error) {
3421 Some(auth) => {
3422 let operation = auth.operation.clone();
3423 let status = auth.status;
3424 let mut details = vec![
3425 ("operation", json!(operation)),
3426 ("status", json!(status)),
3427 ("retryScheduled", json!(retry_scheduled)),
3428 ("durationMs", json!(duration_ms)),
3429 ];
3430 push_native_error_details(&mut details, error);
3431 let diagnostic = native_diagnostic(
3432 "warn",
3433 "auth",
3434 "auth.expired",
3435 "Native Syncular auth expired",
3436 details,
3437 );
3438 let mut event = native_event(
3439 NativeEventKind::AuthExpired,
3440 Vec::new(),
3441 Some(diagnostic.clone()),
3442 );
3443 event.error = Some(NativeErrorInfo::from_error(error));
3444 event.auth = Some(auth);
3445 event.command_id = command_id;
3446 event.retry_scheduled = Some(retry_scheduled);
3447 event.duration_ms = Some(duration_ms);
3448 event.lifecycle = Some(native_lifecycle_for_error(
3449 error,
3450 retry_scheduled,
3451 diagnostic,
3452 ));
3453 event
3454 }
3455 None => {
3456 let resync_required = error.requires_full_snapshot_resync();
3457 let mut details = vec![
3458 ("retryScheduled", json!(retry_scheduled)),
3459 ("durationMs", json!(duration_ms)),
3460 ("resyncRequired", json!(resync_required)),
3461 ];
3462 push_native_error_details(&mut details, error);
3463 let diagnostic = native_diagnostic(
3464 "error",
3465 "sync",
3466 if resync_required {
3467 "sync.resync_required"
3468 } else {
3469 "sync.failed"
3470 },
3471 if resync_required {
3472 "Native Syncular sync requires full resync"
3473 } else {
3474 "Native Syncular sync failed"
3475 },
3476 details,
3477 );
3478 let mut event = native_event(
3479 NativeEventKind::SyncFailed,
3480 Vec::new(),
3481 Some(diagnostic.clone()),
3482 );
3483 event.error = Some(NativeErrorInfo::from_error(error));
3484 event.command_id = command_id;
3485 event.retry_scheduled = Some(retry_scheduled);
3486 event.duration_ms = Some(duration_ms);
3487 event.lifecycle = Some(native_lifecycle_for_error(
3488 error,
3489 retry_scheduled,
3490 diagnostic,
3491 ));
3492 if resync_required {
3493 event.resync_required = Some(true);
3494 event.payload_json = Some(json!({
3495 "type": "syncResyncRequired",
3496 "resyncRequired": true,
3497 "retryScheduled": retry_scheduled
3498 }));
3499 }
3500 event
3501 }
3502 }
3503}
3504
3505fn local_write_committed_event(
3506 command_id: String,
3507 client_commit_id: String,
3508 changed_tables: Vec<String>,
3509 changed_rows: Vec<SyncChangedRow>,
3510 outbox_count: usize,
3511 duration_ms: u64,
3512) -> NativeEvent {
3513 let diagnostic = native_diagnostic(
3514 "info",
3515 "storage",
3516 "storage.local_write_committed",
3517 "Native Syncular local write committed",
3518 [
3519 ("commandId", json!(command_id.clone())),
3520 ("clientCommitId", json!(client_commit_id.clone())),
3521 ("tables", json!(changed_tables.clone())),
3522 ("changedRows", json!(changed_rows.clone())),
3523 ("outboxCount", json!(outbox_count)),
3524 ("durationMs", json!(duration_ms)),
3525 ],
3526 );
3527 let mut event = native_event(
3528 NativeEventKind::LocalWriteCommitted,
3529 changed_tables.clone(),
3530 Some(diagnostic.clone()),
3531 );
3532 event.command_id = Some(command_id);
3533 event.client_commit_id = Some(client_commit_id);
3534 event.outbox_count = Some(outbox_count);
3535 event.duration_ms = Some(duration_ms);
3536 event.changed_rows = changed_rows;
3537 event.lifecycle = Some(native_lifecycle_state(
3538 NativeLifecyclePhase::Offline,
3539 false,
3540 false,
3541 0,
3542 None,
3543 Some(NativeLifecycleOutbox {
3544 pending: outbox_count,
3545 }),
3546 None,
3547 None,
3548 None,
3549 Some(diagnostic),
3550 ));
3551 event
3552}
3553
3554fn local_write_failed_event(
3555 error: &SyncularError,
3556 command_id: String,
3557 payload_json: Option<Value>,
3558 duration_ms: u64,
3559) -> NativeEvent {
3560 let mut details = vec![
3561 ("commandId", json!(command_id.clone())),
3562 ("durationMs", json!(duration_ms)),
3563 ];
3564 push_native_error_details(&mut details, error);
3565 let diagnostic = native_diagnostic(
3566 "error",
3567 "storage",
3568 "storage.local_write_failed",
3569 "Native Syncular local write failed",
3570 details,
3571 );
3572 let mut event = native_event(
3573 NativeEventKind::LocalWriteFailed,
3574 Vec::new(),
3575 Some(diagnostic.clone()),
3576 );
3577 event.error = Some(NativeErrorInfo::from_error(error));
3578 event.command_id = Some(command_id);
3579 event.payload_json = payload_json;
3580 event.duration_ms = Some(duration_ms);
3581 event.lifecycle = Some(native_lifecycle_for_error(error, false, diagnostic));
3582 event
3583}
3584
3585fn conflict_resolution_completed_event(
3586 command_id: String,
3587 retry_client_commit_id: Option<String>,
3588 duration_ms: u64,
3589) -> NativeEvent {
3590 let mut event = native_event(
3591 NativeEventKind::ConflictResolutionCompleted,
3592 Vec::new(),
3593 Some(native_diagnostic(
3594 "info",
3595 "sync",
3596 "sync.conflict_resolution_completed",
3597 "Native Syncular conflict resolution completed",
3598 [
3599 ("commandId", json!(command_id.clone())),
3600 ("retryClientCommitId", json!(retry_client_commit_id.clone())),
3601 ("durationMs", json!(duration_ms)),
3602 ],
3603 )),
3604 );
3605 event.command_id = Some(command_id);
3606 event.client_commit_id = retry_client_commit_id;
3607 event.duration_ms = Some(duration_ms);
3608 event
3609}
3610
3611fn conflict_resolution_failed_event(
3612 error: &SyncularError,
3613 command_id: String,
3614 duration_ms: u64,
3615) -> NativeEvent {
3616 let mut details = vec![
3617 ("commandId", json!(command_id.clone())),
3618 ("durationMs", json!(duration_ms)),
3619 ];
3620 push_native_error_details(&mut details, error);
3621 let diagnostic = native_diagnostic(
3622 "error",
3623 "sync",
3624 "sync.conflict_resolution_failed",
3625 "Native Syncular conflict resolution failed",
3626 details,
3627 );
3628 let mut event = native_event(
3629 NativeEventKind::ConflictResolutionFailed,
3630 Vec::new(),
3631 Some(diagnostic.clone()),
3632 );
3633 event.error = Some(NativeErrorInfo::from_error(error));
3634 event.command_id = Some(command_id);
3635 event.duration_ms = Some(duration_ms);
3636 event.lifecycle = Some(native_lifecycle_for_error(error, false, diagnostic));
3637 event
3638}
3639
3640fn snapshot_ready_event(command_id: String, payload_json: Value, duration_ms: u64) -> NativeEvent {
3641 let mut event = native_event(
3642 NativeEventKind::SnapshotReady,
3643 Vec::new(),
3644 Some(native_diagnostic(
3645 "info",
3646 "storage",
3647 "storage.snapshot_ready",
3648 "Native Syncular snapshot ready",
3649 [
3650 ("commandId", json!(command_id.clone())),
3651 ("durationMs", json!(duration_ms)),
3652 ],
3653 )),
3654 );
3655 event.command_id = Some(command_id);
3656 event.payload_json = Some(payload_json);
3657 event.duration_ms = Some(duration_ms);
3658 event
3659}
3660
3661fn worker_command_completed_diagnostic(
3662 command_id: &str,
3663 operation: &str,
3664 payload_json: Option<&Value>,
3665 duration_ms: u64,
3666) -> NativeDiagnostic {
3667 let mut details = vec![
3668 ("commandId", json!(command_id)),
3669 ("operation", json!(operation)),
3670 ("durationMs", json!(duration_ms)),
3671 ];
3672 if let Some(payload_json) = payload_json {
3673 details.push(("payload", payload_json.clone()));
3674 }
3675 let (source, code, message, level) = match operation {
3676 "processBlobUploadQueue" => {
3677 let failed = payload_json
3678 .and_then(|payload| payload.get("failed"))
3679 .and_then(Value::as_i64)
3680 .unwrap_or_default();
3681 (
3682 "blob",
3683 "blob.upload_queue_processed",
3684 "Native Syncular blob upload queue processed",
3685 if failed > 0 { "warn" } else { "info" },
3686 )
3687 }
3688 "pruneBlobCache" => (
3689 "blob",
3690 "blob.cache_pruned",
3691 "Native Syncular blob cache pruned",
3692 "info",
3693 ),
3694 "clearBlobCache" => (
3695 "blob",
3696 "blob.cache_cleared",
3697 "Native Syncular blob cache cleared",
3698 "info",
3699 ),
3700 "retrieveBlobFile" => (
3701 "blob",
3702 "blob.download_completed",
3703 "Native Syncular blob file retrieved",
3704 "info",
3705 ),
3706 "storeBlobFile" => (
3707 "blob",
3708 "blob.store_queued",
3709 "Native Syncular blob file stored through worker",
3710 "info",
3711 ),
3712 _ => (
3713 "worker",
3714 "worker.command_completed",
3715 "Native Syncular worker command completed",
3716 "info",
3717 ),
3718 };
3719 native_diagnostic(level, source, code, message, details)
3720}
3721
3722fn worker_command_failed_diagnostic(
3723 operation: &str,
3724 details: Vec<(&'static str, Value)>,
3725) -> NativeDiagnostic {
3726 let (source, code, message) = match operation {
3727 "retrieveBlobFile" => (
3728 "blob",
3729 "blob.download_failed",
3730 "Native Syncular blob file retrieval failed",
3731 ),
3732 "processBlobUploadQueue" => (
3733 "blob",
3734 "blob.upload_failed",
3735 "Native Syncular blob upload queue processing failed",
3736 ),
3737 "pruneBlobCache" => (
3738 "blob",
3739 "blob.cache_prune_failed",
3740 "Native Syncular blob cache pruning failed",
3741 ),
3742 "clearBlobCache" => (
3743 "blob",
3744 "blob.cache_clear_failed",
3745 "Native Syncular blob cache clearing failed",
3746 ),
3747 "storeBlobFile" => (
3748 "blob",
3749 "blob.store_failed",
3750 "Native Syncular blob file storage failed",
3751 ),
3752 _ => (
3753 "worker",
3754 "worker.command_failed",
3755 "Native Syncular worker command failed",
3756 ),
3757 };
3758 native_diagnostic("error", source, code, message, details)
3759}
3760
3761fn worker_command_completed_event(
3762 command_id: String,
3763 operation: &str,
3764 payload_json: Option<Value>,
3765 duration_ms: u64,
3766) -> NativeEvent {
3767 let diagnostic = worker_command_completed_diagnostic(
3768 &command_id,
3769 operation,
3770 payload_json.as_ref(),
3771 duration_ms,
3772 );
3773 let mut event = native_event(
3774 NativeEventKind::WorkerCommandCompleted,
3775 Vec::new(),
3776 Some(diagnostic),
3777 );
3778 event.command_id = Some(command_id);
3779 event.payload_json = payload_json;
3780 event.duration_ms = Some(duration_ms);
3781 event
3782}
3783
3784fn worker_command_failed_event(
3785 error: &SyncularError,
3786 command_id: String,
3787 operation: &str,
3788 duration_ms: u64,
3789) -> NativeEvent {
3790 let mut details = vec![
3791 ("commandId", json!(command_id.clone())),
3792 ("operation", json!(operation)),
3793 ("durationMs", json!(duration_ms)),
3794 ];
3795 push_native_error_details(&mut details, error);
3796 let diagnostic = worker_command_failed_diagnostic(operation, details);
3797 let mut event = native_event(
3798 NativeEventKind::WorkerCommandFailed,
3799 Vec::new(),
3800 Some(diagnostic.clone()),
3801 );
3802 event.error = Some(NativeErrorInfo::from_error(error));
3803 event.command_id = Some(command_id);
3804 event.duration_ms = Some(duration_ms);
3805 event.lifecycle = Some(native_lifecycle_for_error(error, false, diagnostic));
3806 event
3807}
3808
3809fn blob_uploads_changed_event(stats_json: Value) -> NativeEvent {
3810 let blob_uploads = native_lifecycle_blob_uploads(&stats_json);
3811 let failed_count = blob_uploads.as_ref().map_or(0, |stats| stats.failed);
3812 let diagnostic = native_diagnostic(
3813 if failed_count > 0 { "warn" } else { "info" },
3814 "blob",
3815 "blob.uploads_changed",
3816 "Native Syncular blob upload queue changed",
3817 [("blobUploads", stats_json.clone())],
3818 );
3819 let mut event = native_event(
3820 NativeEventKind::BlobUploadsChanged,
3821 Vec::new(),
3822 Some(diagnostic.clone()),
3823 );
3824 event.payload_json = Some(stats_json);
3825 event.lifecycle = Some(native_lifecycle_state(
3826 if failed_count > 0 {
3827 NativeLifecyclePhase::Degraded
3828 } else {
3829 NativeLifecyclePhase::Offline
3830 },
3831 false,
3832 failed_count > 0,
3833 0,
3834 None,
3835 None,
3836 None,
3837 blob_uploads,
3838 None,
3839 Some(diagnostic),
3840 ));
3841 event
3842}
3843
3844fn native_event(
3845 kind: NativeEventKind,
3846 tables: Vec<String>,
3847 diagnostic: Option<NativeDiagnostic>,
3848) -> NativeEvent {
3849 NativeEvent {
3850 event_seq: 0,
3851 kind,
3852 error: None,
3853 command_id: None,
3854 client_commit_id: None,
3855 outbox_count: None,
3856 conflict_count: None,
3857 retry_scheduled: None,
3858 duration_ms: None,
3859 dropped_count: None,
3860 resync_required: None,
3861 auth: None,
3862 diagnostic,
3863 tables,
3864 changed_rows: Vec::new(),
3865 queries: Vec::new(),
3866 bootstrap: None,
3867 lifecycle: None,
3868 payload_json: None,
3869 }
3870}
3871
3872fn diagnostic_event(diagnostic: NativeDiagnostic) -> NativeEvent {
3873 native_event(NativeEventKind::Diagnostic, Vec::new(), Some(diagnostic))
3874}
3875
3876fn blob_store_diagnostic(blob: &BlobRef, immediate: bool, cache_local: bool) -> NativeDiagnostic {
3877 let mut details = blob_ref_details(blob);
3878 details.push(("immediate", json!(immediate)));
3879 details.push(("cacheLocal", json!(cache_local)));
3880 native_diagnostic(
3881 "info",
3882 "blob",
3883 if immediate {
3884 "blob.upload_completed"
3885 } else {
3886 "blob.store_queued"
3887 },
3888 if immediate {
3889 "Native Syncular blob stored and uploaded"
3890 } else {
3891 "Native Syncular blob stored and queued for upload"
3892 },
3893 details,
3894 )
3895}
3896
3897fn blob_cache_lookup_diagnostic(hash: &str, local: bool) -> NativeDiagnostic {
3898 native_diagnostic(
3899 "debug",
3900 "blob",
3901 if local {
3902 "blob.cache_hit"
3903 } else {
3904 "blob.cache_miss"
3905 },
3906 if local {
3907 "Native Syncular blob is available in local cache"
3908 } else {
3909 "Native Syncular blob is not available in local cache"
3910 },
3911 [("hash", json!(hash))],
3912 )
3913}
3914
3915fn blob_cache_retrieve_diagnostic(blob: &BlobRef, was_local: bool) -> NativeDiagnostic {
3916 native_diagnostic(
3917 "info",
3918 "blob",
3919 if was_local {
3920 "blob.cache_hit"
3921 } else {
3922 "blob.cache_miss"
3923 },
3924 if was_local {
3925 "Native Syncular blob served from local cache"
3926 } else {
3927 "Native Syncular blob fetched after local cache miss"
3928 },
3929 blob_ref_details(blob),
3930 )
3931}
3932
3933fn blob_download_failed_diagnostic(blob: &BlobRef, error: &SyncularError) -> NativeDiagnostic {
3934 let mut details = blob_ref_details(blob);
3935 push_native_error_details(&mut details, error);
3936 native_diagnostic(
3937 "warn",
3938 "blob",
3939 "blob.download_failed",
3940 "Native Syncular blob download failed",
3941 details,
3942 )
3943}
3944
3945fn blob_upload_queue_processed_diagnostic(payload_json: Value) -> NativeDiagnostic {
3946 let failed = payload_json
3947 .get("failed")
3948 .and_then(Value::as_i64)
3949 .unwrap_or_default();
3950 native_diagnostic(
3951 if failed > 0 { "warn" } else { "info" },
3952 "blob",
3953 "blob.upload_queue_processed",
3954 "Native Syncular blob upload queue processed",
3955 [("result", payload_json)],
3956 )
3957}
3958
3959fn blob_cache_pruned_diagnostic(pruned_bytes: i64, max_bytes: Option<i64>) -> NativeDiagnostic {
3960 let mut details = vec![("prunedBytes", json!(pruned_bytes))];
3961 if let Some(max_bytes) = max_bytes {
3962 details.push(("maxBytes", json!(max_bytes)));
3963 }
3964 native_diagnostic(
3965 "info",
3966 "blob",
3967 "blob.cache_pruned",
3968 "Native Syncular blob cache pruned",
3969 details,
3970 )
3971}
3972
3973fn blob_cache_cleared_diagnostic() -> NativeDiagnostic {
3974 native_diagnostic(
3975 "info",
3976 "blob",
3977 "blob.cache_cleared",
3978 "Native Syncular blob cache cleared",
3979 std::iter::empty::<(&str, Value)>(),
3980 )
3981}
3982
3983fn blob_ref_details(blob: &BlobRef) -> Vec<(&'static str, Value)> {
3984 let mut details = vec![
3985 ("hash", json!(blob.hash.clone())),
3986 ("size", json!(blob.size)),
3987 ("mimeType", json!(blob.mime_type.clone())),
3988 ("encrypted", json!(blob.encrypted)),
3989 ];
3990 if let Some(key_id) = &blob.key_id {
3991 details.push(("keyId", json!(key_id)));
3992 }
3993 details
3994}
3995
3996fn native_diagnostic<'a>(
3997 level: &str,
3998 source: &str,
3999 code: &str,
4000 message: &str,
4001 details: impl IntoIterator<Item = (&'a str, Value)>,
4002) -> NativeDiagnostic {
4003 NativeDiagnostic {
4004 at: now_ms(),
4005 level: level.to_string(),
4006 source: source.to_string(),
4007 code: code.to_string(),
4008 message: message.to_string(),
4009 details: details
4010 .into_iter()
4011 .map(|(key, value)| (key.to_string(), value))
4012 .collect(),
4013 }
4014}
4015
4016fn push_native_error_details(details: &mut Vec<(&'static str, Value)>, error: &SyncularError) {
4017 let classification = error.classification();
4018 details.push(("errorKind", json!(format!("{:?}", error.kind()))));
4019 details.push(("errorCode", json!(classification.code)));
4020 details.push(("errorCategory", json!(classification.category)));
4021 details.push(("retryable", json!(classification.retryable)));
4022 details.push((
4023 "recommendedAction",
4024 json!(classification.recommended_action),
4025 ));
4026}
4027
4028fn native_auth_info_from_error(error: &SyncularError) -> Option<NativeAuthInfo> {
4029 if error.kind() != ErrorKind::Transport {
4030 return None;
4031 }
4032
4033 let message = error.message_text();
4034 let classification = error.classification();
4035 if classification.code != "sync.auth_required" {
4036 return None;
4037 }
4038 Some(NativeAuthInfo {
4039 operation: auth_operation_from_message(&message).to_string(),
4040 status: 401,
4041 })
4042}
4043
4044fn auth_operation_from_message(message: &str) -> &'static str {
4045 if message.contains("snapshot chunk failed") {
4046 "snapshotChunk"
4047 } else if message.contains("blob upload init failed") {
4048 "blobInitiateUpload"
4049 } else if message.contains("blob upload complete failed") {
4050 "blobCompleteUpload"
4051 } else if message.contains("blob download url failed") {
4052 "blobGetDownloadUrl"
4053 } else {
4054 "sync"
4055 }
4056}
4057
4058fn normalize_observed_tables(tables: Vec<String>, app_schema: AppSchema) -> Result<Vec<String>> {
4059 if tables.is_empty() {
4060 return Err(SyncularError::config(
4061 "native observed query must depend on at least one app table",
4062 ));
4063 }
4064
4065 let mut normalized = BTreeSet::new();
4066 for table in tables {
4067 let table = table.trim();
4068 if table.is_empty() {
4069 return Err(SyncularError::config(
4070 "native observed query table is empty",
4071 ));
4072 }
4073 validate_app_table_name(table, app_schema)?;
4074 normalized.insert(table.to_string());
4075 }
4076 Ok(normalized.into_iter().collect())
4077}
4078
4079fn normalize_observed_query_dependency_hints(
4080 hints: Vec<NativeObservedQueryDependencyHint>,
4081 observed_tables: &[String],
4082 app_schema: AppSchema,
4083) -> Result<Vec<NativeObservedQueryDependencyHint>> {
4084 let observed_tables = observed_tables
4085 .iter()
4086 .map(String::as_str)
4087 .collect::<BTreeSet<_>>();
4088 let mut normalized = Vec::new();
4089 for hint in hints {
4090 let table = hint.table.trim();
4091 if table.is_empty() {
4092 return Err(SyncularError::config(
4093 "native observed query dependency hint table is empty",
4094 ));
4095 }
4096 if !observed_tables.contains(table) {
4097 return Err(SyncularError::config(format!(
4098 "native observed query dependency hint table {table} is not one of the observed tables"
4099 )));
4100 }
4101 validate_app_table_name(table, app_schema)?;
4102 let metadata = app_schema.table_metadata(table).ok_or_else(|| {
4103 SyncularError::config(format!("unknown generated app table: {table}"))
4104 })?;
4105
4106 let mut row_ids = BTreeSet::new();
4107 for row_id in hint.row_ids {
4108 if row_id.is_empty() {
4109 return Err(SyncularError::config(
4110 "native observed query dependency hint row id is empty",
4111 ));
4112 }
4113 row_ids.insert(row_id);
4114 }
4115
4116 let mut fields = BTreeSet::new();
4117 for field in hint.fields {
4118 let field = field.trim();
4119 if field.is_empty() {
4120 return Err(SyncularError::config(
4121 "native observed query dependency hint field is empty",
4122 ));
4123 }
4124 validate_app_column_name(metadata, field)?;
4125 fields.insert(field.to_string());
4126 }
4127
4128 normalized.push(NativeObservedQueryDependencyHint {
4129 table: table.to_string(),
4130 row_ids: row_ids.into_iter().collect(),
4131 fields: fields.into_iter().collect(),
4132 });
4133 }
4134 Ok(normalized)
4135}
4136
4137fn observed_query_should_notify(
4138 query: &NativeObservedQuery,
4139 changed_tables: &BTreeSet<&str>,
4140 changed_rows: &[SyncChangedRow],
4141) -> bool {
4142 let affected_tables = query
4143 .tables
4144 .iter()
4145 .filter(|table| changed_tables.contains(table.as_str()))
4146 .collect::<Vec<_>>();
4147 if affected_tables.is_empty() {
4148 return false;
4149 }
4150 if query.dependency_hints.is_empty() || changed_rows.is_empty() {
4151 return true;
4152 }
4153
4154 for table in affected_tables {
4155 let table_rows = changed_rows
4156 .iter()
4157 .filter(|row| row.table == table.as_str())
4158 .collect::<Vec<_>>();
4159 if table_rows.is_empty() {
4160 return true;
4161 }
4162 let table_hints = query
4163 .dependency_hints
4164 .iter()
4165 .filter(|hint| hint.table == table.as_str())
4166 .collect::<Vec<_>>();
4167 if table_hints.is_empty() {
4168 return true;
4169 }
4170 if table_rows.iter().any(|row| {
4171 table_hints
4172 .iter()
4173 .any(|hint| hint_matches_changed_row(hint, row))
4174 }) {
4175 return true;
4176 }
4177 }
4178 false
4179}
4180
4181fn hint_matches_changed_row(
4182 hint: &NativeObservedQueryDependencyHint,
4183 row: &SyncChangedRow,
4184) -> bool {
4185 let Some(row_id) = row.row_id.as_deref() else {
4186 return true;
4187 };
4188 if !hint.row_ids.is_empty() && !hint.row_ids.iter().any(|id| id == row_id) {
4189 return false;
4190 }
4191 if hint.fields.is_empty() || row.changed_fields.is_empty() {
4192 return true;
4193 }
4194 row.changed_fields
4195 .iter()
4196 .any(|field| hint.fields.iter().any(|hint_field| hint_field == field))
4197}
4198
4199fn validate_app_table_name(table: &str, app_schema: AppSchema) -> Result<()> {
4200 if app_schema.table_metadata(table).is_some() {
4201 return Ok(());
4202 }
4203
4204 Err(SyncularError::config(format!(
4205 "unknown generated app table: {table}"
4206 )))
4207}
4208
4209fn validate_app_column_name(metadata: &AppTableMetadata, column: &str) -> Result<()> {
4210 if metadata.primary_key_column == column
4211 || metadata.server_version_column == column
4212 || metadata
4213 .columns
4214 .iter()
4215 .any(|metadata| metadata.name == column)
4216 {
4217 return Ok(());
4218 }
4219
4220 Err(SyncularError::config(format!(
4221 "unknown generated app column {}.{}",
4222 metadata.name, column
4223 )))
4224}
4225
4226fn unique_event_tables<'a>(tables: impl IntoIterator<Item = &'a str>) -> Vec<String> {
4227 let mut seen = BTreeSet::new();
4228 let mut unique = Vec::new();
4229 for table in tables {
4230 if seen.insert(table) {
4231 unique.push(table.to_string());
4232 }
4233 }
4234 unique
4235}
4236
4237#[cfg(test)]
4238mod tests {
4239 use super::*;
4240
4241 fn first_native_event_json(event: SyncWorkerEvent) -> Value {
4242 let json_events = native_event_json_from_worker_event(event).unwrap();
4243 serde_json::from_str(&json_events[0]).unwrap()
4244 }
4245
4246 #[test]
4247 fn native_sync_failed_exposes_server_error_classification() {
4248 let value = first_native_event_json(SyncWorkerEvent::SyncFailed {
4249 command_id: Some("cmd-1".to_string()),
4250 error: SyncularError::message(
4251 ErrorKind::Transport,
4252 r#"sync failed with HTTP 403: {"error":"sync.forbidden","code":"sync.forbidden","category":"forbidden","retryable":false,"recommendedAction":"checkPermissions","message":"Forbidden"}"#,
4253 ),
4254 retry_scheduled: false,
4255 duration_ms: 12,
4256 });
4257
4258 assert_eq!(value["kind"], "SyncFailed");
4259 assert_eq!(value["error"]["code"], "sync.forbidden");
4260 assert_eq!(value["error"]["category"], "forbidden");
4261 assert_eq!(value["error"]["retryable"], false);
4262 assert_eq!(value["error"]["recommendedAction"], "checkPermissions");
4263 assert_eq!(
4264 value["diagnostic"]["details"]["errorCode"],
4265 "sync.forbidden"
4266 );
4267 assert_eq!(
4268 value["diagnostic"]["details"]["recommendedAction"],
4269 "checkPermissions"
4270 );
4271 }
4272
4273 #[test]
4274 fn native_sync_failed_maps_auth_required_to_auth_expired_event() {
4275 let value = first_native_event_json(SyncWorkerEvent::SyncFailed {
4276 command_id: Some("cmd-2".to_string()),
4277 error: SyncularError::message(ErrorKind::Transport, "sync failed with HTTP 401"),
4278 retry_scheduled: true,
4279 duration_ms: 34,
4280 });
4281
4282 assert_eq!(value["kind"], "AuthExpired");
4283 assert_eq!(value["auth"]["status"], 401);
4284 assert_eq!(value["error"]["code"], "sync.auth_required");
4285 assert_eq!(value["error"]["category"], "auth-required");
4286 assert_eq!(value["error"]["retryable"], true);
4287 assert_eq!(value["error"]["recommendedAction"], "refreshAuth");
4288 }
4289
4290 #[test]
4291 fn recent_diagnostic_event_payload_is_redacted_when_too_large() {
4292 let mut event = native_event(NativeEventKind::WorkerCommandCompleted, Vec::new(), None);
4293 event.payload_json = Some(json!({
4294 "body": "x".repeat(MAX_NATIVE_DIAGNOSTIC_EVENT_PAYLOAD_JSON_BYTES + 1)
4295 }));
4296
4297 let event = native_event_for_recent_diagnostics(event);
4298 let payload = event.payload_json.expect("redacted payload");
4299 assert_eq!(payload["truncated"], true);
4300 assert_eq!(payload["reason"], "diagnosticPayloadLimit");
4301 assert_eq!(payload["limit"], "maxNativeDiagnosticEventPayloadJsonBytes");
4302 assert!(
4303 payload["originalBytes"].as_u64().unwrap()
4304 > MAX_NATIVE_DIAGNOSTIC_EVENT_PAYLOAD_JSON_BYTES as u64
4305 );
4306 }
4307
4308 #[test]
4309 fn native_local_write_failed_exposes_storage_classification() {
4310 let value = first_native_event_json(SyncWorkerEvent::LocalWriteFailed {
4311 command_id: "cmd-3".to_string(),
4312 error: SyncularError::message(ErrorKind::Storage, "database is locked"),
4313 payload_json: None,
4314 duration_ms: 56,
4315 });
4316
4317 assert_eq!(value["kind"], "LocalWriteFailed");
4318 assert_eq!(value["error"]["code"], "storage.failed");
4319 assert_eq!(value["error"]["category"], "storage");
4320 assert_eq!(value["error"]["recommendedAction"], "inspectStorage");
4321 assert_eq!(value["diagnostic"]["details"]["errorCategory"], "storage");
4322 }
4323}