use std::{
any::Any,
collections::BTreeMap,
fmt::Debug,
sync::{Arc, Mutex},
};
use yrs::{updates::decoder::Decode, GetString, Observable, ReadTxn, StateVector, Text, Transact};
use yrs::{Update, WriteTxn};
use crate::{
error::{MutationError, StateError},
property::{
backend::{Operation, PropertyBackend},
PropertyName, Value,
},
};
#[derive(Debug)]
pub struct YrsBackend {
pub(crate) doc: yrs::Doc,
previous_state: Mutex<StateVector>,
field_broadcasts: Mutex<BTreeMap<PropertyName, ankurah_signals::broadcast::Broadcast>>,
}
impl Default for YrsBackend {
fn default() -> Self { Self::new() }
}
impl YrsBackend {
pub fn new() -> Self {
let doc = yrs::Doc::new();
let starting_state = doc.transact().state_vector();
Self { doc, previous_state: Mutex::new(starting_state), field_broadcasts: Mutex::new(BTreeMap::new()) }
}
pub fn get_string(&self, property_name: impl AsRef<str>) -> Option<String> {
let txn = self.doc.transact();
let text = txn.get_text(property_name.as_ref()); text.map(|t| t.get_string(&txn))
}
pub fn insert(&self, property_name: impl AsRef<str>, index: u32, value: &str) -> Result<(), MutationError> {
let text = self.doc.get_or_insert_text(property_name.as_ref()); let mut ytx = self.doc.transact_mut();
text.insert(&mut ytx, index, value);
Ok(())
}
pub fn delete(&self, property_name: impl AsRef<str>, index: u32, length: u32) -> Result<(), MutationError> {
let text = self.doc.get_or_insert_text(property_name.as_ref()); let mut ytx = self.doc.transact_mut();
text.remove_range(&mut ytx, index, length);
Ok(())
}
fn apply_update(&self, update: &[u8], changed_fields: &Arc<Mutex<std::collections::HashSet<String>>>) -> Result<(), MutationError> {
let mut txn = self.doc.transact_mut();
let _subs: Vec<yrs::Subscription> = self
.field_broadcasts
.lock()
.unwrap()
.keys()
.map(|b| {
let changed_fields = changed_fields.clone();
let b = b.to_string();
txn.get_or_insert_text(b.as_str()).observe(move |_, _| {
let mut changed_fields = changed_fields.lock().unwrap();
changed_fields.insert(b.clone());
})
})
.collect();
let update = Update::decode_v2(update).map_err(|e| StateError::SerializationError(Box::new(e)))?;
txn.apply_update(update).map_err(|e| MutationError::UpdateFailed(Box::new(e)))?;
txn.commit();
Ok(())
}
fn get_property_string(&self, trx: &yrs::Transaction, property_name: &PropertyName) -> Option<Value> {
let value = match trx.get_text(property_name.clone()) {
Some(text_ref) => {
let text = text_ref.get_string(trx);
Some(text)
}
None => None,
};
value.map(Value::String)
}
}
impl PropertyBackend for YrsBackend {
fn as_arc_dyn_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> { self as Arc<dyn Any + Send + Sync + 'static> }
fn as_debug(&self) -> &dyn Debug { self as &dyn Debug }
fn fork(&self) -> Arc<dyn PropertyBackend> {
let state_buffer = self.to_state_buffer().unwrap();
let backend = Self::from_state_buffer(&state_buffer).unwrap();
Arc::new(backend)
}
fn properties(&self) -> Vec<String> {
let trx = Transact::transact(&self.doc);
let root_refs = trx.root_refs();
root_refs.map(|(name, _)| name.to_owned()).collect()
}
fn property_value(&self, property_name: &PropertyName) -> Option<Value> {
let trx = Transact::transact(&self.doc);
self.get_property_string(&trx, property_name)
}
fn property_values(&self) -> BTreeMap<PropertyName, Option<Value>> {
let properties = self.properties();
let mut values = BTreeMap::new();
let trx = Transact::transact(&self.doc);
for property_name in properties {
let value = self.get_property_string(&trx, &property_name);
values.insert(property_name, value);
}
values
}
fn property_backend_name() -> String { "yrs".to_owned() }
fn to_state_buffer(&self) -> Result<Vec<u8>, StateError> {
let txn = self.doc.transact();
let state_buffer = txn.encode_state_as_update_v2(&yrs::StateVector::default());
Ok(state_buffer)
}
fn from_state_buffer(state_buffer: &Vec<u8>) -> std::result::Result<Self, crate::error::RetrievalError> {
let doc = yrs::Doc::new();
let mut txn = doc.transact_mut();
let update = yrs::Update::decode_v2(state_buffer).map_err(|e| crate::error::RetrievalError::FailedUpdate(Box::new(e)))?;
txn.apply_update(update).map_err(|e| crate::error::RetrievalError::FailedUpdate(Box::new(e)))?;
txn.commit(); drop(txn);
let starting_state = doc.transact().state_vector();
Ok(Self { doc, previous_state: Mutex::new(starting_state), field_broadcasts: Mutex::new(BTreeMap::new()) })
}
fn to_operations(&self) -> Result<Option<Vec<Operation>>, MutationError> {
let mut previous_state = self.previous_state.lock().unwrap();
let txn = self.doc.transact_mut();
let diff = txn.encode_diff_v2(&previous_state);
*previous_state = txn.state_vector();
if diff == Update::EMPTY_V2 {
Ok(None)
} else {
Ok(Some(vec![Operation { diff }]))
}
}
fn apply_operations(&self, operations: &Vec<Operation>) -> Result<(), MutationError> {
let changed_fields = Arc::new(Mutex::new(std::collections::HashSet::new()));
for operation in operations {
self.apply_update(&operation.diff, &changed_fields)?;
}
let field_broadcasts = self.field_broadcasts.lock().expect("field_broadcasts lock is poisoned");
for field_name in changed_fields.lock().unwrap().iter() {
if let Some(broadcast) = field_broadcasts.get(field_name) {
broadcast.send(());
}
}
Ok(())
}
fn listen_field(
&self,
field_name: &PropertyName,
listener: ankurah_signals::signal::Listener,
) -> ankurah_signals::signal::ListenerGuard {
let mut field_broadcasts = self.field_broadcasts.lock().expect("other thread panicked, panic here too");
let broadcast = field_broadcasts.entry(field_name.clone()).or_default();
broadcast.reference().listen(listener).into()
}
}
impl YrsBackend {
pub fn field_broadcast_id(&self, field_name: &PropertyName) -> ankurah_signals::broadcast::BroadcastId {
let mut field_broadcasts = self.field_broadcasts.lock().expect("other thread panicked, panic here too");
let broadcast = field_broadcasts.entry(field_name.clone()).or_default();
broadcast.id()
}
}