Skip to main content

syncular_runtime/native/
facade.rs

1use crate::app_schema::{app_schema_from_json, default_app_schema, AppSchema, AppTableMetadata};
2use crate::client::{
3    sync_changed_crdt_field_from_metadata, sync_changed_row_for_local_operation, BootstrapStatus,
4    CrdtFieldCompactionReceipt, CrdtFieldMaterialization, CrdtFieldWriteReceipt, SubscriptionSpec,
5    SyncChangedRow, SyncReport, SyncularClient, SyncularClientConfig,
6};
7use crate::crdt_field::{CrdtField, CrdtFieldId, CrdtFieldSyncMode};
8use crate::crdt_yjs::{
9    validate_crdt_request_json_size, validate_yjs_text_input_size,
10    validate_yjs_update_envelope_size, YjsUpdateEnvelope,
11};
12use crate::diesel_sqlite::DieselSqliteStore;
13use crate::encrypted_crdt::{EncryptedCrdt, CRDT_CHECKPOINTS_TABLE, CRDT_UPDATES_TABLE};
14use crate::encryption::{encryption_helpers_json, BlobEncryption, FieldEncryption};
15use crate::error::{ErrorKind, Result, SyncularError};
16use crate::limits::{
17    runtime_default_limits, RuntimeLimits, DEFAULT_CRDT_UPDATE_LOG_LIMIT,
18    DEFAULT_NATIVE_EVENT_STREAM_CAPACITY, DEFAULT_NATIVE_RECENT_EVENT_LIMIT,
19    DEFAULT_READONLY_QUERY_STATEMENT_CACHE_CAPACITY,
20    MAX_NATIVE_DIAGNOSTIC_EVENT_PAYLOAD_JSON_BYTES,
21};
22use crate::protocol::{
23    validate_mutation_json_input_size, AuthLeaseProvenance, BlobRef, BootstrapState,
24};
25use crate::runtime_schema::runtime_schema_version;
26use crate::sqlite_query::ReadonlySqlQueryExecutor;
27use crate::store::{now_ms, AuthLeaseRecord, ConflictSummary, OutboxSummary};
28use crate::transport::{
29    HttpSyncTransport, RealtimeEvent, RealtimePresenceEntry, RealtimePresenceEvent,
30    SyncAuthHeaderStore, SyncAuthHeaders, SyncTransportConfig,
31};
32use crate::worker::{
33    PersistentRealtimeWorker, SyncWorker, SyncWorkerEvent, SyncWorkerEventSubscription,
34};
35use serde::{Deserialize, Serialize};
36use serde_json::{json, Map, Value};
37use std::collections::{BTreeMap, BTreeSet, VecDeque};
38use std::path::Path;
39use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
40use std::sync::{Arc, Condvar, Mutex};
41use std::thread::{self, JoinHandle};
42use std::time::Duration;
43use uuid::Uuid;
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NativeClientConfig {
47    pub db_path: String,
48    pub base_url: String,
49    pub client_id: String,
50    pub actor_id: String,
51    pub project_id: Option<String>,
52    #[serde(default)]
53    pub app_schema_json: Option<String>,
54}
55
56#[derive(Debug, Clone)]
57pub struct NativeClientOptions {
58    pub auto_sync_local_writes: bool,
59}
60
61impl Default for NativeClientOptions {
62    fn default() -> Self {
63        Self {
64            auto_sync_local_writes: true,
65        }
66    }
67}
68
69pub struct NativeSyncularClient {
70    config: SyncularClientConfig,
71    writer: SyncularClient<DieselSqliteStore, HttpSyncTransport>,
72    worker: Option<SyncWorker>,
73    worker_event_pump: Option<JoinHandle<()>>,
74    realtime_worker: Option<PersistentRealtimeWorker>,
75    presence_by_scope: Arc<Mutex<BTreeMap<String, Vec<NativePresenceEntry>>>>,
76    auth_headers: SyncAuthHeaders,
77    field_encryption: Option<FieldEncryption>,
78    encrypted_crdt: Option<EncryptedCrdt>,
79    blob_encryption: Option<BlobEncryption>,
80    auto_sync_local_writes: bool,
81    shutdown_on_drop: bool,
82    command_seq: Mutex<u64>,
83    events: NativeEventHub,
84    default_events: NativeEventSubscription,
85    read_executor: Mutex<ReadonlySqlQueryExecutor>,
86}
87
88pub struct NativeClientOpenTask {
89    command_id: String,
90    result_rx: Option<Receiver<Result<NativeSyncularClient>>>,
91    completed: Option<Result<NativeSyncularClient>>,
92    finished: bool,
93    taken: bool,
94}
95
96#[derive(Debug, Clone)]
97pub struct NativeSyncularClientBuilder {
98    config: NativeClientConfig,
99    options: NativeClientOptions,
100    realtime: bool,
101    auth_headers: Option<SyncAuthHeaders>,
102    subscriptions: Option<Vec<SubscriptionSpec>>,
103    initial_sync: bool,
104    initial_websocket_sync: bool,
105    process_blob_uploads_on_open: bool,
106    shutdown_on_drop: bool,
107}
108
109pub struct NativePresenceHandle<'a> {
110    client: &'a mut NativeSyncularClient,
111    scope_key: String,
112    active: bool,
113}
114
115pub const NATIVE_FFI_ABI_VERSION: u32 = 2;
116
117#[derive(Debug, Clone, Serialize)]
118pub struct NativeRuntimeManifest {
119    pub ffi_abi_version: u32,
120    pub crate_name: &'static str,
121    pub crate_version: &'static str,
122    pub schema_version: i32,
123    pub storage_backend: &'static str,
124    pub transport_backends: &'static [&'static str],
125    pub worker_model: &'static str,
126    pub string_encoding: &'static str,
127    pub error_shape: &'static str,
128    pub event_model: &'static str,
129    pub limits: RuntimeLimits,
130    pub capabilities: &'static [&'static str],
131    pub app_tables: &'static [&'static str],
132    pub app_table_metadata: &'static [AppTableMetadata],
133}
134
135#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
136pub enum NativeEventKind {
137    SyncStarted,
138    SyncCompleted,
139    SyncFailed,
140    AuthExpired,
141    LocalWriteCommitted,
142    LocalWriteFailed,
143    ConflictResolutionCompleted,
144    ConflictResolutionFailed,
145    SnapshotReady,
146    CrdtFieldChanged,
147    CrdtFieldCompacted,
148    WorkerCommandCompleted,
149    WorkerCommandFailed,
150    RowsChanged,
151    QueriesChanged,
152    ConflictsChanged,
153    PresenceChanged,
154    BlobUploadsChanged,
155    Diagnostic,
156    EventsOverflowed,
157}
158
159#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
160#[serde(rename_all = "camelCase")]
161pub enum NativeLifecyclePhase {
162    Offline,
163    Syncing,
164    Recovering,
165    AuthRequired,
166    Degraded,
167    Complete,
168    Closed,
169}
170
171#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
172#[serde(rename_all = "camelCase")]
173pub struct NativeLifecycleBootstrap {
174    pub complete: bool,
175    pub critical_ready: bool,
176    pub interactive_ready: bool,
177    pub is_bootstrapping: bool,
178    pub progress_percent: i64,
179}
180
181#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
182#[serde(rename_all = "camelCase")]
183pub struct NativeLifecycleOutbox {
184    pub pending: usize,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
188#[serde(rename_all = "camelCase")]
189pub struct NativeLifecycleConflicts {
190    pub unresolved: usize,
191}
192
193#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
194#[serde(rename_all = "camelCase")]
195pub struct NativeLifecycleBlobUploads {
196    pub pending: i64,
197    pub uploading: i64,
198    pub failed: i64,
199}
200
201#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
202#[serde(rename_all = "camelCase")]
203pub struct NativeLifecycleState {
204    pub phase: NativeLifecyclePhase,
205    pub online: bool,
206    pub requires_action: bool,
207    pub pending_requests: usize,
208    #[serde(default, skip_serializing_if = "Option::is_none")]
209    pub bootstrap: Option<NativeLifecycleBootstrap>,
210    #[serde(default, skip_serializing_if = "Option::is_none")]
211    pub outbox: Option<NativeLifecycleOutbox>,
212    #[serde(default, skip_serializing_if = "Option::is_none")]
213    pub conflicts: Option<NativeLifecycleConflicts>,
214    #[serde(default, skip_serializing_if = "Option::is_none")]
215    pub blob_uploads: Option<NativeLifecycleBlobUploads>,
216    #[serde(default, skip_serializing_if = "Option::is_none")]
217    pub last_error: Option<NativeErrorInfo>,
218    #[serde(default, skip_serializing_if = "Option::is_none")]
219    pub last_diagnostic: Option<NativeDiagnostic>,
220}
221
222#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
223pub struct NativeErrorInfo {
224    pub kind: ErrorKind,
225    pub code: String,
226    pub category: String,
227    pub retryable: bool,
228    #[serde(rename = "recommendedAction")]
229    pub recommended_action: String,
230    pub message: String,
231    #[serde(skip_serializing_if = "Option::is_none")]
232    pub debug: Option<String>,
233}
234
235#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
236pub struct NativeEvent {
237    #[serde(default)]
238    pub event_seq: u64,
239    pub kind: NativeEventKind,
240    pub error: Option<NativeErrorInfo>,
241    #[serde(default, skip_serializing_if = "Option::is_none")]
242    pub command_id: Option<String>,
243    #[serde(default, skip_serializing_if = "Option::is_none")]
244    pub client_commit_id: Option<String>,
245    #[serde(default, skip_serializing_if = "Option::is_none")]
246    pub outbox_count: Option<usize>,
247    #[serde(default, skip_serializing_if = "Option::is_none")]
248    pub conflict_count: Option<usize>,
249    #[serde(default, skip_serializing_if = "Option::is_none")]
250    pub retry_scheduled: Option<bool>,
251    #[serde(default, skip_serializing_if = "Option::is_none")]
252    pub duration_ms: Option<u64>,
253    #[serde(
254        default,
255        rename = "droppedCount",
256        skip_serializing_if = "Option::is_none"
257    )]
258    pub dropped_count: Option<usize>,
259    #[serde(
260        default,
261        rename = "resyncRequired",
262        skip_serializing_if = "Option::is_none"
263    )]
264    pub resync_required: Option<bool>,
265    #[serde(default, skip_serializing_if = "Option::is_none")]
266    pub auth: Option<NativeAuthInfo>,
267    #[serde(default, skip_serializing_if = "Option::is_none")]
268    pub diagnostic: Option<NativeDiagnostic>,
269    pub tables: Vec<String>,
270    #[serde(default, rename = "changedRows", skip_serializing_if = "Vec::is_empty")]
271    pub changed_rows: Vec<SyncChangedRow>,
272    #[serde(default)]
273    pub queries: Vec<String>,
274    #[serde(default, skip_serializing_if = "Option::is_none")]
275    pub bootstrap: Option<BootstrapStatus>,
276    #[serde(default, skip_serializing_if = "Option::is_none")]
277    pub lifecycle: Option<NativeLifecycleState>,
278    #[serde(default, skip_serializing_if = "Option::is_none")]
279    pub payload_json: Option<Value>,
280}
281
282#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
283pub struct NativeDiagnostic {
284    pub at: i64,
285    pub level: String,
286    pub source: String,
287    pub code: String,
288    pub message: String,
289    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
290    pub details: BTreeMap<String, Value>,
291}
292
293#[derive(Debug, Clone, Serialize)]
294#[serde(rename_all = "camelCase")]
295pub struct NativeDiagnosticSnapshot {
296    pub generated_at: i64,
297    pub runtime: NativeRuntimeManifest,
298    pub connection: NativeDiagnosticConnectionSnapshot,
299    pub subscriptions: Vec<NativeDiagnosticSubscriptionSnapshot>,
300    pub recent_events: Vec<NativeEvent>,
301    pub recent_diagnostics: Vec<NativeDiagnostic>,
302    pub recent_sync_timings: Vec<NativeSyncTimingSnapshot>,
303    pub limits: RuntimeLimits,
304    pub bootstrap: BootstrapStatus,
305    pub outbox_stats: NativeOutboxStats,
306    pub conflict_stats: NativeConflictStats,
307    pub blob_upload_queue_stats: Value,
308    pub blob_cache_stats: Value,
309    pub observed_queries: Vec<NativeObservedQuery>,
310}
311
312#[derive(Debug, Clone, Serialize)]
313#[serde(rename_all = "camelCase")]
314pub struct NativeDiagnosticConnectionSnapshot {
315    pub sync_worker_running: bool,
316    pub realtime_worker_running: bool,
317    pub auto_sync_local_writes: bool,
318    pub event_subscriber_count: usize,
319    pub observed_query_count: usize,
320}
321
322#[derive(Debug, Clone, Serialize)]
323#[serde(rename_all = "camelCase")]
324pub struct NativeDiagnosticSubscriptionSnapshot {
325    pub id: String,
326    pub table: String,
327    pub scope_keys: Vec<String>,
328    pub scope_value_count: usize,
329    pub params_keys: Vec<String>,
330    pub params_value_count: usize,
331    pub status: Option<String>,
332    pub ready: bool,
333    pub phase: String,
334    pub progress_percent: i64,
335    pub cursor: Option<i64>,
336    pub bootstrap_phase: i64,
337    pub bootstrap_state: Option<BootstrapState>,
338}
339
340#[derive(Debug, Clone, Default, Serialize)]
341#[serde(rename_all = "camelCase")]
342pub struct NativeOutboxStats {
343    pub pending: usize,
344    pub sending: usize,
345    pub failed: usize,
346    pub acked: usize,
347    pub total: usize,
348}
349
350#[derive(Debug, Clone, Default, Serialize)]
351#[serde(rename_all = "camelCase")]
352pub struct NativeConflictStats {
353    pub unresolved: usize,
354    pub resolved: usize,
355    pub total: usize,
356}
357
358#[derive(Debug, Clone, Serialize)]
359#[serde(rename_all = "camelCase")]
360pub struct NativeSyncTimingSnapshot {
361    pub event_seq: u64,
362    pub kind: String,
363    pub command_id: Option<String>,
364    pub total_ms: u64,
365    pub success: bool,
366    pub retry_scheduled: Option<bool>,
367    pub outbox_count: Option<usize>,
368    pub conflict_count: Option<usize>,
369}
370
371#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
372pub struct NativeAuthInfo {
373    pub operation: String,
374    pub status: u16,
375}
376
377#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
378#[serde(rename_all = "camelCase")]
379pub struct NativePresenceEntry {
380    pub client_id: String,
381    pub actor_id: String,
382    pub joined_at: i64,
383    #[serde(default, skip_serializing_if = "Option::is_none")]
384    pub metadata: Option<Value>,
385}
386
387#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
388#[serde(rename_all = "camelCase")]
389pub struct NativeObservedQueryDependencyHint {
390    pub table: String,
391    #[serde(default, skip_serializing_if = "Vec::is_empty")]
392    pub row_ids: Vec<String>,
393    #[serde(default, skip_serializing_if = "Vec::is_empty")]
394    pub fields: Vec<String>,
395}
396
397#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
398pub struct NativeObservedQuery {
399    pub id: String,
400    pub tables: Vec<String>,
401    #[serde(
402        default,
403        rename = "dependencyHints",
404        skip_serializing_if = "Vec::is_empty"
405    )]
406    pub dependency_hints: Vec<NativeObservedQueryDependencyHint>,
407    #[serde(default, skip_serializing_if = "Option::is_none")]
408    pub label: Option<String>,
409}
410
411#[derive(Clone, Default)]
412struct NativeEventHub {
413    event_seq: Arc<Mutex<u64>>,
414    subscriber_seq: Arc<Mutex<u64>>,
415    subscribers: Arc<Mutex<BTreeMap<u64, Arc<NativeEventQueue>>>>,
416    query_observers: Arc<Mutex<BTreeMap<String, NativeObservedQuery>>>,
417    recent_events: Arc<Mutex<VecDeque<NativeEvent>>>,
418}
419
420pub struct NativeEventSubscription {
421    hub: NativeEventHub,
422    subscriber_id: u64,
423    queue: Arc<NativeEventQueue>,
424}
425
426pub struct NativeEventJsonIterator {
427    subscription: NativeEventSubscription,
428}
429
430struct NativeEventQueue {
431    capacity: usize,
432    state: Mutex<NativeEventQueueState>,
433    ready: Condvar,
434}
435
436struct NativeEventQueueState {
437    events: VecDeque<NativeEvent>,
438    closed: bool,
439}
440
441#[derive(Debug, Clone, Deserialize)]
442struct NativeObservedQueryRegistration {
443    #[serde(default)]
444    id: Option<String>,
445    tables: Vec<String>,
446    #[serde(default, rename = "dependencyHints")]
447    dependency_hints: Vec<NativeObservedQueryDependencyHint>,
448    #[serde(default)]
449    label: Option<String>,
450}
451
452#[derive(Debug, Clone, Deserialize)]
453#[serde(rename_all = "camelCase")]
454struct NativeEncryptedCrdtRequest {
455    table: String,
456}
457
458#[derive(Debug, Clone, Deserialize)]
459#[serde(rename_all = "camelCase")]
460struct NativeCrdtFieldRequest {
461    table: String,
462    row_id: String,
463    field: String,
464}
465
466#[derive(Debug, Clone, Deserialize)]
467#[serde(rename_all = "camelCase")]
468struct NativeCrdtFieldTextRequest {
469    table: String,
470    row_id: String,
471    field: String,
472    next_text: String,
473}
474
475#[derive(Debug, Clone, Deserialize)]
476#[serde(rename_all = "camelCase")]
477struct NativeCrdtFieldYjsUpdateRequest {
478    table: String,
479    row_id: String,
480    field: String,
481    update: YjsUpdateEnvelope,
482}
483
484#[derive(Debug, Clone, Deserialize)]
485#[serde(rename_all = "camelCase")]
486struct NativeCrdtFieldCompactionRequest {
487    table: String,
488    row_id: String,
489    field: String,
490    #[serde(default)]
491    min_uncheckpointed_updates: Option<i64>,
492}
493
494#[derive(Debug, Clone, Deserialize)]
495#[serde(rename_all = "camelCase")]
496struct NativeCrdtFieldLogRequest {
497    table: String,
498    row_id: String,
499    field: String,
500    #[serde(default)]
501    limit: Option<i64>,
502}
503
504#[derive(Debug, Clone, Default, Deserialize)]
505#[serde(rename_all = "camelCase")]
506pub struct NativeBlobStoreOptions {
507    pub mime_type: Option<String>,
508    pub immediate: Option<bool>,
509    pub cache_local: Option<bool>,
510}
511
512#[derive(Debug, Clone, Default, Deserialize)]
513#[serde(rename_all = "camelCase")]
514pub struct NativeBlobRetrieveOptions {
515    pub cache_local: Option<bool>,
516}
517
518pub fn native_runtime_manifest() -> NativeRuntimeManifest {
519    NativeRuntimeManifest {
520        ffi_abi_version: NATIVE_FFI_ABI_VERSION,
521        crate_name: env!("CARGO_PKG_NAME"),
522        crate_version: env!("CARGO_PKG_VERSION"),
523        schema_version: runtime_schema_version(),
524        storage_backend: "diesel-sqlite",
525        transport_backends: &["http", "websocket"],
526        worker_model: "background-sync-worker",
527        string_encoding: "utf-8-json",
528        error_shape: "native-error-info-v1",
529        event_model: "native-event-stream-json-v1",
530        limits: runtime_default_limits(),
531        capabilities: &[
532            "dynamic-auth-headers",
533            "dynamic-subscriptions",
534            "auth-expired-events",
535            "generated-app-table-metadata",
536            "generated-app-schema-state",
537            "generated-json-table-reads",
538            "generated-json-local-operations",
539            "generated-json-mutations",
540            "generated-json-leased-mutations",
541            "queued-json-local-operations",
542            "queued-json-leased-mutations",
543            "auth-lease-issue",
544            "queued-yjs-updates",
545            "queued-snapshot-refresh",
546            "queued-storage-compaction",
547            "queued-blob-cache-work",
548            "worker-command-queue",
549            "ordered-native-events",
550            "native-event-stream",
551            "read-only-query-json",
552            "outbox-json",
553            "conflicts-json",
554            "table-level-rows-changed-events",
555            "query-observer-events",
556            "conflicts-changed-events",
557            "realtime-presence",
558            "presence-changed-events",
559            "blob-file-api",
560            "background-worker-lifecycle",
561            "background-resume-recovery",
562            "structured-diagnostics",
563            "diagnostic-snapshot",
564            "runtime-limits",
565            "storage-compaction",
566            "streaming-blob-file-api",
567            "crdt-yjs",
568            "field-encryption",
569            "encrypted-crdt",
570            "queued-encrypted-crdt",
571            "generic-crdt-field-api",
572            "queued-crdt-field-updates",
573            "encryption-key-sharing",
574            "async-native-open",
575        ],
576        app_tables: &[],
577        app_table_metadata: &[],
578    }
579}
580
581pub fn native_runtime_manifest_json() -> Result<String> {
582    Ok(serde_json::to_string(&native_runtime_manifest())?)
583}
584
585impl NativeSyncularClient {
586    pub fn builder(config: NativeClientConfig) -> NativeSyncularClientBuilder {
587        NativeSyncularClientBuilder::new(config)
588    }
589
590    pub fn open(config: SyncularClientConfig) -> Result<Self> {
591        Self::open_with_options(config, NativeClientOptions::default())
592    }
593
594    pub fn open_native(config: NativeClientConfig) -> Result<Self> {
595        Self::open(config.into())
596    }
597
598    pub fn open_native_with_options(
599        config: NativeClientConfig,
600        options: NativeClientOptions,
601    ) -> Result<Self> {
602        let app_schema = config.app_schema()?;
603        Self::open_with_options_and_schema(config.into(), options, app_schema)
604    }
605
606    pub fn open_native_async_with_options(
607        config: NativeClientConfig,
608        options: NativeClientOptions,
609    ) -> NativeClientOpenTask {
610        NativeClientOpenTask::open_native_with_options(config, options)
611    }
612
613    pub fn open_with_options(
614        config: SyncularClientConfig,
615        options: NativeClientOptions,
616    ) -> Result<Self> {
617        Self::open_with_options_and_schema(config, options, default_app_schema())
618    }
619
620    pub fn open_with_options_and_schema(
621        config: SyncularClientConfig,
622        options: NativeClientOptions,
623        app_schema: AppSchema,
624    ) -> Result<Self> {
625        let writer = SyncularClient::open_with_schema(config.clone(), app_schema)?;
626        let worker_client = SyncularClient::open_with_schema(config.clone(), app_schema)?;
627        let events = NativeEventHub::default();
628        let default_events = events.subscribe(DEFAULT_NATIVE_EVENT_STREAM_CAPACITY);
629        let worker = SyncWorker::start(worker_client);
630        let worker_event_pump = Some(start_worker_event_pump(
631            events.clone(),
632            worker.event_source(),
633        ));
634        let read_executor = ReadonlySqlQueryExecutor::open(
635            &config.db_path,
636            app_schema,
637            DEFAULT_READONLY_QUERY_STATEMENT_CACHE_CAPACITY,
638        )?;
639        let presence_by_scope = Arc::new(Mutex::new(BTreeMap::new()));
640
641        Ok(Self {
642            config,
643            writer,
644            worker: Some(worker),
645            worker_event_pump,
646            realtime_worker: None,
647            presence_by_scope,
648            auth_headers: SyncAuthHeaders::new(),
649            field_encryption: None,
650            encrypted_crdt: None,
651            blob_encryption: None,
652            auto_sync_local_writes: options.auto_sync_local_writes,
653            shutdown_on_drop: false,
654            command_seq: Mutex::new(0),
655            events,
656            default_events,
657            read_executor: Mutex::new(read_executor),
658        })
659    }
660
661    pub fn trigger_sync(&self) -> Result<()> {
662        self.worker()?.trigger_sync()
663    }
664
665    pub fn trigger_sync_websocket(&self) -> Result<()> {
666        self.worker()?.trigger_sync_websocket()
667    }
668
669    pub fn enqueue_sync_now(&self) -> Result<String> {
670        let command_id = self.next_command_id("sync")?;
671        self.worker()?.enqueue_sync_now(command_id.clone())?;
672        Ok(command_id)
673    }
674
675    pub fn enqueue_sync_websocket(&self) -> Result<String> {
676        let command_id = self.next_command_id("sync-ws")?;
677        self.worker()?.enqueue_sync_websocket(command_id.clone())?;
678        Ok(command_id)
679    }
680
681    pub fn enqueue_mutation_json(
682        &self,
683        mutation_json: &str,
684        local_row_json: Option<&str>,
685    ) -> Result<String> {
686        let command_id = self.next_command_id("mutation")?;
687        self.worker()?.enqueue_mutation_json(
688            command_id.clone(),
689            mutation_json.to_string(),
690            local_row_json.map(str::to_string),
691            self.auto_sync_local_writes,
692        )?;
693        Ok(command_id)
694    }
695
696    pub fn enqueue_leased_mutation_json(
697        &self,
698        mutation_json: &str,
699        local_row_json: Option<&str>,
700    ) -> Result<String> {
701        let command_id = self.next_command_id("leased-mutation")?;
702        self.worker()?.enqueue_leased_mutation_json(
703            command_id.clone(),
704            mutation_json.to_string(),
705            local_row_json.map(str::to_string),
706            self.auto_sync_local_writes,
707        )?;
708        Ok(command_id)
709    }
710
711    pub fn enqueue_yjs_update_json(&self, update_json: &str) -> Result<String> {
712        validate_crdt_request_json_size(update_json)?;
713        let command_id = self.next_command_id("yjs")?;
714        self.worker()?.enqueue_yjs_update_json(
715            command_id.clone(),
716            update_json.to_string(),
717            self.auto_sync_local_writes,
718        )?;
719        Ok(command_id)
720    }
721
722    pub fn enqueue_crdt_field_yjs_update_json(&self, request_json: &str) -> Result<String> {
723        validate_crdt_request_json_size(request_json)?;
724        let request: NativeCrdtFieldYjsUpdateRequest = serde_json::from_str(request_json)?;
725        validate_yjs_update_envelope_size(&request.update)?;
726        let field = self.writer.open_crdt_field(request.id())?;
727        match field.sync_mode() {
728            CrdtFieldSyncMode::ServerMerge => self.enqueue_yjs_update_json(request_json),
729            CrdtFieldSyncMode::EncryptedUpdateLog => {
730                self.enqueue_encrypted_crdt_update_json(request_json)
731            }
732        }
733    }
734
735    pub fn enqueue_crdt_field_text_json(&self, request_json: &str) -> Result<String> {
736        validate_crdt_request_json_size(request_json)?;
737        let request: NativeCrdtFieldTextRequest = serde_json::from_str(request_json)?;
738        validate_yjs_text_input_size(&request.next_text)?;
739        self.writer.open_crdt_field(request.id())?;
740        let command_id = self.next_command_id("crdt-text")?;
741        self.worker()?.enqueue_crdt_field_text_json(
742            command_id.clone(),
743            request_json.to_string(),
744            self.auto_sync_local_writes,
745        )?;
746        Ok(command_id)
747    }
748
749    pub fn enqueue_crdt_field_compaction_json(&self, request_json: &str) -> Result<String> {
750        validate_crdt_request_json_size(request_json)?;
751        let request: NativeCrdtFieldCompactionRequest = serde_json::from_str(request_json)?;
752        self.writer.open_crdt_field(request.id())?;
753        let command_id = self.next_command_id("crdt-compact")?;
754        self.worker()?.enqueue_crdt_field_compaction_json(
755            command_id.clone(),
756            request_json.to_string(),
757            self.auto_sync_local_writes,
758        )?;
759        Ok(command_id)
760    }
761
762    pub fn enqueue_encrypted_crdt_update_json(&self, request_json: &str) -> Result<String> {
763        validate_crdt_request_json_size(request_json)?;
764        let command_id = self.next_command_id("encrypted-crdt")?;
765        self.worker()?.enqueue_encrypted_crdt_update_json(
766            command_id.clone(),
767            request_json.to_string(),
768            self.auto_sync_local_writes,
769        )?;
770        Ok(command_id)
771    }
772
773    pub fn enqueue_encrypted_crdt_checkpoint_json(&self, request_json: &str) -> Result<String> {
774        validate_crdt_request_json_size(request_json)?;
775        let command_id = self.next_command_id("encrypted-crdt-checkpoint")?;
776        self.worker()?.enqueue_encrypted_crdt_checkpoint_json(
777            command_id.clone(),
778            request_json.to_string(),
779            self.auto_sync_local_writes,
780        )?;
781        Ok(command_id)
782    }
783
784    pub fn enqueue_resolve_conflict(&self, id: &str, resolution: &str) -> Result<String> {
785        let command_id = self.next_command_id("conflict")?;
786        self.worker()?.enqueue_conflict_resolution(
787            command_id.clone(),
788            id.to_string(),
789            resolution.to_string(),
790            self.auto_sync_local_writes,
791        )?;
792        Ok(command_id)
793    }
794
795    pub fn enqueue_refresh_snapshot_json(&self, request_json: &str) -> Result<String> {
796        let command_id = self.next_command_id("snapshot")?;
797        self.worker()?
798            .enqueue_refresh_snapshot_json(command_id.clone(), request_json.to_string())?;
799        Ok(command_id)
800    }
801
802    pub fn enqueue_compact_storage_json(&self, options_json: Option<&str>) -> Result<String> {
803        let command_id = self.next_command_id("compact")?;
804        self.worker()?
805            .enqueue_compact_storage_json(command_id.clone(), options_json.map(str::to_string))?;
806        Ok(command_id)
807    }
808
809    pub fn enqueue_store_blob_file_json(
810        &self,
811        path: &str,
812        options_json: Option<&str>,
813    ) -> Result<String> {
814        let command_id = self.next_command_id("blob-store")?;
815        self.worker()?.enqueue_store_blob_file_json(
816            command_id.clone(),
817            path.to_string(),
818            options_json.map(str::to_string),
819        )?;
820        Ok(command_id)
821    }
822
823    pub fn enqueue_retrieve_blob_file_json(
824        &self,
825        ref_json: &str,
826        path: &str,
827        options_json: Option<&str>,
828    ) -> Result<String> {
829        let command_id = self.next_command_id("blob-retrieve")?;
830        self.worker()?.enqueue_retrieve_blob_file_json(
831            command_id.clone(),
832            ref_json.to_string(),
833            path.to_string(),
834            options_json.map(str::to_string),
835        )?;
836        Ok(command_id)
837    }
838
839    pub fn enqueue_process_blob_upload_queue(&self) -> Result<String> {
840        let command_id = self.next_command_id("blob-upload")?;
841        self.worker()?
842            .enqueue_process_blob_upload_queue(command_id.clone())?;
843        Ok(command_id)
844    }
845
846    pub fn enqueue_prune_blob_cache(&self, max_bytes: i64) -> Result<String> {
847        let command_id = self.next_command_id("blob-prune")?;
848        self.worker()?
849            .enqueue_prune_blob_cache(command_id.clone(), max_bytes)?;
850        Ok(command_id)
851    }
852
853    pub fn enqueue_clear_blob_cache(&self) -> Result<String> {
854        let command_id = self.next_command_id("blob-clear")?;
855        self.worker()?
856            .enqueue_clear_blob_cache(command_id.clone())?;
857        Ok(command_id)
858    }
859
860    pub fn set_auth_headers(&mut self, headers: SyncAuthHeaders) -> Result<()> {
861        self.auth_headers = headers.clone();
862        self.writer.set_auth_headers(headers.clone());
863        if let Some(worker) = &self.worker {
864            worker.set_auth_headers(headers)?;
865        }
866        if let Some(realtime_worker) = &self.realtime_worker {
867            realtime_worker.set_auth_headers(self.auth_headers.clone())?;
868        }
869        Ok(())
870    }
871
872    pub fn set_auth_headers_json(&mut self, headers_json: &str) -> Result<()> {
873        let headers: SyncAuthHeaders = serde_json::from_str(headers_json)?;
874        self.set_auth_headers(headers)
875    }
876
877    pub fn set_subscriptions(&mut self, subscriptions: Vec<SubscriptionSpec>) -> Result<()> {
878        self.writer.set_subscriptions(subscriptions.clone())?;
879        if let Some(worker) = &self.worker {
880            worker.set_subscriptions(subscriptions)?;
881        }
882        Ok(())
883    }
884
885    pub fn set_subscriptions_json(&mut self, subscriptions_json: &str) -> Result<()> {
886        let subscriptions: Vec<SubscriptionSpec> = serde_json::from_str(subscriptions_json)?;
887        self.set_subscriptions(subscriptions)
888    }
889
890    pub fn force_subscriptions_bootstrap(
891        &mut self,
892        subscription_ids: Vec<String>,
893    ) -> Result<usize> {
894        self.writer.force_subscriptions_bootstrap(&subscription_ids)
895    }
896
897    pub fn force_subscriptions_bootstrap_json(
898        &mut self,
899        subscription_ids_json: &str,
900    ) -> Result<String> {
901        let subscription_ids: Vec<String> = serde_json::from_str(subscription_ids_json)?;
902        Ok(serde_json::to_string(
903            &self.force_subscriptions_bootstrap(subscription_ids)?,
904        )?)
905    }
906
907    pub fn set_field_encryption(&mut self, encryption: Option<FieldEncryption>) -> Result<()> {
908        self.writer.set_field_encryption(encryption.clone())?;
909        self.field_encryption = encryption.clone();
910        if let Some(worker) = &self.worker {
911            worker.set_field_encryption(encryption)?;
912        }
913        Ok(())
914    }
915
916    pub fn set_field_encryption_json(&mut self, config_json: &str) -> Result<()> {
917        self.set_field_encryption(FieldEncryption::from_static_config_json(config_json)?)
918    }
919
920    pub fn set_encrypted_crdt(&mut self, encryption: Option<EncryptedCrdt>) -> Result<()> {
921        self.writer.set_encrypted_crdt(encryption.clone())?;
922        self.encrypted_crdt = encryption.clone();
923        if let Some(worker) = &self.worker {
924            worker.set_encrypted_crdt(encryption)?;
925        }
926        Ok(())
927    }
928
929    pub fn set_encrypted_crdt_json(&mut self, config_json: &str) -> Result<()> {
930        self.set_encrypted_crdt(EncryptedCrdt::from_static_config_json(config_json)?)
931    }
932
933    pub fn set_blob_encryption(&mut self, encryption: Option<BlobEncryption>) -> Result<()> {
934        self.writer.set_blob_encryption(encryption.clone())?;
935        self.blob_encryption = encryption.clone();
936        if let Some(worker) = &self.worker {
937            worker.set_blob_encryption(encryption)?;
938        }
939        Ok(())
940    }
941
942    pub fn set_blob_encryption_json(&mut self, config_json: &str) -> Result<()> {
943        self.set_blob_encryption(BlobEncryption::from_static_config_json(config_json)?)
944    }
945
946    pub fn encryption_helper_json(&self, method: &str, args_json: &str) -> Result<String> {
947        encryption_helpers_json(method, args_json)
948    }
949
950    pub fn pause_sync_worker(&mut self) -> Result<()> {
951        self.stop_realtime_worker()?;
952        if let Some(worker) = self.worker.take() {
953            worker.stop()?;
954        }
955        self.join_worker_event_pump()?;
956        Ok(())
957    }
958
959    pub fn resume_sync_worker(&mut self) -> Result<()> {
960        if self.worker.is_some() {
961            return Ok(());
962        }
963        let mut worker_client =
964            SyncularClient::open_with_schema(self.config.clone(), self.writer.app_schema())?;
965        worker_client.set_subscriptions(self.writer.subscriptions().to_vec())?;
966        worker_client.set_auth_headers(self.auth_headers.clone());
967        worker_client.set_field_encryption(self.field_encryption.clone())?;
968        worker_client.set_encrypted_crdt(self.encrypted_crdt.clone())?;
969        worker_client.set_blob_encryption(self.blob_encryption.clone())?;
970        let worker = SyncWorker::start(worker_client);
971        self.worker_event_pump = Some(start_worker_event_pump(
972            self.events.clone(),
973            worker.event_source(),
974        ));
975        self.worker = Some(worker);
976        Ok(())
977    }
978
979    pub fn resume_from_background(&mut self) -> Result<String> {
980        self.resume_sync_worker()?;
981        self.start_realtime_worker()?;
982        self.enqueue_sync_now()
983    }
984
985    pub fn shutdown(&mut self) -> Result<()> {
986        self.close()
987    }
988
989    pub fn start_realtime_worker(&mut self) -> Result<()> {
990        if self.realtime_worker.is_some() {
991            return Ok(());
992        }
993        let trigger = self.worker()?.trigger_handle();
994        let mut transport = HttpSyncTransport::new(SyncTransportConfig::new(
995            self.config.base_url.clone(),
996            self.config.client_id.clone(),
997            self.config.actor_id.clone(),
998        ))
999        .with_schema_version(self.writer.app_schema().current_schema_version());
1000        transport.set_auth_headers(self.auth_headers.clone());
1001        let events = self.events.clone();
1002        let presence_by_scope = self.presence_by_scope.clone();
1003        self.realtime_worker = Some(PersistentRealtimeWorker::start_with_event_handler(
1004            transport,
1005            trigger,
1006            Some(Arc::new(move |event| {
1007                if let RealtimeEvent::Presence(presence) = event {
1008                    apply_native_presence_event(&presence_by_scope, &events, presence);
1009                }
1010            })),
1011        ));
1012        if let Some(realtime_worker) = &self.realtime_worker {
1013            for (scope_key, metadata) in self.local_presence_metadata()? {
1014                realtime_worker.send_presence("join", scope_key, metadata)?;
1015            }
1016        }
1017        Ok(())
1018    }
1019
1020    pub fn stop_realtime_worker(&mut self) -> Result<()> {
1021        if let Some(mut realtime_worker) = self.realtime_worker.take() {
1022            realtime_worker.stop()?;
1023        }
1024        Ok(())
1025    }
1026
1027    pub fn sync_worker_running(&self) -> bool {
1028        self.worker.is_some()
1029    }
1030
1031    pub fn join_presence(&mut self, scope_key: &str, metadata: Option<Value>) -> Result<()> {
1032        let event = RealtimePresenceEvent {
1033            action: "join".to_string(),
1034            scope_key: scope_key.to_string(),
1035            client_id: Some(self.config.client_id.clone()),
1036            actor_id: Some(self.config.actor_id.clone()),
1037            metadata: metadata.clone(),
1038            entries: Vec::new(),
1039        };
1040        apply_native_presence_event(&self.presence_by_scope, &self.events, event);
1041        if let Some(realtime_worker) = &self.realtime_worker {
1042            realtime_worker.send_presence("join", scope_key, metadata)?;
1043        }
1044        Ok(())
1045    }
1046
1047    pub fn join_presence_handle(
1048        &mut self,
1049        scope_key: &str,
1050        metadata: Option<Value>,
1051    ) -> Result<NativePresenceHandle<'_>> {
1052        self.join_presence(scope_key, metadata)?;
1053        Ok(NativePresenceHandle {
1054            client: self,
1055            scope_key: scope_key.to_string(),
1056            active: true,
1057        })
1058    }
1059
1060    pub fn leave_presence(&mut self, scope_key: &str) -> Result<()> {
1061        let event = RealtimePresenceEvent {
1062            action: "leave".to_string(),
1063            scope_key: scope_key.to_string(),
1064            client_id: Some(self.config.client_id.clone()),
1065            actor_id: Some(self.config.actor_id.clone()),
1066            metadata: None,
1067            entries: Vec::new(),
1068        };
1069        apply_native_presence_event(&self.presence_by_scope, &self.events, event);
1070        if let Some(realtime_worker) = &self.realtime_worker {
1071            realtime_worker.send_presence("leave", scope_key, None)?;
1072        }
1073        Ok(())
1074    }
1075
1076    pub fn update_presence_metadata(&mut self, scope_key: &str, metadata: Value) -> Result<()> {
1077        let event = RealtimePresenceEvent {
1078            action: "update".to_string(),
1079            scope_key: scope_key.to_string(),
1080            client_id: Some(self.config.client_id.clone()),
1081            actor_id: Some(self.config.actor_id.clone()),
1082            metadata: Some(metadata.clone()),
1083            entries: Vec::new(),
1084        };
1085        apply_native_presence_event(&self.presence_by_scope, &self.events, event);
1086        if let Some(realtime_worker) = &self.realtime_worker {
1087            realtime_worker.send_presence("update", scope_key, Some(metadata))?;
1088        }
1089        Ok(())
1090    }
1091
1092    pub fn presence_json(&self, scope_key: &str) -> Result<String> {
1093        let presence = self
1094            .presence_by_scope
1095            .lock()
1096            .map_err(|_| {
1097                SyncularError::message(ErrorKind::Internal, "native presence is poisoned")
1098            })?
1099            .get(scope_key)
1100            .cloned()
1101            .unwrap_or_default();
1102        Ok(serde_json::to_string(&presence)?)
1103    }
1104
1105    fn local_presence_metadata(&self) -> Result<Vec<(String, Option<Value>)>> {
1106        let client_id = self.config.client_id.as_str();
1107        let joined = self
1108            .presence_by_scope
1109            .lock()
1110            .map_err(|_| {
1111                SyncularError::message(ErrorKind::Internal, "native presence is poisoned")
1112            })?
1113            .iter()
1114            .filter_map(|(scope_key, entries)| {
1115                entries
1116                    .iter()
1117                    .find(|entry| entry.client_id == client_id)
1118                    .map(|entry| (scope_key.clone(), entry.metadata.clone()))
1119            })
1120            .collect();
1121        Ok(joined)
1122    }
1123
1124    pub fn subscribe_events(&self, capacity: usize) -> NativeEventSubscription {
1125        self.events.subscribe(capacity)
1126    }
1127
1128    pub fn event_receiver(&self, capacity: usize) -> NativeEventSubscription {
1129        self.subscribe_events(capacity)
1130    }
1131
1132    pub fn next_event(&self) -> Option<NativeEvent> {
1133        self.default_events.next_event()
1134    }
1135
1136    pub fn next_event_timeout(&self, timeout: Duration) -> Option<NativeEvent> {
1137        self.default_events.next_event_timeout(timeout)
1138    }
1139
1140    pub fn next_event_json(&self) -> Option<Result<String>> {
1141        self.default_events.next_event_json()
1142    }
1143
1144    pub fn next_event_json_timeout(&self, timeout: Duration) -> Option<Result<String>> {
1145        self.default_events.next_event_json_timeout(timeout)
1146    }
1147
1148    pub fn apply_mutation_json(
1149        &mut self,
1150        mutation_json: &str,
1151        local_row_json: Option<&str>,
1152    ) -> Result<String> {
1153        validate_mutation_json_input_size(mutation_json, local_row_json)?;
1154        let operation: crate::protocol::SyncOperation = serde_json::from_str(mutation_json)?;
1155        let table = operation.table.clone();
1156        let previous_row = self
1157            .writer
1158            .current_row_json(&operation.table, &operation.row_id)?;
1159        let local_row = local_row_json.map(serde_json::from_str).transpose()?;
1160        let client_commit_id = self
1161            .writer
1162            .apply_mutation_json(mutation_json, local_row_json)?;
1163        let changed_rows = sync_changed_row_for_local_operation(
1164            self.writer.app_schema(),
1165            &operation,
1166            previous_row.as_ref(),
1167            local_row.as_ref(),
1168            Some(client_commit_id.clone()),
1169        )
1170        .into_iter()
1171        .collect();
1172        self.events.push_rows_changed_events_with_details(
1173            [table.as_str()],
1174            changed_rows,
1175            Some("localWrite"),
1176        );
1177        self.trigger_after_local_write()?;
1178        Ok(client_commit_id)
1179    }
1180
1181    pub fn apply_leased_mutation_json(
1182        &mut self,
1183        mutation_json: &str,
1184        local_row_json: Option<&str>,
1185    ) -> Result<String> {
1186        validate_mutation_json_input_size(mutation_json, local_row_json)?;
1187        let operation: crate::protocol::SyncOperation = serde_json::from_str(mutation_json)?;
1188        let table = operation.table.clone();
1189        let previous_row = self
1190            .writer
1191            .current_row_json(&operation.table, &operation.row_id)?;
1192        let local_row = local_row_json.map(serde_json::from_str).transpose()?;
1193        let client_commit_id = self
1194            .writer
1195            .apply_leased_mutation_json(mutation_json, local_row_json)?;
1196        let changed_rows = sync_changed_row_for_local_operation(
1197            self.writer.app_schema(),
1198            &operation,
1199            previous_row.as_ref(),
1200            local_row.as_ref(),
1201            Some(client_commit_id.clone()),
1202        )
1203        .into_iter()
1204        .collect();
1205        self.events.push_rows_changed_events_with_details(
1206            [table.as_str()],
1207            changed_rows,
1208            Some("localWrite"),
1209        );
1210        self.trigger_after_local_write()?;
1211        Ok(client_commit_id)
1212    }
1213
1214    pub fn open_crdt_field_json(&self, request_json: &str) -> Result<String> {
1215        validate_crdt_request_json_size(request_json)?;
1216        let request: NativeCrdtFieldRequest = serde_json::from_str(request_json)?;
1217        let field = self.writer.open_crdt_field(request.id())?;
1218        Ok(serde_json::to_string(&crdt_field_descriptor(&field))?)
1219    }
1220
1221    pub fn apply_crdt_field_text_json(&mut self, request_json: &str) -> Result<String> {
1222        validate_crdt_request_json_size(request_json)?;
1223        let request: NativeCrdtFieldTextRequest = serde_json::from_str(request_json)?;
1224        validate_yjs_text_input_size(&request.next_text)?;
1225        let field = self.writer.open_crdt_field(request.id())?;
1226        let receipt = self
1227            .writer
1228            .apply_crdt_field_text(&field, &request.next_text)?;
1229        self.after_crdt_field_write(&field, Some(receipt.client_commit_id.clone()))?;
1230        crdt_field_write_receipt_json(receipt)
1231    }
1232
1233    pub fn apply_crdt_field_yjs_update_json(&mut self, request_json: &str) -> Result<String> {
1234        validate_crdt_request_json_size(request_json)?;
1235        let request: NativeCrdtFieldYjsUpdateRequest = serde_json::from_str(request_json)?;
1236        validate_yjs_update_envelope_size(&request.update)?;
1237        let field = self.writer.open_crdt_field(request.id())?;
1238        let receipt = self
1239            .writer
1240            .apply_crdt_field_yjs_update(&field, request.update)?;
1241        self.after_crdt_field_write(&field, Some(receipt.client_commit_id.clone()))?;
1242        crdt_field_write_receipt_json(receipt)
1243    }
1244
1245    pub fn materialize_crdt_field_json(&mut self, request_json: &str) -> Result<String> {
1246        validate_crdt_request_json_size(request_json)?;
1247        let request: NativeCrdtFieldRequest = serde_json::from_str(request_json)?;
1248        let field = self.writer.open_crdt_field(request.id())?;
1249        crdt_field_materialization_json(self.writer.materialize_crdt_field(&field)?)
1250    }
1251
1252    pub fn crdt_document_snapshot_json(&mut self, request_json: &str) -> Result<String> {
1253        validate_crdt_request_json_size(request_json)?;
1254        let request: NativeCrdtFieldRequest = serde_json::from_str(request_json)?;
1255        let field = self.writer.open_crdt_field(request.id())?;
1256        self.writer.crdt_document_snapshot_json(&field)
1257    }
1258
1259    pub fn crdt_update_log_json(&mut self, request_json: &str) -> Result<String> {
1260        validate_crdt_request_json_size(request_json)?;
1261        let request: NativeCrdtFieldLogRequest = serde_json::from_str(request_json)?;
1262        let field = self.writer.open_crdt_field(request.id())?;
1263        self.writer.crdt_update_log_json(
1264            &field,
1265            request.limit.unwrap_or(DEFAULT_CRDT_UPDATE_LOG_LIMIT),
1266        )
1267    }
1268
1269    pub fn snapshot_crdt_field_state_vector_json(&mut self, request_json: &str) -> Result<String> {
1270        validate_crdt_request_json_size(request_json)?;
1271        let request: NativeCrdtFieldRequest = serde_json::from_str(request_json)?;
1272        let field = self.writer.open_crdt_field(request.id())?;
1273        Ok(serde_json::to_string(&json!({
1274            "stateVectorBase64": self.writer.snapshot_crdt_field_state_vector_base64(&field)?
1275        }))?)
1276    }
1277
1278    pub fn compact_crdt_field_json(&mut self, request_json: &str) -> Result<String> {
1279        validate_crdt_request_json_size(request_json)?;
1280        let request: NativeCrdtFieldCompactionRequest = serde_json::from_str(request_json)?;
1281        let field = self.writer.open_crdt_field(request.id())?;
1282        let receipt = self
1283            .writer
1284            .compact_crdt_field(&field, request.min_uncheckpointed_updates.unwrap_or(1))?;
1285        let should_emit_compaction_event =
1286            receipt.checkpoint_created || field.sync_mode() == CrdtFieldSyncMode::ServerMerge;
1287        if should_emit_compaction_event {
1288            let extra_payload_json = crdt_field_compaction_event_payload(
1289                &mut self.writer,
1290                &field,
1291                &receipt,
1292                receipt.checkpoint_created,
1293                request.min_uncheckpointed_updates.unwrap_or(1),
1294            );
1295            self.events.publish_event(crdt_field_compacted_event(
1296                &field,
1297                receipt.client_commit_id.clone(),
1298                crdt_field_compaction_tables(&field)
1299                    .into_iter()
1300                    .map(str::to_string)
1301                    .collect(),
1302                None,
1303                None,
1304                receipt.checkpoint_created,
1305                extra_payload_json,
1306            ));
1307            self.events.push_rows_changed_events_with_details(
1308                crdt_field_compaction_tables(&field),
1309                vec![crdt_field_compacted_row(
1310                    &field,
1311                    receipt.client_commit_id.clone(),
1312                )],
1313                Some("localWrite"),
1314            );
1315            self.trigger_after_local_write()?;
1316        }
1317        crdt_field_compaction_receipt_json(receipt)
1318    }
1319
1320    pub fn apply_encrypted_crdt_update_json(&mut self, request_json: &str) -> Result<String> {
1321        validate_crdt_request_json_size(request_json)?;
1322        let request: NativeEncryptedCrdtRequest = serde_json::from_str(request_json)?;
1323        let receipt = self.writer.apply_encrypted_crdt_update_json(request_json)?;
1324        self.events
1325            .push_rows_changed_events([request.table.as_str(), CRDT_UPDATES_TABLE]);
1326        self.trigger_after_local_write()?;
1327        Ok(receipt.client_commit_id)
1328    }
1329
1330    pub fn apply_encrypted_crdt_checkpoint_json(&mut self, request_json: &str) -> Result<String> {
1331        validate_crdt_request_json_size(request_json)?;
1332        let _request: NativeEncryptedCrdtRequest = serde_json::from_str(request_json)?;
1333        let receipt = self
1334            .writer
1335            .apply_encrypted_crdt_checkpoint_json(request_json)?;
1336        if let Some(receipt) = receipt {
1337            self.events
1338                .push_rows_changed_events([CRDT_CHECKPOINTS_TABLE]);
1339            self.trigger_after_local_write()?;
1340            Ok(serde_json::to_string(&json!({
1341                "checkpointed": true,
1342                "clientCommitId": receipt.client_commit_id,
1343                "commitId": receipt.commit_id
1344            }))?)
1345        } else {
1346            Ok(serde_json::to_string(&json!({ "checkpointed": false }))?)
1347        }
1348    }
1349
1350    pub fn list_table_json(&mut self, table: &str) -> Result<String> {
1351        self.writer.list_table_json(table)
1352    }
1353
1354    pub fn query_json(&self, request_json: &str) -> Result<String> {
1355        self.read_executor
1356            .lock()
1357            .map_err(|_| {
1358                SyncularError::message(ErrorKind::Internal, "native read executor lock poisoned")
1359            })?
1360            .execute_json(request_json)
1361    }
1362
1363    pub fn store_blob_file_json(
1364        &mut self,
1365        path: &str,
1366        options_json: Option<&str>,
1367    ) -> Result<String> {
1368        let options: NativeBlobStoreOptions = options_json
1369            .filter(|value| !value.trim().is_empty())
1370            .map(serde_json::from_str)
1371            .transpose()?
1372            .unwrap_or_default();
1373        let blob = self.writer.store_blob_file(
1374            Path::new(path),
1375            options
1376                .mime_type
1377                .as_deref()
1378                .unwrap_or("application/octet-stream"),
1379            options.immediate.unwrap_or(false),
1380            options.cache_local.unwrap_or(true),
1381        )?;
1382        self.events.push_diagnostic(blob_store_diagnostic(
1383            &blob,
1384            options.immediate.unwrap_or(false),
1385            options.cache_local.unwrap_or(true),
1386        ));
1387        Ok(serde_json::to_string(&blob)?)
1388    }
1389
1390    pub fn retrieve_blob_file(&mut self, ref_json: &str, path: &str) -> Result<()> {
1391        self.retrieve_blob_file_with_options(ref_json, path, None)
1392    }
1393
1394    pub fn retrieve_blob_file_with_options(
1395        &mut self,
1396        ref_json: &str,
1397        path: &str,
1398        options_json: Option<&str>,
1399    ) -> Result<()> {
1400        let options: NativeBlobRetrieveOptions = options_json
1401            .filter(|value| !value.trim().is_empty())
1402            .map(serde_json::from_str)
1403            .transpose()?
1404            .unwrap_or_default();
1405        let blob: BlobRef = serde_json::from_str(ref_json)?;
1406        let was_local = self.writer.is_blob_local(&blob.hash).unwrap_or(false);
1407        match self.writer.retrieve_blob_file(
1408            &blob,
1409            Path::new(path),
1410            options.cache_local.unwrap_or(true),
1411        ) {
1412            Ok(()) => {
1413                self.events
1414                    .push_diagnostic(blob_cache_retrieve_diagnostic(&blob, was_local));
1415                Ok(())
1416            }
1417            Err(error) => {
1418                self.events
1419                    .push_diagnostic(blob_download_failed_diagnostic(&blob, &error));
1420                Err(error)
1421            }
1422        }
1423    }
1424
1425    pub fn is_blob_local(&mut self, hash: &str) -> Result<bool> {
1426        let local = self.writer.is_blob_local(hash)?;
1427        self.events
1428            .push_diagnostic(blob_cache_lookup_diagnostic(hash, local));
1429        Ok(local)
1430    }
1431
1432    pub fn process_blob_upload_queue_json(&mut self) -> Result<String> {
1433        let result = self.writer.process_blob_upload_queue()?;
1434        self.events
1435            .push_diagnostic(blob_upload_queue_processed_diagnostic(
1436                serde_json::to_value(&result)?,
1437            ));
1438        Ok(serde_json::to_string(&result)?)
1439    }
1440
1441    pub fn blob_upload_queue_stats_json(&mut self) -> Result<String> {
1442        Ok(serde_json::to_string(
1443            &self.writer.blob_upload_queue_stats()?,
1444        )?)
1445    }
1446
1447    pub fn blob_cache_stats_json(&mut self) -> Result<String> {
1448        Ok(serde_json::to_string(&self.writer.blob_cache_stats()?)?)
1449    }
1450
1451    pub fn prune_blob_cache(&mut self, max_bytes: i64) -> Result<i64> {
1452        let pruned_bytes = self.writer.prune_blob_cache(max_bytes)?;
1453        self.events
1454            .push_diagnostic(blob_cache_pruned_diagnostic(pruned_bytes, Some(max_bytes)));
1455        Ok(pruned_bytes)
1456    }
1457
1458    pub fn clear_blob_cache(&mut self) -> Result<()> {
1459        self.writer.clear_blob_cache()?;
1460        self.events.push_diagnostic(blob_cache_cleared_diagnostic());
1461        Ok(())
1462    }
1463
1464    pub fn compact_storage_json(&mut self, options_json: Option<&str>) -> Result<String> {
1465        self.writer.compact_storage_json(options_json)
1466    }
1467
1468    pub fn app_tables(&self) -> Vec<String> {
1469        self.writer
1470            .app_schema()
1471            .app_table_metadata
1472            .iter()
1473            .map(|metadata| metadata.name.to_string())
1474            .collect()
1475    }
1476
1477    pub fn app_tables_json(&self) -> Result<String> {
1478        Ok(serde_json::to_string(self.writer.app_schema().app_tables)?)
1479    }
1480
1481    pub fn app_table_metadata_json(&self) -> Result<String> {
1482        Ok(serde_json::to_string(
1483            self.writer.app_schema().app_table_metadata,
1484        )?)
1485    }
1486
1487    pub fn app_schema_state_json(&mut self) -> Result<String> {
1488        self.writer.app_schema_state_json()
1489    }
1490
1491    pub fn local_health_check_json(&mut self) -> Result<String> {
1492        self.writer.local_health_check_json()
1493    }
1494
1495    pub fn export_local_support_bundle_json(&mut self) -> Result<String> {
1496        self.writer.export_local_support_bundle_json()
1497    }
1498
1499    pub fn import_local_support_bundle_json(&mut self, bundle_json: &str) -> Result<String> {
1500        self.writer.import_local_support_bundle_json(bundle_json)
1501    }
1502
1503    pub fn repair_local_health_json(&mut self, request_json: &str) -> Result<String> {
1504        self.writer.repair_local_health_json(request_json)
1505    }
1506
1507    pub fn reset_local_sync_state_json(&mut self, request_json: &str) -> Result<String> {
1508        self.writer.reset_local_sync_state_json(request_json)
1509    }
1510
1511    pub fn register_query_json(&self, query_json: &str) -> Result<String> {
1512        let registration: NativeObservedQueryRegistration = serde_json::from_str(query_json)?;
1513        let observed_query = registration.into_observed_query(self.writer.app_schema())?;
1514        let id = observed_query.id.clone();
1515        let mut observers = self.events.query_observers.lock().map_err(|_| {
1516            SyncularError::message(
1517                ErrorKind::Internal,
1518                "native query observer registry is poisoned",
1519            )
1520        })?;
1521        observers.insert(id.clone(), observed_query);
1522        Ok(id)
1523    }
1524
1525    pub fn unregister_query(&self, id: &str) -> Result<()> {
1526        let mut observers = self.events.query_observers.lock().map_err(|_| {
1527            SyncularError::message(
1528                ErrorKind::Internal,
1529                "native query observer registry is poisoned",
1530            )
1531        })?;
1532        observers.remove(id);
1533        Ok(())
1534    }
1535
1536    pub fn observed_queries(&self) -> Result<Vec<NativeObservedQuery>> {
1537        let observers = self.events.query_observers.lock().map_err(|_| {
1538            SyncularError::message(
1539                ErrorKind::Internal,
1540                "native query observer registry is poisoned",
1541            )
1542        })?;
1543        Ok(observers.values().cloned().collect())
1544    }
1545
1546    pub fn observed_queries_json(&self) -> Result<String> {
1547        Ok(serde_json::to_string(&self.observed_queries()?)?)
1548    }
1549
1550    pub fn diagnostic_snapshot(&mut self) -> Result<NativeDiagnosticSnapshot> {
1551        let bootstrap = self.writer.bootstrap_status()?;
1552        let outbox = self.writer.outbox_summaries()?;
1553        let conflicts = self.writer.conflict_summaries()?;
1554        let observed_queries = self.observed_queries()?;
1555        let recent_events = self.events.recent_events();
1556        let recent_diagnostics = recent_events
1557            .iter()
1558            .filter_map(|event| event.diagnostic.clone())
1559            .collect();
1560        let recent_sync_timings = native_sync_timing_snapshots(&recent_events);
1561        let connection = NativeDiagnosticConnectionSnapshot {
1562            sync_worker_running: self.sync_worker_running(),
1563            realtime_worker_running: self.realtime_worker.is_some(),
1564            auto_sync_local_writes: self.auto_sync_local_writes,
1565            event_subscriber_count: self.events.subscriber_count(),
1566            observed_query_count: observed_queries.len(),
1567        };
1568        Ok(NativeDiagnosticSnapshot {
1569            generated_at: now_ms(),
1570            runtime: native_runtime_manifest(),
1571            connection,
1572            subscriptions: native_diagnostic_subscription_snapshots(
1573                self.writer.subscriptions(),
1574                &bootstrap,
1575            ),
1576            recent_events,
1577            recent_diagnostics,
1578            recent_sync_timings,
1579            limits: runtime_default_limits(),
1580            bootstrap,
1581            outbox_stats: native_outbox_stats(&outbox),
1582            conflict_stats: native_conflict_stats(&conflicts),
1583            blob_upload_queue_stats: serde_json::to_value(self.writer.blob_upload_queue_stats()?)?,
1584            blob_cache_stats: serde_json::to_value(self.writer.blob_cache_stats()?)?,
1585            observed_queries,
1586        })
1587    }
1588
1589    pub fn diagnostic_snapshot_json(&mut self) -> Result<String> {
1590        Ok(serde_json::to_string(&self.diagnostic_snapshot()?)?)
1591    }
1592
1593    pub fn outbox_summaries(&mut self) -> Result<Vec<OutboxSummary>> {
1594        self.writer.outbox_summaries()
1595    }
1596
1597    pub fn outbox_summaries_json(&mut self) -> Result<String> {
1598        Ok(serde_json::to_string(&self.outbox_summaries()?)?)
1599    }
1600
1601    pub fn upsert_auth_lease_json(&mut self, lease_json: &str) -> Result<()> {
1602        let lease: AuthLeaseRecord = serde_json::from_str(lease_json)?;
1603        self.writer.upsert_auth_lease(&lease)
1604    }
1605
1606    pub fn issue_auth_lease_json(&mut self, request_json: &str) -> Result<String> {
1607        self.writer.issue_auth_lease_json(request_json)
1608    }
1609
1610    pub fn auth_lease_json(&mut self, lease_id: &str) -> Result<String> {
1611        Ok(serde_json::to_string(&self.writer.auth_lease(lease_id)?)?)
1612    }
1613
1614    pub fn active_auth_leases_json(
1615        &mut self,
1616        actor_id: Option<&str>,
1617        now_ms_value: i64,
1618    ) -> Result<String> {
1619        Ok(serde_json::to_string(
1620            &self.writer.active_auth_leases(actor_id, now_ms_value)?,
1621        )?)
1622    }
1623
1624    pub fn set_outbox_auth_lease_json(
1625        &mut self,
1626        client_commit_id: &str,
1627        provenance_json: Option<&str>,
1628    ) -> Result<()> {
1629        let provenance: Option<AuthLeaseProvenance> =
1630            provenance_json.map(serde_json::from_str).transpose()?;
1631        self.writer
1632            .set_outbox_auth_lease(client_commit_id, provenance.as_ref())
1633    }
1634
1635    pub fn conflict_summaries(&mut self) -> Result<Vec<ConflictSummary>> {
1636        self.writer.conflict_summaries()
1637    }
1638
1639    pub fn conflict_summaries_json(&mut self) -> Result<String> {
1640        Ok(serde_json::to_string(&self.conflict_summaries()?)?)
1641    }
1642
1643    pub fn resolve_conflict(&mut self, id: &str, resolution: &str) -> Result<()> {
1644        self.writer.resolve_conflict(id, resolution)?;
1645        self.events.publish_event(conflicts_changed_event());
1646        Ok(())
1647    }
1648
1649    pub fn retry_conflict_keep_local(&mut self, id: &str) -> Result<String> {
1650        let client_commit_id = self.writer.retry_conflict_keep_local(id)?;
1651        self.events.publish_event(conflicts_changed_event());
1652        self.trigger_after_local_write()?;
1653        Ok(client_commit_id)
1654    }
1655
1656    pub fn close(&mut self) -> Result<()> {
1657        self.stop_realtime_worker()?;
1658        if let Some(worker) = self.worker.take() {
1659            worker.stop()?;
1660        }
1661        self.join_worker_event_pump()?;
1662        Ok(())
1663    }
1664
1665    fn join_worker_event_pump(&mut self) -> Result<()> {
1666        if let Some(join) = self.worker_event_pump.take() {
1667            join.join().map_err(|_| {
1668                SyncularError::message(ErrorKind::Internal, "native worker event pump panicked")
1669            })?;
1670        }
1671        Ok(())
1672    }
1673
1674    fn trigger_after_local_write(&self) -> Result<()> {
1675        if self.auto_sync_local_writes && self.worker.is_some() {
1676            self.trigger_sync()
1677                .map_err(|err| err.context("local write succeeded but sync trigger failed"))?;
1678        }
1679        Ok(())
1680    }
1681
1682    fn next_command_id(&self, prefix: &str) -> Result<String> {
1683        let mut seq = self.command_seq.lock().map_err(|_| {
1684            SyncularError::message(ErrorKind::Internal, "native command sequence is poisoned")
1685        })?;
1686        *seq = seq.saturating_add(1);
1687        Ok(format!("native-{prefix}-{seq}"))
1688    }
1689
1690    fn worker(&self) -> Result<&SyncWorker> {
1691        self.worker.as_ref().ok_or_else(|| {
1692            SyncularError::message(ErrorKind::Internal, "native sync client is closed")
1693        })
1694    }
1695
1696    fn after_crdt_field_write(
1697        &mut self,
1698        field: &CrdtField,
1699        client_commit_id: Option<String>,
1700    ) -> Result<()> {
1701        let extra_payload_json = crdt_field_event_payload(&mut self.writer, field);
1702        self.events.publish_event(crdt_field_changed_event(
1703            field,
1704            client_commit_id.clone(),
1705            crdt_field_write_tables(field)
1706                .into_iter()
1707                .map(str::to_string)
1708                .collect(),
1709            None,
1710            None,
1711            extra_payload_json,
1712        ));
1713        self.events.push_rows_changed_events_with_details(
1714            crdt_field_write_tables(field),
1715            vec![crdt_field_changed_row(field, client_commit_id)],
1716            Some("localWrite"),
1717        );
1718        self.trigger_after_local_write()
1719    }
1720}
1721
1722impl NativeSyncularClientBuilder {
1723    pub fn new(config: NativeClientConfig) -> Self {
1724        Self {
1725            config,
1726            options: NativeClientOptions::default(),
1727            realtime: true,
1728            auth_headers: None,
1729            subscriptions: None,
1730            initial_sync: false,
1731            initial_websocket_sync: false,
1732            process_blob_uploads_on_open: false,
1733            shutdown_on_drop: false,
1734        }
1735    }
1736
1737    pub fn auto_sync_local_writes(mut self, enabled: bool) -> Self {
1738        self.options.auto_sync_local_writes = enabled;
1739        self
1740    }
1741
1742    pub fn realtime(mut self, enabled: bool) -> Self {
1743        self.realtime = enabled;
1744        self
1745    }
1746
1747    pub fn auth_headers(mut self, headers: SyncAuthHeaders) -> Self {
1748        self.auth_headers = Some(headers);
1749        self
1750    }
1751
1752    pub fn auth_headers_json(mut self, headers_json: &str) -> Result<Self> {
1753        self.auth_headers = Some(serde_json::from_str(headers_json)?);
1754        Ok(self)
1755    }
1756
1757    pub fn subscriptions(mut self, subscriptions: Vec<SubscriptionSpec>) -> Self {
1758        self.subscriptions = Some(subscriptions);
1759        self
1760    }
1761
1762    pub fn subscriptions_json(mut self, subscriptions_json: &str) -> Result<Self> {
1763        self.subscriptions = Some(serde_json::from_str(subscriptions_json)?);
1764        Ok(self)
1765    }
1766
1767    pub fn initial_sync(mut self, enabled: bool) -> Self {
1768        self.initial_sync = enabled;
1769        self
1770    }
1771
1772    pub fn initial_websocket_sync(mut self, enabled: bool) -> Self {
1773        self.initial_websocket_sync = enabled;
1774        self
1775    }
1776
1777    pub fn process_blob_uploads_on_open(mut self, enabled: bool) -> Self {
1778        self.process_blob_uploads_on_open = enabled;
1779        self
1780    }
1781
1782    pub fn shutdown_on_drop(mut self, enabled: bool) -> Self {
1783        self.shutdown_on_drop = enabled;
1784        self
1785    }
1786
1787    pub fn open(self) -> Result<NativeSyncularClient> {
1788        let mut client = NativeSyncularClient::open_native_with_options(self.config, self.options)?;
1789        client.shutdown_on_drop = self.shutdown_on_drop;
1790        if let Some(headers) = self.auth_headers {
1791            client.set_auth_headers(headers)?;
1792        }
1793        if let Some(subscriptions) = self.subscriptions {
1794            client.set_subscriptions(subscriptions)?;
1795        }
1796        if self.realtime {
1797            client.start_realtime_worker()?;
1798        }
1799        if self.initial_websocket_sync {
1800            client.trigger_sync_websocket()?;
1801        } else if self.initial_sync {
1802            client.trigger_sync()?;
1803        }
1804        if self.process_blob_uploads_on_open {
1805            let _ = client.process_blob_upload_queue_json()?;
1806        }
1807        Ok(client)
1808    }
1809}
1810
1811impl Drop for NativeSyncularClient {
1812    fn drop(&mut self) {
1813        if self.shutdown_on_drop {
1814            let _ = self.close();
1815        }
1816    }
1817}
1818
1819impl NativePresenceHandle<'_> {
1820    pub fn update_metadata(&mut self, metadata: Value) -> Result<()> {
1821        self.client
1822            .update_presence_metadata(&self.scope_key, metadata)
1823    }
1824
1825    pub fn leave(mut self) -> Result<()> {
1826        self.leave_inner()
1827    }
1828
1829    fn leave_inner(&mut self) -> Result<()> {
1830        if !self.active {
1831            return Ok(());
1832        }
1833        self.active = false;
1834        self.client.leave_presence(&self.scope_key)
1835    }
1836}
1837
1838impl Drop for NativePresenceHandle<'_> {
1839    fn drop(&mut self) {
1840        let _ = self.leave_inner();
1841    }
1842}
1843
1844impl NativeClientOpenTask {
1845    pub fn open_native_with_options(
1846        config: NativeClientConfig,
1847        options: NativeClientOptions,
1848    ) -> Self {
1849        let command_id = format!("native-open-{}", Uuid::new_v4());
1850        let (result_tx, result_rx) = mpsc::channel();
1851        thread::spawn(move || {
1852            let _ = result_tx.send(NativeSyncularClient::open_native_with_options(
1853                config, options,
1854            ));
1855        });
1856        Self {
1857            command_id,
1858            result_rx: Some(result_rx),
1859            completed: None,
1860            finished: false,
1861            taken: false,
1862        }
1863    }
1864
1865    pub fn command_id(&self) -> &str {
1866        &self.command_id
1867    }
1868
1869    pub fn is_finished(&mut self) -> bool {
1870        self.fill_completed(Duration::ZERO);
1871        self.finished
1872    }
1873
1874    pub fn take_client_timeout(
1875        &mut self,
1876        timeout: Duration,
1877    ) -> Option<Result<NativeSyncularClient>> {
1878        if self.taken {
1879            return Some(Err(SyncularError::message(
1880                ErrorKind::Internal,
1881                "native async open result was already taken",
1882            )));
1883        }
1884        self.fill_completed(timeout);
1885        let result = self.completed.take();
1886        if result.is_some() {
1887            self.taken = true;
1888        }
1889        result
1890    }
1891
1892    fn fill_completed(&mut self, timeout: Duration) {
1893        if self.completed.is_some() {
1894            return;
1895        }
1896        let Some(result_rx) = &self.result_rx else {
1897            return;
1898        };
1899        match result_rx.recv_timeout(timeout) {
1900            Ok(result) => {
1901                self.completed = Some(result);
1902                self.result_rx = None;
1903                self.finished = true;
1904            }
1905            Err(RecvTimeoutError::Timeout) => {}
1906            Err(RecvTimeoutError::Disconnected) => {
1907                self.completed = Some(Err(SyncularError::message(
1908                    ErrorKind::Internal,
1909                    "native async open worker stopped before returning a client",
1910                )));
1911                self.result_rx = None;
1912                self.finished = true;
1913            }
1914        }
1915    }
1916}
1917
1918impl NativeCrdtFieldRequest {
1919    fn id(&self) -> CrdtFieldId {
1920        CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
1921    }
1922}
1923
1924impl NativeCrdtFieldLogRequest {
1925    fn id(&self) -> CrdtFieldId {
1926        CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
1927    }
1928}
1929
1930impl NativeCrdtFieldTextRequest {
1931    fn id(&self) -> CrdtFieldId {
1932        CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
1933    }
1934}
1935
1936impl NativeCrdtFieldYjsUpdateRequest {
1937    fn id(&self) -> CrdtFieldId {
1938        CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
1939    }
1940}
1941
1942impl NativeCrdtFieldCompactionRequest {
1943    fn id(&self) -> CrdtFieldId {
1944        CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
1945    }
1946}
1947
1948impl NativeEventSubscription {
1949    pub fn recv(&self) -> Option<NativeEvent> {
1950        self.next_event()
1951    }
1952
1953    pub fn recv_timeout(&self, timeout: Duration) -> Option<NativeEvent> {
1954        self.next_event_timeout(timeout)
1955    }
1956
1957    pub fn next_event(&self) -> Option<NativeEvent> {
1958        self.queue.next_event()
1959    }
1960
1961    pub fn next_event_timeout(&self, timeout: Duration) -> Option<NativeEvent> {
1962        self.queue.next_event_timeout(timeout)
1963    }
1964
1965    pub fn next_event_json(&self) -> Option<Result<String>> {
1966        self.next_event()
1967            .map(|event| serde_json::to_string(&event).map_err(Into::into))
1968    }
1969
1970    pub fn next_event_json_timeout(&self, timeout: Duration) -> Option<Result<String>> {
1971        self.next_event_timeout(timeout)
1972            .map(|event| serde_json::to_string(&event).map_err(Into::into))
1973    }
1974
1975    pub fn into_json_iter(self) -> NativeEventJsonIterator {
1976        NativeEventJsonIterator { subscription: self }
1977    }
1978
1979    pub fn close(&self) {
1980        if let Ok(mut subscribers) = self.hub.subscribers.lock() {
1981            subscribers.remove(&self.subscriber_id);
1982        }
1983        self.queue.close();
1984    }
1985}
1986
1987impl Iterator for NativeEventSubscription {
1988    type Item = NativeEvent;
1989
1990    fn next(&mut self) -> Option<Self::Item> {
1991        self.next_event()
1992    }
1993}
1994
1995impl Iterator for NativeEventJsonIterator {
1996    type Item = Result<String>;
1997
1998    fn next(&mut self) -> Option<Self::Item> {
1999        self.subscription.next_event_json()
2000    }
2001}
2002
2003impl Drop for NativeEventSubscription {
2004    fn drop(&mut self) {
2005        self.close();
2006    }
2007}
2008
2009impl NativeEventQueue {
2010    fn new(capacity: usize) -> Self {
2011        Self {
2012            capacity: capacity.max(1),
2013            state: Mutex::new(NativeEventQueueState {
2014                events: VecDeque::new(),
2015                closed: false,
2016            }),
2017            ready: Condvar::new(),
2018        }
2019    }
2020
2021    fn push(&self, event: NativeEvent, overflow_event: impl FnOnce(usize) -> NativeEvent) {
2022        let Ok(mut state) = self.state.lock() else {
2023            return;
2024        };
2025        if state.closed {
2026            return;
2027        }
2028        if state.events.len() >= self.capacity {
2029            let dropped_count = state.events.len().saturating_add(1);
2030            state.events.clear();
2031            state.events.push_back(overflow_event(dropped_count));
2032            state.closed = true;
2033        } else {
2034            state.events.push_back(event);
2035        }
2036        self.ready.notify_one();
2037    }
2038
2039    fn next_event(&self) -> Option<NativeEvent> {
2040        let mut state = self.state.lock().ok()?;
2041        loop {
2042            if let Some(event) = state.events.pop_front() {
2043                return Some(event);
2044            }
2045            if state.closed {
2046                return None;
2047            }
2048            state = self.ready.wait(state).ok()?;
2049        }
2050    }
2051
2052    fn next_event_timeout(&self, timeout: Duration) -> Option<NativeEvent> {
2053        let deadline = std::time::Instant::now().checked_add(timeout)?;
2054        let mut state = self.state.lock().ok()?;
2055        loop {
2056            if let Some(event) = state.events.pop_front() {
2057                return Some(event);
2058            }
2059            if state.closed {
2060                return None;
2061            }
2062            let now = std::time::Instant::now();
2063            if now >= deadline {
2064                return None;
2065            }
2066            let wait = deadline.saturating_duration_since(now);
2067            let (next_state, timeout) = self.ready.wait_timeout(state, wait).ok()?;
2068            state = next_state;
2069            if timeout.timed_out() && state.events.is_empty() {
2070                return None;
2071            }
2072        }
2073    }
2074
2075    fn close(&self) {
2076        if let Ok(mut state) = self.state.lock() {
2077            state.closed = true;
2078            state.events.clear();
2079            self.ready.notify_all();
2080        }
2081    }
2082
2083    fn is_closed(&self) -> bool {
2084        self.state.lock().map(|state| state.closed).unwrap_or(true)
2085    }
2086}
2087
2088fn start_worker_event_pump(
2089    events: NativeEventHub,
2090    worker_events: SyncWorkerEventSubscription,
2091) -> JoinHandle<()> {
2092    thread::spawn(move || {
2093        while let Some(event) = worker_events.next_event() {
2094            events.publish_worker_event(event);
2095        }
2096    })
2097}
2098
2099impl NativeEventHub {
2100    fn subscribe(&self, capacity: usize) -> NativeEventSubscription {
2101        let queue = Arc::new(NativeEventQueue::new(capacity));
2102        let subscriber_id = self.next_subscriber_id();
2103        if let Ok(mut subscribers) = self.subscribers.lock() {
2104            subscribers.insert(subscriber_id, queue.clone());
2105        }
2106        NativeEventSubscription {
2107            hub: self.clone(),
2108            subscriber_id,
2109            queue,
2110        }
2111    }
2112
2113    fn publish_event(&self, event: NativeEvent) {
2114        let event = self.stamp_event(event);
2115        self.record_recent_event(event.clone());
2116        let Ok(mut subscribers) = self.subscribers.lock() else {
2117            return;
2118        };
2119
2120        subscribers.retain(|_, queue| {
2121            queue.push(event.clone(), |dropped_count| {
2122                self.stamp_event(events_overflowed_event(dropped_count))
2123            });
2124            !queue.is_closed()
2125        });
2126    }
2127
2128    fn publish_worker_event(&self, event: SyncWorkerEvent) {
2129        for event in self.events_from_worker_event(event) {
2130            self.publish_event(event);
2131        }
2132    }
2133
2134    fn recent_events(&self) -> Vec<NativeEvent> {
2135        self.recent_events
2136            .lock()
2137            .map(|events| events.iter().cloned().collect())
2138            .unwrap_or_default()
2139    }
2140
2141    fn subscriber_count(&self) -> usize {
2142        self.subscribers
2143            .lock()
2144            .map(|subscribers| subscribers.len())
2145            .unwrap_or_default()
2146    }
2147
2148    fn record_recent_event(&self, event: NativeEvent) {
2149        let Ok(mut events) = self.recent_events.lock() else {
2150            return;
2151        };
2152        events.push_back(native_event_for_recent_diagnostics(event));
2153        while events.len() > DEFAULT_NATIVE_RECENT_EVENT_LIMIT {
2154            events.pop_front();
2155        }
2156    }
2157
2158    fn push_rows_changed_events<'a>(&self, tables: impl IntoIterator<Item = &'a str>) {
2159        self.push_rows_changed_events_with_details(tables, Vec::new(), None);
2160    }
2161
2162    fn push_diagnostic(&self, diagnostic: NativeDiagnostic) {
2163        self.publish_event(diagnostic_event(diagnostic));
2164    }
2165
2166    fn push_rows_changed_events_with_details<'a>(
2167        &self,
2168        tables: impl IntoIterator<Item = &'a str>,
2169        changed_rows: Vec<SyncChangedRow>,
2170        source: Option<&str>,
2171    ) {
2172        let tables = unique_event_tables(tables);
2173        if tables.is_empty() {
2174            return;
2175        }
2176
2177        self.publish_event(rows_changed_event_with_details(
2178            tables.iter().map(String::as_str),
2179            changed_rows.clone(),
2180            source,
2181        ));
2182        let queries = self.changed_query_ids(&tables, &changed_rows);
2183        if !queries.is_empty() {
2184            self.publish_event(queries_changed_event_with_details(
2185                &tables,
2186                queries,
2187                changed_rows,
2188                source,
2189            ));
2190        }
2191    }
2192
2193    fn changed_query_ids(&self, tables: &[String], changed_rows: &[SyncChangedRow]) -> Vec<String> {
2194        let changed = tables.iter().map(String::as_str).collect::<BTreeSet<_>>();
2195        let Ok(observers) = self.query_observers.lock() else {
2196            return Vec::new();
2197        };
2198
2199        observers
2200            .values()
2201            .filter(|query| observed_query_should_notify(query, &changed, changed_rows))
2202            .map(|query| query.id.clone())
2203            .collect()
2204    }
2205
2206    fn stamp_event(&self, mut event: NativeEvent) -> NativeEvent {
2207        if let Ok(mut seq) = self.event_seq.lock() {
2208            *seq = seq.saturating_add(1);
2209            event.event_seq = *seq;
2210        }
2211        event
2212    }
2213
2214    fn next_subscriber_id(&self) -> u64 {
2215        if let Ok(mut seq) = self.subscriber_seq.lock() {
2216            *seq = seq.saturating_add(1);
2217            *seq
2218        } else {
2219            0
2220        }
2221    }
2222
2223    fn events_from_worker_event(&self, event: SyncWorkerEvent) -> Vec<NativeEvent> {
2224        match event {
2225            SyncWorkerEvent::SyncStarted { command_id } => {
2226                vec![sync_started_event(command_id)]
2227            }
2228            SyncWorkerEvent::SyncCompleted {
2229                command_id,
2230                report,
2231                bootstrap,
2232                outbox_count,
2233                conflict_count,
2234                duration_ms,
2235            } => {
2236                let mut events = vec![sync_completed_event(
2237                    report.clone(),
2238                    bootstrap,
2239                    command_id.clone(),
2240                    outbox_count,
2241                    conflict_count,
2242                    duration_ms,
2243                )];
2244                if !report.changed_tables.is_empty() {
2245                    events.push(rows_changed_event_with_details(
2246                        report.changed_tables.iter().map(String::as_str),
2247                        report.changed_rows.clone(),
2248                        Some("remotePull"),
2249                    ));
2250                    let queries =
2251                        self.changed_query_ids(&report.changed_tables, &report.changed_rows);
2252                    if !queries.is_empty() {
2253                        events.push(queries_changed_event_with_details(
2254                            &report.changed_tables,
2255                            queries,
2256                            report.changed_rows.clone(),
2257                            Some("remotePull"),
2258                        ));
2259                    }
2260                    events.extend(crdt_field_changed_events_from_changed_rows(
2261                        &report.changed_rows,
2262                        "remotePull",
2263                        command_id,
2264                        Some(duration_ms),
2265                    ));
2266                }
2267                if report.conflicts_changed {
2268                    events.push(conflicts_changed_event());
2269                }
2270                events
2271            }
2272            SyncWorkerEvent::SyncFailed {
2273                command_id,
2274                error,
2275                retry_scheduled,
2276                duration_ms,
2277            } => vec![sync_failed_event(
2278                &error,
2279                command_id,
2280                retry_scheduled,
2281                duration_ms,
2282            )],
2283            SyncWorkerEvent::LocalWriteCommitted {
2284                command_id,
2285                client_commit_id,
2286                changed_tables,
2287                changed_rows,
2288                outbox_count,
2289                duration_ms,
2290            } => {
2291                let mut events = vec![local_write_committed_event(
2292                    command_id,
2293                    client_commit_id,
2294                    changed_tables.clone(),
2295                    changed_rows.clone(),
2296                    outbox_count,
2297                    duration_ms,
2298                )];
2299                if !changed_tables.is_empty() {
2300                    events.push(rows_changed_event_with_details(
2301                        changed_tables.iter().map(String::as_str),
2302                        changed_rows.clone(),
2303                        Some("localWrite"),
2304                    ));
2305                    let queries = self.changed_query_ids(&changed_tables, &changed_rows);
2306                    if !queries.is_empty() {
2307                        events.push(queries_changed_event_with_details(
2308                            &changed_tables,
2309                            queries,
2310                            changed_rows,
2311                            Some("localWrite"),
2312                        ));
2313                    }
2314                }
2315                events
2316            }
2317            SyncWorkerEvent::LocalWriteFailed {
2318                command_id,
2319                error,
2320                payload_json,
2321                duration_ms,
2322            } => vec![local_write_failed_event(
2323                &error,
2324                command_id,
2325                payload_json,
2326                duration_ms,
2327            )],
2328            SyncWorkerEvent::CrdtFieldChanged {
2329                command_id,
2330                client_commit_id,
2331                table,
2332                row_id,
2333                field,
2334                changed_tables,
2335                payload_json,
2336                duration_ms,
2337            } => vec![crdt_field_changed_event_from_parts(
2338                CrdtFieldEventParts {
2339                    table,
2340                    row_id,
2341                    field,
2342                    changed_tables,
2343                    client_commit_id: Some(client_commit_id),
2344                    checkpoint_created: None,
2345                    extra_payload_json: payload_json,
2346                },
2347                Some(command_id),
2348                Some(duration_ms),
2349            )],
2350            SyncWorkerEvent::CrdtFieldCompacted {
2351                command_id,
2352                client_commit_id,
2353                table,
2354                row_id,
2355                field,
2356                changed_tables,
2357                checkpoint_created,
2358                payload_json,
2359                duration_ms,
2360            } => vec![crdt_field_compacted_event_from_parts(
2361                CrdtFieldEventParts {
2362                    table,
2363                    row_id,
2364                    field,
2365                    changed_tables,
2366                    client_commit_id,
2367                    checkpoint_created: Some(checkpoint_created),
2368                    extra_payload_json: payload_json,
2369                },
2370                Some(command_id),
2371                Some(duration_ms),
2372            )],
2373            SyncWorkerEvent::ConflictResolutionCompleted {
2374                command_id,
2375                retry_client_commit_id,
2376                duration_ms,
2377            } => vec![
2378                conflict_resolution_completed_event(
2379                    command_id,
2380                    retry_client_commit_id,
2381                    duration_ms,
2382                ),
2383                conflicts_changed_event(),
2384            ],
2385            SyncWorkerEvent::ConflictResolutionFailed {
2386                command_id,
2387                error,
2388                duration_ms,
2389            } => vec![conflict_resolution_failed_event(
2390                &error,
2391                command_id,
2392                duration_ms,
2393            )],
2394            SyncWorkerEvent::SnapshotReady {
2395                command_id,
2396                payload_json,
2397                duration_ms,
2398            } => vec![snapshot_ready_event(command_id, payload_json, duration_ms)],
2399            SyncWorkerEvent::WorkerCommandCompleted {
2400                command_id,
2401                operation,
2402                payload_json,
2403                duration_ms,
2404            } => vec![worker_command_completed_event(
2405                command_id,
2406                operation,
2407                payload_json,
2408                duration_ms,
2409            )],
2410            SyncWorkerEvent::WorkerCommandFailed {
2411                command_id,
2412                operation,
2413                error,
2414                duration_ms,
2415            } => vec![worker_command_failed_event(
2416                &error,
2417                command_id,
2418                operation,
2419                duration_ms,
2420            )],
2421            SyncWorkerEvent::BlobUploadsChanged { stats_json } => {
2422                vec![blob_uploads_changed_event(stats_json)]
2423            }
2424            SyncWorkerEvent::EventsOverflowed { dropped_count } => {
2425                vec![events_overflowed_event(dropped_count)]
2426            }
2427        }
2428    }
2429}
2430
2431#[derive(Clone, Default)]
2432pub struct NativeWorkerEventConverter {
2433    hub: NativeEventHub,
2434}
2435
2436impl NativeWorkerEventConverter {
2437    pub fn new() -> Self {
2438        Self::default()
2439    }
2440
2441    pub fn set_observed_queries(&self, observed_queries: &[NativeObservedQuery]) {
2442        if let Ok(mut observers) = self.hub.query_observers.lock() {
2443            observers.clear();
2444            for query in observed_queries {
2445                observers.insert(query.id.clone(), query.clone());
2446            }
2447        }
2448    }
2449
2450    pub fn convert(&self, event: SyncWorkerEvent) -> Vec<NativeEvent> {
2451        self.hub
2452            .events_from_worker_event(event)
2453            .into_iter()
2454            .map(|event| self.hub.stamp_event(event))
2455            .collect()
2456    }
2457
2458    pub fn convert_json(&self, event: SyncWorkerEvent) -> Result<Vec<String>> {
2459        self.convert(event)
2460            .into_iter()
2461            .map(|event| serde_json::to_string(&event).map_err(Into::into))
2462            .collect()
2463    }
2464}
2465
2466pub fn native_events_from_worker_event(event: SyncWorkerEvent) -> Vec<NativeEvent> {
2467    NativeWorkerEventConverter::new().convert(event)
2468}
2469
2470pub fn native_events_from_worker_event_with_observed_queries(
2471    event: SyncWorkerEvent,
2472    observed_queries: &[NativeObservedQuery],
2473) -> Vec<NativeEvent> {
2474    let converter = NativeWorkerEventConverter::new();
2475    converter.set_observed_queries(observed_queries);
2476    converter.convert(event)
2477}
2478
2479pub fn native_event_json_from_worker_event(event: SyncWorkerEvent) -> Result<Vec<String>> {
2480    NativeWorkerEventConverter::new().convert_json(event)
2481}
2482
2483impl NativeObservedQueryRegistration {
2484    fn into_observed_query(self, app_schema: AppSchema) -> Result<NativeObservedQuery> {
2485        let id = match self.id {
2486            Some(id) => {
2487                let id = id.trim();
2488                if id.is_empty() {
2489                    return Err(SyncularError::config("native observed query id is empty"));
2490                }
2491                id.to_string()
2492            }
2493            None => format!("native-query-{}", Uuid::new_v4()),
2494        };
2495
2496        let tables = normalize_observed_tables(self.tables, app_schema)?;
2497        let dependency_hints =
2498            normalize_observed_query_dependency_hints(self.dependency_hints, &tables, app_schema)?;
2499
2500        Ok(NativeObservedQuery {
2501            id,
2502            tables,
2503            dependency_hints,
2504            label: self.label,
2505        })
2506    }
2507}
2508
2509impl NativeErrorInfo {
2510    pub fn from_error(error: &SyncularError) -> Self {
2511        let classification = error.classification();
2512        Self {
2513            kind: error.kind(),
2514            code: classification.code,
2515            category: classification.category,
2516            retryable: classification.retryable,
2517            recommended_action: classification.recommended_action,
2518            message: error.message_text(),
2519            debug: Some(error.debug_text()),
2520        }
2521    }
2522}
2523
2524impl From<NativeClientConfig> for SyncularClientConfig {
2525    fn from(config: NativeClientConfig) -> Self {
2526        Self {
2527            db_path: config.db_path,
2528            base_url: config.base_url,
2529            client_id: config.client_id,
2530            actor_id: config.actor_id,
2531            project_id: config.project_id,
2532        }
2533    }
2534}
2535
2536impl NativeClientConfig {
2537    fn app_schema(&self) -> Result<AppSchema> {
2538        match self.app_schema_json.as_deref() {
2539            Some(schema_json) => app_schema_from_json(schema_json),
2540            None => Ok(default_app_schema()),
2541        }
2542    }
2543}
2544
2545fn native_diagnostic_subscription_snapshots(
2546    subscriptions: &[SubscriptionSpec],
2547    bootstrap: &BootstrapStatus,
2548) -> Vec<NativeDiagnosticSubscriptionSnapshot> {
2549    subscriptions
2550        .iter()
2551        .map(|subscription| {
2552            let status = bootstrap
2553                .subscriptions
2554                .iter()
2555                .find(|status| status.id == subscription.id);
2556            NativeDiagnosticSubscriptionSnapshot {
2557                id: subscription.id.clone(),
2558                table: subscription.table.clone(),
2559                scope_keys: sorted_json_map_keys(&subscription.scopes),
2560                scope_value_count: count_redacted_values(&subscription.scopes),
2561                params_keys: sorted_json_map_keys(&subscription.params),
2562                params_value_count: count_redacted_values(&subscription.params),
2563                status: status.and_then(|status| status.status.clone()),
2564                ready: status.is_some_and(|status| status.ready),
2565                phase: status
2566                    .map(|status| status.phase.clone())
2567                    .unwrap_or_else(|| "pending".to_string()),
2568                progress_percent: status.map_or(0, |status| status.progress_percent),
2569                cursor: status.and_then(|status| status.cursor),
2570                bootstrap_phase: subscription.bootstrap_phase,
2571                bootstrap_state: status.and_then(|status| status.bootstrap_state.clone()),
2572            }
2573        })
2574        .collect()
2575}
2576
2577fn sorted_json_map_keys(map: &Map<String, Value>) -> Vec<String> {
2578    let mut keys = map.keys().cloned().collect::<Vec<_>>();
2579    keys.sort();
2580    keys
2581}
2582
2583fn count_redacted_values(map: &Map<String, Value>) -> usize {
2584    map.values()
2585        .map(|value| match value {
2586            Value::Array(values) => values.len(),
2587            Value::Null => 0,
2588            _ => 1,
2589        })
2590        .sum()
2591}
2592
2593fn native_outbox_stats(outbox: &[OutboxSummary]) -> NativeOutboxStats {
2594    let mut stats = NativeOutboxStats {
2595        total: outbox.len(),
2596        ..NativeOutboxStats::default()
2597    };
2598    for item in outbox {
2599        match item.status.as_str() {
2600            "pending" => stats.pending += 1,
2601            "sending" => stats.sending += 1,
2602            "failed" => stats.failed += 1,
2603            "acked" => stats.acked += 1,
2604            _ => {}
2605        }
2606    }
2607    stats
2608}
2609
2610fn native_conflict_stats(conflicts: &[ConflictSummary]) -> NativeConflictStats {
2611    let mut stats = NativeConflictStats {
2612        total: conflicts.len(),
2613        ..NativeConflictStats::default()
2614    };
2615    for conflict in conflicts {
2616        if conflict.resolved_at.is_some() {
2617            stats.resolved += 1;
2618        } else {
2619            stats.unresolved += 1;
2620        }
2621    }
2622    stats
2623}
2624
2625fn native_sync_timing_snapshots(events: &[NativeEvent]) -> Vec<NativeSyncTimingSnapshot> {
2626    events
2627        .iter()
2628        .filter_map(|event| {
2629            let total_ms = event.duration_ms?;
2630            let (kind, success) = match event.kind {
2631                NativeEventKind::SyncCompleted => ("syncCompleted", true),
2632                NativeEventKind::SyncFailed => ("syncFailed", false),
2633                NativeEventKind::AuthExpired => ("authExpired", false),
2634                _ => return None,
2635            };
2636            Some(NativeSyncTimingSnapshot {
2637                event_seq: event.event_seq,
2638                kind: kind.to_string(),
2639                command_id: event.command_id.clone(),
2640                total_ms,
2641                success,
2642                retry_scheduled: event.retry_scheduled,
2643                outbox_count: event.outbox_count,
2644                conflict_count: event.conflict_count,
2645            })
2646        })
2647        .collect()
2648}
2649
2650fn crdt_field_descriptor(field: &CrdtField) -> Value {
2651    json!({
2652        "table": field.table(),
2653        "rowId": field.row_id(),
2654        "field": field.field(),
2655        "stateColumn": field.state_column(),
2656        "containerKey": field.container_key(),
2657        "rowIdField": field.row_id_field(),
2658        "syncMode": field.sync_mode(),
2659        "kind": field.field_metadata().kind,
2660    })
2661}
2662
2663fn crdt_field_write_receipt_json(receipt: CrdtFieldWriteReceipt) -> Result<String> {
2664    Ok(serde_json::to_string(&receipt)?)
2665}
2666
2667fn crdt_field_materialization_json(materialization: CrdtFieldMaterialization) -> Result<String> {
2668    Ok(serde_json::to_string(&materialization)?)
2669}
2670
2671fn crdt_field_compaction_receipt_json(receipt: CrdtFieldCompactionReceipt) -> Result<String> {
2672    Ok(serde_json::to_string(&receipt)?)
2673}
2674
2675fn crdt_field_write_tables(field: &CrdtField) -> Vec<&'static str> {
2676    match field.sync_mode() {
2677        CrdtFieldSyncMode::ServerMerge => vec![field.table()],
2678        CrdtFieldSyncMode::EncryptedUpdateLog => vec![field.table(), CRDT_UPDATES_TABLE],
2679    }
2680}
2681
2682fn crdt_field_compaction_tables(field: &CrdtField) -> Vec<&'static str> {
2683    match field.sync_mode() {
2684        CrdtFieldSyncMode::ServerMerge => vec![field.table()],
2685        CrdtFieldSyncMode::EncryptedUpdateLog => vec![CRDT_CHECKPOINTS_TABLE],
2686    }
2687}
2688
2689fn crdt_field_changed_row(field: &CrdtField, client_commit_id: Option<String>) -> SyncChangedRow {
2690    let crdt_field_changes = vec![sync_changed_crdt_field_from_metadata(
2691        field.field_metadata(),
2692    )];
2693    SyncChangedRow {
2694        table: field.table().to_string(),
2695        row_id: Some(field.row_id().to_string()),
2696        operation: "update".to_string(),
2697        changed_fields: vec![field.field().to_string(), field.state_column().to_string()],
2698        crdt_fields: vec![field.state_column().to_string()],
2699        crdt_field_changes,
2700        commit_id: client_commit_id,
2701        commit_seq: None,
2702        subscription_id: None,
2703        server_version: None,
2704    }
2705}
2706
2707fn crdt_field_compacted_row(field: &CrdtField, client_commit_id: Option<String>) -> SyncChangedRow {
2708    let crdt_field_changes = vec![sync_changed_crdt_field_from_metadata(
2709        field.field_metadata(),
2710    )];
2711    SyncChangedRow {
2712        table: field.table().to_string(),
2713        row_id: Some(field.row_id().to_string()),
2714        operation: "compact".to_string(),
2715        changed_fields: vec![field.state_column().to_string()],
2716        crdt_fields: vec![field.state_column().to_string()],
2717        crdt_field_changes,
2718        commit_id: client_commit_id,
2719        commit_seq: None,
2720        subscription_id: None,
2721        server_version: None,
2722    }
2723}
2724
2725#[derive(Debug)]
2726struct CrdtFieldEventParts {
2727    table: String,
2728    row_id: String,
2729    field: String,
2730    changed_tables: Vec<String>,
2731    client_commit_id: Option<String>,
2732    checkpoint_created: Option<bool>,
2733    extra_payload_json: Option<Value>,
2734}
2735
2736fn crdt_field_changed_event(
2737    field: &CrdtField,
2738    client_commit_id: Option<String>,
2739    changed_tables: Vec<String>,
2740    command_id: Option<String>,
2741    duration_ms: Option<u64>,
2742    extra_payload_json: Option<Value>,
2743) -> NativeEvent {
2744    crdt_field_changed_event_from_parts(
2745        CrdtFieldEventParts {
2746            table: field.table().to_string(),
2747            row_id: field.row_id().to_string(),
2748            field: field.field().to_string(),
2749            changed_tables,
2750            client_commit_id,
2751            checkpoint_created: None,
2752            extra_payload_json,
2753        },
2754        command_id,
2755        duration_ms,
2756    )
2757}
2758
2759fn crdt_field_changed_event_from_parts(
2760    parts: CrdtFieldEventParts,
2761    command_id: Option<String>,
2762    duration_ms: Option<u64>,
2763) -> NativeEvent {
2764    let mut event = native_event(
2765        NativeEventKind::CrdtFieldChanged,
2766        parts.changed_tables.clone(),
2767        Some(native_diagnostic(
2768            "info",
2769            "storage",
2770            "crdt.field_changed",
2771            "Native Syncular CRDT field changed",
2772            [
2773                ("commandId", json!(command_id.clone())),
2774                ("clientCommitId", json!(parts.client_commit_id.clone())),
2775                ("table", json!(parts.table.clone())),
2776                ("rowId", json!(parts.row_id.clone())),
2777                ("field", json!(parts.field.clone())),
2778                ("tables", json!(parts.changed_tables.clone())),
2779                ("durationMs", json!(duration_ms)),
2780            ],
2781        )),
2782    );
2783    event.command_id = command_id;
2784    event.client_commit_id = parts.client_commit_id.clone();
2785    event.duration_ms = duration_ms;
2786    event.payload_json = Some(crdt_field_payload(parts));
2787    event
2788}
2789
2790fn crdt_field_compacted_event(
2791    field: &CrdtField,
2792    client_commit_id: Option<String>,
2793    changed_tables: Vec<String>,
2794    command_id: Option<String>,
2795    duration_ms: Option<u64>,
2796    checkpoint_created: bool,
2797    extra_payload_json: Option<Value>,
2798) -> NativeEvent {
2799    crdt_field_compacted_event_from_parts(
2800        CrdtFieldEventParts {
2801            table: field.table().to_string(),
2802            row_id: field.row_id().to_string(),
2803            field: field.field().to_string(),
2804            changed_tables,
2805            client_commit_id,
2806            checkpoint_created: Some(checkpoint_created),
2807            extra_payload_json,
2808        },
2809        command_id,
2810        duration_ms,
2811    )
2812}
2813
2814fn crdt_field_compacted_event_from_parts(
2815    parts: CrdtFieldEventParts,
2816    command_id: Option<String>,
2817    duration_ms: Option<u64>,
2818) -> NativeEvent {
2819    let mut event = native_event(
2820        NativeEventKind::CrdtFieldCompacted,
2821        parts.changed_tables.clone(),
2822        Some(native_diagnostic(
2823            "info",
2824            "storage",
2825            "crdt.field_compacted",
2826            "Native Syncular CRDT field compacted",
2827            [
2828                ("commandId", json!(command_id.clone())),
2829                ("clientCommitId", json!(parts.client_commit_id.clone())),
2830                ("table", json!(parts.table.clone())),
2831                ("rowId", json!(parts.row_id.clone())),
2832                ("field", json!(parts.field.clone())),
2833                (
2834                    "checkpointCreated",
2835                    json!(parts.checkpoint_created.unwrap_or(true)),
2836                ),
2837                ("tables", json!(parts.changed_tables.clone())),
2838                ("durationMs", json!(duration_ms)),
2839            ],
2840        )),
2841    );
2842    event.command_id = command_id;
2843    event.client_commit_id = parts.client_commit_id.clone();
2844    event.duration_ms = duration_ms;
2845    event.payload_json = Some(crdt_field_payload(parts));
2846    event
2847}
2848
2849fn crdt_field_event_payload(
2850    client: &mut SyncularClient<DieselSqliteStore, HttpSyncTransport>,
2851    field: &CrdtField,
2852) -> Option<Value> {
2853    let mut payload = crdt_field_base_event_payload(field);
2854    match client.materialize_crdt_field(field) {
2855        Ok(materialization) => {
2856            payload.insert("materializationAvailable".to_string(), json!(true));
2857            payload.insert(
2858                "hasState".to_string(),
2859                json!(materialization.state_base64.is_some()),
2860            );
2861            payload.insert(
2862                "stateVectorBase64".to_string(),
2863                json!(materialization.state_vector_base64),
2864            );
2865        }
2866        Err(error) => {
2867            payload.insert("materializationAvailable".to_string(), json!(false));
2868            payload.insert(
2869                "materializationError".to_string(),
2870                json!(error.message_text()),
2871            );
2872        }
2873    }
2874    Some(Value::Object(payload))
2875}
2876
2877fn crdt_field_compaction_event_payload(
2878    client: &mut SyncularClient<DieselSqliteStore, HttpSyncTransport>,
2879    field: &CrdtField,
2880    receipt: &CrdtFieldCompactionReceipt,
2881    checkpoint_created: bool,
2882    min_uncheckpointed_updates: i64,
2883) -> Option<Value> {
2884    let mut payload = crdt_field_event_payload(client, field)
2885        .and_then(|value| value.as_object().cloned())
2886        .unwrap_or_else(|| crdt_field_base_event_payload(field));
2887    payload.insert("checkpointCreated".to_string(), json!(checkpoint_created));
2888    payload.insert(
2889        "minUncheckpointedUpdates".to_string(),
2890        json!(min_uncheckpointed_updates),
2891    );
2892    payload.insert("before".to_string(), json!(&receipt.before));
2893    payload.insert("after".to_string(), json!(&receipt.after));
2894    payload.insert(
2895        "encryptedStreamBefore".to_string(),
2896        json!(&receipt.encrypted_stream_before),
2897    );
2898    payload.insert(
2899        "encryptedStreamAfter".to_string(),
2900        json!(&receipt.encrypted_stream_after),
2901    );
2902    Some(Value::Object(payload))
2903}
2904
2905fn crdt_field_base_event_payload(field: &CrdtField) -> serde_json::Map<String, Value> {
2906    let mut payload = serde_json::Map::new();
2907    payload.insert("syncMode".to_string(), json!(field.sync_mode()));
2908    payload.insert("kind".to_string(), json!(field.field_metadata().kind));
2909    payload.insert("stateColumn".to_string(), json!(field.state_column()));
2910    payload.insert("containerKey".to_string(), json!(field.container_key()));
2911    payload.insert("rowIdField".to_string(), json!(field.row_id_field()));
2912    payload
2913}
2914
2915fn crdt_field_payload(parts: CrdtFieldEventParts) -> Value {
2916    let mut payload = json!({
2917        "table": parts.table,
2918        "rowId": parts.row_id,
2919        "field": parts.field,
2920        "changedTables": parts.changed_tables,
2921    });
2922    if let Value::Object(ref mut object) = payload {
2923        if let Some(client_commit_id) = parts.client_commit_id {
2924            object.insert("clientCommitId".to_string(), json!(client_commit_id));
2925        }
2926        if let Some(checkpoint_created) = parts.checkpoint_created {
2927            object.insert("checkpointCreated".to_string(), json!(checkpoint_created));
2928        }
2929        if let Some(Value::Object(extra)) = parts.extra_payload_json {
2930            for (key, value) in extra {
2931                object.entry(key).or_insert(value);
2932            }
2933        }
2934    }
2935    payload
2936}
2937
2938fn crdt_field_changed_events_from_changed_rows(
2939    changed_rows: &[SyncChangedRow],
2940    source: &str,
2941    command_id: Option<String>,
2942    duration_ms: Option<u64>,
2943) -> Vec<NativeEvent> {
2944    let mut events = Vec::new();
2945    for row in changed_rows {
2946        let Some(row_id) = row.row_id.as_ref() else {
2947            continue;
2948        };
2949        for field in &row.crdt_field_changes {
2950            events.push(crdt_field_changed_event_from_parts(
2951                CrdtFieldEventParts {
2952                    table: row.table.clone(),
2953                    row_id: row_id.clone(),
2954                    field: field.field.clone(),
2955                    changed_tables: vec![row.table.clone()],
2956                    client_commit_id: None,
2957                    checkpoint_created: None,
2958                    extra_payload_json: Some(json!({
2959                        "source": source,
2960                        "operation": row.operation,
2961                        "stateColumn": field.state_column,
2962                        "containerKey": field.container_key,
2963                        "rowIdField": field.row_id_field,
2964                        "kind": field.kind,
2965                        "syncMode": field.sync_mode,
2966                        "commitId": row.commit_id,
2967                        "commitSeq": row.commit_seq,
2968                        "subscriptionId": row.subscription_id,
2969                        "serverVersion": row.server_version,
2970                        "changedFields": row.changed_fields,
2971                        "crdtFields": row.crdt_fields,
2972                    })),
2973                },
2974                command_id.clone(),
2975                duration_ms,
2976            ));
2977        }
2978    }
2979    events
2980}
2981
2982fn native_event_for_recent_diagnostics(mut event: NativeEvent) -> NativeEvent {
2983    if let Some(payload_json) = event.payload_json.as_ref() {
2984        if let Ok(bytes) = serde_json::to_vec(payload_json) {
2985            if bytes.len() > MAX_NATIVE_DIAGNOSTIC_EVENT_PAYLOAD_JSON_BYTES {
2986                event.payload_json = Some(json!({
2987                    "truncated": true,
2988                    "reason": "diagnosticPayloadLimit",
2989                    "originalBytes": bytes.len(),
2990                    "maxBytes": MAX_NATIVE_DIAGNOSTIC_EVENT_PAYLOAD_JSON_BYTES,
2991                    "limit": "maxNativeDiagnosticEventPayloadJsonBytes"
2992                }));
2993            }
2994        }
2995    }
2996    event
2997}
2998
2999fn rows_changed_event_with_details<'a>(
3000    tables: impl IntoIterator<Item = &'a str>,
3001    changed_rows: Vec<SyncChangedRow>,
3002    source: Option<&str>,
3003) -> NativeEvent {
3004    let tables = tables.into_iter().map(str::to_string).collect::<Vec<_>>();
3005    let mut event = native_event(
3006        NativeEventKind::RowsChanged,
3007        tables.clone(),
3008        Some(native_diagnostic(
3009            "info",
3010            "storage",
3011            "storage.rows_changed",
3012            "Native Syncular rows changed",
3013            [
3014                ("tables", json!(tables)),
3015                ("source", json!(source)),
3016                ("changedRows", json!(changed_rows.clone())),
3017            ],
3018        )),
3019    );
3020    event.changed_rows = changed_rows.clone();
3021    event.payload_json = Some(json!({
3022        "type": "rowsChanged",
3023        "source": source,
3024        "changedRows": changed_rows,
3025    }));
3026    event
3027}
3028
3029fn apply_native_presence_event(
3030    presence_by_scope: &Arc<Mutex<BTreeMap<String, Vec<NativePresenceEntry>>>>,
3031    events: &NativeEventHub,
3032    event: RealtimePresenceEvent,
3033) {
3034    let scope_key = event.scope_key.clone();
3035    let Ok(mut state) = presence_by_scope.lock() else {
3036        return;
3037    };
3038    let current = state.get(&scope_key).cloned().unwrap_or_default();
3039    let next = match event.action.as_str() {
3040        "snapshot" => event
3041            .entries
3042            .into_iter()
3043            .map(native_presence_entry_from_realtime)
3044            .collect::<Vec<_>>(),
3045        "leave" => {
3046            let Some(client_id) = event.client_id.as_deref() else {
3047                return;
3048            };
3049            current
3050                .into_iter()
3051                .filter(|entry| entry.client_id != client_id)
3052                .collect()
3053        }
3054        "update" => {
3055            let Some(client_id) = event.client_id.as_deref() else {
3056                return;
3057            };
3058            if !current.iter().any(|entry| entry.client_id == client_id) {
3059                return;
3060            }
3061            current
3062                .into_iter()
3063                .map(|entry| {
3064                    if entry.client_id == client_id {
3065                        NativePresenceEntry {
3066                            metadata: event.metadata.clone(),
3067                            ..entry
3068                        }
3069                    } else {
3070                        entry
3071                    }
3072                })
3073                .collect()
3074        }
3075        "join" => {
3076            let (Some(client_id), Some(actor_id)) = (event.client_id, event.actor_id) else {
3077                return;
3078            };
3079            let joined_at = current
3080                .iter()
3081                .find(|entry| entry.client_id == client_id)
3082                .map(|entry| entry.joined_at)
3083                .unwrap_or_else(now_ms);
3084            let mut next = current
3085                .into_iter()
3086                .filter(|entry| entry.client_id != client_id)
3087                .collect::<Vec<_>>();
3088            next.push(NativePresenceEntry {
3089                client_id,
3090                actor_id,
3091                joined_at,
3092                metadata: event.metadata,
3093            });
3094            next
3095        }
3096        _ => return,
3097    };
3098    if next.is_empty() {
3099        state.remove(&scope_key);
3100    } else {
3101        state.insert(scope_key.clone(), next.clone());
3102    }
3103    drop(state);
3104    events.publish_event(presence_changed_event(scope_key, next));
3105}
3106
3107fn native_presence_entry_from_realtime(entry: RealtimePresenceEntry) -> NativePresenceEntry {
3108    NativePresenceEntry {
3109        client_id: entry.client_id,
3110        actor_id: entry.actor_id,
3111        joined_at: entry.joined_at,
3112        metadata: entry.metadata,
3113    }
3114}
3115
3116fn presence_changed_event(scope_key: String, presence: Vec<NativePresenceEntry>) -> NativeEvent {
3117    let mut event = native_event(
3118        NativeEventKind::PresenceChanged,
3119        Vec::new(),
3120        Some(native_diagnostic(
3121            "info",
3122            "realtime",
3123            "realtime.presence_changed",
3124            "Native Syncular presence changed",
3125            [
3126                ("scopeKey", json!(scope_key.clone())),
3127                ("presence", json!(presence.clone())),
3128            ],
3129        )),
3130    );
3131    event.payload_json = Some(json!({
3132        "type": "presenceChanged",
3133        "scopeKey": scope_key,
3134        "presence": presence,
3135    }));
3136    event
3137}
3138
3139fn queries_changed_event_with_details(
3140    tables: &[String],
3141    queries: Vec<String>,
3142    changed_rows: Vec<SyncChangedRow>,
3143    source: Option<&str>,
3144) -> NativeEvent {
3145    let mut event = native_event(
3146        NativeEventKind::QueriesChanged,
3147        tables.to_vec(),
3148        Some(native_diagnostic(
3149            "info",
3150            "storage",
3151            "storage.queries_changed",
3152            "Native Syncular observed queries changed",
3153            [
3154                ("tables", json!(tables)),
3155                ("queries", json!(queries.clone())),
3156                ("source", json!(source)),
3157                ("changedRows", json!(changed_rows.clone())),
3158            ],
3159        )),
3160    );
3161    event.changed_rows = changed_rows;
3162    event.queries = queries;
3163    event
3164}
3165
3166fn conflicts_changed_event() -> NativeEvent {
3167    let diagnostic = native_diagnostic(
3168        "warn",
3169        "sync",
3170        "sync.conflicts_changed",
3171        "Native Syncular conflicts changed",
3172        std::iter::empty::<(&str, Value)>(),
3173    );
3174    let mut event = native_event(
3175        NativeEventKind::ConflictsChanged,
3176        Vec::new(),
3177        Some(diagnostic.clone()),
3178    );
3179    event.lifecycle = Some(native_lifecycle_state(
3180        NativeLifecyclePhase::Degraded,
3181        false,
3182        true,
3183        0,
3184        None,
3185        None,
3186        Some(NativeLifecycleConflicts { unresolved: 1 }),
3187        None,
3188        None,
3189        Some(diagnostic),
3190    ));
3191    event
3192}
3193
3194fn native_lifecycle_blob_uploads(stats_json: &Value) -> Option<NativeLifecycleBlobUploads> {
3195    Some(NativeLifecycleBlobUploads {
3196        pending: stats_json.get("pending")?.as_i64()?,
3197        uploading: stats_json.get("uploading")?.as_i64()?,
3198        failed: stats_json.get("failed")?.as_i64()?,
3199    })
3200}
3201
3202fn native_lifecycle_bootstrap(status: &BootstrapStatus) -> NativeLifecycleBootstrap {
3203    NativeLifecycleBootstrap {
3204        complete: status.complete,
3205        critical_ready: status.critical_ready,
3206        interactive_ready: status.interactive_ready,
3207        is_bootstrapping: status.is_bootstrapping,
3208        progress_percent: status.progress_percent,
3209    }
3210}
3211
3212fn native_lifecycle_state(
3213    phase: NativeLifecyclePhase,
3214    online: bool,
3215    requires_action: bool,
3216    pending_requests: usize,
3217    bootstrap: Option<NativeLifecycleBootstrap>,
3218    outbox: Option<NativeLifecycleOutbox>,
3219    conflicts: Option<NativeLifecycleConflicts>,
3220    blob_uploads: Option<NativeLifecycleBlobUploads>,
3221    last_error: Option<NativeErrorInfo>,
3222    last_diagnostic: Option<NativeDiagnostic>,
3223) -> NativeLifecycleState {
3224    NativeLifecycleState {
3225        phase,
3226        online,
3227        requires_action,
3228        pending_requests,
3229        bootstrap,
3230        outbox,
3231        conflicts,
3232        blob_uploads,
3233        last_error,
3234        last_diagnostic,
3235    }
3236}
3237
3238fn native_lifecycle_for_sync_completed(
3239    bootstrap: &BootstrapStatus,
3240    outbox_count: usize,
3241    conflict_count: usize,
3242    diagnostic: NativeDiagnostic,
3243) -> NativeLifecycleState {
3244    let has_conflicts = conflict_count > 0;
3245    let phase = if has_conflicts {
3246        NativeLifecyclePhase::Degraded
3247    } else if bootstrap.complete {
3248        NativeLifecyclePhase::Complete
3249    } else {
3250        NativeLifecyclePhase::Recovering
3251    };
3252    native_lifecycle_state(
3253        phase,
3254        true,
3255        has_conflicts,
3256        0,
3257        Some(native_lifecycle_bootstrap(bootstrap)),
3258        Some(NativeLifecycleOutbox {
3259            pending: outbox_count,
3260        }),
3261        Some(NativeLifecycleConflicts {
3262            unresolved: conflict_count,
3263        }),
3264        None,
3265        None,
3266        Some(diagnostic),
3267    )
3268}
3269
3270fn native_lifecycle_for_error(
3271    error: &SyncularError,
3272    retry_scheduled: bool,
3273    diagnostic: NativeDiagnostic,
3274) -> NativeLifecycleState {
3275    let error_info = NativeErrorInfo::from_error(error);
3276    let classification = error.classification();
3277    let resync_required = error.requires_full_snapshot_resync();
3278    let phase = if classification.code == "sync.auth_required" {
3279        NativeLifecyclePhase::AuthRequired
3280    } else if resync_required {
3281        NativeLifecyclePhase::Recovering
3282    } else if classification.category == "offline" || retry_scheduled {
3283        NativeLifecyclePhase::Offline
3284    } else {
3285        NativeLifecyclePhase::Degraded
3286    };
3287    let requires_action = matches!(phase, NativeLifecyclePhase::AuthRequired)
3288        || (matches!(phase, NativeLifecyclePhase::Degraded) && !classification.retryable);
3289    native_lifecycle_state(
3290        phase,
3291        false,
3292        requires_action,
3293        0,
3294        None,
3295        None,
3296        None,
3297        None,
3298        Some(error_info),
3299        Some(diagnostic),
3300    )
3301}
3302
3303fn events_overflowed_event(dropped_count: usize) -> NativeEvent {
3304    let diagnostic = native_diagnostic(
3305        "warn",
3306        "events",
3307        "events.overflowed",
3308        "Native Syncular event stream overflowed",
3309        [
3310            ("droppedCount", json!(dropped_count)),
3311            ("resyncRequired", json!(true)),
3312        ],
3313    );
3314    let mut event = native_event(
3315        NativeEventKind::EventsOverflowed,
3316        Vec::new(),
3317        Some(diagnostic.clone()),
3318    );
3319    event.dropped_count = Some(dropped_count);
3320    event.resync_required = Some(true);
3321    event.lifecycle = Some(native_lifecycle_state(
3322        NativeLifecyclePhase::Recovering,
3323        false,
3324        true,
3325        0,
3326        None,
3327        None,
3328        None,
3329        None,
3330        None,
3331        Some(diagnostic),
3332    ));
3333    event.payload_json = Some(json!({
3334        "type": "eventsOverflowed",
3335        "droppedCount": dropped_count,
3336        "resyncRequired": true
3337    }));
3338    event
3339}
3340
3341fn sync_started_event(command_id: Option<String>) -> NativeEvent {
3342    let diagnostic = native_diagnostic(
3343        "info",
3344        "sync",
3345        "sync.started",
3346        "Native Syncular sync started",
3347        std::iter::empty::<(&str, Value)>(),
3348    );
3349    let mut event = native_event(
3350        NativeEventKind::SyncStarted,
3351        Vec::new(),
3352        Some(diagnostic.clone()),
3353    );
3354    event.command_id = command_id;
3355    event.lifecycle = Some(native_lifecycle_state(
3356        NativeLifecyclePhase::Syncing,
3357        false,
3358        false,
3359        1,
3360        None,
3361        None,
3362        None,
3363        None,
3364        None,
3365        Some(diagnostic),
3366    ));
3367    event
3368}
3369
3370fn sync_completed_event(
3371    report: SyncReport,
3372    bootstrap: BootstrapStatus,
3373    command_id: Option<String>,
3374    outbox_count: usize,
3375    conflict_count: usize,
3376    duration_ms: u64,
3377) -> NativeEvent {
3378    let diagnostic = native_diagnostic(
3379        "info",
3380        "sync",
3381        "sync.completed",
3382        "Native Syncular sync completed",
3383        [
3384            ("changedTables", json!(report.changed_tables.clone())),
3385            ("changedTableCount", json!(report.changed_tables.len())),
3386            ("changedRows", json!(report.changed_rows.clone())),
3387            ("conflictsChanged", json!(report.conflicts_changed)),
3388            ("bootstrap", json!(bootstrap.clone())),
3389            ("outboxCount", json!(outbox_count)),
3390            ("conflictCount", json!(conflict_count)),
3391            ("durationMs", json!(duration_ms)),
3392        ],
3393    );
3394    let mut event = native_event(
3395        NativeEventKind::SyncCompleted,
3396        report.changed_tables.clone(),
3397        Some(diagnostic.clone()),
3398    );
3399    event.command_id = command_id;
3400    event.outbox_count = Some(outbox_count);
3401    event.conflict_count = Some(conflict_count);
3402    event.duration_ms = Some(duration_ms);
3403    event.changed_rows = report.changed_rows;
3404    event.lifecycle = Some(native_lifecycle_for_sync_completed(
3405        &bootstrap,
3406        outbox_count,
3407        conflict_count,
3408        diagnostic,
3409    ));
3410    event.bootstrap = Some(bootstrap);
3411    event
3412}
3413
3414fn sync_failed_event(
3415    error: &SyncularError,
3416    command_id: Option<String>,
3417    retry_scheduled: bool,
3418    duration_ms: u64,
3419) -> NativeEvent {
3420    match native_auth_info_from_error(error) {
3421        Some(auth) => {
3422            let operation = auth.operation.clone();
3423            let status = auth.status;
3424            let mut details = vec![
3425                ("operation", json!(operation)),
3426                ("status", json!(status)),
3427                ("retryScheduled", json!(retry_scheduled)),
3428                ("durationMs", json!(duration_ms)),
3429            ];
3430            push_native_error_details(&mut details, error);
3431            let diagnostic = native_diagnostic(
3432                "warn",
3433                "auth",
3434                "auth.expired",
3435                "Native Syncular auth expired",
3436                details,
3437            );
3438            let mut event = native_event(
3439                NativeEventKind::AuthExpired,
3440                Vec::new(),
3441                Some(diagnostic.clone()),
3442            );
3443            event.error = Some(NativeErrorInfo::from_error(error));
3444            event.auth = Some(auth);
3445            event.command_id = command_id;
3446            event.retry_scheduled = Some(retry_scheduled);
3447            event.duration_ms = Some(duration_ms);
3448            event.lifecycle = Some(native_lifecycle_for_error(
3449                error,
3450                retry_scheduled,
3451                diagnostic,
3452            ));
3453            event
3454        }
3455        None => {
3456            let resync_required = error.requires_full_snapshot_resync();
3457            let mut details = vec![
3458                ("retryScheduled", json!(retry_scheduled)),
3459                ("durationMs", json!(duration_ms)),
3460                ("resyncRequired", json!(resync_required)),
3461            ];
3462            push_native_error_details(&mut details, error);
3463            let diagnostic = native_diagnostic(
3464                "error",
3465                "sync",
3466                if resync_required {
3467                    "sync.resync_required"
3468                } else {
3469                    "sync.failed"
3470                },
3471                if resync_required {
3472                    "Native Syncular sync requires full resync"
3473                } else {
3474                    "Native Syncular sync failed"
3475                },
3476                details,
3477            );
3478            let mut event = native_event(
3479                NativeEventKind::SyncFailed,
3480                Vec::new(),
3481                Some(diagnostic.clone()),
3482            );
3483            event.error = Some(NativeErrorInfo::from_error(error));
3484            event.command_id = command_id;
3485            event.retry_scheduled = Some(retry_scheduled);
3486            event.duration_ms = Some(duration_ms);
3487            event.lifecycle = Some(native_lifecycle_for_error(
3488                error,
3489                retry_scheduled,
3490                diagnostic,
3491            ));
3492            if resync_required {
3493                event.resync_required = Some(true);
3494                event.payload_json = Some(json!({
3495                    "type": "syncResyncRequired",
3496                    "resyncRequired": true,
3497                    "retryScheduled": retry_scheduled
3498                }));
3499            }
3500            event
3501        }
3502    }
3503}
3504
3505fn local_write_committed_event(
3506    command_id: String,
3507    client_commit_id: String,
3508    changed_tables: Vec<String>,
3509    changed_rows: Vec<SyncChangedRow>,
3510    outbox_count: usize,
3511    duration_ms: u64,
3512) -> NativeEvent {
3513    let diagnostic = native_diagnostic(
3514        "info",
3515        "storage",
3516        "storage.local_write_committed",
3517        "Native Syncular local write committed",
3518        [
3519            ("commandId", json!(command_id.clone())),
3520            ("clientCommitId", json!(client_commit_id.clone())),
3521            ("tables", json!(changed_tables.clone())),
3522            ("changedRows", json!(changed_rows.clone())),
3523            ("outboxCount", json!(outbox_count)),
3524            ("durationMs", json!(duration_ms)),
3525        ],
3526    );
3527    let mut event = native_event(
3528        NativeEventKind::LocalWriteCommitted,
3529        changed_tables.clone(),
3530        Some(diagnostic.clone()),
3531    );
3532    event.command_id = Some(command_id);
3533    event.client_commit_id = Some(client_commit_id);
3534    event.outbox_count = Some(outbox_count);
3535    event.duration_ms = Some(duration_ms);
3536    event.changed_rows = changed_rows;
3537    event.lifecycle = Some(native_lifecycle_state(
3538        NativeLifecyclePhase::Offline,
3539        false,
3540        false,
3541        0,
3542        None,
3543        Some(NativeLifecycleOutbox {
3544            pending: outbox_count,
3545        }),
3546        None,
3547        None,
3548        None,
3549        Some(diagnostic),
3550    ));
3551    event
3552}
3553
3554fn local_write_failed_event(
3555    error: &SyncularError,
3556    command_id: String,
3557    payload_json: Option<Value>,
3558    duration_ms: u64,
3559) -> NativeEvent {
3560    let mut details = vec![
3561        ("commandId", json!(command_id.clone())),
3562        ("durationMs", json!(duration_ms)),
3563    ];
3564    push_native_error_details(&mut details, error);
3565    let diagnostic = native_diagnostic(
3566        "error",
3567        "storage",
3568        "storage.local_write_failed",
3569        "Native Syncular local write failed",
3570        details,
3571    );
3572    let mut event = native_event(
3573        NativeEventKind::LocalWriteFailed,
3574        Vec::new(),
3575        Some(diagnostic.clone()),
3576    );
3577    event.error = Some(NativeErrorInfo::from_error(error));
3578    event.command_id = Some(command_id);
3579    event.payload_json = payload_json;
3580    event.duration_ms = Some(duration_ms);
3581    event.lifecycle = Some(native_lifecycle_for_error(error, false, diagnostic));
3582    event
3583}
3584
3585fn conflict_resolution_completed_event(
3586    command_id: String,
3587    retry_client_commit_id: Option<String>,
3588    duration_ms: u64,
3589) -> NativeEvent {
3590    let mut event = native_event(
3591        NativeEventKind::ConflictResolutionCompleted,
3592        Vec::new(),
3593        Some(native_diagnostic(
3594            "info",
3595            "sync",
3596            "sync.conflict_resolution_completed",
3597            "Native Syncular conflict resolution completed",
3598            [
3599                ("commandId", json!(command_id.clone())),
3600                ("retryClientCommitId", json!(retry_client_commit_id.clone())),
3601                ("durationMs", json!(duration_ms)),
3602            ],
3603        )),
3604    );
3605    event.command_id = Some(command_id);
3606    event.client_commit_id = retry_client_commit_id;
3607    event.duration_ms = Some(duration_ms);
3608    event
3609}
3610
3611fn conflict_resolution_failed_event(
3612    error: &SyncularError,
3613    command_id: String,
3614    duration_ms: u64,
3615) -> NativeEvent {
3616    let mut details = vec![
3617        ("commandId", json!(command_id.clone())),
3618        ("durationMs", json!(duration_ms)),
3619    ];
3620    push_native_error_details(&mut details, error);
3621    let diagnostic = native_diagnostic(
3622        "error",
3623        "sync",
3624        "sync.conflict_resolution_failed",
3625        "Native Syncular conflict resolution failed",
3626        details,
3627    );
3628    let mut event = native_event(
3629        NativeEventKind::ConflictResolutionFailed,
3630        Vec::new(),
3631        Some(diagnostic.clone()),
3632    );
3633    event.error = Some(NativeErrorInfo::from_error(error));
3634    event.command_id = Some(command_id);
3635    event.duration_ms = Some(duration_ms);
3636    event.lifecycle = Some(native_lifecycle_for_error(error, false, diagnostic));
3637    event
3638}
3639
3640fn snapshot_ready_event(command_id: String, payload_json: Value, duration_ms: u64) -> NativeEvent {
3641    let mut event = native_event(
3642        NativeEventKind::SnapshotReady,
3643        Vec::new(),
3644        Some(native_diagnostic(
3645            "info",
3646            "storage",
3647            "storage.snapshot_ready",
3648            "Native Syncular snapshot ready",
3649            [
3650                ("commandId", json!(command_id.clone())),
3651                ("durationMs", json!(duration_ms)),
3652            ],
3653        )),
3654    );
3655    event.command_id = Some(command_id);
3656    event.payload_json = Some(payload_json);
3657    event.duration_ms = Some(duration_ms);
3658    event
3659}
3660
3661fn worker_command_completed_diagnostic(
3662    command_id: &str,
3663    operation: &str,
3664    payload_json: Option<&Value>,
3665    duration_ms: u64,
3666) -> NativeDiagnostic {
3667    let mut details = vec![
3668        ("commandId", json!(command_id)),
3669        ("operation", json!(operation)),
3670        ("durationMs", json!(duration_ms)),
3671    ];
3672    if let Some(payload_json) = payload_json {
3673        details.push(("payload", payload_json.clone()));
3674    }
3675    let (source, code, message, level) = match operation {
3676        "processBlobUploadQueue" => {
3677            let failed = payload_json
3678                .and_then(|payload| payload.get("failed"))
3679                .and_then(Value::as_i64)
3680                .unwrap_or_default();
3681            (
3682                "blob",
3683                "blob.upload_queue_processed",
3684                "Native Syncular blob upload queue processed",
3685                if failed > 0 { "warn" } else { "info" },
3686            )
3687        }
3688        "pruneBlobCache" => (
3689            "blob",
3690            "blob.cache_pruned",
3691            "Native Syncular blob cache pruned",
3692            "info",
3693        ),
3694        "clearBlobCache" => (
3695            "blob",
3696            "blob.cache_cleared",
3697            "Native Syncular blob cache cleared",
3698            "info",
3699        ),
3700        "retrieveBlobFile" => (
3701            "blob",
3702            "blob.download_completed",
3703            "Native Syncular blob file retrieved",
3704            "info",
3705        ),
3706        "storeBlobFile" => (
3707            "blob",
3708            "blob.store_queued",
3709            "Native Syncular blob file stored through worker",
3710            "info",
3711        ),
3712        _ => (
3713            "worker",
3714            "worker.command_completed",
3715            "Native Syncular worker command completed",
3716            "info",
3717        ),
3718    };
3719    native_diagnostic(level, source, code, message, details)
3720}
3721
3722fn worker_command_failed_diagnostic(
3723    operation: &str,
3724    details: Vec<(&'static str, Value)>,
3725) -> NativeDiagnostic {
3726    let (source, code, message) = match operation {
3727        "retrieveBlobFile" => (
3728            "blob",
3729            "blob.download_failed",
3730            "Native Syncular blob file retrieval failed",
3731        ),
3732        "processBlobUploadQueue" => (
3733            "blob",
3734            "blob.upload_failed",
3735            "Native Syncular blob upload queue processing failed",
3736        ),
3737        "pruneBlobCache" => (
3738            "blob",
3739            "blob.cache_prune_failed",
3740            "Native Syncular blob cache pruning failed",
3741        ),
3742        "clearBlobCache" => (
3743            "blob",
3744            "blob.cache_clear_failed",
3745            "Native Syncular blob cache clearing failed",
3746        ),
3747        "storeBlobFile" => (
3748            "blob",
3749            "blob.store_failed",
3750            "Native Syncular blob file storage failed",
3751        ),
3752        _ => (
3753            "worker",
3754            "worker.command_failed",
3755            "Native Syncular worker command failed",
3756        ),
3757    };
3758    native_diagnostic("error", source, code, message, details)
3759}
3760
3761fn worker_command_completed_event(
3762    command_id: String,
3763    operation: &str,
3764    payload_json: Option<Value>,
3765    duration_ms: u64,
3766) -> NativeEvent {
3767    let diagnostic = worker_command_completed_diagnostic(
3768        &command_id,
3769        operation,
3770        payload_json.as_ref(),
3771        duration_ms,
3772    );
3773    let mut event = native_event(
3774        NativeEventKind::WorkerCommandCompleted,
3775        Vec::new(),
3776        Some(diagnostic),
3777    );
3778    event.command_id = Some(command_id);
3779    event.payload_json = payload_json;
3780    event.duration_ms = Some(duration_ms);
3781    event
3782}
3783
3784fn worker_command_failed_event(
3785    error: &SyncularError,
3786    command_id: String,
3787    operation: &str,
3788    duration_ms: u64,
3789) -> NativeEvent {
3790    let mut details = vec![
3791        ("commandId", json!(command_id.clone())),
3792        ("operation", json!(operation)),
3793        ("durationMs", json!(duration_ms)),
3794    ];
3795    push_native_error_details(&mut details, error);
3796    let diagnostic = worker_command_failed_diagnostic(operation, details);
3797    let mut event = native_event(
3798        NativeEventKind::WorkerCommandFailed,
3799        Vec::new(),
3800        Some(diagnostic.clone()),
3801    );
3802    event.error = Some(NativeErrorInfo::from_error(error));
3803    event.command_id = Some(command_id);
3804    event.duration_ms = Some(duration_ms);
3805    event.lifecycle = Some(native_lifecycle_for_error(error, false, diagnostic));
3806    event
3807}
3808
3809fn blob_uploads_changed_event(stats_json: Value) -> NativeEvent {
3810    let blob_uploads = native_lifecycle_blob_uploads(&stats_json);
3811    let failed_count = blob_uploads.as_ref().map_or(0, |stats| stats.failed);
3812    let diagnostic = native_diagnostic(
3813        if failed_count > 0 { "warn" } else { "info" },
3814        "blob",
3815        "blob.uploads_changed",
3816        "Native Syncular blob upload queue changed",
3817        [("blobUploads", stats_json.clone())],
3818    );
3819    let mut event = native_event(
3820        NativeEventKind::BlobUploadsChanged,
3821        Vec::new(),
3822        Some(diagnostic.clone()),
3823    );
3824    event.payload_json = Some(stats_json);
3825    event.lifecycle = Some(native_lifecycle_state(
3826        if failed_count > 0 {
3827            NativeLifecyclePhase::Degraded
3828        } else {
3829            NativeLifecyclePhase::Offline
3830        },
3831        false,
3832        failed_count > 0,
3833        0,
3834        None,
3835        None,
3836        None,
3837        blob_uploads,
3838        None,
3839        Some(diagnostic),
3840    ));
3841    event
3842}
3843
3844fn native_event(
3845    kind: NativeEventKind,
3846    tables: Vec<String>,
3847    diagnostic: Option<NativeDiagnostic>,
3848) -> NativeEvent {
3849    NativeEvent {
3850        event_seq: 0,
3851        kind,
3852        error: None,
3853        command_id: None,
3854        client_commit_id: None,
3855        outbox_count: None,
3856        conflict_count: None,
3857        retry_scheduled: None,
3858        duration_ms: None,
3859        dropped_count: None,
3860        resync_required: None,
3861        auth: None,
3862        diagnostic,
3863        tables,
3864        changed_rows: Vec::new(),
3865        queries: Vec::new(),
3866        bootstrap: None,
3867        lifecycle: None,
3868        payload_json: None,
3869    }
3870}
3871
3872fn diagnostic_event(diagnostic: NativeDiagnostic) -> NativeEvent {
3873    native_event(NativeEventKind::Diagnostic, Vec::new(), Some(diagnostic))
3874}
3875
3876fn blob_store_diagnostic(blob: &BlobRef, immediate: bool, cache_local: bool) -> NativeDiagnostic {
3877    let mut details = blob_ref_details(blob);
3878    details.push(("immediate", json!(immediate)));
3879    details.push(("cacheLocal", json!(cache_local)));
3880    native_diagnostic(
3881        "info",
3882        "blob",
3883        if immediate {
3884            "blob.upload_completed"
3885        } else {
3886            "blob.store_queued"
3887        },
3888        if immediate {
3889            "Native Syncular blob stored and uploaded"
3890        } else {
3891            "Native Syncular blob stored and queued for upload"
3892        },
3893        details,
3894    )
3895}
3896
3897fn blob_cache_lookup_diagnostic(hash: &str, local: bool) -> NativeDiagnostic {
3898    native_diagnostic(
3899        "debug",
3900        "blob",
3901        if local {
3902            "blob.cache_hit"
3903        } else {
3904            "blob.cache_miss"
3905        },
3906        if local {
3907            "Native Syncular blob is available in local cache"
3908        } else {
3909            "Native Syncular blob is not available in local cache"
3910        },
3911        [("hash", json!(hash))],
3912    )
3913}
3914
3915fn blob_cache_retrieve_diagnostic(blob: &BlobRef, was_local: bool) -> NativeDiagnostic {
3916    native_diagnostic(
3917        "info",
3918        "blob",
3919        if was_local {
3920            "blob.cache_hit"
3921        } else {
3922            "blob.cache_miss"
3923        },
3924        if was_local {
3925            "Native Syncular blob served from local cache"
3926        } else {
3927            "Native Syncular blob fetched after local cache miss"
3928        },
3929        blob_ref_details(blob),
3930    )
3931}
3932
3933fn blob_download_failed_diagnostic(blob: &BlobRef, error: &SyncularError) -> NativeDiagnostic {
3934    let mut details = blob_ref_details(blob);
3935    push_native_error_details(&mut details, error);
3936    native_diagnostic(
3937        "warn",
3938        "blob",
3939        "blob.download_failed",
3940        "Native Syncular blob download failed",
3941        details,
3942    )
3943}
3944
3945fn blob_upload_queue_processed_diagnostic(payload_json: Value) -> NativeDiagnostic {
3946    let failed = payload_json
3947        .get("failed")
3948        .and_then(Value::as_i64)
3949        .unwrap_or_default();
3950    native_diagnostic(
3951        if failed > 0 { "warn" } else { "info" },
3952        "blob",
3953        "blob.upload_queue_processed",
3954        "Native Syncular blob upload queue processed",
3955        [("result", payload_json)],
3956    )
3957}
3958
3959fn blob_cache_pruned_diagnostic(pruned_bytes: i64, max_bytes: Option<i64>) -> NativeDiagnostic {
3960    let mut details = vec![("prunedBytes", json!(pruned_bytes))];
3961    if let Some(max_bytes) = max_bytes {
3962        details.push(("maxBytes", json!(max_bytes)));
3963    }
3964    native_diagnostic(
3965        "info",
3966        "blob",
3967        "blob.cache_pruned",
3968        "Native Syncular blob cache pruned",
3969        details,
3970    )
3971}
3972
3973fn blob_cache_cleared_diagnostic() -> NativeDiagnostic {
3974    native_diagnostic(
3975        "info",
3976        "blob",
3977        "blob.cache_cleared",
3978        "Native Syncular blob cache cleared",
3979        std::iter::empty::<(&str, Value)>(),
3980    )
3981}
3982
3983fn blob_ref_details(blob: &BlobRef) -> Vec<(&'static str, Value)> {
3984    let mut details = vec![
3985        ("hash", json!(blob.hash.clone())),
3986        ("size", json!(blob.size)),
3987        ("mimeType", json!(blob.mime_type.clone())),
3988        ("encrypted", json!(blob.encrypted)),
3989    ];
3990    if let Some(key_id) = &blob.key_id {
3991        details.push(("keyId", json!(key_id)));
3992    }
3993    details
3994}
3995
3996fn native_diagnostic<'a>(
3997    level: &str,
3998    source: &str,
3999    code: &str,
4000    message: &str,
4001    details: impl IntoIterator<Item = (&'a str, Value)>,
4002) -> NativeDiagnostic {
4003    NativeDiagnostic {
4004        at: now_ms(),
4005        level: level.to_string(),
4006        source: source.to_string(),
4007        code: code.to_string(),
4008        message: message.to_string(),
4009        details: details
4010            .into_iter()
4011            .map(|(key, value)| (key.to_string(), value))
4012            .collect(),
4013    }
4014}
4015
4016fn push_native_error_details(details: &mut Vec<(&'static str, Value)>, error: &SyncularError) {
4017    let classification = error.classification();
4018    details.push(("errorKind", json!(format!("{:?}", error.kind()))));
4019    details.push(("errorCode", json!(classification.code)));
4020    details.push(("errorCategory", json!(classification.category)));
4021    details.push(("retryable", json!(classification.retryable)));
4022    details.push((
4023        "recommendedAction",
4024        json!(classification.recommended_action),
4025    ));
4026}
4027
4028fn native_auth_info_from_error(error: &SyncularError) -> Option<NativeAuthInfo> {
4029    if error.kind() != ErrorKind::Transport {
4030        return None;
4031    }
4032
4033    let message = error.message_text();
4034    let classification = error.classification();
4035    if classification.code != "sync.auth_required" {
4036        return None;
4037    }
4038    Some(NativeAuthInfo {
4039        operation: auth_operation_from_message(&message).to_string(),
4040        status: 401,
4041    })
4042}
4043
4044fn auth_operation_from_message(message: &str) -> &'static str {
4045    if message.contains("snapshot chunk failed") {
4046        "snapshotChunk"
4047    } else if message.contains("blob upload init failed") {
4048        "blobInitiateUpload"
4049    } else if message.contains("blob upload complete failed") {
4050        "blobCompleteUpload"
4051    } else if message.contains("blob download url failed") {
4052        "blobGetDownloadUrl"
4053    } else {
4054        "sync"
4055    }
4056}
4057
4058fn normalize_observed_tables(tables: Vec<String>, app_schema: AppSchema) -> Result<Vec<String>> {
4059    if tables.is_empty() {
4060        return Err(SyncularError::config(
4061            "native observed query must depend on at least one app table",
4062        ));
4063    }
4064
4065    let mut normalized = BTreeSet::new();
4066    for table in tables {
4067        let table = table.trim();
4068        if table.is_empty() {
4069            return Err(SyncularError::config(
4070                "native observed query table is empty",
4071            ));
4072        }
4073        validate_app_table_name(table, app_schema)?;
4074        normalized.insert(table.to_string());
4075    }
4076    Ok(normalized.into_iter().collect())
4077}
4078
4079fn normalize_observed_query_dependency_hints(
4080    hints: Vec<NativeObservedQueryDependencyHint>,
4081    observed_tables: &[String],
4082    app_schema: AppSchema,
4083) -> Result<Vec<NativeObservedQueryDependencyHint>> {
4084    let observed_tables = observed_tables
4085        .iter()
4086        .map(String::as_str)
4087        .collect::<BTreeSet<_>>();
4088    let mut normalized = Vec::new();
4089    for hint in hints {
4090        let table = hint.table.trim();
4091        if table.is_empty() {
4092            return Err(SyncularError::config(
4093                "native observed query dependency hint table is empty",
4094            ));
4095        }
4096        if !observed_tables.contains(table) {
4097            return Err(SyncularError::config(format!(
4098                "native observed query dependency hint table {table} is not one of the observed tables"
4099            )));
4100        }
4101        validate_app_table_name(table, app_schema)?;
4102        let metadata = app_schema.table_metadata(table).ok_or_else(|| {
4103            SyncularError::config(format!("unknown generated app table: {table}"))
4104        })?;
4105
4106        let mut row_ids = BTreeSet::new();
4107        for row_id in hint.row_ids {
4108            if row_id.is_empty() {
4109                return Err(SyncularError::config(
4110                    "native observed query dependency hint row id is empty",
4111                ));
4112            }
4113            row_ids.insert(row_id);
4114        }
4115
4116        let mut fields = BTreeSet::new();
4117        for field in hint.fields {
4118            let field = field.trim();
4119            if field.is_empty() {
4120                return Err(SyncularError::config(
4121                    "native observed query dependency hint field is empty",
4122                ));
4123            }
4124            validate_app_column_name(metadata, field)?;
4125            fields.insert(field.to_string());
4126        }
4127
4128        normalized.push(NativeObservedQueryDependencyHint {
4129            table: table.to_string(),
4130            row_ids: row_ids.into_iter().collect(),
4131            fields: fields.into_iter().collect(),
4132        });
4133    }
4134    Ok(normalized)
4135}
4136
4137fn observed_query_should_notify(
4138    query: &NativeObservedQuery,
4139    changed_tables: &BTreeSet<&str>,
4140    changed_rows: &[SyncChangedRow],
4141) -> bool {
4142    let affected_tables = query
4143        .tables
4144        .iter()
4145        .filter(|table| changed_tables.contains(table.as_str()))
4146        .collect::<Vec<_>>();
4147    if affected_tables.is_empty() {
4148        return false;
4149    }
4150    if query.dependency_hints.is_empty() || changed_rows.is_empty() {
4151        return true;
4152    }
4153
4154    for table in affected_tables {
4155        let table_rows = changed_rows
4156            .iter()
4157            .filter(|row| row.table == table.as_str())
4158            .collect::<Vec<_>>();
4159        if table_rows.is_empty() {
4160            return true;
4161        }
4162        let table_hints = query
4163            .dependency_hints
4164            .iter()
4165            .filter(|hint| hint.table == table.as_str())
4166            .collect::<Vec<_>>();
4167        if table_hints.is_empty() {
4168            return true;
4169        }
4170        if table_rows.iter().any(|row| {
4171            table_hints
4172                .iter()
4173                .any(|hint| hint_matches_changed_row(hint, row))
4174        }) {
4175            return true;
4176        }
4177    }
4178    false
4179}
4180
4181fn hint_matches_changed_row(
4182    hint: &NativeObservedQueryDependencyHint,
4183    row: &SyncChangedRow,
4184) -> bool {
4185    let Some(row_id) = row.row_id.as_deref() else {
4186        return true;
4187    };
4188    if !hint.row_ids.is_empty() && !hint.row_ids.iter().any(|id| id == row_id) {
4189        return false;
4190    }
4191    if hint.fields.is_empty() || row.changed_fields.is_empty() {
4192        return true;
4193    }
4194    row.changed_fields
4195        .iter()
4196        .any(|field| hint.fields.iter().any(|hint_field| hint_field == field))
4197}
4198
4199fn validate_app_table_name(table: &str, app_schema: AppSchema) -> Result<()> {
4200    if app_schema.table_metadata(table).is_some() {
4201        return Ok(());
4202    }
4203
4204    Err(SyncularError::config(format!(
4205        "unknown generated app table: {table}"
4206    )))
4207}
4208
4209fn validate_app_column_name(metadata: &AppTableMetadata, column: &str) -> Result<()> {
4210    if metadata.primary_key_column == column
4211        || metadata.server_version_column == column
4212        || metadata
4213            .columns
4214            .iter()
4215            .any(|metadata| metadata.name == column)
4216    {
4217        return Ok(());
4218    }
4219
4220    Err(SyncularError::config(format!(
4221        "unknown generated app column {}.{}",
4222        metadata.name, column
4223    )))
4224}
4225
4226fn unique_event_tables<'a>(tables: impl IntoIterator<Item = &'a str>) -> Vec<String> {
4227    let mut seen = BTreeSet::new();
4228    let mut unique = Vec::new();
4229    for table in tables {
4230        if seen.insert(table) {
4231            unique.push(table.to_string());
4232        }
4233    }
4234    unique
4235}
4236
4237#[cfg(test)]
4238mod tests {
4239    use super::*;
4240
4241    fn first_native_event_json(event: SyncWorkerEvent) -> Value {
4242        let json_events = native_event_json_from_worker_event(event).unwrap();
4243        serde_json::from_str(&json_events[0]).unwrap()
4244    }
4245
4246    #[test]
4247    fn native_sync_failed_exposes_server_error_classification() {
4248        let value = first_native_event_json(SyncWorkerEvent::SyncFailed {
4249            command_id: Some("cmd-1".to_string()),
4250            error: SyncularError::message(
4251                ErrorKind::Transport,
4252                r#"sync failed with HTTP 403: {"error":"sync.forbidden","code":"sync.forbidden","category":"forbidden","retryable":false,"recommendedAction":"checkPermissions","message":"Forbidden"}"#,
4253            ),
4254            retry_scheduled: false,
4255            duration_ms: 12,
4256        });
4257
4258        assert_eq!(value["kind"], "SyncFailed");
4259        assert_eq!(value["error"]["code"], "sync.forbidden");
4260        assert_eq!(value["error"]["category"], "forbidden");
4261        assert_eq!(value["error"]["retryable"], false);
4262        assert_eq!(value["error"]["recommendedAction"], "checkPermissions");
4263        assert_eq!(
4264            value["diagnostic"]["details"]["errorCode"],
4265            "sync.forbidden"
4266        );
4267        assert_eq!(
4268            value["diagnostic"]["details"]["recommendedAction"],
4269            "checkPermissions"
4270        );
4271    }
4272
4273    #[test]
4274    fn native_sync_failed_maps_auth_required_to_auth_expired_event() {
4275        let value = first_native_event_json(SyncWorkerEvent::SyncFailed {
4276            command_id: Some("cmd-2".to_string()),
4277            error: SyncularError::message(ErrorKind::Transport, "sync failed with HTTP 401"),
4278            retry_scheduled: true,
4279            duration_ms: 34,
4280        });
4281
4282        assert_eq!(value["kind"], "AuthExpired");
4283        assert_eq!(value["auth"]["status"], 401);
4284        assert_eq!(value["error"]["code"], "sync.auth_required");
4285        assert_eq!(value["error"]["category"], "auth-required");
4286        assert_eq!(value["error"]["retryable"], true);
4287        assert_eq!(value["error"]["recommendedAction"], "refreshAuth");
4288    }
4289
4290    #[test]
4291    fn recent_diagnostic_event_payload_is_redacted_when_too_large() {
4292        let mut event = native_event(NativeEventKind::WorkerCommandCompleted, Vec::new(), None);
4293        event.payload_json = Some(json!({
4294            "body": "x".repeat(MAX_NATIVE_DIAGNOSTIC_EVENT_PAYLOAD_JSON_BYTES + 1)
4295        }));
4296
4297        let event = native_event_for_recent_diagnostics(event);
4298        let payload = event.payload_json.expect("redacted payload");
4299        assert_eq!(payload["truncated"], true);
4300        assert_eq!(payload["reason"], "diagnosticPayloadLimit");
4301        assert_eq!(payload["limit"], "maxNativeDiagnosticEventPayloadJsonBytes");
4302        assert!(
4303            payload["originalBytes"].as_u64().unwrap()
4304                > MAX_NATIVE_DIAGNOSTIC_EVENT_PAYLOAD_JSON_BYTES as u64
4305        );
4306    }
4307
4308    #[test]
4309    fn native_local_write_failed_exposes_storage_classification() {
4310        let value = first_native_event_json(SyncWorkerEvent::LocalWriteFailed {
4311            command_id: "cmd-3".to_string(),
4312            error: SyncularError::message(ErrorKind::Storage, "database is locked"),
4313            payload_json: None,
4314            duration_ms: 56,
4315        });
4316
4317        assert_eq!(value["kind"], "LocalWriteFailed");
4318        assert_eq!(value["error"]["code"], "storage.failed");
4319        assert_eq!(value["error"]["category"], "storage");
4320        assert_eq!(value["error"]["recommendedAction"], "inspectStorage");
4321        assert_eq!(value["diagnostic"]["details"]["errorCategory"], "storage");
4322    }
4323}