1use std::cell::RefCell;
30use std::collections::HashMap;
31use std::rc::Rc;
32
33use tokio::sync::mpsc;
34use yrs::{Any, Doc, Map, Options, ReadTxn, Transact};
35
36use crate::error::{Result, VoltError};
37use crate::sync_provider::{SyncEvent, SyncProvider};
38use crate::VoltClient;
39
40pub async fn ensure_sync_database(client: &VoltClient, database_alias: &str) -> Result<()> {
59 let request = serde_json::json!({
61 "resource_id": database_alias,
62 "include_attributes": false,
63 "include_protobuf": false
64 });
65
66 let needs_creation = match client.get_resource(request).await {
68 Ok(response) => {
69 tracing::debug!("Got sync database resource response: {:?}", response);
70
71 let is_not_found = response
73 .status
74 .as_ref()
75 .map(|s| s.message.to_lowercase().contains("not found"))
76 .unwrap_or(false);
77
78 if is_not_found {
79 tracing::debug!("Response status indicates resource not found");
80 true
81 } else {
82 tracing::debug!("Sync database '{}' exists", database_alias);
83 false
84 }
85 }
86 Err(e) => {
87 let err_msg = e.to_string().to_lowercase();
88 if err_msg.contains("not found") {
89 tracing::debug!("Error indicates resource not found: {}", e);
90 true
91 } else {
92 return Err(VoltError::ServerError(format!(
93 "Failed to get sync database resource: {}",
94 e
95 )));
96 }
97 }
98 };
99
100 if needs_creation {
101 tracing::info!("Sync database '{}' not found, creating...", database_alias);
102
103 let name = if database_alias.starts_with('@') {
105 &database_alias[1..]
106 } else {
107 database_alias
108 };
109
110 let resource_create_request = serde_json::json!({
112 "create": true,
113 "create_in_parent_id": "",
114 "purge_attributes": false,
115 "resource": {
116 "id": "",
117 "name": name,
118 "alias": [database_alias],
119 "description": "",
120 "share_mode": 0,
121 "volt_id": "",
122 "attribute": [],
123 "version": 0,
124 "owner": "",
125 "created": 0,
126 "modified": 0,
127 "status": 0,
128 "kind": [
129 "volt:database",
130 "volt:sqlite-database",
131 "volt:sync-database"
132 ],
133 "online_status": 0,
134 "size": 0,
135 "store": "",
136 "content_hash": "",
137 "child": [],
138 "data": {}
139 }
140 });
141
142 client
143 .save_resource(resource_create_request)
144 .await
145 .map_err(|e| {
146 VoltError::ServerError(format!("Failed to create sync database: {}", e))
147 })?;
148
149 tracing::info!("Sync database '{}' created", database_alias);
150 }
151
152 Ok(())
153}
154
155pub fn map_to_json<T: ReadTxn>(map: &yrs::MapRef, txn: &T) -> serde_json::Value {
177 let mut json_map = serde_json::Map::new();
178
179 for (key, value) in map.iter(txn) {
180 let json_value = out_to_json(value);
181 json_map.insert(key.to_string(), json_value);
182 }
183
184 serde_json::Value::Object(json_map)
185}
186
187pub fn map_to_json_string<T: ReadTxn>(map: &yrs::MapRef, txn: &T) -> String {
189 let json = map_to_json(map, txn);
190 serde_json::to_string_pretty(&json).unwrap_or_else(|_| "{}".to_string())
191}
192
193pub fn out_to_json(value: yrs::Out) -> serde_json::Value {
195 match value {
196 yrs::Out::Any(any) => any_to_json(any),
197 yrs::Out::YText(_text) => {
198 serde_json::Value::String("[YText]".to_string())
200 }
201 yrs::Out::YArray(_arr) => serde_json::Value::String("[YArray]".to_string()),
202 yrs::Out::YMap(_) => serde_json::Value::String("[YMap]".to_string()),
203 yrs::Out::YXmlElement(_) => serde_json::Value::String("[YXmlElement]".to_string()),
204 yrs::Out::YXmlText(_) => serde_json::Value::String("[YXmlText]".to_string()),
205 yrs::Out::YXmlFragment(_) => serde_json::Value::String("[YXmlFragment]".to_string()),
206 yrs::Out::YDoc(subdoc) => {
207 serde_json::json!({
208 "__subdoc__": subdoc.guid().to_string()
209 })
210 }
211 #[allow(unreachable_patterns)]
212 _ => serde_json::Value::Null,
213 }
214}
215
216pub fn any_to_json(any: Any) -> serde_json::Value {
218 match any {
219 Any::Null => serde_json::Value::Null,
220 Any::Undefined => serde_json::Value::Null,
221 Any::Bool(b) => serde_json::Value::Bool(b),
222 Any::Number(n) => serde_json::json!(n),
223 Any::BigInt(n) => serde_json::json!(n),
224 Any::String(s) => serde_json::Value::String(s.to_string()),
225 Any::Buffer(b) => serde_json::json!(b.as_ref()),
226 Any::Array(arr) => {
227 let values: Vec<serde_json::Value> =
228 arr.iter().map(|v| any_to_json(v.clone())).collect();
229 serde_json::Value::Array(values)
230 }
231 Any::Map(m) => {
232 let obj: serde_json::Map<String, serde_json::Value> = m
233 .iter()
234 .map(|(k, v)| (k.to_string(), any_to_json(v.clone())))
235 .collect();
236 serde_json::Value::Object(obj)
237 }
238 }
239}
240
241#[derive(Debug, Clone)]
247pub enum SyncManagerEvent {
248 Syncing,
250 Synced,
252 Updated,
254 SubdocSyncing { guid: String, doc: Doc },
256 SubdocSynced { guid: String, doc: Doc },
258 SubdocUpdated { guid: String, doc: Doc },
260 SubdocRemoved { guid: String },
262 Disconnected,
264 Error(String),
266}
267
268struct SubdocInfo {
270 provider: SyncProvider,
271 #[allow(dead_code)]
272 doc: Doc,
273}
274
275pub struct SyncManager {
306 doc: Doc,
308 database_id: String,
310 document_id: String,
312 provider: Option<SyncProvider>,
314 provider_event_rx: Option<mpsc::Receiver<SyncEvent>>,
316 subdoc_providers: Rc<RefCell<HashMap<String, SubdocInfo>>>,
318 event_tx: mpsc::Sender<SyncManagerEvent>,
320 auto_sync_subdocs: bool,
322}
323
324impl SyncManager {
325 pub fn new(
335 doc: Doc,
336 database_id: impl Into<String>,
337 document_id: impl Into<String>,
338 ) -> (Self, mpsc::Receiver<SyncManagerEvent>) {
339 let (event_tx, event_rx) = mpsc::channel(100);
340
341 let manager = Self {
342 doc,
343 database_id: database_id.into(),
344 document_id: document_id.into(),
345 provider: None,
346 provider_event_rx: None,
347 subdoc_providers: Rc::new(RefCell::new(HashMap::new())),
348 event_tx,
349 auto_sync_subdocs: true,
350 };
351
352 (manager, event_rx)
353 }
354
355 pub fn set_auto_sync_subdocs(&mut self, enabled: bool) {
360 self.auto_sync_subdocs = enabled;
361 }
362
363 pub fn doc(&self) -> &Doc {
365 &self.doc
366 }
367
368 pub fn doc_clone(&self) -> Doc {
370 self.doc.clone()
371 }
372
373 pub fn database_id(&self) -> &str {
375 &self.database_id
376 }
377
378 pub fn document_id(&self) -> &str {
380 &self.document_id
381 }
382
383 pub fn subdoc_guids(&self) -> Vec<String> {
385 self.subdoc_providers.borrow().keys().cloned().collect()
386 }
387
388 pub fn is_subdoc_synced(&self, guid: &str) -> bool {
390 self.subdoc_providers.borrow().contains_key(guid)
391 }
392
393 pub fn get_subdoc(&self, guid: &str) -> Option<Doc> {
397 self.subdoc_providers
398 .borrow()
399 .get(guid)
400 .map(|info| info.doc.clone())
401 }
402
403 pub async fn start(&mut self, client: &VoltClient) -> Result<()> {
411 self.start_with_options(client, false, true).await
412 }
413
414 pub async fn start_with_options(
421 &mut self,
422 client: &VoltClient,
423 read_only: bool,
424 read_only_fallback: bool,
425 ) -> Result<()> {
426 let (mut provider, event_rx) = SyncProvider::new(
428 self.doc.clone(),
429 self.database_id.clone(),
430 self.document_id.clone(),
431 );
432
433 provider
435 .start(client, read_only, read_only_fallback)
436 .await?;
437
438 self.provider = Some(provider);
439 self.provider_event_rx = Some(event_rx);
440
441 Ok(())
442 }
443
444 pub async fn run_event_loop(&mut self, client: &VoltClient) {
459 let event_rx = match self.provider_event_rx.take() {
460 Some(rx) => rx,
461 None => {
462 let _ = self
463 .event_tx
464 .send(SyncManagerEvent::Error(
465 "Event receiver not available".to_string(),
466 ))
467 .await;
468 return;
469 }
470 };
471
472 self.process_events(event_rx, client).await;
473 }
474
475 async fn process_events(
477 &mut self,
478 mut event_rx: mpsc::Receiver<SyncEvent>,
479 client: &VoltClient,
480 ) {
481 while let Some(event) = event_rx.recv().await {
482 match event {
483 SyncEvent::Syncing => {
484 let _ = self.event_tx.send(SyncManagerEvent::Syncing).await;
485 }
486 SyncEvent::Synced => {
487 let _ = self.event_tx.send(SyncManagerEvent::Synced).await;
488
489 if self.auto_sync_subdocs {
491 if let Err(e) = self.sync_all_subdocs(client).await {
492 let _ = self
493 .event_tx
494 .send(SyncManagerEvent::Error(format!(
495 "Failed to sync subdocs: {}",
496 e
497 )))
498 .await;
499 }
500 }
501 }
502 SyncEvent::Updated => {
503 let _ = self.event_tx.send(SyncManagerEvent::Updated).await;
504 }
505 SyncEvent::Disconnected => {
506 let _ = self.event_tx.send(SyncManagerEvent::Disconnected).await;
507 break;
508 }
509 SyncEvent::Error(err) => {
510 let _ = self.event_tx.send(SyncManagerEvent::Error(err)).await;
511 }
512 SyncEvent::SubdocLoaded { guid, doc: _ } => {
513 if self.auto_sync_subdocs {
515 if let Err(e) = self.sync_subdoc(client, guid.to_string()).await {
516 let _ = self
517 .event_tx
518 .send(SyncManagerEvent::Error(format!(
519 "Failed to sync subdoc {}: {}",
520 guid, e
521 )))
522 .await;
523 }
524 }
525 }
526 SyncEvent::SubdocRemoved { guid } => {
527 let guid_str = guid.to_string();
528 self.stop_subdoc(&guid_str).await;
529 let _ = self
530 .event_tx
531 .send(SyncManagerEvent::SubdocRemoved { guid: guid_str })
532 .await;
533 }
534 }
535 }
536 }
537
538 async fn sync_all_subdocs(&mut self, client: &VoltClient) -> Result<()> {
540 let subdoc_guids: Vec<_> = {
541 let txn = self.doc.transact();
542 txn.subdoc_guids().collect()
543 };
544
545 tracing::debug!("Found {} subdocs to sync", subdoc_guids.len());
546
547 for guid_arc in subdoc_guids {
548 let guid = guid_arc.to_string();
549 self.sync_subdoc(client, guid).await?;
550 }
551
552 Ok(())
553 }
554
555 async fn sync_subdoc(&mut self, client: &VoltClient, guid: String) -> Result<()> {
557 if self.subdoc_providers.borrow().contains_key(&guid) {
559 tracing::debug!("Subdoc {} already syncing", guid);
560 return Ok(());
561 }
562
563 tracing::info!("🔗 Setting up sync for subdoc (GUID: {})", guid);
564
565 let mut subdoc_opts = Options::default();
567 subdoc_opts.guid = yrs::uuid_v4(); subdoc_opts.guid = std::sync::Arc::from(guid.as_str());
570
571 let synced_subdoc = Doc::with_options(subdoc_opts);
572
573 let (mut subdoc_provider, mut subdoc_event_rx) = SyncProvider::new(
575 synced_subdoc.clone(),
576 self.database_id.clone(),
577 guid.clone(),
578 );
579
580 subdoc_provider.start(client, false, true).await?;
582
583 let subdoc_for_task = synced_subdoc.clone();
585 let subdoc_for_event = synced_subdoc.clone();
586
587 self.subdoc_providers.borrow_mut().insert(
589 guid.clone(),
590 SubdocInfo {
591 provider: subdoc_provider,
592 doc: synced_subdoc,
593 },
594 );
595
596 let event_tx = self.event_tx.clone();
598 let guid_for_task = guid.clone();
599 tokio::task::spawn_local(async move {
600 while let Some(event) = subdoc_event_rx.recv().await {
601 let manager_event = match event {
602 SyncEvent::Syncing => SyncManagerEvent::SubdocSyncing {
603 guid: guid_for_task.clone(),
604 doc: subdoc_for_task.clone(),
605 },
606 SyncEvent::Synced => SyncManagerEvent::SubdocSynced {
607 guid: guid_for_task.clone(),
608 doc: subdoc_for_task.clone(),
609 },
610 SyncEvent::Updated => SyncManagerEvent::SubdocUpdated {
611 guid: guid_for_task.clone(),
612 doc: subdoc_for_task.clone(),
613 },
614 SyncEvent::Disconnected => SyncManagerEvent::SubdocRemoved {
615 guid: guid_for_task.clone(),
616 },
617 SyncEvent::Error(e) => {
618 SyncManagerEvent::Error(format!("Subdoc {} error: {}", guid_for_task, e))
619 }
620 _ => continue,
621 };
622 if event_tx.send(manager_event).await.is_err() {
623 break;
624 }
625 }
626 });
627
628 let _ = self
629 .event_tx
630 .send(SyncManagerEvent::SubdocSyncing {
631 guid,
632 doc: subdoc_for_event,
633 })
634 .await;
635
636 Ok(())
637 }
638
639 async fn stop_subdoc(&mut self, guid: &str) {
641 if let Some(mut info) = self.subdoc_providers.borrow_mut().remove(guid) {
642 tracing::debug!("Stopping subdoc provider: {}", guid);
643 info.provider.stop().await;
644 }
645 }
646
647 pub async fn stop(&mut self) {
649 let guids: Vec<String> = self.subdoc_providers.borrow().keys().cloned().collect();
651 for guid in guids {
652 self.stop_subdoc(&guid).await;
653 }
654
655 if let Some(ref mut provider) = self.provider {
657 provider.stop().await;
658 }
659 self.provider = None;
660
661 let _ = self.event_tx.send(SyncManagerEvent::Disconnected).await;
662 }
663
664 pub fn root_map_to_json(&self) -> serde_json::Value {
666 let map = self.doc.get_or_insert_map("");
667 let txn = self.doc.transact();
668 map_to_json(&map, &txn)
669 }
670
671 pub fn root_map_to_json_string(&self) -> String {
673 serde_json::to_string_pretty(&self.root_map_to_json()).unwrap_or_else(|_| "{}".to_string())
674 }
675}