hocuspocus_rs_ws/
doc_sync.rs

1// Portions of this module are adapted from the Hocuspocus JavaScript server
2// (https://github.com/ueberdosis/hocuspocus) and y-sweet
3// (https://github.com/y-sweet/y-sweet), both distributed under the MIT license.
4// Adapted code retains the original license terms.
5
6use crate::{doc_connection::DOC_NAME, store::Store, sync::awareness::Awareness, sync_kv::SyncKv};
7use anyhow::{anyhow, Context, Result};
8use std::sync::{Arc, RwLock};
9use yrs::{updates::decoder::Decode, Doc, ReadTxn, StateVector, Subscription, Transact, Update};
10use yrs_kvstore::DocOps;
11
12pub struct DocWithSyncKv {
13    awareness: Arc<RwLock<Awareness>>,
14    sync_kv: Arc<SyncKv>,
15    #[allow(unused)] // acts as RAII guard
16    subscription: Subscription,
17}
18
19impl DocWithSyncKv {
20    pub fn awareness(&self) -> Arc<RwLock<Awareness>> {
21        self.awareness.clone()
22    }
23
24    pub fn sync_kv(&self) -> Arc<SyncKv> {
25        self.sync_kv.clone()
26    }
27
28    pub async fn new<F>(key: &str, store: Arc<dyn Store>, dirty_callback: F) -> Result<Self>
29    where
30        F: Fn() + Send + Sync + 'static,
31    {
32        let sync_kv = SyncKv::new(store, key, dirty_callback)
33            .await
34            .context("Failed to create SyncKv")?;
35
36        let sync_kv = Arc::new(sync_kv);
37        let doc = Doc::new();
38
39        {
40            let mut txn = doc.transact_mut();
41            sync_kv
42                .load_doc(DOC_NAME, &mut txn)
43                .map_err(|_| anyhow!("Failed to load doc"))?;
44        }
45
46        let subscription = {
47            let sync_kv = sync_kv.clone();
48            doc.observe_update_v1(move |_, event| {
49                sync_kv.push_update(DOC_NAME, &event.update).unwrap();
50                sync_kv
51                    .flush_doc_with(DOC_NAME, Default::default())
52                    .unwrap();
53            })
54            .map_err(|_| anyhow!("Failed to subscribe to updates"))?
55        };
56
57        let awareness = Arc::new(RwLock::new(Awareness::new(doc)));
58        Ok(Self {
59            awareness,
60            sync_kv,
61            subscription,
62        })
63    }
64
65    pub fn as_update(&self) -> Vec<u8> {
66        let awareness_guard = self.awareness.read().unwrap();
67        let doc = &awareness_guard.doc;
68
69        let txn = doc.transact();
70
71        txn.encode_state_as_update_v1(&StateVector::default())
72    }
73
74    pub fn apply_update(&self, update: &[u8]) -> Result<()> {
75        let awareness_guard = self.awareness.write().unwrap();
76        let doc = &awareness_guard.doc;
77
78        let update: Update =
79            Update::decode_v1(update).map_err(|_| anyhow!("Failed to decode update"))?;
80
81        let mut txn = doc.transact_mut();
82        txn.apply_update(update);
83
84        Ok(())
85    }
86}