Skip to main content

syncular_runtime/core/
worker.rs

1use crate::app_schema::{
2    validate_blob_encryption_against_app_schema, validate_encrypted_crdt_against_app_schema,
3    validate_field_encryption_rules_against_app_schema, AppSchema,
4};
5#[cfg(feature = "native")]
6use crate::client::sync_changed_crdt_field_from_metadata;
7#[cfg(feature = "native")]
8use crate::client::BootstrapStatus;
9#[cfg(feature = "native")]
10use crate::client::CrdtFieldCompactionReceipt;
11use crate::client::{
12    sync_changed_row_for_local_operation, validate_subscription_limits, SubscriptionSpec,
13    SyncChangedRow, SyncReport, SyncularClient,
14};
15#[cfg(feature = "native")]
16use crate::crdt_field::{CrdtField, CrdtFieldId, CrdtFieldSyncMode};
17use crate::crdt_yjs::{
18    validate_crdt_request_json_size, validate_yjs_text_input_size,
19    validate_yjs_update_envelope_size, YjsUpdateEnvelope, YJS_PAYLOAD_KEY,
20};
21#[cfg(feature = "native")]
22use crate::diesel_sqlite::DieselSqliteStore;
23use crate::encrypted_crdt::EncryptedCrdt;
24#[cfg(feature = "native")]
25use crate::encrypted_crdt::{CRDT_CHECKPOINTS_TABLE, CRDT_UPDATES_TABLE};
26use crate::encryption::{BlobEncryption, FieldEncryption};
27use crate::error::{ErrorKind, Result, SyncularError};
28#[cfg(feature = "demo-todo-native-fixture")]
29use crate::fixtures::todo::rusqlite_sqlite::RusqliteStore;
30use crate::limits::{
31    DEFAULT_WORKER_COMMAND_QUEUE_CAPACITY, DEFAULT_WORKER_EVENT_QUEUE_CAPACITY,
32    DEFAULT_YJS_FLUSH_WINDOW_MS,
33};
34#[cfg(feature = "native")]
35use crate::protocol::BlobRef;
36use crate::protocol::{validate_mutation_json_input_size, SyncOperation};
37use crate::store::{now_ms, retry_backoff_delay_ms, SyncStateStore, SyncStore};
38#[cfg(feature = "native")]
39use crate::transport::BlobTransport;
40use crate::transport::{
41    RealtimeEvent, RealtimeTransport, SyncAuthHeaderStore, SyncAuthHeaders, SyncTransport,
42};
43use serde::Deserialize;
44use serde_json::{json, Value};
45use std::collections::{BTreeMap, VecDeque};
46#[cfg(feature = "native")]
47use std::path::Path;
48use std::sync::mpsc::{self, Receiver, RecvTimeoutError, SyncSender, TrySendError};
49use std::sync::{Arc, Condvar, Mutex};
50use std::thread::{self, JoinHandle};
51use std::time::{Duration, Instant};
52
53const YJS_FLUSH_WINDOW: Duration = Duration::from_millis(DEFAULT_YJS_FLUSH_WINDOW_MS);
54
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct SyncWorkerConfig {
57    pub command_queue_capacity: usize,
58    pub yjs_flush_window: Duration,
59}
60
61impl Default for SyncWorkerConfig {
62    fn default() -> Self {
63        Self {
64            command_queue_capacity: DEFAULT_WORKER_COMMAND_QUEUE_CAPACITY,
65            yjs_flush_window: YJS_FLUSH_WINDOW,
66        }
67    }
68}
69
70enum WorkerCommand {
71    Trigger {
72        command_id: Option<String>,
73        emit_started: bool,
74        transport: WorkerSyncTransport,
75    },
76    ApplyMutationJson {
77        command_id: String,
78        mutation_json: String,
79        local_row_json: Option<String>,
80        auto_sync: bool,
81        require_auth_lease: bool,
82    },
83    SaveYjsUpdateJson {
84        command_id: String,
85        update_json: String,
86        auto_sync: bool,
87    },
88    ApplyCrdtFieldTextJson {
89        command_id: String,
90        request_json: String,
91        auto_sync: bool,
92    },
93    CompactCrdtFieldJson {
94        command_id: String,
95        request_json: String,
96        auto_sync: bool,
97    },
98    ApplyEncryptedCrdtUpdateJson {
99        command_id: String,
100        request_json: String,
101        auto_sync: bool,
102    },
103    ApplyEncryptedCrdtCheckpointJson {
104        command_id: String,
105        request_json: String,
106        auto_sync: bool,
107    },
108    ResolveConflict {
109        command_id: String,
110        conflict_id: String,
111        resolution: String,
112        auto_sync: bool,
113    },
114    RefreshSnapshotJson {
115        command_id: String,
116        request_json: String,
117    },
118    CompactStorageJson {
119        command_id: String,
120        options_json: Option<String>,
121    },
122    StoreBlobFileJson {
123        command_id: String,
124        path: String,
125        options_json: Option<String>,
126    },
127    RetrieveBlobFileJson {
128        command_id: String,
129        ref_json: String,
130        path: String,
131        options_json: Option<String>,
132    },
133    ProcessBlobUploadQueue {
134        command_id: String,
135    },
136    PruneBlobCache {
137        command_id: String,
138        max_bytes: i64,
139    },
140    ClearBlobCache {
141        command_id: String,
142    },
143    SetSubscriptions(Vec<SubscriptionSpec>),
144    SetAuthHeaders(SyncAuthHeaders),
145    SetFieldEncryption(Option<FieldEncryption>),
146    SetEncryptedCrdt(Option<EncryptedCrdt>),
147    SetBlobEncryption(Option<BlobEncryption>),
148    Stop,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152enum WorkerSyncTransport {
153    Http,
154    WebSocket,
155}
156
157impl WorkerSyncTransport {
158    fn coalesce(self, next: Self) -> Self {
159        if matches!(self, Self::WebSocket) || matches!(next, Self::WebSocket) {
160            Self::WebSocket
161        } else {
162            Self::Http
163        }
164    }
165}
166
167#[derive(Debug)]
168pub enum SyncWorkerEvent {
169    SyncStarted {
170        command_id: Option<String>,
171    },
172    SyncCompleted {
173        command_id: Option<String>,
174        report: SyncReport,
175        #[cfg(feature = "native")]
176        bootstrap: BootstrapStatus,
177        outbox_count: usize,
178        conflict_count: usize,
179        duration_ms: u64,
180    },
181    SyncFailed {
182        command_id: Option<String>,
183        error: SyncularError,
184        retry_scheduled: bool,
185        duration_ms: u64,
186    },
187    LocalWriteCommitted {
188        command_id: String,
189        client_commit_id: String,
190        changed_tables: Vec<String>,
191        changed_rows: Vec<SyncChangedRow>,
192        outbox_count: usize,
193        duration_ms: u64,
194    },
195    CrdtFieldChanged {
196        command_id: String,
197        client_commit_id: String,
198        table: String,
199        row_id: String,
200        field: String,
201        changed_tables: Vec<String>,
202        payload_json: Option<Value>,
203        duration_ms: u64,
204    },
205    CrdtFieldCompacted {
206        command_id: String,
207        client_commit_id: Option<String>,
208        table: String,
209        row_id: String,
210        field: String,
211        changed_tables: Vec<String>,
212        checkpoint_created: bool,
213        payload_json: Option<Value>,
214        duration_ms: u64,
215    },
216    LocalWriteFailed {
217        command_id: String,
218        error: SyncularError,
219        payload_json: Option<Value>,
220        duration_ms: u64,
221    },
222    ConflictResolutionCompleted {
223        command_id: String,
224        retry_client_commit_id: Option<String>,
225        duration_ms: u64,
226    },
227    ConflictResolutionFailed {
228        command_id: String,
229        error: SyncularError,
230        duration_ms: u64,
231    },
232    SnapshotReady {
233        command_id: String,
234        payload_json: Value,
235        duration_ms: u64,
236    },
237    WorkerCommandCompleted {
238        command_id: String,
239        operation: &'static str,
240        payload_json: Option<Value>,
241        duration_ms: u64,
242    },
243    WorkerCommandFailed {
244        command_id: String,
245        operation: &'static str,
246        error: SyncularError,
247        duration_ms: u64,
248    },
249    BlobUploadsChanged {
250        stats_json: Value,
251    },
252    EventsOverflowed {
253        dropped_count: usize,
254    },
255}
256
257impl Clone for SyncWorkerEvent {
258    fn clone(&self) -> Self {
259        match self {
260            Self::SyncStarted { command_id } => Self::SyncStarted {
261                command_id: command_id.clone(),
262            },
263            Self::SyncCompleted {
264                command_id,
265                report,
266                #[cfg(feature = "native")]
267                bootstrap,
268                outbox_count,
269                conflict_count,
270                duration_ms,
271            } => Self::SyncCompleted {
272                command_id: command_id.clone(),
273                report: report.clone(),
274                #[cfg(feature = "native")]
275                bootstrap: bootstrap.clone(),
276                outbox_count: *outbox_count,
277                conflict_count: *conflict_count,
278                duration_ms: *duration_ms,
279            },
280            Self::SyncFailed {
281                command_id,
282                error,
283                retry_scheduled,
284                duration_ms,
285            } => Self::SyncFailed {
286                command_id: command_id.clone(),
287                error: clone_worker_error(error),
288                retry_scheduled: *retry_scheduled,
289                duration_ms: *duration_ms,
290            },
291            Self::LocalWriteCommitted {
292                command_id,
293                client_commit_id,
294                changed_tables,
295                changed_rows,
296                outbox_count,
297                duration_ms,
298            } => Self::LocalWriteCommitted {
299                command_id: command_id.clone(),
300                client_commit_id: client_commit_id.clone(),
301                changed_tables: changed_tables.clone(),
302                changed_rows: changed_rows.clone(),
303                outbox_count: *outbox_count,
304                duration_ms: *duration_ms,
305            },
306            Self::CrdtFieldChanged {
307                command_id,
308                client_commit_id,
309                table,
310                row_id,
311                field,
312                changed_tables,
313                payload_json,
314                duration_ms,
315            } => Self::CrdtFieldChanged {
316                command_id: command_id.clone(),
317                client_commit_id: client_commit_id.clone(),
318                table: table.clone(),
319                row_id: row_id.clone(),
320                field: field.clone(),
321                changed_tables: changed_tables.clone(),
322                payload_json: payload_json.clone(),
323                duration_ms: *duration_ms,
324            },
325            Self::CrdtFieldCompacted {
326                command_id,
327                client_commit_id,
328                table,
329                row_id,
330                field,
331                changed_tables,
332                checkpoint_created,
333                payload_json,
334                duration_ms,
335            } => Self::CrdtFieldCompacted {
336                command_id: command_id.clone(),
337                client_commit_id: client_commit_id.clone(),
338                table: table.clone(),
339                row_id: row_id.clone(),
340                field: field.clone(),
341                changed_tables: changed_tables.clone(),
342                checkpoint_created: *checkpoint_created,
343                payload_json: payload_json.clone(),
344                duration_ms: *duration_ms,
345            },
346            Self::LocalWriteFailed {
347                command_id,
348                error,
349                payload_json,
350                duration_ms,
351            } => Self::LocalWriteFailed {
352                command_id: command_id.clone(),
353                error: clone_worker_error(error),
354                payload_json: payload_json.clone(),
355                duration_ms: *duration_ms,
356            },
357            Self::ConflictResolutionCompleted {
358                command_id,
359                retry_client_commit_id,
360                duration_ms,
361            } => Self::ConflictResolutionCompleted {
362                command_id: command_id.clone(),
363                retry_client_commit_id: retry_client_commit_id.clone(),
364                duration_ms: *duration_ms,
365            },
366            Self::ConflictResolutionFailed {
367                command_id,
368                error,
369                duration_ms,
370            } => Self::ConflictResolutionFailed {
371                command_id: command_id.clone(),
372                error: clone_worker_error(error),
373                duration_ms: *duration_ms,
374            },
375            Self::SnapshotReady {
376                command_id,
377                payload_json,
378                duration_ms,
379            } => Self::SnapshotReady {
380                command_id: command_id.clone(),
381                payload_json: payload_json.clone(),
382                duration_ms: *duration_ms,
383            },
384            Self::WorkerCommandCompleted {
385                command_id,
386                operation,
387                payload_json,
388                duration_ms,
389            } => Self::WorkerCommandCompleted {
390                command_id: command_id.clone(),
391                operation: *operation,
392                payload_json: payload_json.clone(),
393                duration_ms: *duration_ms,
394            },
395            Self::WorkerCommandFailed {
396                command_id,
397                operation,
398                error,
399                duration_ms,
400            } => Self::WorkerCommandFailed {
401                command_id: command_id.clone(),
402                operation: *operation,
403                error: clone_worker_error(error),
404                duration_ms: *duration_ms,
405            },
406            Self::BlobUploadsChanged { stats_json } => Self::BlobUploadsChanged {
407                stats_json: stats_json.clone(),
408            },
409            Self::EventsOverflowed { dropped_count } => Self::EventsOverflowed {
410                dropped_count: *dropped_count,
411            },
412        }
413    }
414}
415
416impl SyncWorkerEvent {
417    pub fn requires_full_refresh(&self) -> bool {
418        match self {
419            Self::SyncFailed { error, .. } => error.requires_full_snapshot_resync(),
420            Self::EventsOverflowed { .. } => true,
421            _ => false,
422        }
423    }
424}
425
426fn clone_worker_error(error: &SyncularError) -> SyncularError {
427    SyncularError::message(error.kind(), error.to_string())
428}
429
430pub trait SyncWorkerClientExt {
431    fn apply_worker_mutation_json(
432        &mut self,
433        mutation_json: &str,
434        local_row_json: Option<&str>,
435    ) -> Result<String>;
436
437    fn apply_worker_leased_mutation_json(
438        &mut self,
439        _mutation_json: &str,
440        _local_row_json: Option<&str>,
441    ) -> Result<String> {
442        Err(SyncularError::config(
443            "worker-owned leased mutations are not available for this client",
444        ))
445    }
446
447    fn apply_worker_mutation(
448        &mut self,
449        mutation: SyncOperation,
450        local_row: Option<Value>,
451    ) -> Result<String>;
452
453    fn worker_current_row_json(&mut self, _table: &str, _row_id: &str) -> Result<Option<Value>> {
454        Ok(None)
455    }
456
457    fn apply_worker_encrypted_crdt_update_json(
458        &mut self,
459        _request_json: &str,
460    ) -> Result<WorkerLocalWriteReceipt> {
461        Err(SyncularError::config(
462            "worker-owned encrypted CRDT updates are not available for this client",
463        ))
464    }
465
466    fn apply_worker_encrypted_crdt_checkpoint_json(
467        &mut self,
468        _request_json: &str,
469    ) -> Result<Option<WorkerLocalWriteReceipt>> {
470        Err(SyncularError::config(
471            "worker-owned encrypted CRDT checkpoints are not available for this client",
472        ))
473    }
474
475    fn apply_worker_crdt_field_text_json(
476        &mut self,
477        _request_json: &str,
478    ) -> Result<WorkerLocalWriteReceipt> {
479        Err(SyncularError::config(
480            "worker-owned CRDT field text updates are not available for this client",
481        ))
482    }
483
484    fn compact_worker_crdt_field_json(
485        &mut self,
486        _request_json: &str,
487    ) -> Result<Option<WorkerLocalWriteReceipt>> {
488        Err(SyncularError::config(
489            "worker-owned CRDT field compaction is not available for this client",
490        ))
491    }
492
493    fn worker_crdt_field_event_payload_json(
494        &mut self,
495        _table: &str,
496        _row_id: &str,
497        _field: &str,
498    ) -> Result<Option<Value>> {
499        Ok(None)
500    }
501
502    fn worker_crdt_field_changed_row(
503        &mut self,
504        _table: &str,
505        _row_id: &str,
506        _field: &str,
507        _client_commit_id: &str,
508    ) -> Result<Option<SyncChangedRow>> {
509        Ok(None)
510    }
511
512    fn worker_query_json(&mut self, _request_json: &str) -> Result<String> {
513        Err(SyncularError::config(
514            "worker-owned snapshot refresh is not available for this client",
515        ))
516    }
517
518    fn worker_compact_storage_json(&mut self, _options_json: Option<&str>) -> Result<String> {
519        Err(SyncularError::config(
520            "worker-owned storage compaction is not available for this client",
521        ))
522    }
523
524    fn worker_store_blob_file_json(
525        &mut self,
526        _path: &str,
527        _options_json: Option<&str>,
528    ) -> Result<String> {
529        Err(SyncularError::config(
530            "worker-owned blob file storage is not available for this client",
531        ))
532    }
533
534    fn worker_retrieve_blob_file_json(
535        &mut self,
536        _ref_json: &str,
537        _path: &str,
538        _options_json: Option<&str>,
539    ) -> Result<String> {
540        Err(SyncularError::config(
541            "worker-owned blob file retrieval is not available for this client",
542        ))
543    }
544
545    fn worker_prune_blob_cache_json(&mut self, _max_bytes: i64) -> Result<String> {
546        Err(SyncularError::config(
547            "worker-owned blob cache pruning is not available for this client",
548        ))
549    }
550
551    fn worker_clear_blob_cache_json(&mut self) -> Result<String> {
552        Err(SyncularError::config(
553            "worker-owned blob cache clearing is not available for this client",
554        ))
555    }
556
557    fn worker_process_blob_upload_queue_json(&mut self) -> Result<Option<String>> {
558        Ok(None)
559    }
560
561    fn worker_blob_upload_queue_stats_json(&mut self) -> Result<Option<String>> {
562        Ok(None)
563    }
564
565    fn worker_next_outbox_retry_at_ms(&mut self) -> Result<Option<i64>> {
566        Ok(None)
567    }
568
569    fn worker_next_blob_upload_retry_at_ms(&mut self) -> Result<Option<i64>> {
570        Ok(None)
571    }
572}
573
574#[derive(Debug, Clone, PartialEq, Eq)]
575pub struct WorkerLocalWriteReceipt {
576    pub client_commit_id: String,
577    pub changed_tables: Vec<String>,
578    pub changed_rows: Vec<SyncChangedRow>,
579    pub crdt_event_payload_json: Option<Value>,
580}
581
582#[cfg(feature = "native")]
583impl<T> SyncWorkerClientExt for SyncularClient<DieselSqliteStore, T>
584where
585    T: SyncTransport + BlobTransport,
586{
587    fn apply_worker_mutation_json(
588        &mut self,
589        mutation_json: &str,
590        local_row_json: Option<&str>,
591    ) -> Result<String> {
592        self.apply_mutation_json(mutation_json, local_row_json)
593    }
594
595    fn apply_worker_leased_mutation_json(
596        &mut self,
597        mutation_json: &str,
598        local_row_json: Option<&str>,
599    ) -> Result<String> {
600        self.apply_leased_mutation_json(mutation_json, local_row_json)
601    }
602
603    fn apply_worker_mutation(
604        &mut self,
605        mutation: SyncOperation,
606        local_row: Option<Value>,
607    ) -> Result<String> {
608        let mutation_json = serde_json::to_string(&mutation)?;
609        let local_row_json = local_row.as_ref().map(serde_json::to_string).transpose()?;
610        self.apply_mutation_json(&mutation_json, local_row_json.as_deref())
611    }
612
613    fn worker_current_row_json(&mut self, table: &str, row_id: &str) -> Result<Option<Value>> {
614        self.current_row_json(table, row_id)
615    }
616
617    fn apply_worker_encrypted_crdt_update_json(
618        &mut self,
619        request_json: &str,
620    ) -> Result<WorkerLocalWriteReceipt> {
621        let request: WorkerEncryptedCrdtRequest = serde_json::from_str(request_json)?;
622        let field = request
623            .field_identity_ref()
624            .and_then(|identity| self.open_crdt_field(identity.id()).ok());
625        let receipt = self.apply_encrypted_crdt_update_json(request_json)?;
626        let crdt_event_payload_json = field
627            .as_ref()
628            .and_then(|field| crdt_field_event_payload_for_worker(self, field));
629        Ok(WorkerLocalWriteReceipt {
630            changed_rows: field
631                .as_ref()
632                .map(|field| crdt_field_changed_row_for_worker(field, &receipt.client_commit_id))
633                .into_iter()
634                .collect(),
635            client_commit_id: receipt.client_commit_id,
636            changed_tables: vec![request.table, CRDT_UPDATES_TABLE.to_string()],
637            crdt_event_payload_json,
638        })
639    }
640
641    fn apply_worker_encrypted_crdt_checkpoint_json(
642        &mut self,
643        request_json: &str,
644    ) -> Result<Option<WorkerLocalWriteReceipt>> {
645        let request: WorkerEncryptedCrdtRequest = serde_json::from_str(request_json)?;
646        let field = request
647            .field_identity_ref()
648            .and_then(|identity| self.open_crdt_field(identity.id()).ok());
649        let receipt = self.apply_encrypted_crdt_checkpoint_json(request_json)?;
650        let crdt_event_payload_json = field.as_ref().and_then(|field| {
651            crdt_field_compaction_payload_for_worker_current(
652                self,
653                field,
654                true,
655                encrypted_crdt_min_uncheckpointed_updates(request_json),
656            )
657        });
658        Ok(receipt.map(|receipt| WorkerLocalWriteReceipt {
659            changed_rows: field
660                .as_ref()
661                .map(|field| crdt_field_compacted_row_for_worker(field, &receipt.client_commit_id))
662                .into_iter()
663                .collect(),
664            client_commit_id: receipt.client_commit_id,
665            changed_tables: vec![CRDT_CHECKPOINTS_TABLE.to_string()],
666            crdt_event_payload_json,
667        }))
668    }
669
670    fn apply_worker_crdt_field_text_json(
671        &mut self,
672        request_json: &str,
673    ) -> Result<WorkerLocalWriteReceipt> {
674        let request: WorkerCrdtFieldTextRequest = serde_json::from_str(request_json)?;
675        let field = self.open_crdt_field(request.id())?;
676        let receipt = self.apply_crdt_field_text(&field, &request.next_text)?;
677        let crdt_event_payload_json = crdt_field_event_payload_for_worker(self, &field);
678        Ok(WorkerLocalWriteReceipt {
679            changed_rows: vec![crdt_field_changed_row_for_worker(
680                &field,
681                &receipt.client_commit_id,
682            )],
683            client_commit_id: receipt.client_commit_id,
684            changed_tables: crdt_field_write_tables_for_worker(&field),
685            crdt_event_payload_json,
686        })
687    }
688
689    fn compact_worker_crdt_field_json(
690        &mut self,
691        request_json: &str,
692    ) -> Result<Option<WorkerLocalWriteReceipt>> {
693        let request: WorkerCrdtFieldCompactionRequest = serde_json::from_str(request_json)?;
694        let field = self.open_crdt_field(request.id())?;
695        let receipt =
696            self.compact_crdt_field(&field, request.min_uncheckpointed_updates.unwrap_or(1))?;
697        let crdt_event_payload_json = crdt_field_compaction_payload_for_worker(
698            self,
699            &field,
700            &receipt,
701            receipt.checkpoint_created,
702            request.min_uncheckpointed_updates.unwrap_or(1),
703        );
704        Ok(receipt
705            .client_commit_id
706            .map(|client_commit_id| WorkerLocalWriteReceipt {
707                changed_rows: vec![crdt_field_compacted_row_for_worker(
708                    &field,
709                    &client_commit_id,
710                )],
711                client_commit_id,
712                changed_tables: crdt_field_compaction_tables_for_worker(&field),
713                crdt_event_payload_json,
714            }))
715    }
716
717    fn worker_crdt_field_event_payload_json(
718        &mut self,
719        table: &str,
720        row_id: &str,
721        field: &str,
722    ) -> Result<Option<Value>> {
723        let field = self.open_crdt_field(CrdtFieldId::new(table, row_id, field))?;
724        Ok(crdt_field_event_payload_for_worker(self, &field))
725    }
726
727    fn worker_crdt_field_changed_row(
728        &mut self,
729        table: &str,
730        row_id: &str,
731        field: &str,
732        client_commit_id: &str,
733    ) -> Result<Option<SyncChangedRow>> {
734        let field = self.open_crdt_field(CrdtFieldId::new(table, row_id, field))?;
735        Ok(Some(crdt_field_changed_row_for_worker(
736            &field,
737            client_commit_id,
738        )))
739    }
740
741    fn worker_query_json(&mut self, request_json: &str) -> Result<String> {
742        self.readonly_query_json(request_json)
743    }
744
745    fn worker_compact_storage_json(&mut self, options_json: Option<&str>) -> Result<String> {
746        self.compact_storage_json(options_json)
747    }
748
749    fn worker_store_blob_file_json(
750        &mut self,
751        path: &str,
752        options_json: Option<&str>,
753    ) -> Result<String> {
754        let options: WorkerBlobStoreOptions = options_json
755            .filter(|value| !value.trim().is_empty())
756            .map(serde_json::from_str)
757            .transpose()?
758            .unwrap_or_default();
759        if options.immediate.unwrap_or(false) {
760            return Err(SyncularError::config(
761                "queued blob file storage currently supports immediate=false",
762            ));
763        }
764        if !options.cache_local.unwrap_or(true) {
765            return Err(SyncularError::config(
766                "queued blob file storage with cacheLocal=false requires immediate=true",
767            ));
768        }
769        let mime_type = options
770            .mime_type
771            .as_deref()
772            .unwrap_or("application/octet-stream");
773        self.store_blob_file_local_json(Path::new(path), mime_type, true)
774    }
775
776    fn worker_retrieve_blob_file_json(
777        &mut self,
778        ref_json: &str,
779        path: &str,
780        options_json: Option<&str>,
781    ) -> Result<String> {
782        let options: WorkerBlobRetrieveOptions = options_json
783            .filter(|value| !value.trim().is_empty())
784            .map(serde_json::from_str)
785            .transpose()?
786            .unwrap_or_default();
787        let blob: BlobRef = serde_json::from_str(ref_json)?;
788        self.retrieve_cached_blob_file_json(&blob, Path::new(path))?;
789        let payload = json!({
790            "ok": true,
791            "cacheLocal": options.cache_local.unwrap_or(true)
792        });
793        Ok(serde_json::to_string(&payload)?)
794    }
795
796    fn worker_prune_blob_cache_json(&mut self, max_bytes: i64) -> Result<String> {
797        Ok(serde_json::to_string(&json!({
798            "bytesPruned": self.prune_blob_cache(max_bytes)?
799        }))?)
800    }
801
802    fn worker_clear_blob_cache_json(&mut self) -> Result<String> {
803        self.clear_blob_cache()?;
804        Ok(serde_json::to_string(&json!({ "ok": true }))?)
805    }
806
807    fn worker_process_blob_upload_queue_json(&mut self) -> Result<Option<String>> {
808        Ok(Some(serde_json::to_string(
809            &self.process_blob_upload_queue()?,
810        )?))
811    }
812
813    fn worker_blob_upload_queue_stats_json(&mut self) -> Result<Option<String>> {
814        Ok(Some(serde_json::to_string(
815            &self.blob_upload_queue_stats()?,
816        )?))
817    }
818
819    fn worker_next_outbox_retry_at_ms(&mut self) -> Result<Option<i64>> {
820        self.next_outbox_retry_at_ms()
821    }
822
823    fn worker_next_blob_upload_retry_at_ms(&mut self) -> Result<Option<i64>> {
824        self.next_blob_upload_retry_at_ms()
825    }
826}
827
828#[cfg(feature = "demo-todo-native-fixture")]
829impl<T> SyncWorkerClientExt for SyncularClient<RusqliteStore, T>
830where
831    T: SyncTransport,
832{
833    fn apply_worker_mutation_json(
834        &mut self,
835        mutation_json: &str,
836        local_row_json: Option<&str>,
837    ) -> Result<String> {
838        self.apply_mutation_json(mutation_json, local_row_json)
839    }
840
841    fn apply_worker_mutation(
842        &mut self,
843        mutation: SyncOperation,
844        local_row: Option<Value>,
845    ) -> Result<String> {
846        let mutation_json = serde_json::to_string(&mutation)?;
847        let local_row_json = local_row.as_ref().map(serde_json::to_string).transpose()?;
848        self.apply_mutation_json(&mutation_json, local_row_json.as_deref())
849    }
850
851    fn worker_current_row_json(&mut self, table: &str, row_id: &str) -> Result<Option<Value>> {
852        self.current_row_json(table, row_id)
853    }
854
855    fn worker_next_outbox_retry_at_ms(&mut self) -> Result<Option<i64>> {
856        self.next_outbox_retry_at_ms()
857    }
858}
859
860pub struct SyncWorker {
861    app_schema: AppSchema,
862    command_tx: SyncSender<WorkerCommand>,
863    events: SyncWorkerEventHub,
864    default_events: SyncWorkerEventSubscription,
865    join: Option<JoinHandle<()>>,
866}
867
868#[derive(Clone)]
869struct SyncWorkerEventHub {
870    subscriber_seq: Arc<Mutex<u64>>,
871    subscribers: Arc<Mutex<BTreeMap<u64, Arc<WorkerEventQueue>>>>,
872}
873
874pub struct SyncWorkerEventSubscription {
875    hub: SyncWorkerEventHub,
876    subscriber_id: u64,
877    queue: Arc<WorkerEventQueue>,
878}
879
880struct WorkerEventQueue {
881    capacity: usize,
882    state: Mutex<WorkerEventQueueState>,
883    ready: Condvar,
884}
885
886struct WorkerEventQueueState {
887    events: VecDeque<SyncWorkerEvent>,
888    closed: bool,
889}
890
891impl Default for SyncWorkerEventHub {
892    fn default() -> Self {
893        Self {
894            subscriber_seq: Arc::new(Mutex::new(0)),
895            subscribers: Arc::new(Mutex::new(BTreeMap::new())),
896        }
897    }
898}
899
900impl SyncWorkerEventSubscription {
901    pub fn next_event(&self) -> Option<SyncWorkerEvent> {
902        self.queue.next_event()
903    }
904
905    pub fn next_event_timeout(&self, timeout: Duration) -> Option<SyncWorkerEvent> {
906        self.queue.next_event_timeout(timeout)
907    }
908
909    pub fn close(&self) {
910        if let Ok(mut subscribers) = self.hub.subscribers.lock() {
911            subscribers.remove(&self.subscriber_id);
912        }
913        self.queue.close();
914    }
915}
916
917impl Drop for SyncWorkerEventSubscription {
918    fn drop(&mut self) {
919        self.close();
920    }
921}
922
923impl SyncWorkerEventHub {
924    fn subscribe(&self, capacity: usize) -> SyncWorkerEventSubscription {
925        let queue = Arc::new(WorkerEventQueue::new(capacity));
926        let subscriber_id = self.next_subscriber_id();
927        if let Ok(mut subscribers) = self.subscribers.lock() {
928            subscribers.insert(subscriber_id, queue.clone());
929        }
930        SyncWorkerEventSubscription {
931            hub: self.clone(),
932            subscriber_id,
933            queue,
934        }
935    }
936
937    fn publish_event(&self, event: SyncWorkerEvent) {
938        let Ok(mut subscribers) = self.subscribers.lock() else {
939            return;
940        };
941
942        subscribers.retain(|_, queue| {
943            queue.push(event.clone());
944            !queue.is_closed()
945        });
946    }
947
948    fn close_all(&self) {
949        if let Ok(mut subscribers) = self.subscribers.lock() {
950            for queue in subscribers.values() {
951                queue.close_after_drain();
952            }
953            subscribers.clear();
954        }
955    }
956
957    fn next_subscriber_id(&self) -> u64 {
958        if let Ok(mut seq) = self.subscriber_seq.lock() {
959            *seq = seq.saturating_add(1);
960            *seq
961        } else {
962            0
963        }
964    }
965}
966
967impl WorkerEventQueue {
968    fn new(capacity: usize) -> Self {
969        Self {
970            capacity: capacity.max(1),
971            state: Mutex::new(WorkerEventQueueState {
972                events: VecDeque::new(),
973                closed: false,
974            }),
975            ready: Condvar::new(),
976        }
977    }
978
979    fn push(&self, event: SyncWorkerEvent) {
980        let Ok(mut state) = self.state.lock() else {
981            return;
982        };
983        if state.closed {
984            return;
985        }
986        if state.events.len() >= self.capacity {
987            let dropped_count = state.events.len().saturating_add(1);
988            state.events.clear();
989            state
990                .events
991                .push_back(SyncWorkerEvent::EventsOverflowed { dropped_count });
992            state.closed = true;
993        } else {
994            state.events.push_back(event);
995        }
996        self.ready.notify_one();
997    }
998
999    fn next_event(&self) -> Option<SyncWorkerEvent> {
1000        let mut state = self.state.lock().ok()?;
1001        loop {
1002            if let Some(event) = state.events.pop_front() {
1003                return Some(event);
1004            }
1005            if state.closed {
1006                return None;
1007            }
1008            state = self.ready.wait(state).ok()?;
1009        }
1010    }
1011
1012    fn next_event_timeout(&self, timeout: Duration) -> Option<SyncWorkerEvent> {
1013        let deadline = Instant::now().checked_add(timeout)?;
1014        let mut state = self.state.lock().ok()?;
1015        loop {
1016            if let Some(event) = state.events.pop_front() {
1017                return Some(event);
1018            }
1019            if state.closed {
1020                return None;
1021            }
1022            let now = Instant::now();
1023            if now >= deadline {
1024                return None;
1025            }
1026            let wait = deadline.saturating_duration_since(now);
1027            let (next_state, timeout) = self.ready.wait_timeout(state, wait).ok()?;
1028            state = next_state;
1029            if timeout.timed_out() && state.events.is_empty() {
1030                return None;
1031            }
1032        }
1033    }
1034
1035    fn close(&self) {
1036        if let Ok(mut state) = self.state.lock() {
1037            state.closed = true;
1038            state.events.clear();
1039            self.ready.notify_all();
1040        }
1041    }
1042
1043    fn close_after_drain(&self) {
1044        if let Ok(mut state) = self.state.lock() {
1045            state.closed = true;
1046            self.ready.notify_all();
1047        }
1048    }
1049
1050    fn is_closed(&self) -> bool {
1051        self.state.lock().map(|state| state.closed).unwrap_or(true)
1052    }
1053}
1054
1055struct CloseWorkerEventsOnDrop(SyncWorkerEventHub);
1056
1057impl Drop for CloseWorkerEventsOnDrop {
1058    fn drop(&mut self) {
1059        self.0.close_all();
1060    }
1061}
1062
1063enum WorkerWake {
1064    Command(WorkerCommand),
1065    FlushYjs,
1066    Retry,
1067}
1068
1069#[derive(Clone)]
1070pub struct SyncWorkerTrigger {
1071    command_tx: SyncSender<WorkerCommand>,
1072}
1073
1074pub struct PersistentRealtimeWorker {
1075    command_tx: SyncSender<RealtimeWorkerCommand>,
1076    join: Option<JoinHandle<()>>,
1077}
1078
1079enum RealtimeWorkerCommand {
1080    Stop,
1081    SetAuthHeaders(SyncAuthHeaders),
1082    SendPresence {
1083        action: String,
1084        scope_key: String,
1085        metadata: Option<Value>,
1086    },
1087}
1088
1089type RealtimeEventHandler = Arc<dyn Fn(RealtimeEvent) + Send + Sync>;
1090
1091impl SyncWorkerTrigger {
1092    pub fn trigger_sync(&self) -> Result<()> {
1093        self.command_tx
1094            .try_send(WorkerCommand::Trigger {
1095                command_id: None,
1096                emit_started: false,
1097                transport: WorkerSyncTransport::Http,
1098            })
1099            .map_err(|err| match err {
1100                TrySendError::Full(_) => SyncularError::busy("sync worker command queue is full"),
1101                TrySendError::Disconnected(_) => {
1102                    SyncularError::message(ErrorKind::Internal, "sync worker is not running")
1103                }
1104            })
1105    }
1106}
1107
1108impl PersistentRealtimeWorker {
1109    pub fn start<T>(transport: T, trigger: SyncWorkerTrigger) -> Self
1110    where
1111        T: SyncTransport + SyncAuthHeaderStore + Send + 'static,
1112    {
1113        Self::start_with_event_handler(transport, trigger, None)
1114    }
1115
1116    pub fn start_with_event_handler<T>(
1117        transport: T,
1118        trigger: SyncWorkerTrigger,
1119        event_handler: Option<RealtimeEventHandler>,
1120    ) -> Self
1121    where
1122        T: SyncTransport + SyncAuthHeaderStore + Send + 'static,
1123    {
1124        let (command_tx, command_rx) = mpsc::sync_channel(32);
1125        let join = thread::spawn(move || {
1126            run_persistent_realtime_worker(transport, trigger, command_rx, event_handler)
1127        });
1128        Self {
1129            command_tx,
1130            join: Some(join),
1131        }
1132    }
1133
1134    pub fn set_auth_headers(&self, headers: SyncAuthHeaders) -> Result<()> {
1135        self.command_tx
1136            .try_send(RealtimeWorkerCommand::SetAuthHeaders(headers))
1137            .map_err(|err| match err {
1138                TrySendError::Full(_) => {
1139                    SyncularError::busy("realtime worker command queue is full")
1140                }
1141                TrySendError::Disconnected(_) => {
1142                    SyncularError::message(ErrorKind::Internal, "realtime worker is not running")
1143                }
1144            })
1145    }
1146
1147    pub fn send_presence(
1148        &self,
1149        action: impl Into<String>,
1150        scope_key: impl Into<String>,
1151        metadata: Option<Value>,
1152    ) -> Result<()> {
1153        self.command_tx
1154            .try_send(RealtimeWorkerCommand::SendPresence {
1155                action: action.into(),
1156                scope_key: scope_key.into(),
1157                metadata,
1158            })
1159            .map_err(|err| match err {
1160                TrySendError::Full(_) => {
1161                    SyncularError::busy("realtime worker command queue is full")
1162                }
1163                TrySendError::Disconnected(_) => {
1164                    SyncularError::message(ErrorKind::Internal, "realtime worker is not running")
1165                }
1166            })
1167    }
1168
1169    pub fn stop(&mut self) -> Result<()> {
1170        let _ = self.command_tx.send(RealtimeWorkerCommand::Stop);
1171        if let Some(join) = self.join.take() {
1172            join.join().map_err(|_| {
1173                SyncularError::message(ErrorKind::Internal, "realtime worker panicked")
1174            })?;
1175        }
1176        Ok(())
1177    }
1178}
1179
1180impl Drop for PersistentRealtimeWorker {
1181    fn drop(&mut self) {
1182        let _ = self.stop();
1183    }
1184}
1185
1186fn run_persistent_realtime_worker<T>(
1187    mut transport: T,
1188    trigger: SyncWorkerTrigger,
1189    command_rx: Receiver<RealtimeWorkerCommand>,
1190    event_handler: Option<RealtimeEventHandler>,
1191) where
1192    T: SyncTransport + SyncAuthHeaderStore,
1193{
1194    let mut reconnect_attempt: i32 = 0;
1195    let mut active_presence: BTreeMap<String, Option<Value>> = BTreeMap::new();
1196    loop {
1197        if drain_realtime_commands(&mut transport, None, &command_rx, &mut active_presence)
1198            .is_none()
1199        {
1200            return;
1201        }
1202
1203        match transport.connect_realtime() {
1204            Ok(mut socket) => {
1205                reconnect_attempt = 0;
1206                rejoin_realtime_presence(&mut socket, &active_presence);
1207                if !run_connected_realtime_socket(
1208                    &mut transport,
1209                    &mut socket,
1210                    &trigger,
1211                    &command_rx,
1212                    &mut active_presence,
1213                    event_handler.as_deref(),
1214                ) {
1215                    return;
1216                }
1217            }
1218            Err(_) => {
1219                reconnect_attempt = reconnect_attempt.saturating_add(1);
1220            }
1221        }
1222
1223        let delay =
1224            Duration::from_millis(retry_backoff_delay_ms(reconnect_attempt).max(250) as u64);
1225        match command_rx.recv_timeout(delay) {
1226            Ok(RealtimeWorkerCommand::Stop) | Err(RecvTimeoutError::Disconnected) => return,
1227            Ok(RealtimeWorkerCommand::SetAuthHeaders(headers)) => {
1228                transport.set_auth_headers(headers);
1229                reconnect_attempt = 0;
1230            }
1231            Ok(RealtimeWorkerCommand::SendPresence {
1232                action,
1233                scope_key,
1234                metadata,
1235            }) => {
1236                apply_active_presence_command(&mut active_presence, &action, &scope_key, metadata);
1237                reconnect_attempt = 0;
1238            }
1239            Err(RecvTimeoutError::Timeout) => {}
1240        }
1241    }
1242}
1243
1244fn run_connected_realtime_socket<T>(
1245    transport: &mut T,
1246    socket: &mut T::Realtime,
1247    trigger: &SyncWorkerTrigger,
1248    command_rx: &Receiver<RealtimeWorkerCommand>,
1249    active_presence: &mut BTreeMap<String, Option<Value>>,
1250    event_handler: Option<&(dyn Fn(RealtimeEvent) + Send + Sync)>,
1251) -> bool
1252where
1253    T: SyncTransport + SyncAuthHeaderStore,
1254{
1255    loop {
1256        match drain_realtime_commands(transport, Some(&mut *socket), command_rx, active_presence) {
1257            Some(true) => {
1258                socket.close();
1259                return true;
1260            }
1261            Some(false) => {}
1262            None => {
1263                socket.close();
1264                return false;
1265            }
1266        }
1267
1268        match socket.read_event() {
1269            Ok(Some(RealtimeEvent::Sync)) => {
1270                let _ = trigger.trigger_sync();
1271            }
1272            Ok(Some(event @ RealtimeEvent::Presence(_))) => {
1273                if let Some(handler) = event_handler {
1274                    handler(event);
1275                }
1276            }
1277            Ok(Some(RealtimeEvent::Other(_))) => {}
1278            Ok(None) => match command_rx.recv_timeout(Duration::from_millis(250)) {
1279                Ok(RealtimeWorkerCommand::Stop) | Err(RecvTimeoutError::Disconnected) => {
1280                    socket.close();
1281                    return false;
1282                }
1283                Ok(RealtimeWorkerCommand::SetAuthHeaders(headers)) => {
1284                    transport.set_auth_headers(headers);
1285                    socket.close();
1286                    return true;
1287                }
1288                Ok(RealtimeWorkerCommand::SendPresence {
1289                    action,
1290                    scope_key,
1291                    metadata,
1292                }) => {
1293                    apply_active_presence_command(
1294                        active_presence,
1295                        &action,
1296                        &scope_key,
1297                        metadata.clone(),
1298                    );
1299                    let _ = socket.send_presence(&action, &scope_key, metadata.as_ref());
1300                }
1301                Err(RecvTimeoutError::Timeout) => {}
1302            },
1303            Err(_) => {
1304                socket.close();
1305                return true;
1306            }
1307        }
1308    }
1309}
1310
1311fn drain_realtime_commands<T>(
1312    transport: &mut T,
1313    mut socket: Option<&mut T::Realtime>,
1314    command_rx: &Receiver<RealtimeWorkerCommand>,
1315    active_presence: &mut BTreeMap<String, Option<Value>>,
1316) -> Option<bool>
1317where
1318    T: SyncAuthHeaderStore + SyncTransport,
1319{
1320    let mut reconnect = false;
1321    loop {
1322        match command_rx.try_recv() {
1323            Ok(RealtimeWorkerCommand::Stop) | Err(mpsc::TryRecvError::Disconnected) => {
1324                return None;
1325            }
1326            Ok(RealtimeWorkerCommand::SetAuthHeaders(headers)) => {
1327                transport.set_auth_headers(headers);
1328                reconnect = true;
1329            }
1330            Ok(RealtimeWorkerCommand::SendPresence {
1331                action,
1332                scope_key,
1333                metadata,
1334            }) => {
1335                apply_active_presence_command(
1336                    active_presence,
1337                    &action,
1338                    &scope_key,
1339                    metadata.clone(),
1340                );
1341                if let Some(socket) = socket.as_deref_mut() {
1342                    let _ = socket.send_presence(&action, &scope_key, metadata.as_ref());
1343                }
1344            }
1345            Err(mpsc::TryRecvError::Empty) => return Some(reconnect),
1346        }
1347    }
1348}
1349
1350fn apply_active_presence_command(
1351    active_presence: &mut BTreeMap<String, Option<Value>>,
1352    action: &str,
1353    scope_key: &str,
1354    metadata: Option<Value>,
1355) {
1356    match action {
1357        "leave" => {
1358            active_presence.remove(scope_key);
1359        }
1360        "join" | "update" => {
1361            active_presence.insert(scope_key.to_string(), metadata);
1362        }
1363        _ => {}
1364    }
1365}
1366
1367fn rejoin_realtime_presence<T>(socket: &mut T, active_presence: &BTreeMap<String, Option<Value>>)
1368where
1369    T: RealtimeTransport,
1370{
1371    for (scope_key, metadata) in active_presence {
1372        let _ = socket.send_presence("join", scope_key, metadata.as_ref());
1373    }
1374}
1375
1376impl SyncWorker {
1377    pub fn start<S, T>(client: SyncularClient<S, T>) -> Self
1378    where
1379        S: SyncStore + SyncStateStore + Send + 'static,
1380        T: SyncTransport + SyncAuthHeaderStore + Send + 'static,
1381        SyncularClient<S, T>: SyncWorkerClientExt,
1382    {
1383        Self::start_with_config(client, SyncWorkerConfig::default())
1384    }
1385
1386    pub fn start_with_config<S, T>(
1387        mut client: SyncularClient<S, T>,
1388        config: SyncWorkerConfig,
1389    ) -> Self
1390    where
1391        S: SyncStore + SyncStateStore + Send + 'static,
1392        T: SyncTransport + SyncAuthHeaderStore + Send + 'static,
1393        SyncularClient<S, T>: SyncWorkerClientExt,
1394    {
1395        let (command_tx, command_rx) = mpsc::sync_channel(config.command_queue_capacity);
1396        let app_schema = client.app_schema();
1397        let events = SyncWorkerEventHub::default();
1398        let default_events = events.subscribe(DEFAULT_WORKER_EVENT_QUEUE_CAPACITY);
1399        let worker_events = events.clone();
1400        let join = thread::spawn(move || {
1401            let _close_events = CloseWorkerEventsOnDrop(worker_events.clone());
1402            let mut pending_yjs = BTreeMap::new();
1403            loop {
1404                let wake = if pending_yjs.is_empty() {
1405                    match next_retry_timeout(&mut client) {
1406                        Some(timeout) => match command_rx.recv_timeout(timeout) {
1407                            Ok(command) => WorkerWake::Command(command),
1408                            Err(RecvTimeoutError::Timeout) => WorkerWake::Retry,
1409                            Err(RecvTimeoutError::Disconnected) => return,
1410                        },
1411                        None => match command_rx.recv() {
1412                            Ok(command) => WorkerWake::Command(command),
1413                            Err(_) => return,
1414                        },
1415                    }
1416                } else {
1417                    match command_rx.recv_timeout(config.yjs_flush_window) {
1418                        Ok(command) => WorkerWake::Command(command),
1419                        Err(RecvTimeoutError::Timeout) => WorkerWake::FlushYjs,
1420                        Err(RecvTimeoutError::Disconnected) => return,
1421                    }
1422                };
1423
1424                match wake {
1425                    WorkerWake::FlushYjs => {
1426                        if flush_pending_yjs(&mut client, &mut pending_yjs, &worker_events) {
1427                            if !run_until_settled(
1428                                &mut client,
1429                                &command_rx,
1430                                &worker_events,
1431                                &mut pending_yjs,
1432                                None,
1433                                false,
1434                                WorkerSyncTransport::Http,
1435                            ) {
1436                                return;
1437                            }
1438                        }
1439                    }
1440                    WorkerWake::Retry => {
1441                        if !run_due_retry_work(
1442                            &mut client,
1443                            &command_rx,
1444                            &worker_events,
1445                            &mut pending_yjs,
1446                        ) {
1447                            return;
1448                        }
1449                    }
1450                    WorkerWake::Command(command) => {
1451                        if !handle_command(
1452                            &mut client,
1453                            &command_rx,
1454                            &worker_events,
1455                            &mut pending_yjs,
1456                            command,
1457                        ) {
1458                            return;
1459                        }
1460                    }
1461                }
1462            }
1463        });
1464
1465        Self {
1466            app_schema,
1467            command_tx,
1468            events,
1469            default_events,
1470            join: Some(join),
1471        }
1472    }
1473
1474    pub fn trigger_sync(&self) -> Result<()> {
1475        self.trigger_sync_inner(None, false, WorkerSyncTransport::Http)
1476    }
1477
1478    pub fn trigger_sync_websocket(&self) -> Result<()> {
1479        self.trigger_sync_inner(None, false, WorkerSyncTransport::WebSocket)
1480    }
1481
1482    pub fn trigger_handle(&self) -> SyncWorkerTrigger {
1483        SyncWorkerTrigger {
1484            command_tx: self.command_tx.clone(),
1485        }
1486    }
1487
1488    pub fn subscribe_events(&self, capacity: usize) -> SyncWorkerEventSubscription {
1489        self.events.subscribe(capacity)
1490    }
1491
1492    pub fn event_source(&self) -> SyncWorkerEventSubscription {
1493        self.subscribe_events(DEFAULT_WORKER_EVENT_QUEUE_CAPACITY)
1494    }
1495
1496    pub fn enqueue_sync_now(&self, command_id: String) -> Result<()> {
1497        self.trigger_sync_inner(Some(command_id), true, WorkerSyncTransport::Http)
1498    }
1499
1500    pub fn enqueue_sync_websocket(&self, command_id: String) -> Result<()> {
1501        self.trigger_sync_inner(Some(command_id), true, WorkerSyncTransport::WebSocket)
1502    }
1503
1504    pub fn enqueue_mutation_json(
1505        &self,
1506        command_id: String,
1507        mutation_json: String,
1508        local_row_json: Option<String>,
1509        auto_sync: bool,
1510    ) -> Result<()> {
1511        validate_mutation_json_input_size(&mutation_json, local_row_json.as_deref())?;
1512        self.try_send(WorkerCommand::ApplyMutationJson {
1513            command_id,
1514            mutation_json,
1515            local_row_json,
1516            auto_sync,
1517            require_auth_lease: false,
1518        })
1519    }
1520
1521    pub fn enqueue_leased_mutation_json(
1522        &self,
1523        command_id: String,
1524        mutation_json: String,
1525        local_row_json: Option<String>,
1526        auto_sync: bool,
1527    ) -> Result<()> {
1528        validate_mutation_json_input_size(&mutation_json, local_row_json.as_deref())?;
1529        self.try_send(WorkerCommand::ApplyMutationJson {
1530            command_id,
1531            mutation_json,
1532            local_row_json,
1533            auto_sync,
1534            require_auth_lease: true,
1535        })
1536    }
1537
1538    pub fn enqueue_yjs_update_json(
1539        &self,
1540        command_id: String,
1541        update_json: String,
1542        auto_sync: bool,
1543    ) -> Result<()> {
1544        validate_save_yjs_update_json(&update_json)?;
1545        self.try_send(WorkerCommand::SaveYjsUpdateJson {
1546            command_id,
1547            update_json,
1548            auto_sync,
1549        })
1550    }
1551
1552    pub fn enqueue_crdt_field_text_json(
1553        &self,
1554        command_id: String,
1555        request_json: String,
1556        auto_sync: bool,
1557    ) -> Result<()> {
1558        validate_worker_crdt_field_text_request_json(&request_json)?;
1559        self.try_send(WorkerCommand::ApplyCrdtFieldTextJson {
1560            command_id,
1561            request_json,
1562            auto_sync,
1563        })
1564    }
1565
1566    pub fn enqueue_crdt_field_compaction_json(
1567        &self,
1568        command_id: String,
1569        request_json: String,
1570        auto_sync: bool,
1571    ) -> Result<()> {
1572        validate_crdt_request_json_size(&request_json)?;
1573        self.try_send(WorkerCommand::CompactCrdtFieldJson {
1574            command_id,
1575            request_json,
1576            auto_sync,
1577        })
1578    }
1579
1580    pub fn enqueue_encrypted_crdt_update_json(
1581        &self,
1582        command_id: String,
1583        request_json: String,
1584        auto_sync: bool,
1585    ) -> Result<()> {
1586        validate_crdt_request_json_size(&request_json)?;
1587        self.try_send(WorkerCommand::ApplyEncryptedCrdtUpdateJson {
1588            command_id,
1589            request_json,
1590            auto_sync,
1591        })
1592    }
1593
1594    pub fn enqueue_encrypted_crdt_checkpoint_json(
1595        &self,
1596        command_id: String,
1597        request_json: String,
1598        auto_sync: bool,
1599    ) -> Result<()> {
1600        validate_crdt_request_json_size(&request_json)?;
1601        self.try_send(WorkerCommand::ApplyEncryptedCrdtCheckpointJson {
1602            command_id,
1603            request_json,
1604            auto_sync,
1605        })
1606    }
1607
1608    pub fn enqueue_conflict_resolution(
1609        &self,
1610        command_id: String,
1611        conflict_id: String,
1612        resolution: String,
1613        auto_sync: bool,
1614    ) -> Result<()> {
1615        self.try_send(WorkerCommand::ResolveConflict {
1616            command_id,
1617            conflict_id,
1618            resolution,
1619            auto_sync,
1620        })
1621    }
1622
1623    pub fn enqueue_refresh_snapshot_json(
1624        &self,
1625        command_id: String,
1626        request_json: String,
1627    ) -> Result<()> {
1628        self.try_send(WorkerCommand::RefreshSnapshotJson {
1629            command_id,
1630            request_json,
1631        })
1632    }
1633
1634    pub fn enqueue_compact_storage_json(
1635        &self,
1636        command_id: String,
1637        options_json: Option<String>,
1638    ) -> Result<()> {
1639        self.try_send(WorkerCommand::CompactStorageJson {
1640            command_id,
1641            options_json,
1642        })
1643    }
1644
1645    pub fn enqueue_store_blob_file_json(
1646        &self,
1647        command_id: String,
1648        path: String,
1649        options_json: Option<String>,
1650    ) -> Result<()> {
1651        self.try_send(WorkerCommand::StoreBlobFileJson {
1652            command_id,
1653            path,
1654            options_json,
1655        })
1656    }
1657
1658    pub fn enqueue_retrieve_blob_file_json(
1659        &self,
1660        command_id: String,
1661        ref_json: String,
1662        path: String,
1663        options_json: Option<String>,
1664    ) -> Result<()> {
1665        self.try_send(WorkerCommand::RetrieveBlobFileJson {
1666            command_id,
1667            ref_json,
1668            path,
1669            options_json,
1670        })
1671    }
1672
1673    pub fn enqueue_process_blob_upload_queue(&self, command_id: String) -> Result<()> {
1674        self.try_send(WorkerCommand::ProcessBlobUploadQueue { command_id })
1675    }
1676
1677    pub fn enqueue_prune_blob_cache(&self, command_id: String, max_bytes: i64) -> Result<()> {
1678        self.try_send(WorkerCommand::PruneBlobCache {
1679            command_id,
1680            max_bytes,
1681        })
1682    }
1683
1684    pub fn enqueue_clear_blob_cache(&self, command_id: String) -> Result<()> {
1685        self.try_send(WorkerCommand::ClearBlobCache { command_id })
1686    }
1687
1688    pub fn set_auth_headers(&self, headers: SyncAuthHeaders) -> Result<()> {
1689        self.try_send(WorkerCommand::SetAuthHeaders(headers))
1690    }
1691
1692    pub fn set_subscriptions(&self, subscriptions: Vec<SubscriptionSpec>) -> Result<()> {
1693        validate_subscription_limits(&subscriptions)?;
1694        self.try_send(WorkerCommand::SetSubscriptions(subscriptions))
1695    }
1696
1697    pub fn set_field_encryption(&self, encryption: Option<FieldEncryption>) -> Result<()> {
1698        if let Some(encryption) = &encryption {
1699            validate_field_encryption_rules_against_app_schema(
1700                self.app_schema,
1701                encryption.rules(),
1702            )?;
1703        }
1704        self.try_send(WorkerCommand::SetFieldEncryption(encryption))
1705    }
1706
1707    pub fn set_encrypted_crdt(&self, encryption: Option<EncryptedCrdt>) -> Result<()> {
1708        if encryption.is_some() {
1709            validate_encrypted_crdt_against_app_schema(self.app_schema)?;
1710        }
1711        self.try_send(WorkerCommand::SetEncryptedCrdt(encryption))
1712    }
1713
1714    pub fn set_blob_encryption(&self, encryption: Option<BlobEncryption>) -> Result<()> {
1715        if encryption.is_some() {
1716            validate_blob_encryption_against_app_schema(self.app_schema)?;
1717        }
1718        self.try_send(WorkerCommand::SetBlobEncryption(encryption))
1719    }
1720
1721    pub fn recv_event_timeout(&self, timeout: Duration) -> Option<SyncWorkerEvent> {
1722        self.default_events.next_event_timeout(timeout)
1723    }
1724
1725    pub fn recv_result_timeout(&self, timeout: Duration) -> Option<Result<SyncReport>> {
1726        let deadline = Instant::now().checked_add(timeout)?;
1727        loop {
1728            let now = Instant::now();
1729            if now >= deadline {
1730                return None;
1731            }
1732            let remaining = deadline.saturating_duration_since(now);
1733            match self.recv_event_timeout(remaining)? {
1734                SyncWorkerEvent::SyncCompleted { report, .. } => return Some(Ok(report)),
1735                SyncWorkerEvent::SyncFailed { error, .. } => return Some(Err(error)),
1736                _ => continue,
1737            }
1738        }
1739    }
1740
1741    pub fn request_stop(&self) -> Result<()> {
1742        self.command_tx
1743            .send(WorkerCommand::Stop)
1744            .map_err(|_| SyncularError::message(ErrorKind::Internal, "sync worker is not running"))
1745    }
1746
1747    pub fn join(&mut self) -> Result<()> {
1748        if let Some(join) = self.join.take() {
1749            join.join()
1750                .map_err(|_| SyncularError::message(ErrorKind::Internal, "sync worker panicked"))?;
1751        }
1752        Ok(())
1753    }
1754
1755    pub fn stop(mut self) -> Result<()> {
1756        let _ = self.request_stop();
1757        self.join()
1758    }
1759
1760    fn trigger_sync_inner(
1761        &self,
1762        command_id: Option<String>,
1763        emit_started: bool,
1764        transport: WorkerSyncTransport,
1765    ) -> Result<()> {
1766        self.try_send(WorkerCommand::Trigger {
1767            command_id,
1768            emit_started,
1769            transport,
1770        })
1771    }
1772
1773    fn try_send(&self, command: WorkerCommand) -> Result<()> {
1774        self.command_tx.try_send(command).map_err(|err| match err {
1775            TrySendError::Full(_) => SyncularError::busy("sync worker command queue is full"),
1776            TrySendError::Disconnected(_) => {
1777                SyncularError::message(ErrorKind::Internal, "sync worker is not running")
1778            }
1779        })
1780    }
1781}
1782
1783impl Drop for SyncWorker {
1784    fn drop(&mut self) {
1785        let _ = self.command_tx.send(WorkerCommand::Stop);
1786        if let Some(join) = self.join.take() {
1787            let _ = join.join();
1788        }
1789    }
1790}
1791
1792fn handle_command<S, T>(
1793    client: &mut SyncularClient<S, T>,
1794    command_rx: &Receiver<WorkerCommand>,
1795    event_tx: &SyncWorkerEventHub,
1796    pending_yjs: &mut BTreeMap<YjsBatchKey, PendingYjsBatch>,
1797    command: WorkerCommand,
1798) -> bool
1799where
1800    S: SyncStore + SyncStateStore,
1801    T: SyncTransport + SyncAuthHeaderStore,
1802    SyncularClient<S, T>: SyncWorkerClientExt,
1803{
1804    match command {
1805        WorkerCommand::Trigger {
1806            command_id,
1807            emit_started,
1808            transport,
1809        } => run_until_settled(
1810            client,
1811            command_rx,
1812            event_tx,
1813            pending_yjs,
1814            command_id,
1815            emit_started,
1816            transport,
1817        ),
1818        WorkerCommand::ApplyMutationJson {
1819            command_id,
1820            mutation_json,
1821            local_row_json,
1822            auto_sync,
1823            require_auth_lease,
1824        } => {
1825            if flush_pending_yjs(client, pending_yjs, event_tx) {
1826                if !run_until_settled(
1827                    client,
1828                    command_rx,
1829                    event_tx,
1830                    pending_yjs,
1831                    None,
1832                    false,
1833                    WorkerSyncTransport::Http,
1834                ) {
1835                    return false;
1836                }
1837            }
1838            let should_sync = apply_mutation_json(
1839                client,
1840                event_tx,
1841                command_id,
1842                &mutation_json,
1843                local_row_json.as_deref(),
1844                auto_sync,
1845                require_auth_lease,
1846            );
1847            if should_sync {
1848                run_until_settled(
1849                    client,
1850                    command_rx,
1851                    event_tx,
1852                    pending_yjs,
1853                    None,
1854                    false,
1855                    WorkerSyncTransport::Http,
1856                )
1857            } else {
1858                true
1859            }
1860        }
1861        WorkerCommand::SaveYjsUpdateJson {
1862            command_id,
1863            update_json,
1864            auto_sync,
1865        } => {
1866            queue_yjs_update_json(pending_yjs, event_tx, command_id, &update_json, auto_sync);
1867            true
1868        }
1869        WorkerCommand::ApplyCrdtFieldTextJson {
1870            command_id,
1871            request_json,
1872            auto_sync,
1873        } => {
1874            if flush_pending_yjs(client, pending_yjs, event_tx) {
1875                if !run_until_settled(
1876                    client,
1877                    command_rx,
1878                    event_tx,
1879                    pending_yjs,
1880                    None,
1881                    false,
1882                    WorkerSyncTransport::Http,
1883                ) {
1884                    return false;
1885                }
1886            }
1887            let should_sync =
1888                apply_crdt_field_text_json(client, event_tx, command_id, &request_json, auto_sync);
1889            if should_sync {
1890                run_until_settled(
1891                    client,
1892                    command_rx,
1893                    event_tx,
1894                    pending_yjs,
1895                    None,
1896                    false,
1897                    WorkerSyncTransport::Http,
1898                )
1899            } else {
1900                true
1901            }
1902        }
1903        WorkerCommand::CompactCrdtFieldJson {
1904            command_id,
1905            request_json,
1906            auto_sync,
1907        } => {
1908            if flush_pending_yjs(client, pending_yjs, event_tx) {
1909                if !run_until_settled(
1910                    client,
1911                    command_rx,
1912                    event_tx,
1913                    pending_yjs,
1914                    None,
1915                    false,
1916                    WorkerSyncTransport::Http,
1917                ) {
1918                    return false;
1919                }
1920            }
1921            let should_sync =
1922                compact_crdt_field_json(client, event_tx, command_id, &request_json, auto_sync);
1923            if should_sync {
1924                run_until_settled(
1925                    client,
1926                    command_rx,
1927                    event_tx,
1928                    pending_yjs,
1929                    None,
1930                    false,
1931                    WorkerSyncTransport::Http,
1932                )
1933            } else {
1934                true
1935            }
1936        }
1937        WorkerCommand::ApplyEncryptedCrdtUpdateJson {
1938            command_id,
1939            request_json,
1940            auto_sync,
1941        } => {
1942            if flush_pending_yjs(client, pending_yjs, event_tx) {
1943                if !run_until_settled(
1944                    client,
1945                    command_rx,
1946                    event_tx,
1947                    pending_yjs,
1948                    None,
1949                    false,
1950                    WorkerSyncTransport::Http,
1951                ) {
1952                    return false;
1953                }
1954            }
1955            let should_sync = apply_encrypted_crdt_update_json(
1956                client,
1957                event_tx,
1958                command_id,
1959                &request_json,
1960                auto_sync,
1961            );
1962            if should_sync {
1963                run_until_settled(
1964                    client,
1965                    command_rx,
1966                    event_tx,
1967                    pending_yjs,
1968                    None,
1969                    false,
1970                    WorkerSyncTransport::Http,
1971                )
1972            } else {
1973                true
1974            }
1975        }
1976        WorkerCommand::ApplyEncryptedCrdtCheckpointJson {
1977            command_id,
1978            request_json,
1979            auto_sync,
1980        } => {
1981            if flush_pending_yjs(client, pending_yjs, event_tx) {
1982                if !run_until_settled(
1983                    client,
1984                    command_rx,
1985                    event_tx,
1986                    pending_yjs,
1987                    None,
1988                    false,
1989                    WorkerSyncTransport::Http,
1990                ) {
1991                    return false;
1992                }
1993            }
1994            let should_sync = apply_encrypted_crdt_checkpoint_json(
1995                client,
1996                event_tx,
1997                command_id,
1998                &request_json,
1999                auto_sync,
2000            );
2001            if should_sync {
2002                run_until_settled(
2003                    client,
2004                    command_rx,
2005                    event_tx,
2006                    pending_yjs,
2007                    None,
2008                    false,
2009                    WorkerSyncTransport::Http,
2010                )
2011            } else {
2012                true
2013            }
2014        }
2015        WorkerCommand::ResolveConflict {
2016            command_id,
2017            conflict_id,
2018            resolution,
2019            auto_sync,
2020        } => {
2021            if flush_pending_yjs(client, pending_yjs, event_tx) {
2022                if !run_until_settled(
2023                    client,
2024                    command_rx,
2025                    event_tx,
2026                    pending_yjs,
2027                    None,
2028                    false,
2029                    WorkerSyncTransport::Http,
2030                ) {
2031                    return false;
2032                }
2033            }
2034            let should_sync = resolve_conflict(
2035                client,
2036                event_tx,
2037                command_id,
2038                &conflict_id,
2039                &resolution,
2040                auto_sync,
2041            );
2042            if should_sync {
2043                run_until_settled(
2044                    client,
2045                    command_rx,
2046                    event_tx,
2047                    pending_yjs,
2048                    None,
2049                    false,
2050                    WorkerSyncTransport::Http,
2051                )
2052            } else {
2053                true
2054            }
2055        }
2056        WorkerCommand::RefreshSnapshotJson {
2057            command_id,
2058            request_json,
2059        } => {
2060            if flush_pending_yjs(client, pending_yjs, event_tx) {
2061                if !run_until_settled(
2062                    client,
2063                    command_rx,
2064                    event_tx,
2065                    pending_yjs,
2066                    None,
2067                    false,
2068                    WorkerSyncTransport::Http,
2069                ) {
2070                    return false;
2071                }
2072            }
2073            refresh_snapshot_json(client, event_tx, command_id, &request_json);
2074            true
2075        }
2076        WorkerCommand::CompactStorageJson {
2077            command_id,
2078            options_json,
2079        } => {
2080            if flush_pending_yjs(client, pending_yjs, event_tx) {
2081                if !run_until_settled(
2082                    client,
2083                    command_rx,
2084                    event_tx,
2085                    pending_yjs,
2086                    None,
2087                    false,
2088                    WorkerSyncTransport::Http,
2089                ) {
2090                    return false;
2091                }
2092            }
2093            run_worker_json_command(client, event_tx, command_id, "compactStorage", |client| {
2094                client.worker_compact_storage_json(options_json.as_deref())
2095            });
2096            true
2097        }
2098        WorkerCommand::StoreBlobFileJson {
2099            command_id,
2100            path,
2101            options_json,
2102        } => {
2103            if run_worker_json_command(client, event_tx, command_id, "storeBlobFile", |client| {
2104                client.worker_store_blob_file_json(&path, options_json.as_deref())
2105            }) {
2106                publish_blob_uploads_changed(client, event_tx);
2107            }
2108            true
2109        }
2110        WorkerCommand::RetrieveBlobFileJson {
2111            command_id,
2112            ref_json,
2113            path,
2114            options_json,
2115        } => {
2116            run_worker_json_command(client, event_tx, command_id, "retrieveBlobFile", |client| {
2117                client.worker_retrieve_blob_file_json(&ref_json, &path, options_json.as_deref())
2118            });
2119            true
2120        }
2121        WorkerCommand::ProcessBlobUploadQueue { command_id } => {
2122            if run_worker_json_command(
2123                client,
2124                event_tx,
2125                command_id,
2126                "processBlobUploadQueue",
2127                |client| {
2128                    client
2129                        .worker_process_blob_upload_queue_json()?
2130                        .ok_or_else(|| {
2131                            SyncularError::config(
2132                                "worker-owned blob upload queue processing is not available for this client",
2133                            )
2134                        })
2135                },
2136            ) {
2137                publish_blob_uploads_changed(client, event_tx);
2138            }
2139            true
2140        }
2141        WorkerCommand::PruneBlobCache {
2142            command_id,
2143            max_bytes,
2144        } => {
2145            run_worker_json_command(client, event_tx, command_id, "pruneBlobCache", |client| {
2146                client.worker_prune_blob_cache_json(max_bytes)
2147            });
2148            true
2149        }
2150        WorkerCommand::ClearBlobCache { command_id } => {
2151            run_worker_json_command(client, event_tx, command_id, "clearBlobCache", |client| {
2152                client.worker_clear_blob_cache_json()
2153            });
2154            true
2155        }
2156        WorkerCommand::SetAuthHeaders(headers) => {
2157            client.set_auth_headers(headers);
2158            true
2159        }
2160        WorkerCommand::SetSubscriptions(subscriptions) => {
2161            let _ = client.set_subscriptions(subscriptions);
2162            true
2163        }
2164        WorkerCommand::SetFieldEncryption(encryption) => {
2165            let _ = client.set_field_encryption(encryption);
2166            true
2167        }
2168        WorkerCommand::SetEncryptedCrdt(encryption) => {
2169            let _ = client.set_encrypted_crdt(encryption);
2170            true
2171        }
2172        WorkerCommand::SetBlobEncryption(encryption) => {
2173            let _ = client.set_blob_encryption(encryption);
2174            true
2175        }
2176        WorkerCommand::Stop => {
2177            let _ = flush_pending_yjs(client, pending_yjs, event_tx);
2178            false
2179        }
2180    }
2181}
2182
2183fn run_due_retry_work<S, T>(
2184    client: &mut SyncularClient<S, T>,
2185    command_rx: &Receiver<WorkerCommand>,
2186    event_tx: &SyncWorkerEventHub,
2187    pending_yjs: &mut BTreeMap<YjsBatchKey, PendingYjsBatch>,
2188) -> bool
2189where
2190    S: SyncStore + SyncStateStore,
2191    T: SyncTransport + SyncAuthHeaderStore,
2192    SyncularClient<S, T>: SyncWorkerClientExt,
2193{
2194    if due_now(client.worker_next_blob_upload_retry_at_ms()) {
2195        process_due_blob_upload_queue(client, event_tx);
2196    }
2197
2198    if due_now(client.worker_next_outbox_retry_at_ms()) {
2199        run_until_settled(
2200            client,
2201            command_rx,
2202            event_tx,
2203            pending_yjs,
2204            None,
2205            false,
2206            WorkerSyncTransport::Http,
2207        )
2208    } else {
2209        true
2210    }
2211}
2212
2213fn process_due_blob_upload_queue<S, T>(
2214    client: &mut SyncularClient<S, T>,
2215    event_tx: &SyncWorkerEventHub,
2216) where
2217    S: SyncStore + SyncStateStore,
2218    T: SyncTransport,
2219    SyncularClient<S, T>: SyncWorkerClientExt,
2220{
2221    let command_id = retry_wakeup_command_id("blob-retry");
2222    let started = Instant::now();
2223    match client.worker_process_blob_upload_queue_json() {
2224        Ok(Some(payload_json)) => {
2225            let payload_json = serde_json::from_str(&payload_json).ok();
2226            let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandCompleted {
2227                command_id,
2228                operation: "processBlobUploadQueue",
2229                payload_json,
2230                duration_ms: duration_ms(started),
2231            });
2232            publish_blob_uploads_changed(client, event_tx);
2233        }
2234        Ok(None) => {}
2235        Err(error) => {
2236            let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandFailed {
2237                command_id,
2238                operation: "processBlobUploadQueue",
2239                error,
2240                duration_ms: duration_ms(started),
2241            });
2242        }
2243    }
2244}
2245
2246fn run_until_settled<S, T>(
2247    client: &mut SyncularClient<S, T>,
2248    command_rx: &Receiver<WorkerCommand>,
2249    event_tx: &SyncWorkerEventHub,
2250    pending_yjs: &mut BTreeMap<YjsBatchKey, PendingYjsBatch>,
2251    initial_command_id: Option<String>,
2252    initial_emit_started: bool,
2253    initial_transport: WorkerSyncTransport,
2254) -> bool
2255where
2256    S: SyncStore + SyncStateStore,
2257    T: SyncTransport + SyncAuthHeaderStore,
2258    SyncularClient<S, T>: SyncWorkerClientExt,
2259{
2260    let mut should_sync = Some((initial_command_id, initial_emit_started, initial_transport));
2261    while let Some((command_id, emit_started, transport)) = should_sync.take() {
2262        run_sync(client, event_tx, command_id, emit_started, transport);
2263
2264        let mut next_sync: Option<(Option<String>, bool, WorkerSyncTransport)> = None;
2265        loop {
2266            match command_rx.try_recv() {
2267                Ok(WorkerCommand::Trigger {
2268                    command_id,
2269                    emit_started,
2270                    transport,
2271                }) => {
2272                    next_sync = Some(match next_sync {
2273                        Some((existing_id, existing_emit_started, existing_transport)) => (
2274                            existing_id.or(command_id),
2275                            existing_emit_started || emit_started,
2276                            existing_transport.coalesce(transport),
2277                        ),
2278                        None => (command_id, emit_started, transport),
2279                    });
2280                }
2281                Ok(WorkerCommand::SetAuthHeaders(headers)) => {
2282                    client.set_auth_headers(headers);
2283                }
2284                Ok(WorkerCommand::SetSubscriptions(subscriptions)) => {
2285                    let _ = client.set_subscriptions(subscriptions);
2286                }
2287                Ok(WorkerCommand::SetFieldEncryption(encryption)) => {
2288                    let _ = client.set_field_encryption(encryption);
2289                }
2290                Ok(WorkerCommand::SetEncryptedCrdt(encryption)) => {
2291                    let _ = client.set_encrypted_crdt(encryption);
2292                }
2293                Ok(WorkerCommand::SetBlobEncryption(encryption)) => {
2294                    let _ = client.set_blob_encryption(encryption);
2295                }
2296                Ok(WorkerCommand::ApplyMutationJson {
2297                    command_id,
2298                    mutation_json,
2299                    local_row_json,
2300                    auto_sync,
2301                    require_auth_lease,
2302                }) => {
2303                    if flush_pending_yjs(client, pending_yjs, event_tx) {
2304                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2305                    }
2306                    if apply_mutation_json(
2307                        client,
2308                        event_tx,
2309                        command_id,
2310                        &mutation_json,
2311                        local_row_json.as_deref(),
2312                        auto_sync,
2313                        require_auth_lease,
2314                    ) {
2315                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2316                    }
2317                }
2318                Ok(WorkerCommand::SaveYjsUpdateJson {
2319                    command_id,
2320                    update_json,
2321                    auto_sync,
2322                }) => {
2323                    queue_yjs_update_json(
2324                        pending_yjs,
2325                        event_tx,
2326                        command_id,
2327                        &update_json,
2328                        auto_sync,
2329                    );
2330                }
2331                Ok(WorkerCommand::ApplyCrdtFieldTextJson {
2332                    command_id,
2333                    request_json,
2334                    auto_sync,
2335                }) => {
2336                    if flush_pending_yjs(client, pending_yjs, event_tx) {
2337                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2338                    }
2339                    if apply_crdt_field_text_json(
2340                        client,
2341                        event_tx,
2342                        command_id,
2343                        &request_json,
2344                        auto_sync,
2345                    ) {
2346                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2347                    }
2348                }
2349                Ok(WorkerCommand::CompactCrdtFieldJson {
2350                    command_id,
2351                    request_json,
2352                    auto_sync,
2353                }) => {
2354                    if flush_pending_yjs(client, pending_yjs, event_tx) {
2355                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2356                    }
2357                    if compact_crdt_field_json(
2358                        client,
2359                        event_tx,
2360                        command_id,
2361                        &request_json,
2362                        auto_sync,
2363                    ) {
2364                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2365                    }
2366                }
2367                Ok(WorkerCommand::ApplyEncryptedCrdtUpdateJson {
2368                    command_id,
2369                    request_json,
2370                    auto_sync,
2371                }) => {
2372                    if flush_pending_yjs(client, pending_yjs, event_tx) {
2373                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2374                    }
2375                    if apply_encrypted_crdt_update_json(
2376                        client,
2377                        event_tx,
2378                        command_id,
2379                        &request_json,
2380                        auto_sync,
2381                    ) {
2382                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2383                    }
2384                }
2385                Ok(WorkerCommand::ApplyEncryptedCrdtCheckpointJson {
2386                    command_id,
2387                    request_json,
2388                    auto_sync,
2389                }) => {
2390                    if flush_pending_yjs(client, pending_yjs, event_tx) {
2391                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2392                    }
2393                    if apply_encrypted_crdt_checkpoint_json(
2394                        client,
2395                        event_tx,
2396                        command_id,
2397                        &request_json,
2398                        auto_sync,
2399                    ) {
2400                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2401                    }
2402                }
2403                Ok(WorkerCommand::ResolveConflict {
2404                    command_id,
2405                    conflict_id,
2406                    resolution,
2407                    auto_sync,
2408                }) => {
2409                    if flush_pending_yjs(client, pending_yjs, event_tx) {
2410                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2411                    }
2412                    if resolve_conflict(
2413                        client,
2414                        event_tx,
2415                        command_id,
2416                        &conflict_id,
2417                        &resolution,
2418                        auto_sync,
2419                    ) {
2420                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2421                    }
2422                }
2423                Ok(WorkerCommand::RefreshSnapshotJson {
2424                    command_id,
2425                    request_json,
2426                }) => {
2427                    if flush_pending_yjs(client, pending_yjs, event_tx) {
2428                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2429                    }
2430                    refresh_snapshot_json(client, event_tx, command_id, &request_json);
2431                }
2432                Ok(WorkerCommand::CompactStorageJson {
2433                    command_id,
2434                    options_json,
2435                }) => {
2436                    if flush_pending_yjs(client, pending_yjs, event_tx) {
2437                        next_sync = Some((None, false, WorkerSyncTransport::Http));
2438                    }
2439                    run_worker_json_command(
2440                        client,
2441                        event_tx,
2442                        command_id,
2443                        "compactStorage",
2444                        |client| client.worker_compact_storage_json(options_json.as_deref()),
2445                    );
2446                }
2447                Ok(WorkerCommand::StoreBlobFileJson {
2448                    command_id,
2449                    path,
2450                    options_json,
2451                }) => {
2452                    if run_worker_json_command(
2453                        client,
2454                        event_tx,
2455                        command_id,
2456                        "storeBlobFile",
2457                        |client| client.worker_store_blob_file_json(&path, options_json.as_deref()),
2458                    ) {
2459                        publish_blob_uploads_changed(client, event_tx);
2460                    }
2461                }
2462                Ok(WorkerCommand::RetrieveBlobFileJson {
2463                    command_id,
2464                    ref_json,
2465                    path,
2466                    options_json,
2467                }) => {
2468                    run_worker_json_command(
2469                        client,
2470                        event_tx,
2471                        command_id,
2472                        "retrieveBlobFile",
2473                        |client| {
2474                            client.worker_retrieve_blob_file_json(
2475                                &ref_json,
2476                                &path,
2477                                options_json.as_deref(),
2478                            )
2479                        },
2480                    );
2481                }
2482                Ok(WorkerCommand::ProcessBlobUploadQueue { command_id }) => {
2483                    if run_worker_json_command(
2484                        client,
2485                        event_tx,
2486                        command_id,
2487                        "processBlobUploadQueue",
2488                        |client| {
2489                            client
2490                                .worker_process_blob_upload_queue_json()?
2491                                .ok_or_else(|| {
2492                                    SyncularError::config(
2493                                        "worker-owned blob upload queue processing is not available for this client",
2494                                    )
2495                                })
2496                        },
2497                    ) {
2498                        publish_blob_uploads_changed(client, event_tx);
2499                    }
2500                }
2501                Ok(WorkerCommand::PruneBlobCache {
2502                    command_id,
2503                    max_bytes,
2504                }) => {
2505                    run_worker_json_command(
2506                        client,
2507                        event_tx,
2508                        command_id,
2509                        "pruneBlobCache",
2510                        |client| client.worker_prune_blob_cache_json(max_bytes),
2511                    );
2512                }
2513                Ok(WorkerCommand::ClearBlobCache { command_id }) => {
2514                    run_worker_json_command(
2515                        client,
2516                        event_tx,
2517                        command_id,
2518                        "clearBlobCache",
2519                        |client| client.worker_clear_blob_cache_json(),
2520                    );
2521                }
2522                Ok(WorkerCommand::Stop) | Err(mpsc::TryRecvError::Disconnected) => {
2523                    let _ = flush_pending_yjs(client, pending_yjs, event_tx);
2524                    return false;
2525                }
2526                Err(mpsc::TryRecvError::Empty) => break,
2527            }
2528        }
2529
2530        if flush_pending_yjs(client, pending_yjs, event_tx) {
2531            next_sync = Some((None, false, WorkerSyncTransport::Http));
2532        }
2533        should_sync = next_sync;
2534    }
2535    true
2536}
2537
2538fn run_sync<S, T>(
2539    client: &mut SyncularClient<S, T>,
2540    event_tx: &SyncWorkerEventHub,
2541    command_id: Option<String>,
2542    emit_started: bool,
2543    transport: WorkerSyncTransport,
2544) where
2545    S: SyncStore + SyncStateStore,
2546    T: SyncTransport,
2547{
2548    if emit_started {
2549        let _ = event_tx.publish_event(SyncWorkerEvent::SyncStarted {
2550            command_id: command_id.clone(),
2551        });
2552    }
2553
2554    let started = Instant::now();
2555    let result = match transport {
2556        WorkerSyncTransport::Http => client.sync_http(),
2557        WorkerSyncTransport::WebSocket => client.sync_ws(),
2558    };
2559    #[cfg(feature = "native")]
2560    let result = result.and_then(|report| {
2561        let bootstrap = client.bootstrap_status()?;
2562        Ok((report, bootstrap))
2563    });
2564
2565    match result {
2566        #[cfg(feature = "native")]
2567        Ok((report, bootstrap)) => {
2568            let (outbox_count, conflict_count) = worker_counts(client).unwrap_or((0, 0));
2569            let _ = event_tx.publish_event(SyncWorkerEvent::SyncCompleted {
2570                command_id,
2571                report,
2572                bootstrap,
2573                outbox_count,
2574                conflict_count,
2575                duration_ms: duration_ms(started),
2576            });
2577        }
2578        #[cfg(not(feature = "native"))]
2579        Ok(report) => {
2580            let (outbox_count, conflict_count) = worker_counts(client).unwrap_or((0, 0));
2581            let _ = event_tx.publish_event(SyncWorkerEvent::SyncCompleted {
2582                command_id,
2583                report,
2584                outbox_count,
2585                conflict_count,
2586                duration_ms: duration_ms(started),
2587            });
2588        }
2589        Err(error) => {
2590            let retry_scheduled = retry_scheduled_after_error(client);
2591            let _ = event_tx.publish_event(SyncWorkerEvent::SyncFailed {
2592                command_id,
2593                error,
2594                retry_scheduled,
2595                duration_ms: duration_ms(started),
2596            });
2597        }
2598    }
2599}
2600
2601fn apply_mutation_json<S, T>(
2602    client: &mut SyncularClient<S, T>,
2603    event_tx: &SyncWorkerEventHub,
2604    command_id: String,
2605    mutation_json: &str,
2606    local_row_json: Option<&str>,
2607    auto_sync: bool,
2608    require_auth_lease: bool,
2609) -> bool
2610where
2611    S: SyncStore + SyncStateStore,
2612    T: SyncTransport,
2613    SyncularClient<S, T>: SyncWorkerClientExt,
2614{
2615    let started = Instant::now();
2616    let mutation = serde_json::from_str::<SyncOperation>(mutation_json).ok();
2617    let table = mutation
2618        .as_ref()
2619        .map(|mutation| mutation.table.clone())
2620        .unwrap_or_else(|| "unknown".to_string());
2621    let previous_row = mutation.as_ref().and_then(|mutation| {
2622        client
2623            .worker_current_row_json(&mutation.table, &mutation.row_id)
2624            .ok()
2625            .flatten()
2626    });
2627    let local_row = local_row_json
2628        .map(serde_json::from_str::<Value>)
2629        .transpose()
2630        .ok()
2631        .flatten();
2632    let applied = if require_auth_lease {
2633        client.apply_worker_leased_mutation_json(mutation_json, local_row_json)
2634    } else {
2635        client.apply_worker_mutation_json(mutation_json, local_row_json)
2636    };
2637    match applied {
2638        Ok(client_commit_id) => {
2639            let changed_rows = mutation
2640                .as_ref()
2641                .and_then(|mutation| {
2642                    sync_changed_row_for_local_operation(
2643                        client.app_schema(),
2644                        mutation,
2645                        previous_row.as_ref(),
2646                        local_row.as_ref(),
2647                        Some(client_commit_id.clone()),
2648                    )
2649                })
2650                .into_iter()
2651                .collect();
2652            let outbox_count = worker_outbox_count(client);
2653            let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteCommitted {
2654                command_id,
2655                client_commit_id,
2656                changed_tables: vec![table],
2657                changed_rows,
2658                outbox_count,
2659                duration_ms: duration_ms(started),
2660            });
2661            auto_sync
2662        }
2663        Err(error) => {
2664            let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
2665                command_id,
2666                error,
2667                payload_json: None,
2668                duration_ms: duration_ms(started),
2669            });
2670            false
2671        }
2672    }
2673}
2674
2675fn apply_encrypted_crdt_update_json<S, T>(
2676    client: &mut SyncularClient<S, T>,
2677    event_tx: &SyncWorkerEventHub,
2678    command_id: String,
2679    request_json: &str,
2680    auto_sync: bool,
2681) -> bool
2682where
2683    S: SyncStore + SyncStateStore,
2684    T: SyncTransport,
2685    SyncularClient<S, T>: SyncWorkerClientExt,
2686{
2687    let started = Instant::now();
2688    match client.apply_worker_encrypted_crdt_update_json(request_json) {
2689        Ok(receipt) => {
2690            let crdt_event = WorkerEncryptedCrdtRequest::from_json(request_json).ok();
2691            let outbox_count = worker_outbox_count(client);
2692            let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteCommitted {
2693                command_id: command_id.clone(),
2694                client_commit_id: receipt.client_commit_id.clone(),
2695                changed_tables: receipt.changed_tables.clone(),
2696                changed_rows: receipt.changed_rows.clone(),
2697                outbox_count,
2698                duration_ms: duration_ms(started),
2699            });
2700            if let Some(request) = crdt_event.and_then(WorkerEncryptedCrdtRequest::field_identity) {
2701                let _ = event_tx.publish_event(SyncWorkerEvent::CrdtFieldChanged {
2702                    command_id,
2703                    client_commit_id: receipt.client_commit_id,
2704                    table: request.table,
2705                    row_id: request.row_id,
2706                    field: request.field,
2707                    changed_tables: receipt.changed_tables,
2708                    payload_json: receipt.crdt_event_payload_json,
2709                    duration_ms: duration_ms(started),
2710                });
2711            }
2712            auto_sync
2713        }
2714        Err(error) => {
2715            let payload_json = crdt_field_failure_payload_json("encryptedCrdtUpdate", request_json);
2716            let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
2717                command_id,
2718                error,
2719                payload_json: Some(payload_json),
2720                duration_ms: duration_ms(started),
2721            });
2722            false
2723        }
2724    }
2725}
2726
2727fn apply_encrypted_crdt_checkpoint_json<S, T>(
2728    client: &mut SyncularClient<S, T>,
2729    event_tx: &SyncWorkerEventHub,
2730    command_id: String,
2731    request_json: &str,
2732    auto_sync: bool,
2733) -> bool
2734where
2735    S: SyncStore + SyncStateStore,
2736    T: SyncTransport,
2737    SyncularClient<S, T>: SyncWorkerClientExt,
2738{
2739    let started = Instant::now();
2740    match client.apply_worker_encrypted_crdt_checkpoint_json(request_json) {
2741        Ok(Some(receipt)) => {
2742            let crdt_event = WorkerEncryptedCrdtRequest::from_json(request_json).ok();
2743            let outbox_count = worker_outbox_count(client);
2744            let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteCommitted {
2745                command_id: command_id.clone(),
2746                client_commit_id: receipt.client_commit_id.clone(),
2747                changed_tables: receipt.changed_tables.clone(),
2748                changed_rows: receipt.changed_rows.clone(),
2749                outbox_count,
2750                duration_ms: duration_ms(started),
2751            });
2752            if let Some(request) = crdt_event.and_then(WorkerEncryptedCrdtRequest::field_identity) {
2753                let _ = event_tx.publish_event(SyncWorkerEvent::CrdtFieldCompacted {
2754                    command_id,
2755                    client_commit_id: Some(receipt.client_commit_id),
2756                    table: request.table,
2757                    row_id: request.row_id,
2758                    field: request.field,
2759                    changed_tables: receipt.changed_tables,
2760                    checkpoint_created: true,
2761                    payload_json: receipt.crdt_event_payload_json,
2762                    duration_ms: duration_ms(started),
2763                });
2764            }
2765            auto_sync
2766        }
2767        Ok(None) => {
2768            let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandCompleted {
2769                command_id,
2770                operation: "encryptedCrdtCheckpoint",
2771                payload_json: Some(json!({ "checkpointed": false })),
2772                duration_ms: duration_ms(started),
2773            });
2774            false
2775        }
2776        Err(error) => {
2777            let payload_json =
2778                crdt_field_failure_payload_json("encryptedCrdtCheckpoint", request_json);
2779            let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
2780                command_id,
2781                error,
2782                payload_json: Some(payload_json),
2783                duration_ms: duration_ms(started),
2784            });
2785            false
2786        }
2787    }
2788}
2789
2790fn apply_crdt_field_text_json<S, T>(
2791    client: &mut SyncularClient<S, T>,
2792    event_tx: &SyncWorkerEventHub,
2793    command_id: String,
2794    request_json: &str,
2795    auto_sync: bool,
2796) -> bool
2797where
2798    S: SyncStore + SyncStateStore,
2799    T: SyncTransport,
2800    SyncularClient<S, T>: SyncWorkerClientExt,
2801{
2802    let started = Instant::now();
2803    match client.apply_worker_crdt_field_text_json(request_json) {
2804        Ok(receipt) => {
2805            let crdt_event = WorkerCrdtFieldTextRequest::from_json(request_json).ok();
2806            let outbox_count = worker_outbox_count(client);
2807            let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteCommitted {
2808                command_id: command_id.clone(),
2809                client_commit_id: receipt.client_commit_id.clone(),
2810                changed_tables: receipt.changed_tables.clone(),
2811                changed_rows: receipt.changed_rows.clone(),
2812                outbox_count,
2813                duration_ms: duration_ms(started),
2814            });
2815            if let Some(request) = crdt_event {
2816                let _ = event_tx.publish_event(SyncWorkerEvent::CrdtFieldChanged {
2817                    command_id,
2818                    client_commit_id: receipt.client_commit_id,
2819                    table: request.table,
2820                    row_id: request.row_id,
2821                    field: request.field,
2822                    changed_tables: receipt.changed_tables,
2823                    payload_json: receipt.crdt_event_payload_json,
2824                    duration_ms: duration_ms(started),
2825                });
2826            }
2827            auto_sync
2828        }
2829        Err(error) => {
2830            let payload_json = crdt_field_failure_payload_json("crdtFieldText", request_json);
2831            let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
2832                command_id,
2833                error,
2834                payload_json: Some(payload_json),
2835                duration_ms: duration_ms(started),
2836            });
2837            false
2838        }
2839    }
2840}
2841
2842fn compact_crdt_field_json<S, T>(
2843    client: &mut SyncularClient<S, T>,
2844    event_tx: &SyncWorkerEventHub,
2845    command_id: String,
2846    request_json: &str,
2847    auto_sync: bool,
2848) -> bool
2849where
2850    S: SyncStore + SyncStateStore,
2851    T: SyncTransport,
2852    SyncularClient<S, T>: SyncWorkerClientExt,
2853{
2854    let started = Instant::now();
2855    match client.compact_worker_crdt_field_json(request_json) {
2856        Ok(Some(receipt)) => {
2857            let crdt_event = WorkerCrdtFieldCompactionRequest::from_json(request_json).ok();
2858            let outbox_count = worker_outbox_count(client);
2859            let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteCommitted {
2860                command_id: command_id.clone(),
2861                client_commit_id: receipt.client_commit_id.clone(),
2862                changed_tables: receipt.changed_tables.clone(),
2863                changed_rows: receipt.changed_rows.clone(),
2864                outbox_count,
2865                duration_ms: duration_ms(started),
2866            });
2867            if let Some(request) = crdt_event {
2868                let _ = event_tx.publish_event(SyncWorkerEvent::CrdtFieldCompacted {
2869                    command_id,
2870                    client_commit_id: Some(receipt.client_commit_id),
2871                    table: request.table,
2872                    row_id: request.row_id,
2873                    field: request.field,
2874                    changed_tables: receipt.changed_tables,
2875                    checkpoint_created: true,
2876                    payload_json: receipt.crdt_event_payload_json,
2877                    duration_ms: duration_ms(started),
2878                });
2879            }
2880            auto_sync
2881        }
2882        Ok(None) => {
2883            let payload_json = compact_crdt_field_skipped_payload(client, request_json);
2884            let request = WorkerCrdtFieldCompactionRequest::from_json(request_json).ok();
2885            let compacted_server_merge_document = payload_json
2886                .get("syncMode")
2887                .and_then(Value::as_str)
2888                .is_some_and(|sync_mode| sync_mode == "server-merge");
2889            if compacted_server_merge_document {
2890                if let Some(request) = request {
2891                    let _ = event_tx.publish_event(SyncWorkerEvent::CrdtFieldCompacted {
2892                        command_id: command_id.clone(),
2893                        client_commit_id: None,
2894                        table: request.table.clone(),
2895                        row_id: request.row_id.clone(),
2896                        field: request.field.clone(),
2897                        changed_tables: vec![request.table],
2898                        checkpoint_created: false,
2899                        payload_json: Some(payload_json.clone()),
2900                        duration_ms: duration_ms(started),
2901                    });
2902                }
2903            }
2904            let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandCompleted {
2905                command_id,
2906                operation: "compactCrdtField",
2907                payload_json: Some(payload_json),
2908                duration_ms: duration_ms(started),
2909            });
2910            false
2911        }
2912        Err(error) => {
2913            let payload_json = crdt_field_failure_payload_json("compactCrdtField", request_json);
2914            let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
2915                command_id,
2916                error,
2917                payload_json: Some(payload_json),
2918                duration_ms: duration_ms(started),
2919            });
2920            false
2921        }
2922    }
2923}
2924
2925fn resolve_conflict<S, T>(
2926    client: &mut SyncularClient<S, T>,
2927    event_tx: &SyncWorkerEventHub,
2928    command_id: String,
2929    conflict_id: &str,
2930    resolution: &str,
2931    auto_sync: bool,
2932) -> bool
2933where
2934    S: SyncStore + SyncStateStore,
2935    T: SyncTransport,
2936{
2937    let started = Instant::now();
2938    let result = if resolution == "keep-local" {
2939        client.retry_conflict_keep_local(conflict_id).map(Some)
2940    } else {
2941        client
2942            .resolve_conflict(conflict_id, resolution)
2943            .map(|_| None)
2944    };
2945
2946    match result {
2947        Ok(retry_client_commit_id) => {
2948            let should_sync = retry_client_commit_id.is_some() && auto_sync;
2949            let _ = event_tx.publish_event(SyncWorkerEvent::ConflictResolutionCompleted {
2950                command_id,
2951                retry_client_commit_id,
2952                duration_ms: duration_ms(started),
2953            });
2954            should_sync
2955        }
2956        Err(error) => {
2957            let _ = event_tx.publish_event(SyncWorkerEvent::ConflictResolutionFailed {
2958                command_id,
2959                error,
2960                duration_ms: duration_ms(started),
2961            });
2962            false
2963        }
2964    }
2965}
2966
2967fn refresh_snapshot_json<S, T>(
2968    client: &mut SyncularClient<S, T>,
2969    event_tx: &SyncWorkerEventHub,
2970    command_id: String,
2971    request_json: &str,
2972) where
2973    S: SyncStore + SyncStateStore,
2974    T: SyncTransport,
2975    SyncularClient<S, T>: SyncWorkerClientExt,
2976{
2977    let started = Instant::now();
2978    match client
2979        .worker_query_json(request_json)
2980        .and_then(|json| serde_json::from_str::<Value>(&json).map_err(Into::into))
2981    {
2982        Ok(payload_json) => {
2983            let _ = event_tx.publish_event(SyncWorkerEvent::SnapshotReady {
2984                command_id,
2985                payload_json,
2986                duration_ms: duration_ms(started),
2987            });
2988        }
2989        Err(error) => {
2990            let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandFailed {
2991                command_id,
2992                operation: "refreshSnapshot",
2993                error,
2994                duration_ms: duration_ms(started),
2995            });
2996        }
2997    }
2998}
2999
3000fn compact_crdt_field_skipped_payload<S, T>(
3001    client: &mut SyncularClient<S, T>,
3002    request_json: &str,
3003) -> Value
3004where
3005    S: SyncStore + SyncStateStore,
3006    T: SyncTransport,
3007    SyncularClient<S, T>: SyncWorkerClientExt,
3008{
3009    let request = WorkerCrdtFieldCompactionRequest::from_json(request_json).ok();
3010    let mut payload = request
3011        .as_ref()
3012        .and_then(|request| {
3013            client
3014                .worker_crdt_field_event_payload_json(
3015                    &request.table,
3016                    &request.row_id,
3017                    &request.field,
3018                )
3019                .ok()
3020                .flatten()
3021        })
3022        .and_then(|value| value.as_object().cloned())
3023        .unwrap_or_default();
3024
3025    if let Some(request) = request {
3026        payload.insert("table".to_string(), json!(request.table));
3027        payload.insert("rowId".to_string(), json!(request.row_id));
3028        payload.insert("field".to_string(), json!(request.field));
3029        payload.insert(
3030            "minUncheckpointedUpdates".to_string(),
3031            json!(request.min_uncheckpointed_updates.unwrap_or(1)),
3032        );
3033    }
3034    payload.insert("checkpointCreated".to_string(), json!(false));
3035    Value::Object(payload)
3036}
3037
3038fn crdt_field_failure_payload_json(operation: &'static str, request_json: &str) -> Value {
3039    let mut payload = serde_json::Map::new();
3040    payload.insert("operation".to_string(), json!(operation));
3041    payload.insert("failedBeforeCommit".to_string(), json!(true));
3042    payload.insert("retryScheduled".to_string(), json!(false));
3043
3044    match serde_json::from_str::<Value>(request_json) {
3045        Ok(Value::Object(request)) => {
3046            copy_crdt_request_field(&mut payload, &request, "table", "table");
3047            copy_crdt_request_field(&mut payload, &request, "rowId", "rowId");
3048            copy_crdt_request_field(&mut payload, &request, "field", "field");
3049            copy_crdt_request_field(
3050                &mut payload,
3051                &request,
3052                "minUncheckpointedUpdates",
3053                "minUncheckpointedUpdates",
3054            );
3055        }
3056        Ok(_) => {
3057            payload.insert("requestShape".to_string(), json!("non-object"));
3058        }
3059        Err(error) => {
3060            payload.insert("requestParseError".to_string(), json!(error.to_string()));
3061        }
3062    }
3063
3064    Value::Object(payload)
3065}
3066
3067fn crdt_field_failure_payload_from_parts(
3068    operation: &'static str,
3069    table: &str,
3070    row_id: &str,
3071    field: &str,
3072) -> Value {
3073    json!({
3074        "operation": operation,
3075        "table": table,
3076        "rowId": row_id,
3077        "field": field,
3078        "failedBeforeCommit": true,
3079        "retryScheduled": false,
3080    })
3081}
3082
3083fn copy_crdt_request_field(
3084    payload: &mut serde_json::Map<String, Value>,
3085    request: &serde_json::Map<String, Value>,
3086    from: &str,
3087    to: &str,
3088) {
3089    if !payload.contains_key(to) {
3090        if let Some(value) = request.get(from) {
3091            payload.insert(to.to_string(), value.clone());
3092        }
3093    }
3094}
3095
3096fn run_worker_json_command<S, T>(
3097    client: &mut SyncularClient<S, T>,
3098    event_tx: &SyncWorkerEventHub,
3099    command_id: String,
3100    operation: &'static str,
3101    f: impl FnOnce(&mut SyncularClient<S, T>) -> Result<String>,
3102) -> bool
3103where
3104    S: SyncStore + SyncStateStore,
3105    T: SyncTransport,
3106{
3107    let started = Instant::now();
3108    match f(client).and_then(|json| {
3109        if json.trim().is_empty() {
3110            Ok(None)
3111        } else {
3112            serde_json::from_str::<Value>(&json)
3113                .map(Some)
3114                .map_err(Into::into)
3115        }
3116    }) {
3117        Ok(payload_json) => {
3118            let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandCompleted {
3119                command_id,
3120                operation,
3121                payload_json,
3122                duration_ms: duration_ms(started),
3123            });
3124            true
3125        }
3126        Err(error) => {
3127            let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandFailed {
3128                command_id,
3129                operation,
3130                error,
3131                duration_ms: duration_ms(started),
3132            });
3133            false
3134        }
3135    }
3136}
3137
3138fn publish_blob_uploads_changed<S, T>(
3139    client: &mut SyncularClient<S, T>,
3140    event_tx: &SyncWorkerEventHub,
3141) where
3142    S: SyncStore + SyncStateStore,
3143    T: SyncTransport,
3144    SyncularClient<S, T>: SyncWorkerClientExt,
3145{
3146    let Ok(Some(stats_json)) = client.worker_blob_upload_queue_stats_json() else {
3147        return;
3148    };
3149    let Ok(stats_json) = serde_json::from_str::<Value>(&stats_json) else {
3150        return;
3151    };
3152    let _ = event_tx.publish_event(SyncWorkerEvent::BlobUploadsChanged { stats_json });
3153}
3154
3155fn queue_yjs_update_json(
3156    pending_yjs: &mut BTreeMap<YjsBatchKey, PendingYjsBatch>,
3157    event_tx: &SyncWorkerEventHub,
3158    command_id: String,
3159    update_json: &str,
3160    auto_sync: bool,
3161) {
3162    let started = Instant::now();
3163    let update: Result<SaveYjsUpdate> = serde_json::from_str(update_json).map_err(Into::into);
3164    match update {
3165        Ok(update) => {
3166            let key = YjsBatchKey {
3167                table: update.table,
3168                row_id: update.row_id,
3169                field: update.field,
3170            };
3171            let batch = pending_yjs.entry(key).or_default();
3172            batch.command_ids.push(command_id);
3173            batch.updates.push(update.update);
3174            if update.materialized.is_some() {
3175                batch.materialized = update.materialized;
3176            }
3177            if update.server_payload.is_some() {
3178                batch.server_payload = update.server_payload;
3179            }
3180            batch.auto_sync |= auto_sync;
3181        }
3182        Err(error) => {
3183            let payload_json = crdt_field_failure_payload_json("crdtFieldYjsUpdate", update_json);
3184            let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
3185                command_id,
3186                error,
3187                payload_json: Some(payload_json),
3188                duration_ms: duration_ms(started),
3189            });
3190        }
3191    }
3192}
3193
3194fn flush_pending_yjs<S, T>(
3195    client: &mut SyncularClient<S, T>,
3196    pending_yjs: &mut BTreeMap<YjsBatchKey, PendingYjsBatch>,
3197    event_tx: &SyncWorkerEventHub,
3198) -> bool
3199where
3200    S: SyncStore + SyncStateStore,
3201    T: SyncTransport,
3202    SyncularClient<S, T>: SyncWorkerClientExt,
3203{
3204    let mut should_sync = false;
3205    let batches = std::mem::take(pending_yjs);
3206    for (key, batch) in batches {
3207        let started = Instant::now();
3208        let payload = {
3209            let mut payload = match batch.server_payload {
3210                Some(Value::Object(payload)) => payload,
3211                Some(_) => {
3212                    let error = SyncularError::config(
3213                        "queued Yjs serverPayload must be a JSON object when provided",
3214                    );
3215                    let message = error.message_text();
3216                    let kind = error.kind();
3217                    for command_id in batch.command_ids {
3218                        let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
3219                            command_id,
3220                            error: SyncularError::message(kind, &message),
3221                            payload_json: Some(crdt_field_failure_payload_from_parts(
3222                                "crdtFieldYjsUpdate",
3223                                &key.table,
3224                                &key.row_id,
3225                                &key.field,
3226                            )),
3227                            duration_ms: duration_ms(started),
3228                        });
3229                    }
3230                    continue;
3231                }
3232                None => serde_json::Map::new(),
3233            };
3234            let mut envelope = serde_json::Map::new();
3235            envelope.insert(key.field.clone(), json!(batch.updates));
3236            payload.insert(YJS_PAYLOAD_KEY.to_string(), Value::Object(envelope));
3237            Value::Object(payload)
3238        };
3239        let mutation = SyncOperation {
3240            table: key.table.clone(),
3241            row_id: key.row_id.clone(),
3242            op: "upsert".to_string(),
3243            payload: Some(payload),
3244            base_version: None,
3245        };
3246        match client.apply_worker_mutation(mutation, batch.materialized) {
3247            Ok(client_commit_id) => {
3248                should_sync |= batch.auto_sync;
3249                let crdt_event_payload_json = client
3250                    .worker_crdt_field_event_payload_json(&key.table, &key.row_id, &key.field)
3251                    .ok()
3252                    .flatten();
3253                let outbox_count = worker_outbox_count(client);
3254                let changed_rows = client
3255                    .worker_crdt_field_changed_row(
3256                        &key.table,
3257                        &key.row_id,
3258                        &key.field,
3259                        &client_commit_id,
3260                    )
3261                    .ok()
3262                    .flatten()
3263                    .map(|row| vec![row])
3264                    .unwrap_or_else(|| {
3265                        vec![SyncChangedRow {
3266                            table: key.table.clone(),
3267                            row_id: Some(key.row_id.clone()),
3268                            operation: "update".to_string(),
3269                            changed_fields: vec![key.field.clone()],
3270                            crdt_fields: vec![key.field.clone()],
3271                            crdt_field_changes: Vec::new(),
3272                            commit_id: Some(client_commit_id.clone()),
3273                            commit_seq: None,
3274                            subscription_id: None,
3275                            server_version: None,
3276                        }]
3277                    });
3278                for command_id in batch.command_ids {
3279                    let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteCommitted {
3280                        command_id: command_id.clone(),
3281                        client_commit_id: client_commit_id.clone(),
3282                        changed_tables: vec![key.table.clone()],
3283                        changed_rows: changed_rows.clone(),
3284                        outbox_count,
3285                        duration_ms: duration_ms(started),
3286                    });
3287                    let _ = event_tx.publish_event(SyncWorkerEvent::CrdtFieldChanged {
3288                        command_id,
3289                        client_commit_id: client_commit_id.clone(),
3290                        table: key.table.clone(),
3291                        row_id: key.row_id.clone(),
3292                        field: key.field.clone(),
3293                        changed_tables: vec![key.table.clone()],
3294                        payload_json: crdt_event_payload_json.clone(),
3295                        duration_ms: duration_ms(started),
3296                    });
3297                }
3298            }
3299            Err(error) => {
3300                let message = error.message_text();
3301                let kind = error.kind();
3302                for command_id in batch.command_ids {
3303                    let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
3304                        command_id,
3305                        error: SyncularError::message(kind, &message),
3306                        payload_json: Some(crdt_field_failure_payload_from_parts(
3307                            "crdtFieldYjsUpdate",
3308                            &key.table,
3309                            &key.row_id,
3310                            &key.field,
3311                        )),
3312                        duration_ms: duration_ms(started),
3313                    });
3314                }
3315            }
3316        }
3317    }
3318    should_sync
3319}
3320
3321fn worker_counts<S, T>(client: &mut SyncularClient<S, T>) -> Result<(usize, usize)>
3322where
3323    S: SyncStore + SyncStateStore,
3324    T: SyncTransport,
3325{
3326    let outbox_count = client
3327        .outbox_summaries()?
3328        .into_iter()
3329        .filter(|item| item.status != "acked")
3330        .count();
3331    let conflict_count = client.conflict_summaries()?.len();
3332    Ok((outbox_count, conflict_count))
3333}
3334
3335fn worker_outbox_count<S, T>(client: &mut SyncularClient<S, T>) -> usize
3336where
3337    S: SyncStore + SyncStateStore,
3338    T: SyncTransport,
3339{
3340    worker_counts(client)
3341        .map(|(outbox_count, _)| outbox_count)
3342        .unwrap_or(0)
3343}
3344
3345fn retry_scheduled_after_error<S, T>(client: &mut SyncularClient<S, T>) -> bool
3346where
3347    S: SyncStore + SyncStateStore,
3348    T: SyncTransport,
3349{
3350    client
3351        .outbox_summaries()
3352        .map(|items| {
3353            items
3354                .into_iter()
3355                .any(|item| item.status == "pending" || item.status == "sending")
3356        })
3357        .unwrap_or(false)
3358}
3359
3360fn next_retry_timeout<S, T>(client: &mut SyncularClient<S, T>) -> Option<Duration>
3361where
3362    S: SyncStore + SyncStateStore,
3363    T: SyncTransport,
3364    SyncularClient<S, T>: SyncWorkerClientExt,
3365{
3366    let next = [
3367        client.worker_next_outbox_retry_at_ms().ok().flatten(),
3368        client.worker_next_blob_upload_retry_at_ms().ok().flatten(),
3369    ]
3370    .into_iter()
3371    .flatten()
3372    .min()?;
3373    Some(duration_until_ms(next))
3374}
3375
3376fn due_now(next: Result<Option<i64>>) -> bool {
3377    next.ok()
3378        .flatten()
3379        .is_some_and(|next_attempt_at| next_attempt_at <= now_ms())
3380}
3381
3382fn duration_until_ms(timestamp_ms: i64) -> Duration {
3383    let now = now_ms();
3384    if timestamp_ms <= now {
3385        Duration::ZERO
3386    } else {
3387        Duration::from_millis((timestamp_ms - now) as u64)
3388    }
3389}
3390
3391fn retry_wakeup_command_id(prefix: &str) -> String {
3392    format!("{prefix}-{}", now_ms())
3393}
3394
3395fn duration_ms(started: Instant) -> u64 {
3396    started.elapsed().as_millis().try_into().unwrap_or(u64::MAX)
3397}
3398
3399#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3400struct YjsBatchKey {
3401    table: String,
3402    row_id: String,
3403    field: String,
3404}
3405
3406#[derive(Debug, Default)]
3407struct PendingYjsBatch {
3408    command_ids: Vec<String>,
3409    updates: Vec<YjsUpdateEnvelope>,
3410    materialized: Option<Value>,
3411    server_payload: Option<Value>,
3412    auto_sync: bool,
3413}
3414
3415#[derive(Debug, Deserialize)]
3416#[serde(rename_all = "camelCase")]
3417struct SaveYjsUpdate {
3418    table: String,
3419    row_id: String,
3420    field: String,
3421    update: YjsUpdateEnvelope,
3422    #[serde(default)]
3423    materialized: Option<Value>,
3424    #[serde(default)]
3425    server_payload: Option<Value>,
3426}
3427
3428fn validate_save_yjs_update_json(update_json: &str) -> Result<()> {
3429    validate_crdt_request_json_size(update_json)?;
3430    let update: SaveYjsUpdate = serde_json::from_str(update_json)?;
3431    validate_yjs_update_envelope_size(&update.update)
3432}
3433
3434#[derive(Debug, Deserialize)]
3435#[serde(rename_all = "camelCase")]
3436struct WorkerEncryptedCrdtRequest {
3437    table: String,
3438    #[serde(default)]
3439    row_id: Option<String>,
3440    #[serde(default)]
3441    field: Option<String>,
3442}
3443
3444#[derive(Debug, Deserialize)]
3445#[cfg_attr(not(feature = "native"), allow(dead_code))]
3446#[serde(rename_all = "camelCase")]
3447struct WorkerCrdtFieldTextRequest {
3448    table: String,
3449    row_id: String,
3450    field: String,
3451    next_text: String,
3452}
3453
3454fn validate_worker_crdt_field_text_request_json(request_json: &str) -> Result<()> {
3455    validate_crdt_request_json_size(request_json)?;
3456    let request: WorkerCrdtFieldTextRequest = serde_json::from_str(request_json)?;
3457    validate_yjs_text_input_size(&request.next_text)
3458}
3459
3460#[derive(Debug, Deserialize)]
3461#[cfg_attr(not(feature = "native"), allow(dead_code))]
3462#[serde(rename_all = "camelCase")]
3463struct WorkerCrdtFieldCompactionRequest {
3464    table: String,
3465    row_id: String,
3466    field: String,
3467    #[serde(default)]
3468    min_uncheckpointed_updates: Option<i64>,
3469}
3470
3471struct WorkerCrdtFieldIdentity {
3472    table: String,
3473    row_id: String,
3474    field: String,
3475}
3476
3477#[cfg(feature = "native")]
3478struct WorkerCrdtFieldIdentityRef<'a> {
3479    table: &'a str,
3480    row_id: &'a str,
3481    field: &'a str,
3482}
3483
3484impl WorkerEncryptedCrdtRequest {
3485    fn from_json(request_json: &str) -> Result<Self> {
3486        serde_json::from_str(request_json).map_err(Into::into)
3487    }
3488
3489    #[cfg(feature = "native")]
3490    fn field_identity_ref(&self) -> Option<WorkerCrdtFieldIdentityRef<'_>> {
3491        Some(WorkerCrdtFieldIdentityRef {
3492            table: &self.table,
3493            row_id: self.row_id.as_deref()?,
3494            field: self.field.as_deref()?,
3495        })
3496    }
3497
3498    fn field_identity(self) -> Option<WorkerCrdtFieldIdentity> {
3499        Some(WorkerCrdtFieldIdentity {
3500            table: self.table,
3501            row_id: self.row_id?,
3502            field: self.field?,
3503        })
3504    }
3505}
3506
3507#[cfg(feature = "native")]
3508impl WorkerCrdtFieldIdentityRef<'_> {
3509    fn id(&self) -> CrdtFieldId {
3510        CrdtFieldId::new(self.table, self.row_id, self.field)
3511    }
3512}
3513
3514impl WorkerCrdtFieldTextRequest {
3515    fn from_json(request_json: &str) -> Result<Self> {
3516        serde_json::from_str(request_json).map_err(Into::into)
3517    }
3518
3519    #[cfg(feature = "native")]
3520    fn id(&self) -> CrdtFieldId {
3521        CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
3522    }
3523}
3524
3525impl WorkerCrdtFieldCompactionRequest {
3526    fn from_json(request_json: &str) -> Result<Self> {
3527        serde_json::from_str(request_json).map_err(Into::into)
3528    }
3529
3530    #[cfg(feature = "native")]
3531    fn id(&self) -> CrdtFieldId {
3532        CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
3533    }
3534}
3535
3536#[cfg(feature = "native")]
3537fn crdt_field_write_tables_for_worker(field: &CrdtField) -> Vec<String> {
3538    match field.sync_mode() {
3539        CrdtFieldSyncMode::ServerMerge => vec![field.table().to_string()],
3540        CrdtFieldSyncMode::EncryptedUpdateLog => {
3541            vec![field.table().to_string(), CRDT_UPDATES_TABLE.to_string()]
3542        }
3543    }
3544}
3545
3546#[cfg(feature = "native")]
3547fn crdt_field_compaction_tables_for_worker(field: &CrdtField) -> Vec<String> {
3548    match field.sync_mode() {
3549        CrdtFieldSyncMode::ServerMerge => Vec::new(),
3550        CrdtFieldSyncMode::EncryptedUpdateLog => vec![CRDT_CHECKPOINTS_TABLE.to_string()],
3551    }
3552}
3553
3554#[cfg(feature = "native")]
3555fn crdt_field_changed_row_for_worker(field: &CrdtField, client_commit_id: &str) -> SyncChangedRow {
3556    let crdt_field_changes = vec![sync_changed_crdt_field_from_metadata(
3557        field.field_metadata(),
3558    )];
3559    SyncChangedRow {
3560        table: field.table().to_string(),
3561        row_id: Some(field.row_id().to_string()),
3562        operation: "update".to_string(),
3563        changed_fields: vec![field.field().to_string(), field.state_column().to_string()],
3564        crdt_fields: crdt_field_changes
3565            .iter()
3566            .map(|field| field.state_column.clone())
3567            .collect(),
3568        crdt_field_changes,
3569        commit_id: Some(client_commit_id.to_string()),
3570        commit_seq: None,
3571        subscription_id: None,
3572        server_version: None,
3573    }
3574}
3575
3576#[cfg(feature = "native")]
3577fn crdt_field_compacted_row_for_worker(
3578    field: &CrdtField,
3579    client_commit_id: &str,
3580) -> SyncChangedRow {
3581    let crdt_field_changes = vec![sync_changed_crdt_field_from_metadata(
3582        field.field_metadata(),
3583    )];
3584    SyncChangedRow {
3585        table: field.table().to_string(),
3586        row_id: Some(field.row_id().to_string()),
3587        operation: "compact".to_string(),
3588        changed_fields: vec![field.state_column().to_string()],
3589        crdt_fields: crdt_field_changes
3590            .iter()
3591            .map(|field| field.state_column.clone())
3592            .collect(),
3593        crdt_field_changes,
3594        commit_id: Some(client_commit_id.to_string()),
3595        commit_seq: None,
3596        subscription_id: None,
3597        server_version: None,
3598    }
3599}
3600
3601#[cfg(feature = "native")]
3602fn crdt_field_event_payload_for_worker<T>(
3603    client: &mut SyncularClient<DieselSqliteStore, T>,
3604    field: &CrdtField,
3605) -> Option<Value>
3606where
3607    T: SyncTransport,
3608{
3609    let mut payload = crdt_field_base_payload_for_worker(field);
3610    match client.materialize_crdt_field(field) {
3611        Ok(materialization) => {
3612            payload.insert("materializationAvailable".to_string(), json!(true));
3613            payload.insert(
3614                "hasState".to_string(),
3615                json!(materialization.state_base64.is_some()),
3616            );
3617            payload.insert(
3618                "stateVectorBase64".to_string(),
3619                json!(materialization.state_vector_base64),
3620            );
3621        }
3622        Err(error) => {
3623            payload.insert("materializationAvailable".to_string(), json!(false));
3624            payload.insert(
3625                "materializationError".to_string(),
3626                json!(error.message_text()),
3627            );
3628        }
3629    }
3630    Some(Value::Object(payload))
3631}
3632
3633#[cfg(feature = "native")]
3634fn crdt_field_compaction_payload_for_worker<T>(
3635    client: &mut SyncularClient<DieselSqliteStore, T>,
3636    field: &CrdtField,
3637    receipt: &CrdtFieldCompactionReceipt,
3638    checkpoint_created: bool,
3639    min_uncheckpointed_updates: i64,
3640) -> Option<Value>
3641where
3642    T: SyncTransport,
3643{
3644    let mut payload = crdt_field_event_payload_for_worker(client, field)
3645        .and_then(|value| value.as_object().cloned())
3646        .unwrap_or_else(|| crdt_field_base_payload_for_worker(field));
3647    payload.insert("checkpointCreated".to_string(), json!(checkpoint_created));
3648    payload.insert(
3649        "minUncheckpointedUpdates".to_string(),
3650        json!(min_uncheckpointed_updates),
3651    );
3652    payload.insert("before".to_string(), json!(&receipt.before));
3653    payload.insert("after".to_string(), json!(&receipt.after));
3654    payload.insert(
3655        "encryptedStreamBefore".to_string(),
3656        json!(&receipt.encrypted_stream_before),
3657    );
3658    payload.insert(
3659        "encryptedStreamAfter".to_string(),
3660        json!(&receipt.encrypted_stream_after),
3661    );
3662    Some(Value::Object(payload))
3663}
3664
3665#[cfg(feature = "native")]
3666fn crdt_field_compaction_payload_for_worker_current<T>(
3667    client: &mut SyncularClient<DieselSqliteStore, T>,
3668    field: &CrdtField,
3669    checkpoint_created: bool,
3670    min_uncheckpointed_updates: i64,
3671) -> Option<Value>
3672where
3673    T: SyncTransport,
3674{
3675    let mut payload = crdt_field_event_payload_for_worker(client, field)
3676        .and_then(|value| value.as_object().cloned())
3677        .unwrap_or_else(|| crdt_field_base_payload_for_worker(field));
3678    payload.insert("checkpointCreated".to_string(), json!(checkpoint_created));
3679    payload.insert(
3680        "minUncheckpointedUpdates".to_string(),
3681        json!(min_uncheckpointed_updates),
3682    );
3683    Some(Value::Object(payload))
3684}
3685
3686#[cfg(feature = "native")]
3687fn crdt_field_base_payload_for_worker(field: &CrdtField) -> serde_json::Map<String, Value> {
3688    let mut payload = serde_json::Map::new();
3689    payload.insert("syncMode".to_string(), json!(field.sync_mode()));
3690    payload.insert("kind".to_string(), json!(field.field_metadata().kind));
3691    payload.insert("stateColumn".to_string(), json!(field.state_column()));
3692    payload.insert("containerKey".to_string(), json!(field.container_key()));
3693    payload.insert("rowIdField".to_string(), json!(field.row_id_field()));
3694    payload
3695}
3696
3697#[cfg(feature = "native")]
3698fn encrypted_crdt_min_uncheckpointed_updates(request_json: &str) -> i64 {
3699    serde_json::from_str::<WorkerCrdtFieldCompactionRequest>(request_json)
3700        .ok()
3701        .and_then(|request| request.min_uncheckpointed_updates)
3702        .unwrap_or(1)
3703}
3704
3705#[cfg(feature = "native")]
3706#[derive(Debug, Clone, Default, Deserialize)]
3707#[serde(rename_all = "camelCase")]
3708struct WorkerBlobStoreOptions {
3709    mime_type: Option<String>,
3710    immediate: Option<bool>,
3711    cache_local: Option<bool>,
3712}
3713
3714#[cfg(feature = "native")]
3715#[derive(Debug, Clone, Default, Deserialize)]
3716#[serde(rename_all = "camelCase")]
3717struct WorkerBlobRetrieveOptions {
3718    cache_local: Option<bool>,
3719}