1use crate::app_schema::{
2 validate_blob_encryption_against_app_schema, validate_encrypted_crdt_against_app_schema,
3 validate_field_encryption_rules_against_app_schema, AppSchema,
4};
5#[cfg(feature = "native")]
6use crate::client::sync_changed_crdt_field_from_metadata;
7#[cfg(feature = "native")]
8use crate::client::BootstrapStatus;
9#[cfg(feature = "native")]
10use crate::client::CrdtFieldCompactionReceipt;
11use crate::client::{
12 sync_changed_row_for_local_operation, validate_subscription_limits, SubscriptionSpec,
13 SyncChangedRow, SyncReport, SyncularClient,
14};
15#[cfg(feature = "native")]
16use crate::crdt_field::{CrdtField, CrdtFieldId, CrdtFieldSyncMode};
17use crate::crdt_yjs::{
18 validate_crdt_request_json_size, validate_yjs_text_input_size,
19 validate_yjs_update_envelope_size, YjsUpdateEnvelope, YJS_PAYLOAD_KEY,
20};
21#[cfg(feature = "native")]
22use crate::diesel_sqlite::DieselSqliteStore;
23use crate::encrypted_crdt::EncryptedCrdt;
24#[cfg(feature = "native")]
25use crate::encrypted_crdt::{CRDT_CHECKPOINTS_TABLE, CRDT_UPDATES_TABLE};
26use crate::encryption::{BlobEncryption, FieldEncryption};
27use crate::error::{ErrorKind, Result, SyncularError};
28#[cfg(feature = "demo-todo-native-fixture")]
29use crate::fixtures::todo::rusqlite_sqlite::RusqliteStore;
30use crate::limits::{
31 DEFAULT_WORKER_COMMAND_QUEUE_CAPACITY, DEFAULT_WORKER_EVENT_QUEUE_CAPACITY,
32 DEFAULT_YJS_FLUSH_WINDOW_MS,
33};
34#[cfg(feature = "native")]
35use crate::protocol::BlobRef;
36use crate::protocol::{validate_mutation_json_input_size, SyncOperation};
37use crate::store::{now_ms, retry_backoff_delay_ms, SyncStateStore, SyncStore};
38#[cfg(feature = "native")]
39use crate::transport::BlobTransport;
40use crate::transport::{
41 RealtimeEvent, RealtimeTransport, SyncAuthHeaderStore, SyncAuthHeaders, SyncTransport,
42};
43use serde::Deserialize;
44use serde_json::{json, Value};
45use std::collections::{BTreeMap, VecDeque};
46#[cfg(feature = "native")]
47use std::path::Path;
48use std::sync::mpsc::{self, Receiver, RecvTimeoutError, SyncSender, TrySendError};
49use std::sync::{Arc, Condvar, Mutex};
50use std::thread::{self, JoinHandle};
51use std::time::{Duration, Instant};
52
53const YJS_FLUSH_WINDOW: Duration = Duration::from_millis(DEFAULT_YJS_FLUSH_WINDOW_MS);
54
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct SyncWorkerConfig {
57 pub command_queue_capacity: usize,
58 pub yjs_flush_window: Duration,
59}
60
61impl Default for SyncWorkerConfig {
62 fn default() -> Self {
63 Self {
64 command_queue_capacity: DEFAULT_WORKER_COMMAND_QUEUE_CAPACITY,
65 yjs_flush_window: YJS_FLUSH_WINDOW,
66 }
67 }
68}
69
70enum WorkerCommand {
71 Trigger {
72 command_id: Option<String>,
73 emit_started: bool,
74 transport: WorkerSyncTransport,
75 },
76 ApplyMutationJson {
77 command_id: String,
78 mutation_json: String,
79 local_row_json: Option<String>,
80 auto_sync: bool,
81 require_auth_lease: bool,
82 },
83 SaveYjsUpdateJson {
84 command_id: String,
85 update_json: String,
86 auto_sync: bool,
87 },
88 ApplyCrdtFieldTextJson {
89 command_id: String,
90 request_json: String,
91 auto_sync: bool,
92 },
93 CompactCrdtFieldJson {
94 command_id: String,
95 request_json: String,
96 auto_sync: bool,
97 },
98 ApplyEncryptedCrdtUpdateJson {
99 command_id: String,
100 request_json: String,
101 auto_sync: bool,
102 },
103 ApplyEncryptedCrdtCheckpointJson {
104 command_id: String,
105 request_json: String,
106 auto_sync: bool,
107 },
108 ResolveConflict {
109 command_id: String,
110 conflict_id: String,
111 resolution: String,
112 auto_sync: bool,
113 },
114 RefreshSnapshotJson {
115 command_id: String,
116 request_json: String,
117 },
118 CompactStorageJson {
119 command_id: String,
120 options_json: Option<String>,
121 },
122 StoreBlobFileJson {
123 command_id: String,
124 path: String,
125 options_json: Option<String>,
126 },
127 RetrieveBlobFileJson {
128 command_id: String,
129 ref_json: String,
130 path: String,
131 options_json: Option<String>,
132 },
133 ProcessBlobUploadQueue {
134 command_id: String,
135 },
136 PruneBlobCache {
137 command_id: String,
138 max_bytes: i64,
139 },
140 ClearBlobCache {
141 command_id: String,
142 },
143 SetSubscriptions(Vec<SubscriptionSpec>),
144 SetAuthHeaders(SyncAuthHeaders),
145 SetFieldEncryption(Option<FieldEncryption>),
146 SetEncryptedCrdt(Option<EncryptedCrdt>),
147 SetBlobEncryption(Option<BlobEncryption>),
148 Stop,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152enum WorkerSyncTransport {
153 Http,
154 WebSocket,
155}
156
157impl WorkerSyncTransport {
158 fn coalesce(self, next: Self) -> Self {
159 if matches!(self, Self::WebSocket) || matches!(next, Self::WebSocket) {
160 Self::WebSocket
161 } else {
162 Self::Http
163 }
164 }
165}
166
167#[derive(Debug)]
168pub enum SyncWorkerEvent {
169 SyncStarted {
170 command_id: Option<String>,
171 },
172 SyncCompleted {
173 command_id: Option<String>,
174 report: SyncReport,
175 #[cfg(feature = "native")]
176 bootstrap: BootstrapStatus,
177 outbox_count: usize,
178 conflict_count: usize,
179 duration_ms: u64,
180 },
181 SyncFailed {
182 command_id: Option<String>,
183 error: SyncularError,
184 retry_scheduled: bool,
185 duration_ms: u64,
186 },
187 LocalWriteCommitted {
188 command_id: String,
189 client_commit_id: String,
190 changed_tables: Vec<String>,
191 changed_rows: Vec<SyncChangedRow>,
192 outbox_count: usize,
193 duration_ms: u64,
194 },
195 CrdtFieldChanged {
196 command_id: String,
197 client_commit_id: String,
198 table: String,
199 row_id: String,
200 field: String,
201 changed_tables: Vec<String>,
202 payload_json: Option<Value>,
203 duration_ms: u64,
204 },
205 CrdtFieldCompacted {
206 command_id: String,
207 client_commit_id: Option<String>,
208 table: String,
209 row_id: String,
210 field: String,
211 changed_tables: Vec<String>,
212 checkpoint_created: bool,
213 payload_json: Option<Value>,
214 duration_ms: u64,
215 },
216 LocalWriteFailed {
217 command_id: String,
218 error: SyncularError,
219 payload_json: Option<Value>,
220 duration_ms: u64,
221 },
222 ConflictResolutionCompleted {
223 command_id: String,
224 retry_client_commit_id: Option<String>,
225 duration_ms: u64,
226 },
227 ConflictResolutionFailed {
228 command_id: String,
229 error: SyncularError,
230 duration_ms: u64,
231 },
232 SnapshotReady {
233 command_id: String,
234 payload_json: Value,
235 duration_ms: u64,
236 },
237 WorkerCommandCompleted {
238 command_id: String,
239 operation: &'static str,
240 payload_json: Option<Value>,
241 duration_ms: u64,
242 },
243 WorkerCommandFailed {
244 command_id: String,
245 operation: &'static str,
246 error: SyncularError,
247 duration_ms: u64,
248 },
249 BlobUploadsChanged {
250 stats_json: Value,
251 },
252 EventsOverflowed {
253 dropped_count: usize,
254 },
255}
256
257impl Clone for SyncWorkerEvent {
258 fn clone(&self) -> Self {
259 match self {
260 Self::SyncStarted { command_id } => Self::SyncStarted {
261 command_id: command_id.clone(),
262 },
263 Self::SyncCompleted {
264 command_id,
265 report,
266 #[cfg(feature = "native")]
267 bootstrap,
268 outbox_count,
269 conflict_count,
270 duration_ms,
271 } => Self::SyncCompleted {
272 command_id: command_id.clone(),
273 report: report.clone(),
274 #[cfg(feature = "native")]
275 bootstrap: bootstrap.clone(),
276 outbox_count: *outbox_count,
277 conflict_count: *conflict_count,
278 duration_ms: *duration_ms,
279 },
280 Self::SyncFailed {
281 command_id,
282 error,
283 retry_scheduled,
284 duration_ms,
285 } => Self::SyncFailed {
286 command_id: command_id.clone(),
287 error: clone_worker_error(error),
288 retry_scheduled: *retry_scheduled,
289 duration_ms: *duration_ms,
290 },
291 Self::LocalWriteCommitted {
292 command_id,
293 client_commit_id,
294 changed_tables,
295 changed_rows,
296 outbox_count,
297 duration_ms,
298 } => Self::LocalWriteCommitted {
299 command_id: command_id.clone(),
300 client_commit_id: client_commit_id.clone(),
301 changed_tables: changed_tables.clone(),
302 changed_rows: changed_rows.clone(),
303 outbox_count: *outbox_count,
304 duration_ms: *duration_ms,
305 },
306 Self::CrdtFieldChanged {
307 command_id,
308 client_commit_id,
309 table,
310 row_id,
311 field,
312 changed_tables,
313 payload_json,
314 duration_ms,
315 } => Self::CrdtFieldChanged {
316 command_id: command_id.clone(),
317 client_commit_id: client_commit_id.clone(),
318 table: table.clone(),
319 row_id: row_id.clone(),
320 field: field.clone(),
321 changed_tables: changed_tables.clone(),
322 payload_json: payload_json.clone(),
323 duration_ms: *duration_ms,
324 },
325 Self::CrdtFieldCompacted {
326 command_id,
327 client_commit_id,
328 table,
329 row_id,
330 field,
331 changed_tables,
332 checkpoint_created,
333 payload_json,
334 duration_ms,
335 } => Self::CrdtFieldCompacted {
336 command_id: command_id.clone(),
337 client_commit_id: client_commit_id.clone(),
338 table: table.clone(),
339 row_id: row_id.clone(),
340 field: field.clone(),
341 changed_tables: changed_tables.clone(),
342 checkpoint_created: *checkpoint_created,
343 payload_json: payload_json.clone(),
344 duration_ms: *duration_ms,
345 },
346 Self::LocalWriteFailed {
347 command_id,
348 error,
349 payload_json,
350 duration_ms,
351 } => Self::LocalWriteFailed {
352 command_id: command_id.clone(),
353 error: clone_worker_error(error),
354 payload_json: payload_json.clone(),
355 duration_ms: *duration_ms,
356 },
357 Self::ConflictResolutionCompleted {
358 command_id,
359 retry_client_commit_id,
360 duration_ms,
361 } => Self::ConflictResolutionCompleted {
362 command_id: command_id.clone(),
363 retry_client_commit_id: retry_client_commit_id.clone(),
364 duration_ms: *duration_ms,
365 },
366 Self::ConflictResolutionFailed {
367 command_id,
368 error,
369 duration_ms,
370 } => Self::ConflictResolutionFailed {
371 command_id: command_id.clone(),
372 error: clone_worker_error(error),
373 duration_ms: *duration_ms,
374 },
375 Self::SnapshotReady {
376 command_id,
377 payload_json,
378 duration_ms,
379 } => Self::SnapshotReady {
380 command_id: command_id.clone(),
381 payload_json: payload_json.clone(),
382 duration_ms: *duration_ms,
383 },
384 Self::WorkerCommandCompleted {
385 command_id,
386 operation,
387 payload_json,
388 duration_ms,
389 } => Self::WorkerCommandCompleted {
390 command_id: command_id.clone(),
391 operation: *operation,
392 payload_json: payload_json.clone(),
393 duration_ms: *duration_ms,
394 },
395 Self::WorkerCommandFailed {
396 command_id,
397 operation,
398 error,
399 duration_ms,
400 } => Self::WorkerCommandFailed {
401 command_id: command_id.clone(),
402 operation: *operation,
403 error: clone_worker_error(error),
404 duration_ms: *duration_ms,
405 },
406 Self::BlobUploadsChanged { stats_json } => Self::BlobUploadsChanged {
407 stats_json: stats_json.clone(),
408 },
409 Self::EventsOverflowed { dropped_count } => Self::EventsOverflowed {
410 dropped_count: *dropped_count,
411 },
412 }
413 }
414}
415
416impl SyncWorkerEvent {
417 pub fn requires_full_refresh(&self) -> bool {
418 match self {
419 Self::SyncFailed { error, .. } => error.requires_full_snapshot_resync(),
420 Self::EventsOverflowed { .. } => true,
421 _ => false,
422 }
423 }
424}
425
426fn clone_worker_error(error: &SyncularError) -> SyncularError {
427 SyncularError::message(error.kind(), error.to_string())
428}
429
430pub trait SyncWorkerClientExt {
431 fn apply_worker_mutation_json(
432 &mut self,
433 mutation_json: &str,
434 local_row_json: Option<&str>,
435 ) -> Result<String>;
436
437 fn apply_worker_leased_mutation_json(
438 &mut self,
439 _mutation_json: &str,
440 _local_row_json: Option<&str>,
441 ) -> Result<String> {
442 Err(SyncularError::config(
443 "worker-owned leased mutations are not available for this client",
444 ))
445 }
446
447 fn apply_worker_mutation(
448 &mut self,
449 mutation: SyncOperation,
450 local_row: Option<Value>,
451 ) -> Result<String>;
452
453 fn worker_current_row_json(&mut self, _table: &str, _row_id: &str) -> Result<Option<Value>> {
454 Ok(None)
455 }
456
457 fn apply_worker_encrypted_crdt_update_json(
458 &mut self,
459 _request_json: &str,
460 ) -> Result<WorkerLocalWriteReceipt> {
461 Err(SyncularError::config(
462 "worker-owned encrypted CRDT updates are not available for this client",
463 ))
464 }
465
466 fn apply_worker_encrypted_crdt_checkpoint_json(
467 &mut self,
468 _request_json: &str,
469 ) -> Result<Option<WorkerLocalWriteReceipt>> {
470 Err(SyncularError::config(
471 "worker-owned encrypted CRDT checkpoints are not available for this client",
472 ))
473 }
474
475 fn apply_worker_crdt_field_text_json(
476 &mut self,
477 _request_json: &str,
478 ) -> Result<WorkerLocalWriteReceipt> {
479 Err(SyncularError::config(
480 "worker-owned CRDT field text updates are not available for this client",
481 ))
482 }
483
484 fn compact_worker_crdt_field_json(
485 &mut self,
486 _request_json: &str,
487 ) -> Result<Option<WorkerLocalWriteReceipt>> {
488 Err(SyncularError::config(
489 "worker-owned CRDT field compaction is not available for this client",
490 ))
491 }
492
493 fn worker_crdt_field_event_payload_json(
494 &mut self,
495 _table: &str,
496 _row_id: &str,
497 _field: &str,
498 ) -> Result<Option<Value>> {
499 Ok(None)
500 }
501
502 fn worker_crdt_field_changed_row(
503 &mut self,
504 _table: &str,
505 _row_id: &str,
506 _field: &str,
507 _client_commit_id: &str,
508 ) -> Result<Option<SyncChangedRow>> {
509 Ok(None)
510 }
511
512 fn worker_query_json(&mut self, _request_json: &str) -> Result<String> {
513 Err(SyncularError::config(
514 "worker-owned snapshot refresh is not available for this client",
515 ))
516 }
517
518 fn worker_compact_storage_json(&mut self, _options_json: Option<&str>) -> Result<String> {
519 Err(SyncularError::config(
520 "worker-owned storage compaction is not available for this client",
521 ))
522 }
523
524 fn worker_store_blob_file_json(
525 &mut self,
526 _path: &str,
527 _options_json: Option<&str>,
528 ) -> Result<String> {
529 Err(SyncularError::config(
530 "worker-owned blob file storage is not available for this client",
531 ))
532 }
533
534 fn worker_retrieve_blob_file_json(
535 &mut self,
536 _ref_json: &str,
537 _path: &str,
538 _options_json: Option<&str>,
539 ) -> Result<String> {
540 Err(SyncularError::config(
541 "worker-owned blob file retrieval is not available for this client",
542 ))
543 }
544
545 fn worker_prune_blob_cache_json(&mut self, _max_bytes: i64) -> Result<String> {
546 Err(SyncularError::config(
547 "worker-owned blob cache pruning is not available for this client",
548 ))
549 }
550
551 fn worker_clear_blob_cache_json(&mut self) -> Result<String> {
552 Err(SyncularError::config(
553 "worker-owned blob cache clearing is not available for this client",
554 ))
555 }
556
557 fn worker_process_blob_upload_queue_json(&mut self) -> Result<Option<String>> {
558 Ok(None)
559 }
560
561 fn worker_blob_upload_queue_stats_json(&mut self) -> Result<Option<String>> {
562 Ok(None)
563 }
564
565 fn worker_next_outbox_retry_at_ms(&mut self) -> Result<Option<i64>> {
566 Ok(None)
567 }
568
569 fn worker_next_blob_upload_retry_at_ms(&mut self) -> Result<Option<i64>> {
570 Ok(None)
571 }
572}
573
574#[derive(Debug, Clone, PartialEq, Eq)]
575pub struct WorkerLocalWriteReceipt {
576 pub client_commit_id: String,
577 pub changed_tables: Vec<String>,
578 pub changed_rows: Vec<SyncChangedRow>,
579 pub crdt_event_payload_json: Option<Value>,
580}
581
582#[cfg(feature = "native")]
583impl<T> SyncWorkerClientExt for SyncularClient<DieselSqliteStore, T>
584where
585 T: SyncTransport + BlobTransport,
586{
587 fn apply_worker_mutation_json(
588 &mut self,
589 mutation_json: &str,
590 local_row_json: Option<&str>,
591 ) -> Result<String> {
592 self.apply_mutation_json(mutation_json, local_row_json)
593 }
594
595 fn apply_worker_leased_mutation_json(
596 &mut self,
597 mutation_json: &str,
598 local_row_json: Option<&str>,
599 ) -> Result<String> {
600 self.apply_leased_mutation_json(mutation_json, local_row_json)
601 }
602
603 fn apply_worker_mutation(
604 &mut self,
605 mutation: SyncOperation,
606 local_row: Option<Value>,
607 ) -> Result<String> {
608 let mutation_json = serde_json::to_string(&mutation)?;
609 let local_row_json = local_row.as_ref().map(serde_json::to_string).transpose()?;
610 self.apply_mutation_json(&mutation_json, local_row_json.as_deref())
611 }
612
613 fn worker_current_row_json(&mut self, table: &str, row_id: &str) -> Result<Option<Value>> {
614 self.current_row_json(table, row_id)
615 }
616
617 fn apply_worker_encrypted_crdt_update_json(
618 &mut self,
619 request_json: &str,
620 ) -> Result<WorkerLocalWriteReceipt> {
621 let request: WorkerEncryptedCrdtRequest = serde_json::from_str(request_json)?;
622 let field = request
623 .field_identity_ref()
624 .and_then(|identity| self.open_crdt_field(identity.id()).ok());
625 let receipt = self.apply_encrypted_crdt_update_json(request_json)?;
626 let crdt_event_payload_json = field
627 .as_ref()
628 .and_then(|field| crdt_field_event_payload_for_worker(self, field));
629 Ok(WorkerLocalWriteReceipt {
630 changed_rows: field
631 .as_ref()
632 .map(|field| crdt_field_changed_row_for_worker(field, &receipt.client_commit_id))
633 .into_iter()
634 .collect(),
635 client_commit_id: receipt.client_commit_id,
636 changed_tables: vec![request.table, CRDT_UPDATES_TABLE.to_string()],
637 crdt_event_payload_json,
638 })
639 }
640
641 fn apply_worker_encrypted_crdt_checkpoint_json(
642 &mut self,
643 request_json: &str,
644 ) -> Result<Option<WorkerLocalWriteReceipt>> {
645 let request: WorkerEncryptedCrdtRequest = serde_json::from_str(request_json)?;
646 let field = request
647 .field_identity_ref()
648 .and_then(|identity| self.open_crdt_field(identity.id()).ok());
649 let receipt = self.apply_encrypted_crdt_checkpoint_json(request_json)?;
650 let crdt_event_payload_json = field.as_ref().and_then(|field| {
651 crdt_field_compaction_payload_for_worker_current(
652 self,
653 field,
654 true,
655 encrypted_crdt_min_uncheckpointed_updates(request_json),
656 )
657 });
658 Ok(receipt.map(|receipt| WorkerLocalWriteReceipt {
659 changed_rows: field
660 .as_ref()
661 .map(|field| crdt_field_compacted_row_for_worker(field, &receipt.client_commit_id))
662 .into_iter()
663 .collect(),
664 client_commit_id: receipt.client_commit_id,
665 changed_tables: vec![CRDT_CHECKPOINTS_TABLE.to_string()],
666 crdt_event_payload_json,
667 }))
668 }
669
670 fn apply_worker_crdt_field_text_json(
671 &mut self,
672 request_json: &str,
673 ) -> Result<WorkerLocalWriteReceipt> {
674 let request: WorkerCrdtFieldTextRequest = serde_json::from_str(request_json)?;
675 let field = self.open_crdt_field(request.id())?;
676 let receipt = self.apply_crdt_field_text(&field, &request.next_text)?;
677 let crdt_event_payload_json = crdt_field_event_payload_for_worker(self, &field);
678 Ok(WorkerLocalWriteReceipt {
679 changed_rows: vec![crdt_field_changed_row_for_worker(
680 &field,
681 &receipt.client_commit_id,
682 )],
683 client_commit_id: receipt.client_commit_id,
684 changed_tables: crdt_field_write_tables_for_worker(&field),
685 crdt_event_payload_json,
686 })
687 }
688
689 fn compact_worker_crdt_field_json(
690 &mut self,
691 request_json: &str,
692 ) -> Result<Option<WorkerLocalWriteReceipt>> {
693 let request: WorkerCrdtFieldCompactionRequest = serde_json::from_str(request_json)?;
694 let field = self.open_crdt_field(request.id())?;
695 let receipt =
696 self.compact_crdt_field(&field, request.min_uncheckpointed_updates.unwrap_or(1))?;
697 let crdt_event_payload_json = crdt_field_compaction_payload_for_worker(
698 self,
699 &field,
700 &receipt,
701 receipt.checkpoint_created,
702 request.min_uncheckpointed_updates.unwrap_or(1),
703 );
704 Ok(receipt
705 .client_commit_id
706 .map(|client_commit_id| WorkerLocalWriteReceipt {
707 changed_rows: vec![crdt_field_compacted_row_for_worker(
708 &field,
709 &client_commit_id,
710 )],
711 client_commit_id,
712 changed_tables: crdt_field_compaction_tables_for_worker(&field),
713 crdt_event_payload_json,
714 }))
715 }
716
717 fn worker_crdt_field_event_payload_json(
718 &mut self,
719 table: &str,
720 row_id: &str,
721 field: &str,
722 ) -> Result<Option<Value>> {
723 let field = self.open_crdt_field(CrdtFieldId::new(table, row_id, field))?;
724 Ok(crdt_field_event_payload_for_worker(self, &field))
725 }
726
727 fn worker_crdt_field_changed_row(
728 &mut self,
729 table: &str,
730 row_id: &str,
731 field: &str,
732 client_commit_id: &str,
733 ) -> Result<Option<SyncChangedRow>> {
734 let field = self.open_crdt_field(CrdtFieldId::new(table, row_id, field))?;
735 Ok(Some(crdt_field_changed_row_for_worker(
736 &field,
737 client_commit_id,
738 )))
739 }
740
741 fn worker_query_json(&mut self, request_json: &str) -> Result<String> {
742 self.readonly_query_json(request_json)
743 }
744
745 fn worker_compact_storage_json(&mut self, options_json: Option<&str>) -> Result<String> {
746 self.compact_storage_json(options_json)
747 }
748
749 fn worker_store_blob_file_json(
750 &mut self,
751 path: &str,
752 options_json: Option<&str>,
753 ) -> Result<String> {
754 let options: WorkerBlobStoreOptions = options_json
755 .filter(|value| !value.trim().is_empty())
756 .map(serde_json::from_str)
757 .transpose()?
758 .unwrap_or_default();
759 if options.immediate.unwrap_or(false) {
760 return Err(SyncularError::config(
761 "queued blob file storage currently supports immediate=false",
762 ));
763 }
764 if !options.cache_local.unwrap_or(true) {
765 return Err(SyncularError::config(
766 "queued blob file storage with cacheLocal=false requires immediate=true",
767 ));
768 }
769 let mime_type = options
770 .mime_type
771 .as_deref()
772 .unwrap_or("application/octet-stream");
773 self.store_blob_file_local_json(Path::new(path), mime_type, true)
774 }
775
776 fn worker_retrieve_blob_file_json(
777 &mut self,
778 ref_json: &str,
779 path: &str,
780 options_json: Option<&str>,
781 ) -> Result<String> {
782 let options: WorkerBlobRetrieveOptions = options_json
783 .filter(|value| !value.trim().is_empty())
784 .map(serde_json::from_str)
785 .transpose()?
786 .unwrap_or_default();
787 let blob: BlobRef = serde_json::from_str(ref_json)?;
788 self.retrieve_cached_blob_file_json(&blob, Path::new(path))?;
789 let payload = json!({
790 "ok": true,
791 "cacheLocal": options.cache_local.unwrap_or(true)
792 });
793 Ok(serde_json::to_string(&payload)?)
794 }
795
796 fn worker_prune_blob_cache_json(&mut self, max_bytes: i64) -> Result<String> {
797 Ok(serde_json::to_string(&json!({
798 "bytesPruned": self.prune_blob_cache(max_bytes)?
799 }))?)
800 }
801
802 fn worker_clear_blob_cache_json(&mut self) -> Result<String> {
803 self.clear_blob_cache()?;
804 Ok(serde_json::to_string(&json!({ "ok": true }))?)
805 }
806
807 fn worker_process_blob_upload_queue_json(&mut self) -> Result<Option<String>> {
808 Ok(Some(serde_json::to_string(
809 &self.process_blob_upload_queue()?,
810 )?))
811 }
812
813 fn worker_blob_upload_queue_stats_json(&mut self) -> Result<Option<String>> {
814 Ok(Some(serde_json::to_string(
815 &self.blob_upload_queue_stats()?,
816 )?))
817 }
818
819 fn worker_next_outbox_retry_at_ms(&mut self) -> Result<Option<i64>> {
820 self.next_outbox_retry_at_ms()
821 }
822
823 fn worker_next_blob_upload_retry_at_ms(&mut self) -> Result<Option<i64>> {
824 self.next_blob_upload_retry_at_ms()
825 }
826}
827
828#[cfg(feature = "demo-todo-native-fixture")]
829impl<T> SyncWorkerClientExt for SyncularClient<RusqliteStore, T>
830where
831 T: SyncTransport,
832{
833 fn apply_worker_mutation_json(
834 &mut self,
835 mutation_json: &str,
836 local_row_json: Option<&str>,
837 ) -> Result<String> {
838 self.apply_mutation_json(mutation_json, local_row_json)
839 }
840
841 fn apply_worker_mutation(
842 &mut self,
843 mutation: SyncOperation,
844 local_row: Option<Value>,
845 ) -> Result<String> {
846 let mutation_json = serde_json::to_string(&mutation)?;
847 let local_row_json = local_row.as_ref().map(serde_json::to_string).transpose()?;
848 self.apply_mutation_json(&mutation_json, local_row_json.as_deref())
849 }
850
851 fn worker_current_row_json(&mut self, table: &str, row_id: &str) -> Result<Option<Value>> {
852 self.current_row_json(table, row_id)
853 }
854
855 fn worker_next_outbox_retry_at_ms(&mut self) -> Result<Option<i64>> {
856 self.next_outbox_retry_at_ms()
857 }
858}
859
860pub struct SyncWorker {
861 app_schema: AppSchema,
862 command_tx: SyncSender<WorkerCommand>,
863 events: SyncWorkerEventHub,
864 default_events: SyncWorkerEventSubscription,
865 join: Option<JoinHandle<()>>,
866}
867
868#[derive(Clone)]
869struct SyncWorkerEventHub {
870 subscriber_seq: Arc<Mutex<u64>>,
871 subscribers: Arc<Mutex<BTreeMap<u64, Arc<WorkerEventQueue>>>>,
872}
873
874pub struct SyncWorkerEventSubscription {
875 hub: SyncWorkerEventHub,
876 subscriber_id: u64,
877 queue: Arc<WorkerEventQueue>,
878}
879
880struct WorkerEventQueue {
881 capacity: usize,
882 state: Mutex<WorkerEventQueueState>,
883 ready: Condvar,
884}
885
886struct WorkerEventQueueState {
887 events: VecDeque<SyncWorkerEvent>,
888 closed: bool,
889}
890
891impl Default for SyncWorkerEventHub {
892 fn default() -> Self {
893 Self {
894 subscriber_seq: Arc::new(Mutex::new(0)),
895 subscribers: Arc::new(Mutex::new(BTreeMap::new())),
896 }
897 }
898}
899
900impl SyncWorkerEventSubscription {
901 pub fn next_event(&self) -> Option<SyncWorkerEvent> {
902 self.queue.next_event()
903 }
904
905 pub fn next_event_timeout(&self, timeout: Duration) -> Option<SyncWorkerEvent> {
906 self.queue.next_event_timeout(timeout)
907 }
908
909 pub fn close(&self) {
910 if let Ok(mut subscribers) = self.hub.subscribers.lock() {
911 subscribers.remove(&self.subscriber_id);
912 }
913 self.queue.close();
914 }
915}
916
917impl Drop for SyncWorkerEventSubscription {
918 fn drop(&mut self) {
919 self.close();
920 }
921}
922
923impl SyncWorkerEventHub {
924 fn subscribe(&self, capacity: usize) -> SyncWorkerEventSubscription {
925 let queue = Arc::new(WorkerEventQueue::new(capacity));
926 let subscriber_id = self.next_subscriber_id();
927 if let Ok(mut subscribers) = self.subscribers.lock() {
928 subscribers.insert(subscriber_id, queue.clone());
929 }
930 SyncWorkerEventSubscription {
931 hub: self.clone(),
932 subscriber_id,
933 queue,
934 }
935 }
936
937 fn publish_event(&self, event: SyncWorkerEvent) {
938 let Ok(mut subscribers) = self.subscribers.lock() else {
939 return;
940 };
941
942 subscribers.retain(|_, queue| {
943 queue.push(event.clone());
944 !queue.is_closed()
945 });
946 }
947
948 fn close_all(&self) {
949 if let Ok(mut subscribers) = self.subscribers.lock() {
950 for queue in subscribers.values() {
951 queue.close_after_drain();
952 }
953 subscribers.clear();
954 }
955 }
956
957 fn next_subscriber_id(&self) -> u64 {
958 if let Ok(mut seq) = self.subscriber_seq.lock() {
959 *seq = seq.saturating_add(1);
960 *seq
961 } else {
962 0
963 }
964 }
965}
966
967impl WorkerEventQueue {
968 fn new(capacity: usize) -> Self {
969 Self {
970 capacity: capacity.max(1),
971 state: Mutex::new(WorkerEventQueueState {
972 events: VecDeque::new(),
973 closed: false,
974 }),
975 ready: Condvar::new(),
976 }
977 }
978
979 fn push(&self, event: SyncWorkerEvent) {
980 let Ok(mut state) = self.state.lock() else {
981 return;
982 };
983 if state.closed {
984 return;
985 }
986 if state.events.len() >= self.capacity {
987 let dropped_count = state.events.len().saturating_add(1);
988 state.events.clear();
989 state
990 .events
991 .push_back(SyncWorkerEvent::EventsOverflowed { dropped_count });
992 state.closed = true;
993 } else {
994 state.events.push_back(event);
995 }
996 self.ready.notify_one();
997 }
998
999 fn next_event(&self) -> Option<SyncWorkerEvent> {
1000 let mut state = self.state.lock().ok()?;
1001 loop {
1002 if let Some(event) = state.events.pop_front() {
1003 return Some(event);
1004 }
1005 if state.closed {
1006 return None;
1007 }
1008 state = self.ready.wait(state).ok()?;
1009 }
1010 }
1011
1012 fn next_event_timeout(&self, timeout: Duration) -> Option<SyncWorkerEvent> {
1013 let deadline = Instant::now().checked_add(timeout)?;
1014 let mut state = self.state.lock().ok()?;
1015 loop {
1016 if let Some(event) = state.events.pop_front() {
1017 return Some(event);
1018 }
1019 if state.closed {
1020 return None;
1021 }
1022 let now = Instant::now();
1023 if now >= deadline {
1024 return None;
1025 }
1026 let wait = deadline.saturating_duration_since(now);
1027 let (next_state, timeout) = self.ready.wait_timeout(state, wait).ok()?;
1028 state = next_state;
1029 if timeout.timed_out() && state.events.is_empty() {
1030 return None;
1031 }
1032 }
1033 }
1034
1035 fn close(&self) {
1036 if let Ok(mut state) = self.state.lock() {
1037 state.closed = true;
1038 state.events.clear();
1039 self.ready.notify_all();
1040 }
1041 }
1042
1043 fn close_after_drain(&self) {
1044 if let Ok(mut state) = self.state.lock() {
1045 state.closed = true;
1046 self.ready.notify_all();
1047 }
1048 }
1049
1050 fn is_closed(&self) -> bool {
1051 self.state.lock().map(|state| state.closed).unwrap_or(true)
1052 }
1053}
1054
1055struct CloseWorkerEventsOnDrop(SyncWorkerEventHub);
1056
1057impl Drop for CloseWorkerEventsOnDrop {
1058 fn drop(&mut self) {
1059 self.0.close_all();
1060 }
1061}
1062
1063enum WorkerWake {
1064 Command(WorkerCommand),
1065 FlushYjs,
1066 Retry,
1067}
1068
1069#[derive(Clone)]
1070pub struct SyncWorkerTrigger {
1071 command_tx: SyncSender<WorkerCommand>,
1072}
1073
1074pub struct PersistentRealtimeWorker {
1075 command_tx: SyncSender<RealtimeWorkerCommand>,
1076 join: Option<JoinHandle<()>>,
1077}
1078
1079enum RealtimeWorkerCommand {
1080 Stop,
1081 SetAuthHeaders(SyncAuthHeaders),
1082 SendPresence {
1083 action: String,
1084 scope_key: String,
1085 metadata: Option<Value>,
1086 },
1087}
1088
1089type RealtimeEventHandler = Arc<dyn Fn(RealtimeEvent) + Send + Sync>;
1090
1091impl SyncWorkerTrigger {
1092 pub fn trigger_sync(&self) -> Result<()> {
1093 self.command_tx
1094 .try_send(WorkerCommand::Trigger {
1095 command_id: None,
1096 emit_started: false,
1097 transport: WorkerSyncTransport::Http,
1098 })
1099 .map_err(|err| match err {
1100 TrySendError::Full(_) => SyncularError::busy("sync worker command queue is full"),
1101 TrySendError::Disconnected(_) => {
1102 SyncularError::message(ErrorKind::Internal, "sync worker is not running")
1103 }
1104 })
1105 }
1106}
1107
1108impl PersistentRealtimeWorker {
1109 pub fn start<T>(transport: T, trigger: SyncWorkerTrigger) -> Self
1110 where
1111 T: SyncTransport + SyncAuthHeaderStore + Send + 'static,
1112 {
1113 Self::start_with_event_handler(transport, trigger, None)
1114 }
1115
1116 pub fn start_with_event_handler<T>(
1117 transport: T,
1118 trigger: SyncWorkerTrigger,
1119 event_handler: Option<RealtimeEventHandler>,
1120 ) -> Self
1121 where
1122 T: SyncTransport + SyncAuthHeaderStore + Send + 'static,
1123 {
1124 let (command_tx, command_rx) = mpsc::sync_channel(32);
1125 let join = thread::spawn(move || {
1126 run_persistent_realtime_worker(transport, trigger, command_rx, event_handler)
1127 });
1128 Self {
1129 command_tx,
1130 join: Some(join),
1131 }
1132 }
1133
1134 pub fn set_auth_headers(&self, headers: SyncAuthHeaders) -> Result<()> {
1135 self.command_tx
1136 .try_send(RealtimeWorkerCommand::SetAuthHeaders(headers))
1137 .map_err(|err| match err {
1138 TrySendError::Full(_) => {
1139 SyncularError::busy("realtime worker command queue is full")
1140 }
1141 TrySendError::Disconnected(_) => {
1142 SyncularError::message(ErrorKind::Internal, "realtime worker is not running")
1143 }
1144 })
1145 }
1146
1147 pub fn send_presence(
1148 &self,
1149 action: impl Into<String>,
1150 scope_key: impl Into<String>,
1151 metadata: Option<Value>,
1152 ) -> Result<()> {
1153 self.command_tx
1154 .try_send(RealtimeWorkerCommand::SendPresence {
1155 action: action.into(),
1156 scope_key: scope_key.into(),
1157 metadata,
1158 })
1159 .map_err(|err| match err {
1160 TrySendError::Full(_) => {
1161 SyncularError::busy("realtime worker command queue is full")
1162 }
1163 TrySendError::Disconnected(_) => {
1164 SyncularError::message(ErrorKind::Internal, "realtime worker is not running")
1165 }
1166 })
1167 }
1168
1169 pub fn stop(&mut self) -> Result<()> {
1170 let _ = self.command_tx.send(RealtimeWorkerCommand::Stop);
1171 if let Some(join) = self.join.take() {
1172 join.join().map_err(|_| {
1173 SyncularError::message(ErrorKind::Internal, "realtime worker panicked")
1174 })?;
1175 }
1176 Ok(())
1177 }
1178}
1179
1180impl Drop for PersistentRealtimeWorker {
1181 fn drop(&mut self) {
1182 let _ = self.stop();
1183 }
1184}
1185
1186fn run_persistent_realtime_worker<T>(
1187 mut transport: T,
1188 trigger: SyncWorkerTrigger,
1189 command_rx: Receiver<RealtimeWorkerCommand>,
1190 event_handler: Option<RealtimeEventHandler>,
1191) where
1192 T: SyncTransport + SyncAuthHeaderStore,
1193{
1194 let mut reconnect_attempt: i32 = 0;
1195 let mut active_presence: BTreeMap<String, Option<Value>> = BTreeMap::new();
1196 loop {
1197 if drain_realtime_commands(&mut transport, None, &command_rx, &mut active_presence)
1198 .is_none()
1199 {
1200 return;
1201 }
1202
1203 match transport.connect_realtime() {
1204 Ok(mut socket) => {
1205 reconnect_attempt = 0;
1206 rejoin_realtime_presence(&mut socket, &active_presence);
1207 if !run_connected_realtime_socket(
1208 &mut transport,
1209 &mut socket,
1210 &trigger,
1211 &command_rx,
1212 &mut active_presence,
1213 event_handler.as_deref(),
1214 ) {
1215 return;
1216 }
1217 }
1218 Err(_) => {
1219 reconnect_attempt = reconnect_attempt.saturating_add(1);
1220 }
1221 }
1222
1223 let delay =
1224 Duration::from_millis(retry_backoff_delay_ms(reconnect_attempt).max(250) as u64);
1225 match command_rx.recv_timeout(delay) {
1226 Ok(RealtimeWorkerCommand::Stop) | Err(RecvTimeoutError::Disconnected) => return,
1227 Ok(RealtimeWorkerCommand::SetAuthHeaders(headers)) => {
1228 transport.set_auth_headers(headers);
1229 reconnect_attempt = 0;
1230 }
1231 Ok(RealtimeWorkerCommand::SendPresence {
1232 action,
1233 scope_key,
1234 metadata,
1235 }) => {
1236 apply_active_presence_command(&mut active_presence, &action, &scope_key, metadata);
1237 reconnect_attempt = 0;
1238 }
1239 Err(RecvTimeoutError::Timeout) => {}
1240 }
1241 }
1242}
1243
1244fn run_connected_realtime_socket<T>(
1245 transport: &mut T,
1246 socket: &mut T::Realtime,
1247 trigger: &SyncWorkerTrigger,
1248 command_rx: &Receiver<RealtimeWorkerCommand>,
1249 active_presence: &mut BTreeMap<String, Option<Value>>,
1250 event_handler: Option<&(dyn Fn(RealtimeEvent) + Send + Sync)>,
1251) -> bool
1252where
1253 T: SyncTransport + SyncAuthHeaderStore,
1254{
1255 loop {
1256 match drain_realtime_commands(transport, Some(&mut *socket), command_rx, active_presence) {
1257 Some(true) => {
1258 socket.close();
1259 return true;
1260 }
1261 Some(false) => {}
1262 None => {
1263 socket.close();
1264 return false;
1265 }
1266 }
1267
1268 match socket.read_event() {
1269 Ok(Some(RealtimeEvent::Sync)) => {
1270 let _ = trigger.trigger_sync();
1271 }
1272 Ok(Some(event @ RealtimeEvent::Presence(_))) => {
1273 if let Some(handler) = event_handler {
1274 handler(event);
1275 }
1276 }
1277 Ok(Some(RealtimeEvent::Other(_))) => {}
1278 Ok(None) => match command_rx.recv_timeout(Duration::from_millis(250)) {
1279 Ok(RealtimeWorkerCommand::Stop) | Err(RecvTimeoutError::Disconnected) => {
1280 socket.close();
1281 return false;
1282 }
1283 Ok(RealtimeWorkerCommand::SetAuthHeaders(headers)) => {
1284 transport.set_auth_headers(headers);
1285 socket.close();
1286 return true;
1287 }
1288 Ok(RealtimeWorkerCommand::SendPresence {
1289 action,
1290 scope_key,
1291 metadata,
1292 }) => {
1293 apply_active_presence_command(
1294 active_presence,
1295 &action,
1296 &scope_key,
1297 metadata.clone(),
1298 );
1299 let _ = socket.send_presence(&action, &scope_key, metadata.as_ref());
1300 }
1301 Err(RecvTimeoutError::Timeout) => {}
1302 },
1303 Err(_) => {
1304 socket.close();
1305 return true;
1306 }
1307 }
1308 }
1309}
1310
1311fn drain_realtime_commands<T>(
1312 transport: &mut T,
1313 mut socket: Option<&mut T::Realtime>,
1314 command_rx: &Receiver<RealtimeWorkerCommand>,
1315 active_presence: &mut BTreeMap<String, Option<Value>>,
1316) -> Option<bool>
1317where
1318 T: SyncAuthHeaderStore + SyncTransport,
1319{
1320 let mut reconnect = false;
1321 loop {
1322 match command_rx.try_recv() {
1323 Ok(RealtimeWorkerCommand::Stop) | Err(mpsc::TryRecvError::Disconnected) => {
1324 return None;
1325 }
1326 Ok(RealtimeWorkerCommand::SetAuthHeaders(headers)) => {
1327 transport.set_auth_headers(headers);
1328 reconnect = true;
1329 }
1330 Ok(RealtimeWorkerCommand::SendPresence {
1331 action,
1332 scope_key,
1333 metadata,
1334 }) => {
1335 apply_active_presence_command(
1336 active_presence,
1337 &action,
1338 &scope_key,
1339 metadata.clone(),
1340 );
1341 if let Some(socket) = socket.as_deref_mut() {
1342 let _ = socket.send_presence(&action, &scope_key, metadata.as_ref());
1343 }
1344 }
1345 Err(mpsc::TryRecvError::Empty) => return Some(reconnect),
1346 }
1347 }
1348}
1349
1350fn apply_active_presence_command(
1351 active_presence: &mut BTreeMap<String, Option<Value>>,
1352 action: &str,
1353 scope_key: &str,
1354 metadata: Option<Value>,
1355) {
1356 match action {
1357 "leave" => {
1358 active_presence.remove(scope_key);
1359 }
1360 "join" | "update" => {
1361 active_presence.insert(scope_key.to_string(), metadata);
1362 }
1363 _ => {}
1364 }
1365}
1366
1367fn rejoin_realtime_presence<T>(socket: &mut T, active_presence: &BTreeMap<String, Option<Value>>)
1368where
1369 T: RealtimeTransport,
1370{
1371 for (scope_key, metadata) in active_presence {
1372 let _ = socket.send_presence("join", scope_key, metadata.as_ref());
1373 }
1374}
1375
1376impl SyncWorker {
1377 pub fn start<S, T>(client: SyncularClient<S, T>) -> Self
1378 where
1379 S: SyncStore + SyncStateStore + Send + 'static,
1380 T: SyncTransport + SyncAuthHeaderStore + Send + 'static,
1381 SyncularClient<S, T>: SyncWorkerClientExt,
1382 {
1383 Self::start_with_config(client, SyncWorkerConfig::default())
1384 }
1385
1386 pub fn start_with_config<S, T>(
1387 mut client: SyncularClient<S, T>,
1388 config: SyncWorkerConfig,
1389 ) -> Self
1390 where
1391 S: SyncStore + SyncStateStore + Send + 'static,
1392 T: SyncTransport + SyncAuthHeaderStore + Send + 'static,
1393 SyncularClient<S, T>: SyncWorkerClientExt,
1394 {
1395 let (command_tx, command_rx) = mpsc::sync_channel(config.command_queue_capacity);
1396 let app_schema = client.app_schema();
1397 let events = SyncWorkerEventHub::default();
1398 let default_events = events.subscribe(DEFAULT_WORKER_EVENT_QUEUE_CAPACITY);
1399 let worker_events = events.clone();
1400 let join = thread::spawn(move || {
1401 let _close_events = CloseWorkerEventsOnDrop(worker_events.clone());
1402 let mut pending_yjs = BTreeMap::new();
1403 loop {
1404 let wake = if pending_yjs.is_empty() {
1405 match next_retry_timeout(&mut client) {
1406 Some(timeout) => match command_rx.recv_timeout(timeout) {
1407 Ok(command) => WorkerWake::Command(command),
1408 Err(RecvTimeoutError::Timeout) => WorkerWake::Retry,
1409 Err(RecvTimeoutError::Disconnected) => return,
1410 },
1411 None => match command_rx.recv() {
1412 Ok(command) => WorkerWake::Command(command),
1413 Err(_) => return,
1414 },
1415 }
1416 } else {
1417 match command_rx.recv_timeout(config.yjs_flush_window) {
1418 Ok(command) => WorkerWake::Command(command),
1419 Err(RecvTimeoutError::Timeout) => WorkerWake::FlushYjs,
1420 Err(RecvTimeoutError::Disconnected) => return,
1421 }
1422 };
1423
1424 match wake {
1425 WorkerWake::FlushYjs => {
1426 if flush_pending_yjs(&mut client, &mut pending_yjs, &worker_events) {
1427 if !run_until_settled(
1428 &mut client,
1429 &command_rx,
1430 &worker_events,
1431 &mut pending_yjs,
1432 None,
1433 false,
1434 WorkerSyncTransport::Http,
1435 ) {
1436 return;
1437 }
1438 }
1439 }
1440 WorkerWake::Retry => {
1441 if !run_due_retry_work(
1442 &mut client,
1443 &command_rx,
1444 &worker_events,
1445 &mut pending_yjs,
1446 ) {
1447 return;
1448 }
1449 }
1450 WorkerWake::Command(command) => {
1451 if !handle_command(
1452 &mut client,
1453 &command_rx,
1454 &worker_events,
1455 &mut pending_yjs,
1456 command,
1457 ) {
1458 return;
1459 }
1460 }
1461 }
1462 }
1463 });
1464
1465 Self {
1466 app_schema,
1467 command_tx,
1468 events,
1469 default_events,
1470 join: Some(join),
1471 }
1472 }
1473
1474 pub fn trigger_sync(&self) -> Result<()> {
1475 self.trigger_sync_inner(None, false, WorkerSyncTransport::Http)
1476 }
1477
1478 pub fn trigger_sync_websocket(&self) -> Result<()> {
1479 self.trigger_sync_inner(None, false, WorkerSyncTransport::WebSocket)
1480 }
1481
1482 pub fn trigger_handle(&self) -> SyncWorkerTrigger {
1483 SyncWorkerTrigger {
1484 command_tx: self.command_tx.clone(),
1485 }
1486 }
1487
1488 pub fn subscribe_events(&self, capacity: usize) -> SyncWorkerEventSubscription {
1489 self.events.subscribe(capacity)
1490 }
1491
1492 pub fn event_source(&self) -> SyncWorkerEventSubscription {
1493 self.subscribe_events(DEFAULT_WORKER_EVENT_QUEUE_CAPACITY)
1494 }
1495
1496 pub fn enqueue_sync_now(&self, command_id: String) -> Result<()> {
1497 self.trigger_sync_inner(Some(command_id), true, WorkerSyncTransport::Http)
1498 }
1499
1500 pub fn enqueue_sync_websocket(&self, command_id: String) -> Result<()> {
1501 self.trigger_sync_inner(Some(command_id), true, WorkerSyncTransport::WebSocket)
1502 }
1503
1504 pub fn enqueue_mutation_json(
1505 &self,
1506 command_id: String,
1507 mutation_json: String,
1508 local_row_json: Option<String>,
1509 auto_sync: bool,
1510 ) -> Result<()> {
1511 validate_mutation_json_input_size(&mutation_json, local_row_json.as_deref())?;
1512 self.try_send(WorkerCommand::ApplyMutationJson {
1513 command_id,
1514 mutation_json,
1515 local_row_json,
1516 auto_sync,
1517 require_auth_lease: false,
1518 })
1519 }
1520
1521 pub fn enqueue_leased_mutation_json(
1522 &self,
1523 command_id: String,
1524 mutation_json: String,
1525 local_row_json: Option<String>,
1526 auto_sync: bool,
1527 ) -> Result<()> {
1528 validate_mutation_json_input_size(&mutation_json, local_row_json.as_deref())?;
1529 self.try_send(WorkerCommand::ApplyMutationJson {
1530 command_id,
1531 mutation_json,
1532 local_row_json,
1533 auto_sync,
1534 require_auth_lease: true,
1535 })
1536 }
1537
1538 pub fn enqueue_yjs_update_json(
1539 &self,
1540 command_id: String,
1541 update_json: String,
1542 auto_sync: bool,
1543 ) -> Result<()> {
1544 validate_save_yjs_update_json(&update_json)?;
1545 self.try_send(WorkerCommand::SaveYjsUpdateJson {
1546 command_id,
1547 update_json,
1548 auto_sync,
1549 })
1550 }
1551
1552 pub fn enqueue_crdt_field_text_json(
1553 &self,
1554 command_id: String,
1555 request_json: String,
1556 auto_sync: bool,
1557 ) -> Result<()> {
1558 validate_worker_crdt_field_text_request_json(&request_json)?;
1559 self.try_send(WorkerCommand::ApplyCrdtFieldTextJson {
1560 command_id,
1561 request_json,
1562 auto_sync,
1563 })
1564 }
1565
1566 pub fn enqueue_crdt_field_compaction_json(
1567 &self,
1568 command_id: String,
1569 request_json: String,
1570 auto_sync: bool,
1571 ) -> Result<()> {
1572 validate_crdt_request_json_size(&request_json)?;
1573 self.try_send(WorkerCommand::CompactCrdtFieldJson {
1574 command_id,
1575 request_json,
1576 auto_sync,
1577 })
1578 }
1579
1580 pub fn enqueue_encrypted_crdt_update_json(
1581 &self,
1582 command_id: String,
1583 request_json: String,
1584 auto_sync: bool,
1585 ) -> Result<()> {
1586 validate_crdt_request_json_size(&request_json)?;
1587 self.try_send(WorkerCommand::ApplyEncryptedCrdtUpdateJson {
1588 command_id,
1589 request_json,
1590 auto_sync,
1591 })
1592 }
1593
1594 pub fn enqueue_encrypted_crdt_checkpoint_json(
1595 &self,
1596 command_id: String,
1597 request_json: String,
1598 auto_sync: bool,
1599 ) -> Result<()> {
1600 validate_crdt_request_json_size(&request_json)?;
1601 self.try_send(WorkerCommand::ApplyEncryptedCrdtCheckpointJson {
1602 command_id,
1603 request_json,
1604 auto_sync,
1605 })
1606 }
1607
1608 pub fn enqueue_conflict_resolution(
1609 &self,
1610 command_id: String,
1611 conflict_id: String,
1612 resolution: String,
1613 auto_sync: bool,
1614 ) -> Result<()> {
1615 self.try_send(WorkerCommand::ResolveConflict {
1616 command_id,
1617 conflict_id,
1618 resolution,
1619 auto_sync,
1620 })
1621 }
1622
1623 pub fn enqueue_refresh_snapshot_json(
1624 &self,
1625 command_id: String,
1626 request_json: String,
1627 ) -> Result<()> {
1628 self.try_send(WorkerCommand::RefreshSnapshotJson {
1629 command_id,
1630 request_json,
1631 })
1632 }
1633
1634 pub fn enqueue_compact_storage_json(
1635 &self,
1636 command_id: String,
1637 options_json: Option<String>,
1638 ) -> Result<()> {
1639 self.try_send(WorkerCommand::CompactStorageJson {
1640 command_id,
1641 options_json,
1642 })
1643 }
1644
1645 pub fn enqueue_store_blob_file_json(
1646 &self,
1647 command_id: String,
1648 path: String,
1649 options_json: Option<String>,
1650 ) -> Result<()> {
1651 self.try_send(WorkerCommand::StoreBlobFileJson {
1652 command_id,
1653 path,
1654 options_json,
1655 })
1656 }
1657
1658 pub fn enqueue_retrieve_blob_file_json(
1659 &self,
1660 command_id: String,
1661 ref_json: String,
1662 path: String,
1663 options_json: Option<String>,
1664 ) -> Result<()> {
1665 self.try_send(WorkerCommand::RetrieveBlobFileJson {
1666 command_id,
1667 ref_json,
1668 path,
1669 options_json,
1670 })
1671 }
1672
1673 pub fn enqueue_process_blob_upload_queue(&self, command_id: String) -> Result<()> {
1674 self.try_send(WorkerCommand::ProcessBlobUploadQueue { command_id })
1675 }
1676
1677 pub fn enqueue_prune_blob_cache(&self, command_id: String, max_bytes: i64) -> Result<()> {
1678 self.try_send(WorkerCommand::PruneBlobCache {
1679 command_id,
1680 max_bytes,
1681 })
1682 }
1683
1684 pub fn enqueue_clear_blob_cache(&self, command_id: String) -> Result<()> {
1685 self.try_send(WorkerCommand::ClearBlobCache { command_id })
1686 }
1687
1688 pub fn set_auth_headers(&self, headers: SyncAuthHeaders) -> Result<()> {
1689 self.try_send(WorkerCommand::SetAuthHeaders(headers))
1690 }
1691
1692 pub fn set_subscriptions(&self, subscriptions: Vec<SubscriptionSpec>) -> Result<()> {
1693 validate_subscription_limits(&subscriptions)?;
1694 self.try_send(WorkerCommand::SetSubscriptions(subscriptions))
1695 }
1696
1697 pub fn set_field_encryption(&self, encryption: Option<FieldEncryption>) -> Result<()> {
1698 if let Some(encryption) = &encryption {
1699 validate_field_encryption_rules_against_app_schema(
1700 self.app_schema,
1701 encryption.rules(),
1702 )?;
1703 }
1704 self.try_send(WorkerCommand::SetFieldEncryption(encryption))
1705 }
1706
1707 pub fn set_encrypted_crdt(&self, encryption: Option<EncryptedCrdt>) -> Result<()> {
1708 if encryption.is_some() {
1709 validate_encrypted_crdt_against_app_schema(self.app_schema)?;
1710 }
1711 self.try_send(WorkerCommand::SetEncryptedCrdt(encryption))
1712 }
1713
1714 pub fn set_blob_encryption(&self, encryption: Option<BlobEncryption>) -> Result<()> {
1715 if encryption.is_some() {
1716 validate_blob_encryption_against_app_schema(self.app_schema)?;
1717 }
1718 self.try_send(WorkerCommand::SetBlobEncryption(encryption))
1719 }
1720
1721 pub fn recv_event_timeout(&self, timeout: Duration) -> Option<SyncWorkerEvent> {
1722 self.default_events.next_event_timeout(timeout)
1723 }
1724
1725 pub fn recv_result_timeout(&self, timeout: Duration) -> Option<Result<SyncReport>> {
1726 let deadline = Instant::now().checked_add(timeout)?;
1727 loop {
1728 let now = Instant::now();
1729 if now >= deadline {
1730 return None;
1731 }
1732 let remaining = deadline.saturating_duration_since(now);
1733 match self.recv_event_timeout(remaining)? {
1734 SyncWorkerEvent::SyncCompleted { report, .. } => return Some(Ok(report)),
1735 SyncWorkerEvent::SyncFailed { error, .. } => return Some(Err(error)),
1736 _ => continue,
1737 }
1738 }
1739 }
1740
1741 pub fn request_stop(&self) -> Result<()> {
1742 self.command_tx
1743 .send(WorkerCommand::Stop)
1744 .map_err(|_| SyncularError::message(ErrorKind::Internal, "sync worker is not running"))
1745 }
1746
1747 pub fn join(&mut self) -> Result<()> {
1748 if let Some(join) = self.join.take() {
1749 join.join()
1750 .map_err(|_| SyncularError::message(ErrorKind::Internal, "sync worker panicked"))?;
1751 }
1752 Ok(())
1753 }
1754
1755 pub fn stop(mut self) -> Result<()> {
1756 let _ = self.request_stop();
1757 self.join()
1758 }
1759
1760 fn trigger_sync_inner(
1761 &self,
1762 command_id: Option<String>,
1763 emit_started: bool,
1764 transport: WorkerSyncTransport,
1765 ) -> Result<()> {
1766 self.try_send(WorkerCommand::Trigger {
1767 command_id,
1768 emit_started,
1769 transport,
1770 })
1771 }
1772
1773 fn try_send(&self, command: WorkerCommand) -> Result<()> {
1774 self.command_tx.try_send(command).map_err(|err| match err {
1775 TrySendError::Full(_) => SyncularError::busy("sync worker command queue is full"),
1776 TrySendError::Disconnected(_) => {
1777 SyncularError::message(ErrorKind::Internal, "sync worker is not running")
1778 }
1779 })
1780 }
1781}
1782
1783impl Drop for SyncWorker {
1784 fn drop(&mut self) {
1785 let _ = self.command_tx.send(WorkerCommand::Stop);
1786 if let Some(join) = self.join.take() {
1787 let _ = join.join();
1788 }
1789 }
1790}
1791
1792fn handle_command<S, T>(
1793 client: &mut SyncularClient<S, T>,
1794 command_rx: &Receiver<WorkerCommand>,
1795 event_tx: &SyncWorkerEventHub,
1796 pending_yjs: &mut BTreeMap<YjsBatchKey, PendingYjsBatch>,
1797 command: WorkerCommand,
1798) -> bool
1799where
1800 S: SyncStore + SyncStateStore,
1801 T: SyncTransport + SyncAuthHeaderStore,
1802 SyncularClient<S, T>: SyncWorkerClientExt,
1803{
1804 match command {
1805 WorkerCommand::Trigger {
1806 command_id,
1807 emit_started,
1808 transport,
1809 } => run_until_settled(
1810 client,
1811 command_rx,
1812 event_tx,
1813 pending_yjs,
1814 command_id,
1815 emit_started,
1816 transport,
1817 ),
1818 WorkerCommand::ApplyMutationJson {
1819 command_id,
1820 mutation_json,
1821 local_row_json,
1822 auto_sync,
1823 require_auth_lease,
1824 } => {
1825 if flush_pending_yjs(client, pending_yjs, event_tx) {
1826 if !run_until_settled(
1827 client,
1828 command_rx,
1829 event_tx,
1830 pending_yjs,
1831 None,
1832 false,
1833 WorkerSyncTransport::Http,
1834 ) {
1835 return false;
1836 }
1837 }
1838 let should_sync = apply_mutation_json(
1839 client,
1840 event_tx,
1841 command_id,
1842 &mutation_json,
1843 local_row_json.as_deref(),
1844 auto_sync,
1845 require_auth_lease,
1846 );
1847 if should_sync {
1848 run_until_settled(
1849 client,
1850 command_rx,
1851 event_tx,
1852 pending_yjs,
1853 None,
1854 false,
1855 WorkerSyncTransport::Http,
1856 )
1857 } else {
1858 true
1859 }
1860 }
1861 WorkerCommand::SaveYjsUpdateJson {
1862 command_id,
1863 update_json,
1864 auto_sync,
1865 } => {
1866 queue_yjs_update_json(pending_yjs, event_tx, command_id, &update_json, auto_sync);
1867 true
1868 }
1869 WorkerCommand::ApplyCrdtFieldTextJson {
1870 command_id,
1871 request_json,
1872 auto_sync,
1873 } => {
1874 if flush_pending_yjs(client, pending_yjs, event_tx) {
1875 if !run_until_settled(
1876 client,
1877 command_rx,
1878 event_tx,
1879 pending_yjs,
1880 None,
1881 false,
1882 WorkerSyncTransport::Http,
1883 ) {
1884 return false;
1885 }
1886 }
1887 let should_sync =
1888 apply_crdt_field_text_json(client, event_tx, command_id, &request_json, auto_sync);
1889 if should_sync {
1890 run_until_settled(
1891 client,
1892 command_rx,
1893 event_tx,
1894 pending_yjs,
1895 None,
1896 false,
1897 WorkerSyncTransport::Http,
1898 )
1899 } else {
1900 true
1901 }
1902 }
1903 WorkerCommand::CompactCrdtFieldJson {
1904 command_id,
1905 request_json,
1906 auto_sync,
1907 } => {
1908 if flush_pending_yjs(client, pending_yjs, event_tx) {
1909 if !run_until_settled(
1910 client,
1911 command_rx,
1912 event_tx,
1913 pending_yjs,
1914 None,
1915 false,
1916 WorkerSyncTransport::Http,
1917 ) {
1918 return false;
1919 }
1920 }
1921 let should_sync =
1922 compact_crdt_field_json(client, event_tx, command_id, &request_json, auto_sync);
1923 if should_sync {
1924 run_until_settled(
1925 client,
1926 command_rx,
1927 event_tx,
1928 pending_yjs,
1929 None,
1930 false,
1931 WorkerSyncTransport::Http,
1932 )
1933 } else {
1934 true
1935 }
1936 }
1937 WorkerCommand::ApplyEncryptedCrdtUpdateJson {
1938 command_id,
1939 request_json,
1940 auto_sync,
1941 } => {
1942 if flush_pending_yjs(client, pending_yjs, event_tx) {
1943 if !run_until_settled(
1944 client,
1945 command_rx,
1946 event_tx,
1947 pending_yjs,
1948 None,
1949 false,
1950 WorkerSyncTransport::Http,
1951 ) {
1952 return false;
1953 }
1954 }
1955 let should_sync = apply_encrypted_crdt_update_json(
1956 client,
1957 event_tx,
1958 command_id,
1959 &request_json,
1960 auto_sync,
1961 );
1962 if should_sync {
1963 run_until_settled(
1964 client,
1965 command_rx,
1966 event_tx,
1967 pending_yjs,
1968 None,
1969 false,
1970 WorkerSyncTransport::Http,
1971 )
1972 } else {
1973 true
1974 }
1975 }
1976 WorkerCommand::ApplyEncryptedCrdtCheckpointJson {
1977 command_id,
1978 request_json,
1979 auto_sync,
1980 } => {
1981 if flush_pending_yjs(client, pending_yjs, event_tx) {
1982 if !run_until_settled(
1983 client,
1984 command_rx,
1985 event_tx,
1986 pending_yjs,
1987 None,
1988 false,
1989 WorkerSyncTransport::Http,
1990 ) {
1991 return false;
1992 }
1993 }
1994 let should_sync = apply_encrypted_crdt_checkpoint_json(
1995 client,
1996 event_tx,
1997 command_id,
1998 &request_json,
1999 auto_sync,
2000 );
2001 if should_sync {
2002 run_until_settled(
2003 client,
2004 command_rx,
2005 event_tx,
2006 pending_yjs,
2007 None,
2008 false,
2009 WorkerSyncTransport::Http,
2010 )
2011 } else {
2012 true
2013 }
2014 }
2015 WorkerCommand::ResolveConflict {
2016 command_id,
2017 conflict_id,
2018 resolution,
2019 auto_sync,
2020 } => {
2021 if flush_pending_yjs(client, pending_yjs, event_tx) {
2022 if !run_until_settled(
2023 client,
2024 command_rx,
2025 event_tx,
2026 pending_yjs,
2027 None,
2028 false,
2029 WorkerSyncTransport::Http,
2030 ) {
2031 return false;
2032 }
2033 }
2034 let should_sync = resolve_conflict(
2035 client,
2036 event_tx,
2037 command_id,
2038 &conflict_id,
2039 &resolution,
2040 auto_sync,
2041 );
2042 if should_sync {
2043 run_until_settled(
2044 client,
2045 command_rx,
2046 event_tx,
2047 pending_yjs,
2048 None,
2049 false,
2050 WorkerSyncTransport::Http,
2051 )
2052 } else {
2053 true
2054 }
2055 }
2056 WorkerCommand::RefreshSnapshotJson {
2057 command_id,
2058 request_json,
2059 } => {
2060 if flush_pending_yjs(client, pending_yjs, event_tx) {
2061 if !run_until_settled(
2062 client,
2063 command_rx,
2064 event_tx,
2065 pending_yjs,
2066 None,
2067 false,
2068 WorkerSyncTransport::Http,
2069 ) {
2070 return false;
2071 }
2072 }
2073 refresh_snapshot_json(client, event_tx, command_id, &request_json);
2074 true
2075 }
2076 WorkerCommand::CompactStorageJson {
2077 command_id,
2078 options_json,
2079 } => {
2080 if flush_pending_yjs(client, pending_yjs, event_tx) {
2081 if !run_until_settled(
2082 client,
2083 command_rx,
2084 event_tx,
2085 pending_yjs,
2086 None,
2087 false,
2088 WorkerSyncTransport::Http,
2089 ) {
2090 return false;
2091 }
2092 }
2093 run_worker_json_command(client, event_tx, command_id, "compactStorage", |client| {
2094 client.worker_compact_storage_json(options_json.as_deref())
2095 });
2096 true
2097 }
2098 WorkerCommand::StoreBlobFileJson {
2099 command_id,
2100 path,
2101 options_json,
2102 } => {
2103 if run_worker_json_command(client, event_tx, command_id, "storeBlobFile", |client| {
2104 client.worker_store_blob_file_json(&path, options_json.as_deref())
2105 }) {
2106 publish_blob_uploads_changed(client, event_tx);
2107 }
2108 true
2109 }
2110 WorkerCommand::RetrieveBlobFileJson {
2111 command_id,
2112 ref_json,
2113 path,
2114 options_json,
2115 } => {
2116 run_worker_json_command(client, event_tx, command_id, "retrieveBlobFile", |client| {
2117 client.worker_retrieve_blob_file_json(&ref_json, &path, options_json.as_deref())
2118 });
2119 true
2120 }
2121 WorkerCommand::ProcessBlobUploadQueue { command_id } => {
2122 if run_worker_json_command(
2123 client,
2124 event_tx,
2125 command_id,
2126 "processBlobUploadQueue",
2127 |client| {
2128 client
2129 .worker_process_blob_upload_queue_json()?
2130 .ok_or_else(|| {
2131 SyncularError::config(
2132 "worker-owned blob upload queue processing is not available for this client",
2133 )
2134 })
2135 },
2136 ) {
2137 publish_blob_uploads_changed(client, event_tx);
2138 }
2139 true
2140 }
2141 WorkerCommand::PruneBlobCache {
2142 command_id,
2143 max_bytes,
2144 } => {
2145 run_worker_json_command(client, event_tx, command_id, "pruneBlobCache", |client| {
2146 client.worker_prune_blob_cache_json(max_bytes)
2147 });
2148 true
2149 }
2150 WorkerCommand::ClearBlobCache { command_id } => {
2151 run_worker_json_command(client, event_tx, command_id, "clearBlobCache", |client| {
2152 client.worker_clear_blob_cache_json()
2153 });
2154 true
2155 }
2156 WorkerCommand::SetAuthHeaders(headers) => {
2157 client.set_auth_headers(headers);
2158 true
2159 }
2160 WorkerCommand::SetSubscriptions(subscriptions) => {
2161 let _ = client.set_subscriptions(subscriptions);
2162 true
2163 }
2164 WorkerCommand::SetFieldEncryption(encryption) => {
2165 let _ = client.set_field_encryption(encryption);
2166 true
2167 }
2168 WorkerCommand::SetEncryptedCrdt(encryption) => {
2169 let _ = client.set_encrypted_crdt(encryption);
2170 true
2171 }
2172 WorkerCommand::SetBlobEncryption(encryption) => {
2173 let _ = client.set_blob_encryption(encryption);
2174 true
2175 }
2176 WorkerCommand::Stop => {
2177 let _ = flush_pending_yjs(client, pending_yjs, event_tx);
2178 false
2179 }
2180 }
2181}
2182
2183fn run_due_retry_work<S, T>(
2184 client: &mut SyncularClient<S, T>,
2185 command_rx: &Receiver<WorkerCommand>,
2186 event_tx: &SyncWorkerEventHub,
2187 pending_yjs: &mut BTreeMap<YjsBatchKey, PendingYjsBatch>,
2188) -> bool
2189where
2190 S: SyncStore + SyncStateStore,
2191 T: SyncTransport + SyncAuthHeaderStore,
2192 SyncularClient<S, T>: SyncWorkerClientExt,
2193{
2194 if due_now(client.worker_next_blob_upload_retry_at_ms()) {
2195 process_due_blob_upload_queue(client, event_tx);
2196 }
2197
2198 if due_now(client.worker_next_outbox_retry_at_ms()) {
2199 run_until_settled(
2200 client,
2201 command_rx,
2202 event_tx,
2203 pending_yjs,
2204 None,
2205 false,
2206 WorkerSyncTransport::Http,
2207 )
2208 } else {
2209 true
2210 }
2211}
2212
2213fn process_due_blob_upload_queue<S, T>(
2214 client: &mut SyncularClient<S, T>,
2215 event_tx: &SyncWorkerEventHub,
2216) where
2217 S: SyncStore + SyncStateStore,
2218 T: SyncTransport,
2219 SyncularClient<S, T>: SyncWorkerClientExt,
2220{
2221 let command_id = retry_wakeup_command_id("blob-retry");
2222 let started = Instant::now();
2223 match client.worker_process_blob_upload_queue_json() {
2224 Ok(Some(payload_json)) => {
2225 let payload_json = serde_json::from_str(&payload_json).ok();
2226 let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandCompleted {
2227 command_id,
2228 operation: "processBlobUploadQueue",
2229 payload_json,
2230 duration_ms: duration_ms(started),
2231 });
2232 publish_blob_uploads_changed(client, event_tx);
2233 }
2234 Ok(None) => {}
2235 Err(error) => {
2236 let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandFailed {
2237 command_id,
2238 operation: "processBlobUploadQueue",
2239 error,
2240 duration_ms: duration_ms(started),
2241 });
2242 }
2243 }
2244}
2245
2246fn run_until_settled<S, T>(
2247 client: &mut SyncularClient<S, T>,
2248 command_rx: &Receiver<WorkerCommand>,
2249 event_tx: &SyncWorkerEventHub,
2250 pending_yjs: &mut BTreeMap<YjsBatchKey, PendingYjsBatch>,
2251 initial_command_id: Option<String>,
2252 initial_emit_started: bool,
2253 initial_transport: WorkerSyncTransport,
2254) -> bool
2255where
2256 S: SyncStore + SyncStateStore,
2257 T: SyncTransport + SyncAuthHeaderStore,
2258 SyncularClient<S, T>: SyncWorkerClientExt,
2259{
2260 let mut should_sync = Some((initial_command_id, initial_emit_started, initial_transport));
2261 while let Some((command_id, emit_started, transport)) = should_sync.take() {
2262 run_sync(client, event_tx, command_id, emit_started, transport);
2263
2264 let mut next_sync: Option<(Option<String>, bool, WorkerSyncTransport)> = None;
2265 loop {
2266 match command_rx.try_recv() {
2267 Ok(WorkerCommand::Trigger {
2268 command_id,
2269 emit_started,
2270 transport,
2271 }) => {
2272 next_sync = Some(match next_sync {
2273 Some((existing_id, existing_emit_started, existing_transport)) => (
2274 existing_id.or(command_id),
2275 existing_emit_started || emit_started,
2276 existing_transport.coalesce(transport),
2277 ),
2278 None => (command_id, emit_started, transport),
2279 });
2280 }
2281 Ok(WorkerCommand::SetAuthHeaders(headers)) => {
2282 client.set_auth_headers(headers);
2283 }
2284 Ok(WorkerCommand::SetSubscriptions(subscriptions)) => {
2285 let _ = client.set_subscriptions(subscriptions);
2286 }
2287 Ok(WorkerCommand::SetFieldEncryption(encryption)) => {
2288 let _ = client.set_field_encryption(encryption);
2289 }
2290 Ok(WorkerCommand::SetEncryptedCrdt(encryption)) => {
2291 let _ = client.set_encrypted_crdt(encryption);
2292 }
2293 Ok(WorkerCommand::SetBlobEncryption(encryption)) => {
2294 let _ = client.set_blob_encryption(encryption);
2295 }
2296 Ok(WorkerCommand::ApplyMutationJson {
2297 command_id,
2298 mutation_json,
2299 local_row_json,
2300 auto_sync,
2301 require_auth_lease,
2302 }) => {
2303 if flush_pending_yjs(client, pending_yjs, event_tx) {
2304 next_sync = Some((None, false, WorkerSyncTransport::Http));
2305 }
2306 if apply_mutation_json(
2307 client,
2308 event_tx,
2309 command_id,
2310 &mutation_json,
2311 local_row_json.as_deref(),
2312 auto_sync,
2313 require_auth_lease,
2314 ) {
2315 next_sync = Some((None, false, WorkerSyncTransport::Http));
2316 }
2317 }
2318 Ok(WorkerCommand::SaveYjsUpdateJson {
2319 command_id,
2320 update_json,
2321 auto_sync,
2322 }) => {
2323 queue_yjs_update_json(
2324 pending_yjs,
2325 event_tx,
2326 command_id,
2327 &update_json,
2328 auto_sync,
2329 );
2330 }
2331 Ok(WorkerCommand::ApplyCrdtFieldTextJson {
2332 command_id,
2333 request_json,
2334 auto_sync,
2335 }) => {
2336 if flush_pending_yjs(client, pending_yjs, event_tx) {
2337 next_sync = Some((None, false, WorkerSyncTransport::Http));
2338 }
2339 if apply_crdt_field_text_json(
2340 client,
2341 event_tx,
2342 command_id,
2343 &request_json,
2344 auto_sync,
2345 ) {
2346 next_sync = Some((None, false, WorkerSyncTransport::Http));
2347 }
2348 }
2349 Ok(WorkerCommand::CompactCrdtFieldJson {
2350 command_id,
2351 request_json,
2352 auto_sync,
2353 }) => {
2354 if flush_pending_yjs(client, pending_yjs, event_tx) {
2355 next_sync = Some((None, false, WorkerSyncTransport::Http));
2356 }
2357 if compact_crdt_field_json(
2358 client,
2359 event_tx,
2360 command_id,
2361 &request_json,
2362 auto_sync,
2363 ) {
2364 next_sync = Some((None, false, WorkerSyncTransport::Http));
2365 }
2366 }
2367 Ok(WorkerCommand::ApplyEncryptedCrdtUpdateJson {
2368 command_id,
2369 request_json,
2370 auto_sync,
2371 }) => {
2372 if flush_pending_yjs(client, pending_yjs, event_tx) {
2373 next_sync = Some((None, false, WorkerSyncTransport::Http));
2374 }
2375 if apply_encrypted_crdt_update_json(
2376 client,
2377 event_tx,
2378 command_id,
2379 &request_json,
2380 auto_sync,
2381 ) {
2382 next_sync = Some((None, false, WorkerSyncTransport::Http));
2383 }
2384 }
2385 Ok(WorkerCommand::ApplyEncryptedCrdtCheckpointJson {
2386 command_id,
2387 request_json,
2388 auto_sync,
2389 }) => {
2390 if flush_pending_yjs(client, pending_yjs, event_tx) {
2391 next_sync = Some((None, false, WorkerSyncTransport::Http));
2392 }
2393 if apply_encrypted_crdt_checkpoint_json(
2394 client,
2395 event_tx,
2396 command_id,
2397 &request_json,
2398 auto_sync,
2399 ) {
2400 next_sync = Some((None, false, WorkerSyncTransport::Http));
2401 }
2402 }
2403 Ok(WorkerCommand::ResolveConflict {
2404 command_id,
2405 conflict_id,
2406 resolution,
2407 auto_sync,
2408 }) => {
2409 if flush_pending_yjs(client, pending_yjs, event_tx) {
2410 next_sync = Some((None, false, WorkerSyncTransport::Http));
2411 }
2412 if resolve_conflict(
2413 client,
2414 event_tx,
2415 command_id,
2416 &conflict_id,
2417 &resolution,
2418 auto_sync,
2419 ) {
2420 next_sync = Some((None, false, WorkerSyncTransport::Http));
2421 }
2422 }
2423 Ok(WorkerCommand::RefreshSnapshotJson {
2424 command_id,
2425 request_json,
2426 }) => {
2427 if flush_pending_yjs(client, pending_yjs, event_tx) {
2428 next_sync = Some((None, false, WorkerSyncTransport::Http));
2429 }
2430 refresh_snapshot_json(client, event_tx, command_id, &request_json);
2431 }
2432 Ok(WorkerCommand::CompactStorageJson {
2433 command_id,
2434 options_json,
2435 }) => {
2436 if flush_pending_yjs(client, pending_yjs, event_tx) {
2437 next_sync = Some((None, false, WorkerSyncTransport::Http));
2438 }
2439 run_worker_json_command(
2440 client,
2441 event_tx,
2442 command_id,
2443 "compactStorage",
2444 |client| client.worker_compact_storage_json(options_json.as_deref()),
2445 );
2446 }
2447 Ok(WorkerCommand::StoreBlobFileJson {
2448 command_id,
2449 path,
2450 options_json,
2451 }) => {
2452 if run_worker_json_command(
2453 client,
2454 event_tx,
2455 command_id,
2456 "storeBlobFile",
2457 |client| client.worker_store_blob_file_json(&path, options_json.as_deref()),
2458 ) {
2459 publish_blob_uploads_changed(client, event_tx);
2460 }
2461 }
2462 Ok(WorkerCommand::RetrieveBlobFileJson {
2463 command_id,
2464 ref_json,
2465 path,
2466 options_json,
2467 }) => {
2468 run_worker_json_command(
2469 client,
2470 event_tx,
2471 command_id,
2472 "retrieveBlobFile",
2473 |client| {
2474 client.worker_retrieve_blob_file_json(
2475 &ref_json,
2476 &path,
2477 options_json.as_deref(),
2478 )
2479 },
2480 );
2481 }
2482 Ok(WorkerCommand::ProcessBlobUploadQueue { command_id }) => {
2483 if run_worker_json_command(
2484 client,
2485 event_tx,
2486 command_id,
2487 "processBlobUploadQueue",
2488 |client| {
2489 client
2490 .worker_process_blob_upload_queue_json()?
2491 .ok_or_else(|| {
2492 SyncularError::config(
2493 "worker-owned blob upload queue processing is not available for this client",
2494 )
2495 })
2496 },
2497 ) {
2498 publish_blob_uploads_changed(client, event_tx);
2499 }
2500 }
2501 Ok(WorkerCommand::PruneBlobCache {
2502 command_id,
2503 max_bytes,
2504 }) => {
2505 run_worker_json_command(
2506 client,
2507 event_tx,
2508 command_id,
2509 "pruneBlobCache",
2510 |client| client.worker_prune_blob_cache_json(max_bytes),
2511 );
2512 }
2513 Ok(WorkerCommand::ClearBlobCache { command_id }) => {
2514 run_worker_json_command(
2515 client,
2516 event_tx,
2517 command_id,
2518 "clearBlobCache",
2519 |client| client.worker_clear_blob_cache_json(),
2520 );
2521 }
2522 Ok(WorkerCommand::Stop) | Err(mpsc::TryRecvError::Disconnected) => {
2523 let _ = flush_pending_yjs(client, pending_yjs, event_tx);
2524 return false;
2525 }
2526 Err(mpsc::TryRecvError::Empty) => break,
2527 }
2528 }
2529
2530 if flush_pending_yjs(client, pending_yjs, event_tx) {
2531 next_sync = Some((None, false, WorkerSyncTransport::Http));
2532 }
2533 should_sync = next_sync;
2534 }
2535 true
2536}
2537
2538fn run_sync<S, T>(
2539 client: &mut SyncularClient<S, T>,
2540 event_tx: &SyncWorkerEventHub,
2541 command_id: Option<String>,
2542 emit_started: bool,
2543 transport: WorkerSyncTransport,
2544) where
2545 S: SyncStore + SyncStateStore,
2546 T: SyncTransport,
2547{
2548 if emit_started {
2549 let _ = event_tx.publish_event(SyncWorkerEvent::SyncStarted {
2550 command_id: command_id.clone(),
2551 });
2552 }
2553
2554 let started = Instant::now();
2555 let result = match transport {
2556 WorkerSyncTransport::Http => client.sync_http(),
2557 WorkerSyncTransport::WebSocket => client.sync_ws(),
2558 };
2559 #[cfg(feature = "native")]
2560 let result = result.and_then(|report| {
2561 let bootstrap = client.bootstrap_status()?;
2562 Ok((report, bootstrap))
2563 });
2564
2565 match result {
2566 #[cfg(feature = "native")]
2567 Ok((report, bootstrap)) => {
2568 let (outbox_count, conflict_count) = worker_counts(client).unwrap_or((0, 0));
2569 let _ = event_tx.publish_event(SyncWorkerEvent::SyncCompleted {
2570 command_id,
2571 report,
2572 bootstrap,
2573 outbox_count,
2574 conflict_count,
2575 duration_ms: duration_ms(started),
2576 });
2577 }
2578 #[cfg(not(feature = "native"))]
2579 Ok(report) => {
2580 let (outbox_count, conflict_count) = worker_counts(client).unwrap_or((0, 0));
2581 let _ = event_tx.publish_event(SyncWorkerEvent::SyncCompleted {
2582 command_id,
2583 report,
2584 outbox_count,
2585 conflict_count,
2586 duration_ms: duration_ms(started),
2587 });
2588 }
2589 Err(error) => {
2590 let retry_scheduled = retry_scheduled_after_error(client);
2591 let _ = event_tx.publish_event(SyncWorkerEvent::SyncFailed {
2592 command_id,
2593 error,
2594 retry_scheduled,
2595 duration_ms: duration_ms(started),
2596 });
2597 }
2598 }
2599}
2600
2601fn apply_mutation_json<S, T>(
2602 client: &mut SyncularClient<S, T>,
2603 event_tx: &SyncWorkerEventHub,
2604 command_id: String,
2605 mutation_json: &str,
2606 local_row_json: Option<&str>,
2607 auto_sync: bool,
2608 require_auth_lease: bool,
2609) -> bool
2610where
2611 S: SyncStore + SyncStateStore,
2612 T: SyncTransport,
2613 SyncularClient<S, T>: SyncWorkerClientExt,
2614{
2615 let started = Instant::now();
2616 let mutation = serde_json::from_str::<SyncOperation>(mutation_json).ok();
2617 let table = mutation
2618 .as_ref()
2619 .map(|mutation| mutation.table.clone())
2620 .unwrap_or_else(|| "unknown".to_string());
2621 let previous_row = mutation.as_ref().and_then(|mutation| {
2622 client
2623 .worker_current_row_json(&mutation.table, &mutation.row_id)
2624 .ok()
2625 .flatten()
2626 });
2627 let local_row = local_row_json
2628 .map(serde_json::from_str::<Value>)
2629 .transpose()
2630 .ok()
2631 .flatten();
2632 let applied = if require_auth_lease {
2633 client.apply_worker_leased_mutation_json(mutation_json, local_row_json)
2634 } else {
2635 client.apply_worker_mutation_json(mutation_json, local_row_json)
2636 };
2637 match applied {
2638 Ok(client_commit_id) => {
2639 let changed_rows = mutation
2640 .as_ref()
2641 .and_then(|mutation| {
2642 sync_changed_row_for_local_operation(
2643 client.app_schema(),
2644 mutation,
2645 previous_row.as_ref(),
2646 local_row.as_ref(),
2647 Some(client_commit_id.clone()),
2648 )
2649 })
2650 .into_iter()
2651 .collect();
2652 let outbox_count = worker_outbox_count(client);
2653 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteCommitted {
2654 command_id,
2655 client_commit_id,
2656 changed_tables: vec![table],
2657 changed_rows,
2658 outbox_count,
2659 duration_ms: duration_ms(started),
2660 });
2661 auto_sync
2662 }
2663 Err(error) => {
2664 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
2665 command_id,
2666 error,
2667 payload_json: None,
2668 duration_ms: duration_ms(started),
2669 });
2670 false
2671 }
2672 }
2673}
2674
2675fn apply_encrypted_crdt_update_json<S, T>(
2676 client: &mut SyncularClient<S, T>,
2677 event_tx: &SyncWorkerEventHub,
2678 command_id: String,
2679 request_json: &str,
2680 auto_sync: bool,
2681) -> bool
2682where
2683 S: SyncStore + SyncStateStore,
2684 T: SyncTransport,
2685 SyncularClient<S, T>: SyncWorkerClientExt,
2686{
2687 let started = Instant::now();
2688 match client.apply_worker_encrypted_crdt_update_json(request_json) {
2689 Ok(receipt) => {
2690 let crdt_event = WorkerEncryptedCrdtRequest::from_json(request_json).ok();
2691 let outbox_count = worker_outbox_count(client);
2692 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteCommitted {
2693 command_id: command_id.clone(),
2694 client_commit_id: receipt.client_commit_id.clone(),
2695 changed_tables: receipt.changed_tables.clone(),
2696 changed_rows: receipt.changed_rows.clone(),
2697 outbox_count,
2698 duration_ms: duration_ms(started),
2699 });
2700 if let Some(request) = crdt_event.and_then(WorkerEncryptedCrdtRequest::field_identity) {
2701 let _ = event_tx.publish_event(SyncWorkerEvent::CrdtFieldChanged {
2702 command_id,
2703 client_commit_id: receipt.client_commit_id,
2704 table: request.table,
2705 row_id: request.row_id,
2706 field: request.field,
2707 changed_tables: receipt.changed_tables,
2708 payload_json: receipt.crdt_event_payload_json,
2709 duration_ms: duration_ms(started),
2710 });
2711 }
2712 auto_sync
2713 }
2714 Err(error) => {
2715 let payload_json = crdt_field_failure_payload_json("encryptedCrdtUpdate", request_json);
2716 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
2717 command_id,
2718 error,
2719 payload_json: Some(payload_json),
2720 duration_ms: duration_ms(started),
2721 });
2722 false
2723 }
2724 }
2725}
2726
2727fn apply_encrypted_crdt_checkpoint_json<S, T>(
2728 client: &mut SyncularClient<S, T>,
2729 event_tx: &SyncWorkerEventHub,
2730 command_id: String,
2731 request_json: &str,
2732 auto_sync: bool,
2733) -> bool
2734where
2735 S: SyncStore + SyncStateStore,
2736 T: SyncTransport,
2737 SyncularClient<S, T>: SyncWorkerClientExt,
2738{
2739 let started = Instant::now();
2740 match client.apply_worker_encrypted_crdt_checkpoint_json(request_json) {
2741 Ok(Some(receipt)) => {
2742 let crdt_event = WorkerEncryptedCrdtRequest::from_json(request_json).ok();
2743 let outbox_count = worker_outbox_count(client);
2744 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteCommitted {
2745 command_id: command_id.clone(),
2746 client_commit_id: receipt.client_commit_id.clone(),
2747 changed_tables: receipt.changed_tables.clone(),
2748 changed_rows: receipt.changed_rows.clone(),
2749 outbox_count,
2750 duration_ms: duration_ms(started),
2751 });
2752 if let Some(request) = crdt_event.and_then(WorkerEncryptedCrdtRequest::field_identity) {
2753 let _ = event_tx.publish_event(SyncWorkerEvent::CrdtFieldCompacted {
2754 command_id,
2755 client_commit_id: Some(receipt.client_commit_id),
2756 table: request.table,
2757 row_id: request.row_id,
2758 field: request.field,
2759 changed_tables: receipt.changed_tables,
2760 checkpoint_created: true,
2761 payload_json: receipt.crdt_event_payload_json,
2762 duration_ms: duration_ms(started),
2763 });
2764 }
2765 auto_sync
2766 }
2767 Ok(None) => {
2768 let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandCompleted {
2769 command_id,
2770 operation: "encryptedCrdtCheckpoint",
2771 payload_json: Some(json!({ "checkpointed": false })),
2772 duration_ms: duration_ms(started),
2773 });
2774 false
2775 }
2776 Err(error) => {
2777 let payload_json =
2778 crdt_field_failure_payload_json("encryptedCrdtCheckpoint", request_json);
2779 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
2780 command_id,
2781 error,
2782 payload_json: Some(payload_json),
2783 duration_ms: duration_ms(started),
2784 });
2785 false
2786 }
2787 }
2788}
2789
2790fn apply_crdt_field_text_json<S, T>(
2791 client: &mut SyncularClient<S, T>,
2792 event_tx: &SyncWorkerEventHub,
2793 command_id: String,
2794 request_json: &str,
2795 auto_sync: bool,
2796) -> bool
2797where
2798 S: SyncStore + SyncStateStore,
2799 T: SyncTransport,
2800 SyncularClient<S, T>: SyncWorkerClientExt,
2801{
2802 let started = Instant::now();
2803 match client.apply_worker_crdt_field_text_json(request_json) {
2804 Ok(receipt) => {
2805 let crdt_event = WorkerCrdtFieldTextRequest::from_json(request_json).ok();
2806 let outbox_count = worker_outbox_count(client);
2807 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteCommitted {
2808 command_id: command_id.clone(),
2809 client_commit_id: receipt.client_commit_id.clone(),
2810 changed_tables: receipt.changed_tables.clone(),
2811 changed_rows: receipt.changed_rows.clone(),
2812 outbox_count,
2813 duration_ms: duration_ms(started),
2814 });
2815 if let Some(request) = crdt_event {
2816 let _ = event_tx.publish_event(SyncWorkerEvent::CrdtFieldChanged {
2817 command_id,
2818 client_commit_id: receipt.client_commit_id,
2819 table: request.table,
2820 row_id: request.row_id,
2821 field: request.field,
2822 changed_tables: receipt.changed_tables,
2823 payload_json: receipt.crdt_event_payload_json,
2824 duration_ms: duration_ms(started),
2825 });
2826 }
2827 auto_sync
2828 }
2829 Err(error) => {
2830 let payload_json = crdt_field_failure_payload_json("crdtFieldText", request_json);
2831 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
2832 command_id,
2833 error,
2834 payload_json: Some(payload_json),
2835 duration_ms: duration_ms(started),
2836 });
2837 false
2838 }
2839 }
2840}
2841
2842fn compact_crdt_field_json<S, T>(
2843 client: &mut SyncularClient<S, T>,
2844 event_tx: &SyncWorkerEventHub,
2845 command_id: String,
2846 request_json: &str,
2847 auto_sync: bool,
2848) -> bool
2849where
2850 S: SyncStore + SyncStateStore,
2851 T: SyncTransport,
2852 SyncularClient<S, T>: SyncWorkerClientExt,
2853{
2854 let started = Instant::now();
2855 match client.compact_worker_crdt_field_json(request_json) {
2856 Ok(Some(receipt)) => {
2857 let crdt_event = WorkerCrdtFieldCompactionRequest::from_json(request_json).ok();
2858 let outbox_count = worker_outbox_count(client);
2859 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteCommitted {
2860 command_id: command_id.clone(),
2861 client_commit_id: receipt.client_commit_id.clone(),
2862 changed_tables: receipt.changed_tables.clone(),
2863 changed_rows: receipt.changed_rows.clone(),
2864 outbox_count,
2865 duration_ms: duration_ms(started),
2866 });
2867 if let Some(request) = crdt_event {
2868 let _ = event_tx.publish_event(SyncWorkerEvent::CrdtFieldCompacted {
2869 command_id,
2870 client_commit_id: Some(receipt.client_commit_id),
2871 table: request.table,
2872 row_id: request.row_id,
2873 field: request.field,
2874 changed_tables: receipt.changed_tables,
2875 checkpoint_created: true,
2876 payload_json: receipt.crdt_event_payload_json,
2877 duration_ms: duration_ms(started),
2878 });
2879 }
2880 auto_sync
2881 }
2882 Ok(None) => {
2883 let payload_json = compact_crdt_field_skipped_payload(client, request_json);
2884 let request = WorkerCrdtFieldCompactionRequest::from_json(request_json).ok();
2885 let compacted_server_merge_document = payload_json
2886 .get("syncMode")
2887 .and_then(Value::as_str)
2888 .is_some_and(|sync_mode| sync_mode == "server-merge");
2889 if compacted_server_merge_document {
2890 if let Some(request) = request {
2891 let _ = event_tx.publish_event(SyncWorkerEvent::CrdtFieldCompacted {
2892 command_id: command_id.clone(),
2893 client_commit_id: None,
2894 table: request.table.clone(),
2895 row_id: request.row_id.clone(),
2896 field: request.field.clone(),
2897 changed_tables: vec![request.table],
2898 checkpoint_created: false,
2899 payload_json: Some(payload_json.clone()),
2900 duration_ms: duration_ms(started),
2901 });
2902 }
2903 }
2904 let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandCompleted {
2905 command_id,
2906 operation: "compactCrdtField",
2907 payload_json: Some(payload_json),
2908 duration_ms: duration_ms(started),
2909 });
2910 false
2911 }
2912 Err(error) => {
2913 let payload_json = crdt_field_failure_payload_json("compactCrdtField", request_json);
2914 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
2915 command_id,
2916 error,
2917 payload_json: Some(payload_json),
2918 duration_ms: duration_ms(started),
2919 });
2920 false
2921 }
2922 }
2923}
2924
2925fn resolve_conflict<S, T>(
2926 client: &mut SyncularClient<S, T>,
2927 event_tx: &SyncWorkerEventHub,
2928 command_id: String,
2929 conflict_id: &str,
2930 resolution: &str,
2931 auto_sync: bool,
2932) -> bool
2933where
2934 S: SyncStore + SyncStateStore,
2935 T: SyncTransport,
2936{
2937 let started = Instant::now();
2938 let result = if resolution == "keep-local" {
2939 client.retry_conflict_keep_local(conflict_id).map(Some)
2940 } else {
2941 client
2942 .resolve_conflict(conflict_id, resolution)
2943 .map(|_| None)
2944 };
2945
2946 match result {
2947 Ok(retry_client_commit_id) => {
2948 let should_sync = retry_client_commit_id.is_some() && auto_sync;
2949 let _ = event_tx.publish_event(SyncWorkerEvent::ConflictResolutionCompleted {
2950 command_id,
2951 retry_client_commit_id,
2952 duration_ms: duration_ms(started),
2953 });
2954 should_sync
2955 }
2956 Err(error) => {
2957 let _ = event_tx.publish_event(SyncWorkerEvent::ConflictResolutionFailed {
2958 command_id,
2959 error,
2960 duration_ms: duration_ms(started),
2961 });
2962 false
2963 }
2964 }
2965}
2966
2967fn refresh_snapshot_json<S, T>(
2968 client: &mut SyncularClient<S, T>,
2969 event_tx: &SyncWorkerEventHub,
2970 command_id: String,
2971 request_json: &str,
2972) where
2973 S: SyncStore + SyncStateStore,
2974 T: SyncTransport,
2975 SyncularClient<S, T>: SyncWorkerClientExt,
2976{
2977 let started = Instant::now();
2978 match client
2979 .worker_query_json(request_json)
2980 .and_then(|json| serde_json::from_str::<Value>(&json).map_err(Into::into))
2981 {
2982 Ok(payload_json) => {
2983 let _ = event_tx.publish_event(SyncWorkerEvent::SnapshotReady {
2984 command_id,
2985 payload_json,
2986 duration_ms: duration_ms(started),
2987 });
2988 }
2989 Err(error) => {
2990 let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandFailed {
2991 command_id,
2992 operation: "refreshSnapshot",
2993 error,
2994 duration_ms: duration_ms(started),
2995 });
2996 }
2997 }
2998}
2999
3000fn compact_crdt_field_skipped_payload<S, T>(
3001 client: &mut SyncularClient<S, T>,
3002 request_json: &str,
3003) -> Value
3004where
3005 S: SyncStore + SyncStateStore,
3006 T: SyncTransport,
3007 SyncularClient<S, T>: SyncWorkerClientExt,
3008{
3009 let request = WorkerCrdtFieldCompactionRequest::from_json(request_json).ok();
3010 let mut payload = request
3011 .as_ref()
3012 .and_then(|request| {
3013 client
3014 .worker_crdt_field_event_payload_json(
3015 &request.table,
3016 &request.row_id,
3017 &request.field,
3018 )
3019 .ok()
3020 .flatten()
3021 })
3022 .and_then(|value| value.as_object().cloned())
3023 .unwrap_or_default();
3024
3025 if let Some(request) = request {
3026 payload.insert("table".to_string(), json!(request.table));
3027 payload.insert("rowId".to_string(), json!(request.row_id));
3028 payload.insert("field".to_string(), json!(request.field));
3029 payload.insert(
3030 "minUncheckpointedUpdates".to_string(),
3031 json!(request.min_uncheckpointed_updates.unwrap_or(1)),
3032 );
3033 }
3034 payload.insert("checkpointCreated".to_string(), json!(false));
3035 Value::Object(payload)
3036}
3037
3038fn crdt_field_failure_payload_json(operation: &'static str, request_json: &str) -> Value {
3039 let mut payload = serde_json::Map::new();
3040 payload.insert("operation".to_string(), json!(operation));
3041 payload.insert("failedBeforeCommit".to_string(), json!(true));
3042 payload.insert("retryScheduled".to_string(), json!(false));
3043
3044 match serde_json::from_str::<Value>(request_json) {
3045 Ok(Value::Object(request)) => {
3046 copy_crdt_request_field(&mut payload, &request, "table", "table");
3047 copy_crdt_request_field(&mut payload, &request, "rowId", "rowId");
3048 copy_crdt_request_field(&mut payload, &request, "field", "field");
3049 copy_crdt_request_field(
3050 &mut payload,
3051 &request,
3052 "minUncheckpointedUpdates",
3053 "minUncheckpointedUpdates",
3054 );
3055 }
3056 Ok(_) => {
3057 payload.insert("requestShape".to_string(), json!("non-object"));
3058 }
3059 Err(error) => {
3060 payload.insert("requestParseError".to_string(), json!(error.to_string()));
3061 }
3062 }
3063
3064 Value::Object(payload)
3065}
3066
3067fn crdt_field_failure_payload_from_parts(
3068 operation: &'static str,
3069 table: &str,
3070 row_id: &str,
3071 field: &str,
3072) -> Value {
3073 json!({
3074 "operation": operation,
3075 "table": table,
3076 "rowId": row_id,
3077 "field": field,
3078 "failedBeforeCommit": true,
3079 "retryScheduled": false,
3080 })
3081}
3082
3083fn copy_crdt_request_field(
3084 payload: &mut serde_json::Map<String, Value>,
3085 request: &serde_json::Map<String, Value>,
3086 from: &str,
3087 to: &str,
3088) {
3089 if !payload.contains_key(to) {
3090 if let Some(value) = request.get(from) {
3091 payload.insert(to.to_string(), value.clone());
3092 }
3093 }
3094}
3095
3096fn run_worker_json_command<S, T>(
3097 client: &mut SyncularClient<S, T>,
3098 event_tx: &SyncWorkerEventHub,
3099 command_id: String,
3100 operation: &'static str,
3101 f: impl FnOnce(&mut SyncularClient<S, T>) -> Result<String>,
3102) -> bool
3103where
3104 S: SyncStore + SyncStateStore,
3105 T: SyncTransport,
3106{
3107 let started = Instant::now();
3108 match f(client).and_then(|json| {
3109 if json.trim().is_empty() {
3110 Ok(None)
3111 } else {
3112 serde_json::from_str::<Value>(&json)
3113 .map(Some)
3114 .map_err(Into::into)
3115 }
3116 }) {
3117 Ok(payload_json) => {
3118 let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandCompleted {
3119 command_id,
3120 operation,
3121 payload_json,
3122 duration_ms: duration_ms(started),
3123 });
3124 true
3125 }
3126 Err(error) => {
3127 let _ = event_tx.publish_event(SyncWorkerEvent::WorkerCommandFailed {
3128 command_id,
3129 operation,
3130 error,
3131 duration_ms: duration_ms(started),
3132 });
3133 false
3134 }
3135 }
3136}
3137
3138fn publish_blob_uploads_changed<S, T>(
3139 client: &mut SyncularClient<S, T>,
3140 event_tx: &SyncWorkerEventHub,
3141) where
3142 S: SyncStore + SyncStateStore,
3143 T: SyncTransport,
3144 SyncularClient<S, T>: SyncWorkerClientExt,
3145{
3146 let Ok(Some(stats_json)) = client.worker_blob_upload_queue_stats_json() else {
3147 return;
3148 };
3149 let Ok(stats_json) = serde_json::from_str::<Value>(&stats_json) else {
3150 return;
3151 };
3152 let _ = event_tx.publish_event(SyncWorkerEvent::BlobUploadsChanged { stats_json });
3153}
3154
3155fn queue_yjs_update_json(
3156 pending_yjs: &mut BTreeMap<YjsBatchKey, PendingYjsBatch>,
3157 event_tx: &SyncWorkerEventHub,
3158 command_id: String,
3159 update_json: &str,
3160 auto_sync: bool,
3161) {
3162 let started = Instant::now();
3163 let update: Result<SaveYjsUpdate> = serde_json::from_str(update_json).map_err(Into::into);
3164 match update {
3165 Ok(update) => {
3166 let key = YjsBatchKey {
3167 table: update.table,
3168 row_id: update.row_id,
3169 field: update.field,
3170 };
3171 let batch = pending_yjs.entry(key).or_default();
3172 batch.command_ids.push(command_id);
3173 batch.updates.push(update.update);
3174 if update.materialized.is_some() {
3175 batch.materialized = update.materialized;
3176 }
3177 if update.server_payload.is_some() {
3178 batch.server_payload = update.server_payload;
3179 }
3180 batch.auto_sync |= auto_sync;
3181 }
3182 Err(error) => {
3183 let payload_json = crdt_field_failure_payload_json("crdtFieldYjsUpdate", update_json);
3184 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
3185 command_id,
3186 error,
3187 payload_json: Some(payload_json),
3188 duration_ms: duration_ms(started),
3189 });
3190 }
3191 }
3192}
3193
3194fn flush_pending_yjs<S, T>(
3195 client: &mut SyncularClient<S, T>,
3196 pending_yjs: &mut BTreeMap<YjsBatchKey, PendingYjsBatch>,
3197 event_tx: &SyncWorkerEventHub,
3198) -> bool
3199where
3200 S: SyncStore + SyncStateStore,
3201 T: SyncTransport,
3202 SyncularClient<S, T>: SyncWorkerClientExt,
3203{
3204 let mut should_sync = false;
3205 let batches = std::mem::take(pending_yjs);
3206 for (key, batch) in batches {
3207 let started = Instant::now();
3208 let payload = {
3209 let mut payload = match batch.server_payload {
3210 Some(Value::Object(payload)) => payload,
3211 Some(_) => {
3212 let error = SyncularError::config(
3213 "queued Yjs serverPayload must be a JSON object when provided",
3214 );
3215 let message = error.message_text();
3216 let kind = error.kind();
3217 for command_id in batch.command_ids {
3218 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
3219 command_id,
3220 error: SyncularError::message(kind, &message),
3221 payload_json: Some(crdt_field_failure_payload_from_parts(
3222 "crdtFieldYjsUpdate",
3223 &key.table,
3224 &key.row_id,
3225 &key.field,
3226 )),
3227 duration_ms: duration_ms(started),
3228 });
3229 }
3230 continue;
3231 }
3232 None => serde_json::Map::new(),
3233 };
3234 let mut envelope = serde_json::Map::new();
3235 envelope.insert(key.field.clone(), json!(batch.updates));
3236 payload.insert(YJS_PAYLOAD_KEY.to_string(), Value::Object(envelope));
3237 Value::Object(payload)
3238 };
3239 let mutation = SyncOperation {
3240 table: key.table.clone(),
3241 row_id: key.row_id.clone(),
3242 op: "upsert".to_string(),
3243 payload: Some(payload),
3244 base_version: None,
3245 };
3246 match client.apply_worker_mutation(mutation, batch.materialized) {
3247 Ok(client_commit_id) => {
3248 should_sync |= batch.auto_sync;
3249 let crdt_event_payload_json = client
3250 .worker_crdt_field_event_payload_json(&key.table, &key.row_id, &key.field)
3251 .ok()
3252 .flatten();
3253 let outbox_count = worker_outbox_count(client);
3254 let changed_rows = client
3255 .worker_crdt_field_changed_row(
3256 &key.table,
3257 &key.row_id,
3258 &key.field,
3259 &client_commit_id,
3260 )
3261 .ok()
3262 .flatten()
3263 .map(|row| vec![row])
3264 .unwrap_or_else(|| {
3265 vec![SyncChangedRow {
3266 table: key.table.clone(),
3267 row_id: Some(key.row_id.clone()),
3268 operation: "update".to_string(),
3269 changed_fields: vec![key.field.clone()],
3270 crdt_fields: vec![key.field.clone()],
3271 crdt_field_changes: Vec::new(),
3272 commit_id: Some(client_commit_id.clone()),
3273 commit_seq: None,
3274 subscription_id: None,
3275 server_version: None,
3276 }]
3277 });
3278 for command_id in batch.command_ids {
3279 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteCommitted {
3280 command_id: command_id.clone(),
3281 client_commit_id: client_commit_id.clone(),
3282 changed_tables: vec![key.table.clone()],
3283 changed_rows: changed_rows.clone(),
3284 outbox_count,
3285 duration_ms: duration_ms(started),
3286 });
3287 let _ = event_tx.publish_event(SyncWorkerEvent::CrdtFieldChanged {
3288 command_id,
3289 client_commit_id: client_commit_id.clone(),
3290 table: key.table.clone(),
3291 row_id: key.row_id.clone(),
3292 field: key.field.clone(),
3293 changed_tables: vec![key.table.clone()],
3294 payload_json: crdt_event_payload_json.clone(),
3295 duration_ms: duration_ms(started),
3296 });
3297 }
3298 }
3299 Err(error) => {
3300 let message = error.message_text();
3301 let kind = error.kind();
3302 for command_id in batch.command_ids {
3303 let _ = event_tx.publish_event(SyncWorkerEvent::LocalWriteFailed {
3304 command_id,
3305 error: SyncularError::message(kind, &message),
3306 payload_json: Some(crdt_field_failure_payload_from_parts(
3307 "crdtFieldYjsUpdate",
3308 &key.table,
3309 &key.row_id,
3310 &key.field,
3311 )),
3312 duration_ms: duration_ms(started),
3313 });
3314 }
3315 }
3316 }
3317 }
3318 should_sync
3319}
3320
3321fn worker_counts<S, T>(client: &mut SyncularClient<S, T>) -> Result<(usize, usize)>
3322where
3323 S: SyncStore + SyncStateStore,
3324 T: SyncTransport,
3325{
3326 let outbox_count = client
3327 .outbox_summaries()?
3328 .into_iter()
3329 .filter(|item| item.status != "acked")
3330 .count();
3331 let conflict_count = client.conflict_summaries()?.len();
3332 Ok((outbox_count, conflict_count))
3333}
3334
3335fn worker_outbox_count<S, T>(client: &mut SyncularClient<S, T>) -> usize
3336where
3337 S: SyncStore + SyncStateStore,
3338 T: SyncTransport,
3339{
3340 worker_counts(client)
3341 .map(|(outbox_count, _)| outbox_count)
3342 .unwrap_or(0)
3343}
3344
3345fn retry_scheduled_after_error<S, T>(client: &mut SyncularClient<S, T>) -> bool
3346where
3347 S: SyncStore + SyncStateStore,
3348 T: SyncTransport,
3349{
3350 client
3351 .outbox_summaries()
3352 .map(|items| {
3353 items
3354 .into_iter()
3355 .any(|item| item.status == "pending" || item.status == "sending")
3356 })
3357 .unwrap_or(false)
3358}
3359
3360fn next_retry_timeout<S, T>(client: &mut SyncularClient<S, T>) -> Option<Duration>
3361where
3362 S: SyncStore + SyncStateStore,
3363 T: SyncTransport,
3364 SyncularClient<S, T>: SyncWorkerClientExt,
3365{
3366 let next = [
3367 client.worker_next_outbox_retry_at_ms().ok().flatten(),
3368 client.worker_next_blob_upload_retry_at_ms().ok().flatten(),
3369 ]
3370 .into_iter()
3371 .flatten()
3372 .min()?;
3373 Some(duration_until_ms(next))
3374}
3375
3376fn due_now(next: Result<Option<i64>>) -> bool {
3377 next.ok()
3378 .flatten()
3379 .is_some_and(|next_attempt_at| next_attempt_at <= now_ms())
3380}
3381
3382fn duration_until_ms(timestamp_ms: i64) -> Duration {
3383 let now = now_ms();
3384 if timestamp_ms <= now {
3385 Duration::ZERO
3386 } else {
3387 Duration::from_millis((timestamp_ms - now) as u64)
3388 }
3389}
3390
3391fn retry_wakeup_command_id(prefix: &str) -> String {
3392 format!("{prefix}-{}", now_ms())
3393}
3394
3395fn duration_ms(started: Instant) -> u64 {
3396 started.elapsed().as_millis().try_into().unwrap_or(u64::MAX)
3397}
3398
3399#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3400struct YjsBatchKey {
3401 table: String,
3402 row_id: String,
3403 field: String,
3404}
3405
3406#[derive(Debug, Default)]
3407struct PendingYjsBatch {
3408 command_ids: Vec<String>,
3409 updates: Vec<YjsUpdateEnvelope>,
3410 materialized: Option<Value>,
3411 server_payload: Option<Value>,
3412 auto_sync: bool,
3413}
3414
3415#[derive(Debug, Deserialize)]
3416#[serde(rename_all = "camelCase")]
3417struct SaveYjsUpdate {
3418 table: String,
3419 row_id: String,
3420 field: String,
3421 update: YjsUpdateEnvelope,
3422 #[serde(default)]
3423 materialized: Option<Value>,
3424 #[serde(default)]
3425 server_payload: Option<Value>,
3426}
3427
3428fn validate_save_yjs_update_json(update_json: &str) -> Result<()> {
3429 validate_crdt_request_json_size(update_json)?;
3430 let update: SaveYjsUpdate = serde_json::from_str(update_json)?;
3431 validate_yjs_update_envelope_size(&update.update)
3432}
3433
3434#[derive(Debug, Deserialize)]
3435#[serde(rename_all = "camelCase")]
3436struct WorkerEncryptedCrdtRequest {
3437 table: String,
3438 #[serde(default)]
3439 row_id: Option<String>,
3440 #[serde(default)]
3441 field: Option<String>,
3442}
3443
3444#[derive(Debug, Deserialize)]
3445#[cfg_attr(not(feature = "native"), allow(dead_code))]
3446#[serde(rename_all = "camelCase")]
3447struct WorkerCrdtFieldTextRequest {
3448 table: String,
3449 row_id: String,
3450 field: String,
3451 next_text: String,
3452}
3453
3454fn validate_worker_crdt_field_text_request_json(request_json: &str) -> Result<()> {
3455 validate_crdt_request_json_size(request_json)?;
3456 let request: WorkerCrdtFieldTextRequest = serde_json::from_str(request_json)?;
3457 validate_yjs_text_input_size(&request.next_text)
3458}
3459
3460#[derive(Debug, Deserialize)]
3461#[cfg_attr(not(feature = "native"), allow(dead_code))]
3462#[serde(rename_all = "camelCase")]
3463struct WorkerCrdtFieldCompactionRequest {
3464 table: String,
3465 row_id: String,
3466 field: String,
3467 #[serde(default)]
3468 min_uncheckpointed_updates: Option<i64>,
3469}
3470
3471struct WorkerCrdtFieldIdentity {
3472 table: String,
3473 row_id: String,
3474 field: String,
3475}
3476
3477#[cfg(feature = "native")]
3478struct WorkerCrdtFieldIdentityRef<'a> {
3479 table: &'a str,
3480 row_id: &'a str,
3481 field: &'a str,
3482}
3483
3484impl WorkerEncryptedCrdtRequest {
3485 fn from_json(request_json: &str) -> Result<Self> {
3486 serde_json::from_str(request_json).map_err(Into::into)
3487 }
3488
3489 #[cfg(feature = "native")]
3490 fn field_identity_ref(&self) -> Option<WorkerCrdtFieldIdentityRef<'_>> {
3491 Some(WorkerCrdtFieldIdentityRef {
3492 table: &self.table,
3493 row_id: self.row_id.as_deref()?,
3494 field: self.field.as_deref()?,
3495 })
3496 }
3497
3498 fn field_identity(self) -> Option<WorkerCrdtFieldIdentity> {
3499 Some(WorkerCrdtFieldIdentity {
3500 table: self.table,
3501 row_id: self.row_id?,
3502 field: self.field?,
3503 })
3504 }
3505}
3506
3507#[cfg(feature = "native")]
3508impl WorkerCrdtFieldIdentityRef<'_> {
3509 fn id(&self) -> CrdtFieldId {
3510 CrdtFieldId::new(self.table, self.row_id, self.field)
3511 }
3512}
3513
3514impl WorkerCrdtFieldTextRequest {
3515 fn from_json(request_json: &str) -> Result<Self> {
3516 serde_json::from_str(request_json).map_err(Into::into)
3517 }
3518
3519 #[cfg(feature = "native")]
3520 fn id(&self) -> CrdtFieldId {
3521 CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
3522 }
3523}
3524
3525impl WorkerCrdtFieldCompactionRequest {
3526 fn from_json(request_json: &str) -> Result<Self> {
3527 serde_json::from_str(request_json).map_err(Into::into)
3528 }
3529
3530 #[cfg(feature = "native")]
3531 fn id(&self) -> CrdtFieldId {
3532 CrdtFieldId::new(self.table.clone(), self.row_id.clone(), self.field.clone())
3533 }
3534}
3535
3536#[cfg(feature = "native")]
3537fn crdt_field_write_tables_for_worker(field: &CrdtField) -> Vec<String> {
3538 match field.sync_mode() {
3539 CrdtFieldSyncMode::ServerMerge => vec![field.table().to_string()],
3540 CrdtFieldSyncMode::EncryptedUpdateLog => {
3541 vec![field.table().to_string(), CRDT_UPDATES_TABLE.to_string()]
3542 }
3543 }
3544}
3545
3546#[cfg(feature = "native")]
3547fn crdt_field_compaction_tables_for_worker(field: &CrdtField) -> Vec<String> {
3548 match field.sync_mode() {
3549 CrdtFieldSyncMode::ServerMerge => Vec::new(),
3550 CrdtFieldSyncMode::EncryptedUpdateLog => vec![CRDT_CHECKPOINTS_TABLE.to_string()],
3551 }
3552}
3553
3554#[cfg(feature = "native")]
3555fn crdt_field_changed_row_for_worker(field: &CrdtField, client_commit_id: &str) -> SyncChangedRow {
3556 let crdt_field_changes = vec![sync_changed_crdt_field_from_metadata(
3557 field.field_metadata(),
3558 )];
3559 SyncChangedRow {
3560 table: field.table().to_string(),
3561 row_id: Some(field.row_id().to_string()),
3562 operation: "update".to_string(),
3563 changed_fields: vec![field.field().to_string(), field.state_column().to_string()],
3564 crdt_fields: crdt_field_changes
3565 .iter()
3566 .map(|field| field.state_column.clone())
3567 .collect(),
3568 crdt_field_changes,
3569 commit_id: Some(client_commit_id.to_string()),
3570 commit_seq: None,
3571 subscription_id: None,
3572 server_version: None,
3573 }
3574}
3575
3576#[cfg(feature = "native")]
3577fn crdt_field_compacted_row_for_worker(
3578 field: &CrdtField,
3579 client_commit_id: &str,
3580) -> SyncChangedRow {
3581 let crdt_field_changes = vec![sync_changed_crdt_field_from_metadata(
3582 field.field_metadata(),
3583 )];
3584 SyncChangedRow {
3585 table: field.table().to_string(),
3586 row_id: Some(field.row_id().to_string()),
3587 operation: "compact".to_string(),
3588 changed_fields: vec![field.state_column().to_string()],
3589 crdt_fields: crdt_field_changes
3590 .iter()
3591 .map(|field| field.state_column.clone())
3592 .collect(),
3593 crdt_field_changes,
3594 commit_id: Some(client_commit_id.to_string()),
3595 commit_seq: None,
3596 subscription_id: None,
3597 server_version: None,
3598 }
3599}
3600
3601#[cfg(feature = "native")]
3602fn crdt_field_event_payload_for_worker<T>(
3603 client: &mut SyncularClient<DieselSqliteStore, T>,
3604 field: &CrdtField,
3605) -> Option<Value>
3606where
3607 T: SyncTransport,
3608{
3609 let mut payload = crdt_field_base_payload_for_worker(field);
3610 match client.materialize_crdt_field(field) {
3611 Ok(materialization) => {
3612 payload.insert("materializationAvailable".to_string(), json!(true));
3613 payload.insert(
3614 "hasState".to_string(),
3615 json!(materialization.state_base64.is_some()),
3616 );
3617 payload.insert(
3618 "stateVectorBase64".to_string(),
3619 json!(materialization.state_vector_base64),
3620 );
3621 }
3622 Err(error) => {
3623 payload.insert("materializationAvailable".to_string(), json!(false));
3624 payload.insert(
3625 "materializationError".to_string(),
3626 json!(error.message_text()),
3627 );
3628 }
3629 }
3630 Some(Value::Object(payload))
3631}
3632
3633#[cfg(feature = "native")]
3634fn crdt_field_compaction_payload_for_worker<T>(
3635 client: &mut SyncularClient<DieselSqliteStore, T>,
3636 field: &CrdtField,
3637 receipt: &CrdtFieldCompactionReceipt,
3638 checkpoint_created: bool,
3639 min_uncheckpointed_updates: i64,
3640) -> Option<Value>
3641where
3642 T: SyncTransport,
3643{
3644 let mut payload = crdt_field_event_payload_for_worker(client, field)
3645 .and_then(|value| value.as_object().cloned())
3646 .unwrap_or_else(|| crdt_field_base_payload_for_worker(field));
3647 payload.insert("checkpointCreated".to_string(), json!(checkpoint_created));
3648 payload.insert(
3649 "minUncheckpointedUpdates".to_string(),
3650 json!(min_uncheckpointed_updates),
3651 );
3652 payload.insert("before".to_string(), json!(&receipt.before));
3653 payload.insert("after".to_string(), json!(&receipt.after));
3654 payload.insert(
3655 "encryptedStreamBefore".to_string(),
3656 json!(&receipt.encrypted_stream_before),
3657 );
3658 payload.insert(
3659 "encryptedStreamAfter".to_string(),
3660 json!(&receipt.encrypted_stream_after),
3661 );
3662 Some(Value::Object(payload))
3663}
3664
3665#[cfg(feature = "native")]
3666fn crdt_field_compaction_payload_for_worker_current<T>(
3667 client: &mut SyncularClient<DieselSqliteStore, T>,
3668 field: &CrdtField,
3669 checkpoint_created: bool,
3670 min_uncheckpointed_updates: i64,
3671) -> Option<Value>
3672where
3673 T: SyncTransport,
3674{
3675 let mut payload = crdt_field_event_payload_for_worker(client, field)
3676 .and_then(|value| value.as_object().cloned())
3677 .unwrap_or_else(|| crdt_field_base_payload_for_worker(field));
3678 payload.insert("checkpointCreated".to_string(), json!(checkpoint_created));
3679 payload.insert(
3680 "minUncheckpointedUpdates".to_string(),
3681 json!(min_uncheckpointed_updates),
3682 );
3683 Some(Value::Object(payload))
3684}
3685
3686#[cfg(feature = "native")]
3687fn crdt_field_base_payload_for_worker(field: &CrdtField) -> serde_json::Map<String, Value> {
3688 let mut payload = serde_json::Map::new();
3689 payload.insert("syncMode".to_string(), json!(field.sync_mode()));
3690 payload.insert("kind".to_string(), json!(field.field_metadata().kind));
3691 payload.insert("stateColumn".to_string(), json!(field.state_column()));
3692 payload.insert("containerKey".to_string(), json!(field.container_key()));
3693 payload.insert("rowIdField".to_string(), json!(field.row_id_field()));
3694 payload
3695}
3696
3697#[cfg(feature = "native")]
3698fn encrypted_crdt_min_uncheckpointed_updates(request_json: &str) -> i64 {
3699 serde_json::from_str::<WorkerCrdtFieldCompactionRequest>(request_json)
3700 .ok()
3701 .and_then(|request| request.min_uncheckpointed_updates)
3702 .unwrap_or(1)
3703}
3704
3705#[cfg(feature = "native")]
3706#[derive(Debug, Clone, Default, Deserialize)]
3707#[serde(rename_all = "camelCase")]
3708struct WorkerBlobStoreOptions {
3709 mime_type: Option<String>,
3710 immediate: Option<bool>,
3711 cache_local: Option<bool>,
3712}
3713
3714#[cfg(feature = "native")]
3715#[derive(Debug, Clone, Default, Deserialize)]
3716#[serde(rename_all = "camelCase")]
3717struct WorkerBlobRetrieveOptions {
3718 cache_local: Option<bool>,
3719}