Skip to main content

volt_client_grpc/
sync_helpers.rs

1//! Sync Helper Utilities
2//!
3//! This module provides high-level helper functions and types for common
4//! synchronization patterns with Y.js/yrs documents and Volt servers.
5//!
6//! ## Features
7//!
8//! - **`ensure_sync_database`**: Ensures a sync database resource exists
9//! - **`yrs_to_json`**: Convert yrs types to JSON for debugging/display
10//! - **`SyncManager`**: High-level manager that handles subdoc syncing automatically
11//!
12//! ## Example
13//!
14//! ```rust,ignore
15//! use volt_client_grpc::{VoltClient, SyncManager};
16//! use yrs::Doc;
17//!
18//! // Create a SyncManager for easy document syncing
19//! let doc = Doc::new();
20//! let mut manager = SyncManager::new(doc, "@my-sync-db", "my-document");
21//!
22//! // Start syncing - subdocs are automatically handled
23//! manager.start(&client).await?;
24//!
25//! // Access the document
26//! let doc = manager.doc();
27//! ```
28
29use std::cell::RefCell;
30use std::collections::HashMap;
31use std::rc::Rc;
32
33use tokio::sync::mpsc;
34use yrs::{Any, Doc, Map, Options, ReadTxn, Transact};
35
36use crate::error::{Result, VoltError};
37use crate::sync_provider::{SyncEvent, SyncProvider};
38use crate::VoltClient;
39
40// ============================================================================
41// Sync Database Helpers
42// ============================================================================
43
44/// Ensures a sync database resource exists, creating it if necessary.
45///
46/// This is a common operation when setting up document synchronization.
47///
48/// # Arguments
49/// * `client` - The VoltClient to use
50/// * `database_alias` - The alias for the sync database (e.g., "@my-sync-db")
51///
52/// # Example
53/// ```rust,ignore
54/// use volt_client_grpc::sync_helpers::ensure_sync_database;
55///
56/// ensure_sync_database(&client, "@my-sync-db").await?;
57/// ```
58pub async fn ensure_sync_database(client: &VoltClient, database_alias: &str) -> Result<()> {
59    // Try to get the resource
60    let request = serde_json::json!({
61        "resource_id": database_alias,
62        "include_attributes": false,
63        "include_protobuf": false
64    });
65
66    // Check if the resource exists
67    let needs_creation = match client.get_resource(request).await {
68        Ok(response) => {
69            tracing::debug!("Got sync database resource response: {:?}", response);
70
71            // Check if the response indicates "not found" - the API returns Ok even when resource doesn't exist
72            let is_not_found = response
73                .status
74                .as_ref()
75                .map(|s| s.message.to_lowercase().contains("not found"))
76                .unwrap_or(false);
77
78            if is_not_found {
79                tracing::debug!("Response status indicates resource not found");
80                true
81            } else {
82                tracing::debug!("Sync database '{}' exists", database_alias);
83                false
84            }
85        }
86        Err(e) => {
87            let err_msg = e.to_string().to_lowercase();
88            if err_msg.contains("not found") {
89                tracing::debug!("Error indicates resource not found: {}", e);
90                true
91            } else {
92                return Err(VoltError::ServerError(format!(
93                    "Failed to get sync database resource: {}",
94                    e
95                )));
96            }
97        }
98    };
99
100    if needs_creation {
101        tracing::info!("Sync database '{}' not found, creating...", database_alias);
102
103        // Remove @ prefix for the name
104        let name = if database_alias.starts_with('@') {
105            &database_alias[1..]
106        } else {
107            database_alias
108        };
109
110        // Create the sync database resource
111        let resource_create_request = serde_json::json!({
112            "create": true,
113            "create_in_parent_id": "",
114            "purge_attributes": false,
115            "resource": {
116                "id": "",
117                "name": name,
118                "alias": [database_alias],
119                "description": "",
120                "share_mode": 0,
121                "volt_id": "",
122                "attribute": [],
123                "version": 0,
124                "owner": "",
125                "created": 0,
126                "modified": 0,
127                "status": 0,
128                "kind": [
129                    "volt:database",
130                    "volt:sqlite-database",
131                    "volt:sync-database"
132                ],
133                "online_status": 0,
134                "size": 0,
135                "store": "",
136                "content_hash": "",
137                "child": [],
138                "data": {}
139            }
140        });
141
142        client
143            .save_resource(resource_create_request)
144            .await
145            .map_err(|e| {
146                VoltError::ServerError(format!("Failed to create sync database: {}", e))
147            })?;
148
149        tracing::info!("Sync database '{}' created", database_alias);
150    }
151
152    Ok(())
153}
154
155// ============================================================================
156// Y.js to JSON Conversion Utilities
157// ============================================================================
158
159/// Convert a yrs Map to a JSON Value.
160///
161/// This is useful for debugging and displaying document contents.
162///
163/// # Arguments
164/// * `map` - The MapRef to convert
165/// * `txn` - A read transaction
166///
167/// # Example
168/// ```rust,ignore
169/// use volt_client_grpc::sync_helpers::map_to_json;
170///
171/// let map = doc.get_or_insert_map("");
172/// let txn = doc.transact();
173/// let json = map_to_json(&map, &txn);
174/// println!("{}", serde_json::to_string_pretty(&json)?);
175/// ```
176pub fn map_to_json<T: ReadTxn>(map: &yrs::MapRef, txn: &T) -> serde_json::Value {
177    let mut json_map = serde_json::Map::new();
178
179    for (key, value) in map.iter(txn) {
180        let json_value = out_to_json(value);
181        json_map.insert(key.to_string(), json_value);
182    }
183
184    serde_json::Value::Object(json_map)
185}
186
187/// Convert a yrs Map to a pretty-printed JSON string.
188pub fn map_to_json_string<T: ReadTxn>(map: &yrs::MapRef, txn: &T) -> String {
189    let json = map_to_json(map, txn);
190    serde_json::to_string_pretty(&json).unwrap_or_else(|_| "{}".to_string())
191}
192
193/// Convert a yrs::Out value to a JSON Value.
194pub fn out_to_json(value: yrs::Out) -> serde_json::Value {
195    match value {
196        yrs::Out::Any(any) => any_to_json(any),
197        yrs::Out::YText(_text) => {
198            // For YText, we can't get length without a transaction
199            serde_json::Value::String("[YText]".to_string())
200        }
201        yrs::Out::YArray(_arr) => serde_json::Value::String("[YArray]".to_string()),
202        yrs::Out::YMap(_) => serde_json::Value::String("[YMap]".to_string()),
203        yrs::Out::YXmlElement(_) => serde_json::Value::String("[YXmlElement]".to_string()),
204        yrs::Out::YXmlText(_) => serde_json::Value::String("[YXmlText]".to_string()),
205        yrs::Out::YXmlFragment(_) => serde_json::Value::String("[YXmlFragment]".to_string()),
206        yrs::Out::YDoc(subdoc) => {
207            serde_json::json!({
208                "__subdoc__": subdoc.guid().to_string()
209            })
210        }
211        #[allow(unreachable_patterns)]
212        _ => serde_json::Value::Null,
213    }
214}
215
216/// Convert a yrs::Any value to a JSON Value.
217pub fn any_to_json(any: Any) -> serde_json::Value {
218    match any {
219        Any::Null => serde_json::Value::Null,
220        Any::Undefined => serde_json::Value::Null,
221        Any::Bool(b) => serde_json::Value::Bool(b),
222        Any::Number(n) => serde_json::json!(n),
223        Any::BigInt(n) => serde_json::json!(n),
224        Any::String(s) => serde_json::Value::String(s.to_string()),
225        Any::Buffer(b) => serde_json::json!(b.as_ref()),
226        Any::Array(arr) => {
227            let values: Vec<serde_json::Value> =
228                arr.iter().map(|v| any_to_json(v.clone())).collect();
229            serde_json::Value::Array(values)
230        }
231        Any::Map(m) => {
232            let obj: serde_json::Map<String, serde_json::Value> = m
233                .iter()
234                .map(|(k, v)| (k.to_string(), any_to_json(v.clone())))
235                .collect();
236            serde_json::Value::Object(obj)
237        }
238    }
239}
240
241// ============================================================================
242// SyncManager - High-level sync management with automatic subdoc handling
243// ============================================================================
244
245/// Event emitted by SyncManager
246#[derive(Debug, Clone)]
247pub enum SyncManagerEvent {
248    /// Main document syncing started
249    Syncing,
250    /// Main document synced
251    Synced,
252    /// Main document updated
253    Updated,
254    /// A subdoc started syncing
255    SubdocSyncing { guid: String, doc: Doc },
256    /// A subdoc synced
257    SubdocSynced { guid: String, doc: Doc },
258    /// A subdoc was updated
259    SubdocUpdated { guid: String, doc: Doc },
260    /// A subdoc was removed
261    SubdocRemoved { guid: String },
262    /// Disconnected
263    Disconnected,
264    /// Error occurred
265    Error(String),
266}
267
268/// Tracked subdoc information
269struct SubdocInfo {
270    provider: SyncProvider,
271    #[allow(dead_code)]
272    doc: Doc,
273}
274
275/// SyncManager provides high-level document synchronization with automatic subdoc handling.
276///
277/// This manager automatically:
278/// - Creates SyncProviders for subdocuments
279/// - Tracks and cleans up subdoc providers
280/// - Emits unified events for both main doc and subdocs
281///
282/// # Example
283///
284/// ```rust,ignore
285/// use volt_client_grpc::{VoltClient, SyncManager};
286/// use yrs::Doc;
287///
288/// let doc = Doc::new();
289/// let (mut manager, mut events) = SyncManager::new(doc, "@my-sync-db", "my-document");
290///
291/// // Start syncing
292/// manager.start(&client).await?;
293///
294/// // Handle events
295/// while let Some(event) = events.recv().await {
296///     match event {
297///         SyncManagerEvent::Synced => println!("Document synced!"),
298///         SyncManagerEvent::SubdocUpdated { guid, doc } => {
299///             println!("Subdoc {} updated", guid);
300///         }
301///         _ => {}
302///     }
303/// }
304/// ```
305pub struct SyncManager {
306    /// The main document
307    doc: Doc,
308    /// Database ID
309    database_id: String,
310    /// Document ID
311    document_id: String,
312    /// Main document provider
313    provider: Option<SyncProvider>,
314    /// Main document event receiver
315    provider_event_rx: Option<mpsc::Receiver<SyncEvent>>,
316    /// Subdoc providers (single-threaded)
317    subdoc_providers: Rc<RefCell<HashMap<String, SubdocInfo>>>,
318    /// Manager event sender
319    event_tx: mpsc::Sender<SyncManagerEvent>,
320    /// Whether auto-sync subdocs is enabled
321    auto_sync_subdocs: bool,
322}
323
324impl SyncManager {
325    /// Create a new SyncManager.
326    ///
327    /// # Arguments
328    /// * `doc` - The Y.js document to sync
329    /// * `database_id` - ID/alias of the sync database
330    /// * `document_id` - ID of the document to sync
331    ///
332    /// # Returns
333    /// A tuple of (SyncManager, event_receiver)
334    pub fn new(
335        doc: Doc,
336        database_id: impl Into<String>,
337        document_id: impl Into<String>,
338    ) -> (Self, mpsc::Receiver<SyncManagerEvent>) {
339        let (event_tx, event_rx) = mpsc::channel(100);
340
341        let manager = Self {
342            doc,
343            database_id: database_id.into(),
344            document_id: document_id.into(),
345            provider: None,
346            provider_event_rx: None,
347            subdoc_providers: Rc::new(RefCell::new(HashMap::new())),
348            event_tx,
349            auto_sync_subdocs: true,
350        };
351
352        (manager, event_rx)
353    }
354
355    /// Enable or disable automatic subdoc syncing.
356    ///
357    /// When enabled (default), the manager automatically creates SyncProviders
358    /// for subdocuments detected in the main document.
359    pub fn set_auto_sync_subdocs(&mut self, enabled: bool) {
360        self.auto_sync_subdocs = enabled;
361    }
362
363    /// Get a reference to the main document.
364    pub fn doc(&self) -> &Doc {
365        &self.doc
366    }
367
368    /// Get a clone of the main document.
369    pub fn doc_clone(&self) -> Doc {
370        self.doc.clone()
371    }
372
373    /// Get the database ID.
374    pub fn database_id(&self) -> &str {
375        &self.database_id
376    }
377
378    /// Get the document ID.
379    pub fn document_id(&self) -> &str {
380        &self.document_id
381    }
382
383    /// Get the list of currently synced subdoc GUIDs.
384    pub fn subdoc_guids(&self) -> Vec<String> {
385        self.subdoc_providers.borrow().keys().cloned().collect()
386    }
387
388    /// Check if a subdoc is being synced.
389    pub fn is_subdoc_synced(&self, guid: &str) -> bool {
390        self.subdoc_providers.borrow().contains_key(guid)
391    }
392
393    /// Get a clone of a subdoc by GUID.
394    ///
395    /// Returns None if the subdoc is not currently being synced.
396    pub fn get_subdoc(&self, guid: &str) -> Option<Doc> {
397        self.subdoc_providers
398            .borrow()
399            .get(guid)
400            .map(|info| info.doc.clone())
401    }
402
403    /// Start synchronization.
404    ///
405    /// This starts the main document provider and sets up event handling.
406    /// If `auto_sync_subdocs` is enabled, subdocs will be automatically synced.
407    ///
408    /// # Arguments
409    /// * `client` - The VoltClient to use for synchronization
410    pub async fn start(&mut self, client: &VoltClient) -> Result<()> {
411        self.start_with_options(client, false, true).await
412    }
413
414    /// Start synchronization with options.
415    ///
416    /// # Arguments
417    /// * `client` - The VoltClient to use
418    /// * `read_only` - If true, only receive updates
419    /// * `read_only_fallback` - If true, downgrade to read-only if write access denied
420    pub async fn start_with_options(
421        &mut self,
422        client: &VoltClient,
423        read_only: bool,
424        read_only_fallback: bool,
425    ) -> Result<()> {
426        // Create the main provider
427        let (mut provider, event_rx) = SyncProvider::new(
428            self.doc.clone(),
429            self.database_id.clone(),
430            self.document_id.clone(),
431        );
432
433        // Start the provider
434        provider
435            .start(client, read_only, read_only_fallback)
436            .await?;
437
438        self.provider = Some(provider);
439        self.provider_event_rx = Some(event_rx);
440
441        Ok(())
442    }
443
444    /// Run the event processing loop.
445    ///
446    /// This should be called after `start()` to process events.
447    /// It will run until the connection is closed or an error occurs.
448    ///
449    /// # Arguments
450    /// * `client` - The VoltClient (needed for starting subdoc providers)
451    ///
452    /// # Example
453    /// ```rust,ignore
454    /// // In a spawned task:
455    /// manager.start(&client).await?;
456    /// manager.run_event_loop(&client).await;
457    /// ```
458    pub async fn run_event_loop(&mut self, client: &VoltClient) {
459        let event_rx = match self.provider_event_rx.take() {
460            Some(rx) => rx,
461            None => {
462                let _ = self
463                    .event_tx
464                    .send(SyncManagerEvent::Error(
465                        "Event receiver not available".to_string(),
466                    ))
467                    .await;
468                return;
469            }
470        };
471
472        self.process_events(event_rx, client).await;
473    }
474
475    /// Process events from the main provider.
476    async fn process_events(
477        &mut self,
478        mut event_rx: mpsc::Receiver<SyncEvent>,
479        client: &VoltClient,
480    ) {
481        while let Some(event) = event_rx.recv().await {
482            match event {
483                SyncEvent::Syncing => {
484                    let _ = self.event_tx.send(SyncManagerEvent::Syncing).await;
485                }
486                SyncEvent::Synced => {
487                    let _ = self.event_tx.send(SyncManagerEvent::Synced).await;
488
489                    // Auto-sync subdocs if enabled
490                    if self.auto_sync_subdocs {
491                        if let Err(e) = self.sync_all_subdocs(client).await {
492                            let _ = self
493                                .event_tx
494                                .send(SyncManagerEvent::Error(format!(
495                                    "Failed to sync subdocs: {}",
496                                    e
497                                )))
498                                .await;
499                        }
500                    }
501                }
502                SyncEvent::Updated => {
503                    let _ = self.event_tx.send(SyncManagerEvent::Updated).await;
504                }
505                SyncEvent::Disconnected => {
506                    let _ = self.event_tx.send(SyncManagerEvent::Disconnected).await;
507                    break;
508                }
509                SyncEvent::Error(err) => {
510                    let _ = self.event_tx.send(SyncManagerEvent::Error(err)).await;
511                }
512                SyncEvent::SubdocLoaded { guid, doc: _ } => {
513                    // A subdoc was loaded - sync it if auto-sync is enabled
514                    if self.auto_sync_subdocs {
515                        if let Err(e) = self.sync_subdoc(client, guid.to_string()).await {
516                            let _ = self
517                                .event_tx
518                                .send(SyncManagerEvent::Error(format!(
519                                    "Failed to sync subdoc {}: {}",
520                                    guid, e
521                                )))
522                                .await;
523                        }
524                    }
525                }
526                SyncEvent::SubdocRemoved { guid } => {
527                    let guid_str = guid.to_string();
528                    self.stop_subdoc(&guid_str).await;
529                    let _ = self
530                        .event_tx
531                        .send(SyncManagerEvent::SubdocRemoved { guid: guid_str })
532                        .await;
533                }
534            }
535        }
536    }
537
538    /// Sync all subdocs found in the document.
539    async fn sync_all_subdocs(&mut self, client: &VoltClient) -> Result<()> {
540        let subdoc_guids: Vec<_> = {
541            let txn = self.doc.transact();
542            txn.subdoc_guids().collect()
543        };
544
545        tracing::debug!("Found {} subdocs to sync", subdoc_guids.len());
546
547        for guid_arc in subdoc_guids {
548            let guid = guid_arc.to_string();
549            self.sync_subdoc(client, guid).await?;
550        }
551
552        Ok(())
553    }
554
555    /// Sync a specific subdoc.
556    async fn sync_subdoc(&mut self, client: &VoltClient, guid: String) -> Result<()> {
557        // Check if already syncing
558        if self.subdoc_providers.borrow().contains_key(&guid) {
559            tracing::debug!("Subdoc {} already syncing", guid);
560            return Ok(());
561        }
562
563        tracing::info!("🔗 Setting up sync for subdoc (GUID: {})", guid);
564
565        // Create a new Doc with the same GUID
566        let mut subdoc_opts = Options::default();
567        subdoc_opts.guid = yrs::uuid_v4(); // This will be overwritten
568                                           // Set the GUID from the string - yrs expects Arc<str>
569        subdoc_opts.guid = std::sync::Arc::from(guid.as_str());
570
571        let synced_subdoc = Doc::with_options(subdoc_opts);
572
573        // Create SyncProvider for the subdoc
574        let (mut subdoc_provider, mut subdoc_event_rx) = SyncProvider::new(
575            synced_subdoc.clone(),
576            self.database_id.clone(),
577            guid.clone(),
578        );
579
580        // Start the subdoc provider
581        subdoc_provider.start(client, false, true).await?;
582
583        // Clone for the task and final event before moving into storage
584        let subdoc_for_task = synced_subdoc.clone();
585        let subdoc_for_event = synced_subdoc.clone();
586
587        // Store the provider
588        self.subdoc_providers.borrow_mut().insert(
589            guid.clone(),
590            SubdocInfo {
591                provider: subdoc_provider,
592                doc: synced_subdoc,
593            },
594        );
595
596        // Spawn task to handle subdoc events
597        let event_tx = self.event_tx.clone();
598        let guid_for_task = guid.clone();
599        tokio::task::spawn_local(async move {
600            while let Some(event) = subdoc_event_rx.recv().await {
601                let manager_event = match event {
602                    SyncEvent::Syncing => SyncManagerEvent::SubdocSyncing {
603                        guid: guid_for_task.clone(),
604                        doc: subdoc_for_task.clone(),
605                    },
606                    SyncEvent::Synced => SyncManagerEvent::SubdocSynced {
607                        guid: guid_for_task.clone(),
608                        doc: subdoc_for_task.clone(),
609                    },
610                    SyncEvent::Updated => SyncManagerEvent::SubdocUpdated {
611                        guid: guid_for_task.clone(),
612                        doc: subdoc_for_task.clone(),
613                    },
614                    SyncEvent::Disconnected => SyncManagerEvent::SubdocRemoved {
615                        guid: guid_for_task.clone(),
616                    },
617                    SyncEvent::Error(e) => {
618                        SyncManagerEvent::Error(format!("Subdoc {} error: {}", guid_for_task, e))
619                    }
620                    _ => continue,
621                };
622                if event_tx.send(manager_event).await.is_err() {
623                    break;
624                }
625            }
626        });
627
628        let _ = self
629            .event_tx
630            .send(SyncManagerEvent::SubdocSyncing {
631                guid,
632                doc: subdoc_for_event,
633            })
634            .await;
635
636        Ok(())
637    }
638
639    /// Stop syncing a specific subdoc.
640    async fn stop_subdoc(&mut self, guid: &str) {
641        if let Some(mut info) = self.subdoc_providers.borrow_mut().remove(guid) {
642            tracing::debug!("Stopping subdoc provider: {}", guid);
643            info.provider.stop().await;
644        }
645    }
646
647    /// Stop all synchronization.
648    pub async fn stop(&mut self) {
649        // Stop all subdoc providers
650        let guids: Vec<String> = self.subdoc_providers.borrow().keys().cloned().collect();
651        for guid in guids {
652            self.stop_subdoc(&guid).await;
653        }
654
655        // Stop main provider
656        if let Some(ref mut provider) = self.provider {
657            provider.stop().await;
658        }
659        self.provider = None;
660
661        let _ = self.event_tx.send(SyncManagerEvent::Disconnected).await;
662    }
663
664    /// Get the main document's root map as JSON.
665    pub fn root_map_to_json(&self) -> serde_json::Value {
666        let map = self.doc.get_or_insert_map("");
667        let txn = self.doc.transact();
668        map_to_json(&map, &txn)
669    }
670
671    /// Get the main document's root map as a JSON string.
672    pub fn root_map_to_json_string(&self) -> String {
673        serde_json::to_string_pretty(&self.root_map_to_json()).unwrap_or_else(|_| "{}".to_string())
674    }
675}