use crate::{doc_connection::DOC_NAME, store::Store, sync::awareness::Awareness, sync_kv::SyncKv};
use anyhow::{anyhow, Context, Result};
use std::sync::{Arc, RwLock};
use yrs::{updates::decoder::Decode, Doc, ReadTxn, StateVector, Subscription, Transact, Update};
use yrs_kvstore::DocOps;
pub struct DocWithSyncKv {
awareness: Arc<RwLock<Awareness>>,
sync_kv: Arc<SyncKv>,
#[allow(unused)] subscription: Subscription,
}
impl DocWithSyncKv {
pub fn awareness(&self) -> Arc<RwLock<Awareness>> {
self.awareness.clone()
}
pub fn sync_kv(&self) -> Arc<SyncKv> {
self.sync_kv.clone()
}
pub async fn new<F>(key: &str, store: Arc<dyn Store>, dirty_callback: F) -> Result<Self>
where
F: Fn() + Send + Sync + 'static,
{
let sync_kv = SyncKv::new(store, key, dirty_callback)
.await
.context("Failed to create SyncKv")?;
let sync_kv = Arc::new(sync_kv);
let doc = Doc::new();
{
let mut txn = doc.transact_mut();
sync_kv
.load_doc(DOC_NAME, &mut txn)
.map_err(|_| anyhow!("Failed to load doc"))?;
}
let subscription = {
let sync_kv = sync_kv.clone();
doc.observe_update_v1(move |_, event| {
sync_kv.push_update(DOC_NAME, &event.update).unwrap();
sync_kv
.flush_doc_with(DOC_NAME, Default::default())
.unwrap();
})
.map_err(|_| anyhow!("Failed to subscribe to updates"))?
};
let awareness = Arc::new(RwLock::new(Awareness::new(doc)));
Ok(Self {
awareness,
sync_kv,
subscription,
})
}
pub fn as_update(&self) -> Vec<u8> {
let awareness_guard = self.awareness.read().unwrap();
let doc = &awareness_guard.doc;
let txn = doc.transact();
txn.encode_state_as_update_v1(&StateVector::default())
}
pub fn apply_update(&self, update: &[u8]) -> Result<()> {
let awareness_guard = self.awareness.write().unwrap();
let doc = &awareness_guard.doc;
let update: Update =
Update::decode_v1(update).map_err(|_| anyhow!("Failed to decode update"))?;
let mut txn = doc.transact_mut();
txn.apply_update(update);
Ok(())
}
}