hocuspocus_rs_ws/
doc_sync.rs1use crate::{doc_connection::DOC_NAME, store::Store, sync::awareness::Awareness, sync_kv::SyncKv};
7use anyhow::{anyhow, Context, Result};
8use std::sync::{Arc, RwLock};
9use yrs::{updates::decoder::Decode, Doc, ReadTxn, StateVector, Subscription, Transact, Update};
10use yrs_kvstore::DocOps;
11
12pub struct DocWithSyncKv {
13 awareness: Arc<RwLock<Awareness>>,
14 sync_kv: Arc<SyncKv>,
15 #[allow(unused)] subscription: Subscription,
17}
18
19impl DocWithSyncKv {
20 pub fn awareness(&self) -> Arc<RwLock<Awareness>> {
21 self.awareness.clone()
22 }
23
24 pub fn sync_kv(&self) -> Arc<SyncKv> {
25 self.sync_kv.clone()
26 }
27
28 pub async fn new<F>(key: &str, store: Arc<dyn Store>, dirty_callback: F) -> Result<Self>
29 where
30 F: Fn() + Send + Sync + 'static,
31 {
32 let sync_kv = SyncKv::new(store, key, dirty_callback)
33 .await
34 .context("Failed to create SyncKv")?;
35
36 let sync_kv = Arc::new(sync_kv);
37 let doc = Doc::new();
38
39 {
40 let mut txn = doc.transact_mut();
41 sync_kv
42 .load_doc(DOC_NAME, &mut txn)
43 .map_err(|_| anyhow!("Failed to load doc"))?;
44 }
45
46 let subscription = {
47 let sync_kv = sync_kv.clone();
48 doc.observe_update_v1(move |_, event| {
49 sync_kv.push_update(DOC_NAME, &event.update).unwrap();
50 sync_kv
51 .flush_doc_with(DOC_NAME, Default::default())
52 .unwrap();
53 })
54 .map_err(|_| anyhow!("Failed to subscribe to updates"))?
55 };
56
57 let awareness = Arc::new(RwLock::new(Awareness::new(doc)));
58 Ok(Self {
59 awareness,
60 sync_kv,
61 subscription,
62 })
63 }
64
65 pub fn as_update(&self) -> Vec<u8> {
66 let awareness_guard = self.awareness.read().unwrap();
67 let doc = &awareness_guard.doc;
68
69 let txn = doc.transact();
70
71 txn.encode_state_as_update_v1(&StateVector::default())
72 }
73
74 pub fn apply_update(&self, update: &[u8]) -> Result<()> {
75 let awareness_guard = self.awareness.write().unwrap();
76 let doc = &awareness_guard.doc;
77
78 let update: Update =
79 Update::decode_v1(update).map_err(|_| anyhow!("Failed to decode update"))?;
80
81 let mut txn = doc.transact_mut();
82 txn.apply_update(update);
83
84 Ok(())
85 }
86}