use crate::block::{BlockCell, ClientID, ItemContent, ItemPtr};
use crate::block_store::BlockStore;
use crate::branch::{Branch, BranchPtr};
use crate::doc::{DocAddr, Options};
use crate::error::Error;
use crate::event::SubdocsEvent;
use crate::id_set::DeleteSet;
use crate::slice::ItemSlice;
use crate::types::{Path, PathSegment, TypeRef};
use crate::update::PendingUpdate;
use crate::updates::encoder::{Encode, Encoder};
use crate::StateVector;
use crate::{
Doc, Observer, OffsetKind, Snapshot, TransactionCleanupEvent, TransactionMut, UpdateEvent,
Uuid, ID,
};
use arc_swap::{ArcSwap, DefaultStrategy, Guard};
use async_lock::futures::{Read, Write};
use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::borrow::Borrow;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::sync::Arc;
pub struct Store {
pub(crate) client_id: ClientID,
pub(crate) offset_kind: OffsetKind,
pub(crate) skip_gc: bool,
pub(crate) types: HashMap<Arc<str>, Box<Branch>>,
pub(crate) blocks: BlockStore,
pub(crate) pending: Option<PendingUpdate>,
pub(crate) pending_ds: Option<DeleteSet>,
pub(crate) subdocs: HashMap<DocAddr, Doc>,
pub(crate) events: Option<Box<StoreEvents>>,
pub(crate) parent: Option<ItemPtr>,
pub(crate) linked_by: HashMap<ItemPtr, HashSet<BranchPtr>>,
}
impl Store {
pub(crate) fn new(options: &Options) -> Self {
Store {
client_id: options.client_id,
offset_kind: options.offset_kind,
skip_gc: options.skip_gc,
types: HashMap::default(),
blocks: BlockStore::default(),
subdocs: HashMap::default(),
linked_by: HashMap::default(),
events: None,
pending: None,
pending_ds: None,
parent: None,
}
}
pub fn pending_update(&self) -> Option<&PendingUpdate> {
self.pending.as_ref()
}
pub fn pending_update_mut(&mut self) -> Option<&mut PendingUpdate> {
self.pending.as_mut()
}
pub fn pending_ds(&self) -> Option<&DeleteSet> {
self.pending_ds.as_ref()
}
pub fn pending_ds_mut(&mut self) -> Option<&mut DeleteSet> {
self.pending_ds.as_mut()
}
pub fn is_subdoc(&self) -> bool {
self.parent.is_some()
}
pub fn get_local_state(&self) -> u32 {
self.blocks.get_clock(&self.client_id)
}
pub(crate) fn get_type<K: Borrow<str>>(&self, key: K) -> Option<BranchPtr> {
let ptr = BranchPtr::from(self.types.get(key.borrow())?);
Some(ptr)
}
pub(crate) fn get_or_create_type<K: Into<Arc<str>>>(
&mut self,
key: K,
type_ref: TypeRef,
) -> BranchPtr {
let key = key.into();
match self.types.entry(key.clone()) {
Entry::Occupied(e) => {
let mut branch = BranchPtr::from(e.get());
branch.repair_type_ref(type_ref);
branch
}
Entry::Vacant(e) => {
let mut branch = Branch::new(type_ref);
let mut branch_ref = BranchPtr::from(&mut branch);
branch_ref.name = Some(key);
e.insert(branch);
branch_ref
}
}
}
pub fn encode_state_from_snapshot<E: Encoder>(
&self,
snapshot: &Snapshot,
encoder: &mut E,
) -> Result<(), Error> {
if !self.skip_gc {
return Err(Error::Gc);
}
self.write_blocks_to(&snapshot.state_map, encoder);
snapshot.delete_set.encode(encoder);
Ok(())
}
pub(crate) fn write_blocks_to<E: Encoder>(&self, sv: &StateVector, encoder: &mut E) {
let local_sv = self.blocks.get_state_vector();
let mut diff = Vec::with_capacity(sv.len());
for (&client_id, &clock) in sv.iter() {
if local_sv.contains_client(&client_id) {
diff.push((client_id, clock.min(local_sv.get(&client_id))));
}
}
diff.sort_by(|a, b| b.0.cmp(&a.0));
encoder.write_var(diff.len());
for (client, clock) in diff {
let blocks = self.blocks.get_client(&client).unwrap();
let clock = clock.min(blocks.clock() + 1);
let last_idx = blocks.find_pivot(clock - 1).unwrap();
encoder.write_var(last_idx + 1);
encoder.write_client(client);
encoder.write_var(0);
for i in 0..last_idx {
let block = blocks[i].as_slice();
block.encode(encoder);
}
let last_block = &blocks[last_idx];
let mut slice = last_block.as_slice();
slice.trim_end(slice.clock_end() - (clock - 1));
slice.encode(encoder);
}
}
pub fn encode_diff<E: Encoder>(&self, sv: &StateVector, encoder: &mut E) {
self.write_blocks_from(sv, encoder);
let delete_set = DeleteSet::from(&self.blocks);
delete_set.encode(encoder);
}
pub(crate) fn write_blocks_from<E: Encoder>(&self, sv: &StateVector, encoder: &mut E) {
let local_sv = self.blocks.get_state_vector();
let mut diff = Self::diff_state_vectors(&local_sv, sv);
diff.sort_by(|a, b| b.0.cmp(&a.0));
encoder.write_var(diff.len());
for (client, clock) in diff {
let blocks = self.blocks.get_client(&client).unwrap();
let clock = clock.max(blocks.get(0).map(|i| i.clock_start()).unwrap_or_default()); let start = blocks.find_pivot(clock).unwrap();
encoder.write_var(blocks.len() - start);
encoder.write_client(client);
encoder.write_var(clock);
let first_block = blocks.get(start).unwrap();
let offset = clock - first_block.clock_start();
let mut slice = first_block.as_slice();
slice.trim_start(offset);
slice.encode(encoder);
for i in (start + 1)..blocks.len() {
let block = &blocks[i];
block.as_slice().encode(encoder);
}
}
}
fn diff_state_vectors(local_sv: &StateVector, remote_sv: &StateVector) -> Vec<(ClientID, u32)> {
let mut diff = Vec::new();
for (client, &remote_clock) in remote_sv.iter() {
let local_clock = local_sv.get(client);
if local_clock > remote_clock {
diff.push((*client, remote_clock));
}
}
for (client, _) in local_sv.iter() {
if remote_sv.get(client) == 0 {
diff.push((*client, 0));
}
}
diff
}
pub fn get_type_from_path(&self, path: &Path) -> Option<BranchPtr> {
let mut i = path.iter();
if let Some(PathSegment::Key(root_name)) = i.next() {
let mut current = self.get_type(root_name.clone())?;
while let Some(segment) = i.next() {
match segment {
PathSegment::Key(key) => {
let child = current.map.get(key)?;
if let ItemContent::Type(child_branch) = &child.content {
current = BranchPtr::from(child_branch.as_ref());
} else {
return None;
}
}
PathSegment::Index(index) => {
if let Some((ItemContent::Type(child_branch), _)) = current.get_at(*index) {
current = child_branch.into();
} else {
return None;
}
}
}
}
Some(current)
} else {
None
}
}
pub(crate) fn materialize(&mut self, mut slice: ItemSlice) -> ItemPtr {
let id = slice.id().clone();
let blocks = self.blocks.get_client_mut(&id.client).unwrap();
let mut links = None;
let item = slice.ptr.deref();
if item.info.is_linked() {
links = self.linked_by.get(&slice.ptr).cloned();
}
let mut index = None;
let mut ptr = if slice.adjacent_left() {
slice.ptr
} else {
let mut i = blocks.find_pivot(id.clock).unwrap();
if let Some(new) = slice.ptr.splice(slice.start, OffsetKind::Utf16) {
if let Some(source) = links.clone() {
let dest = self.linked_by.entry(ItemPtr::from(&new)).or_default();
dest.extend(source);
}
blocks.insert(i + 1, BlockCell::Block(new));
i += 1;
index = Some(i);
}
let ptr = blocks[i].as_item().unwrap();
slice = ItemSlice::new(ptr, 0, slice.end - slice.start);
ptr
};
if !slice.adjacent_right() {
let i = if let Some(i) = index {
i
} else {
let last_id = slice.last_id();
blocks.find_pivot(last_id.clock).unwrap()
};
let new = ptr.splice(slice.len(), OffsetKind::Utf16).unwrap();
if let Some(source) = links {
let dest = self.linked_by.entry(ItemPtr::from(&new)).or_default();
dest.extend(source);
}
blocks.insert(i + 1, BlockCell::Block(new));
}
ptr
}
pub fn subdocs(&self) -> SubdocsIter {
SubdocsIter(self.subdocs.values())
}
pub fn subdoc_guids(&self) -> SubdocGuids {
SubdocGuids(self.subdocs.values())
}
pub(crate) fn follow_redone(&self, id: &ID) -> Option<ItemSlice> {
let mut next_id = Some(*id);
let mut slice = None;
while let Some(next) = next_id.as_mut() {
slice = self.blocks.get_item_clean_start(next);
if let Some(slice) = &slice {
next_id = slice.ptr.redone;
} else {
break;
}
}
slice
}
}
impl Encode for Store {
fn encode<E: Encoder>(&self, encoder: &mut E) {
self.encode_diff(&StateVector::default(), encoder)
}
}
impl std::fmt::Debug for Store {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(self, f)
}
}
impl std::fmt::Display for Store {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct(&self.client_id.to_string());
if !self.types.is_empty() {
s.field("root types", &self.types);
}
if !self.blocks.is_empty() {
s.field("blocks", &self.blocks);
}
if let Some(pending) = self.pending.as_ref() {
s.field("pending", pending);
}
if let Some(pending_ds) = self.pending_ds.as_ref() {
s.field("pending delete set", pending_ds);
}
if let Some(parent) = self.parent.as_ref() {
s.field("parent block", parent.id());
}
if !self.linked_by.is_empty() {
s.field("links", &self.linked_by);
}
s.finish()
}
}
#[repr(transparent)]
#[derive(Debug, Clone)]
pub(crate) struct DocStore(pub(crate) Arc<StoreInner>);
impl DocStore {
pub fn new(options: Options, parent: Option<ItemPtr>) -> Self {
let mut store = Store::new(&options);
let options = ArcSwap::new(options.into());
store.parent = parent;
DocStore(Arc::new(StoreInner {
options,
store: RwLock::new(store),
}))
}
pub(crate) fn try_read(&self) -> Option<RwLockReadGuard<Store>> {
self.0.store.try_read()
}
#[cfg(not(target_family = "wasm"))]
pub(crate) fn read_blocking(&self) -> RwLockReadGuard<Store> {
self.0.store.read_blocking()
}
#[cfg(target_family = "wasm")]
pub(crate) fn read_blocking(&self) -> RwLockReadGuard<Store> {
self.0.store.try_read().unwrap()
}
pub(crate) fn read_async(&self) -> Read<Store> {
self.0.store.read()
}
pub(crate) fn try_write(&self) -> Option<RwLockWriteGuard<Store>> {
self.0.store.try_write()
}
#[cfg(not(target_family = "wasm"))]
pub(crate) fn write_blocking(&self) -> RwLockWriteGuard<Store> {
self.0.store.write_blocking()
}
#[cfg(target_family = "wasm")]
pub(crate) fn write_blocking(&self) -> RwLockWriteGuard<Store> {
self.0.store.try_write().unwrap()
}
pub(crate) fn write_async(&self) -> Write<Store> {
self.0.store.write()
}
pub(crate) fn options(&self) -> Guard<Arc<Options>, DefaultStrategy> {
self.0.options.load()
}
pub(crate) fn set_should_load(&self, should_load: bool) -> bool {
self.0
.options
.rcu(|options| {
let mut options = options.deref().clone();
options.should_load = should_load;
options
})
.should_load
}
pub(crate) fn set_subdoc_data(&self, client_id: ClientID, collection_id: Option<Arc<str>>) {
self.0.options.rcu(|options| {
let mut options = options.deref().clone();
options.client_id = client_id;
if options.collection_id.is_none() {
options.collection_id = collection_id.clone();
}
options
});
}
}
#[derive(Debug)]
pub(crate) struct StoreInner {
options: ArcSwap<Options>,
store: RwLock<Store>,
}
#[repr(transparent)]
pub struct SubdocsIter<'doc>(std::collections::hash_map::Values<'doc, DocAddr, Doc>);
impl<'doc> Iterator for SubdocsIter<'doc> {
type Item = &'doc Doc;
fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
#[repr(transparent)]
pub struct SubdocGuids<'doc>(std::collections::hash_map::Values<'doc, DocAddr, Doc>);
impl<'doc> Iterator for SubdocGuids<'doc> {
type Item = Uuid;
fn next(&mut self) -> Option<Self::Item> {
let d = self.0.next()?;
Some(d.guid())
}
}
#[cfg(feature = "sync")]
pub type TransactionCleanupFn =
Box<dyn Fn(&TransactionMut, &TransactionCleanupEvent) + Send + Sync + 'static>;
#[cfg(feature = "sync")]
pub type AfterTransactionFn = Box<dyn Fn(&mut TransactionMut) + Send + Sync + 'static>;
#[cfg(feature = "sync")]
pub type UpdateFn = Box<dyn Fn(&TransactionMut, &UpdateEvent) + Send + Sync + 'static>;
#[cfg(feature = "sync")]
pub type SubdocsFn = Box<dyn Fn(&TransactionMut, &SubdocsEvent) + Send + Sync + 'static>;
#[cfg(feature = "sync")]
pub type DestroyFn = Box<dyn Fn(&TransactionMut, &Doc) + Send + Sync + 'static>;
#[cfg(not(feature = "sync"))]
pub type TransactionCleanupFn = Box<dyn Fn(&TransactionMut, &TransactionCleanupEvent) + 'static>;
#[cfg(not(feature = "sync"))]
pub type AfterTransactionFn = Box<dyn Fn(&mut TransactionMut) + 'static>;
#[cfg(not(feature = "sync"))]
pub type UpdateFn = Box<dyn Fn(&TransactionMut, &UpdateEvent) + 'static>;
#[cfg(not(feature = "sync"))]
pub type SubdocsFn = Box<dyn Fn(&TransactionMut, &SubdocsEvent) + 'static>;
#[cfg(not(feature = "sync"))]
pub type DestroyFn = Box<dyn Fn(&TransactionMut, &Doc) + 'static>;
#[derive(Default)]
pub struct StoreEvents {
pub transaction_cleanup_events: Observer<TransactionCleanupFn>,
pub after_transaction_events: Observer<AfterTransactionFn>,
pub update_v1_events: Observer<UpdateFn>,
pub update_v2_events: Observer<UpdateFn>,
pub subdocs_events: Observer<SubdocsFn>,
pub destroy_events: Observer<DestroyFn>,
}
impl StoreEvents {
pub fn emit_update_v1(&self, txn: &TransactionMut) {
if self.update_v1_events.has_subscribers() {
if !txn.delete_set.is_empty() || txn.after_state != txn.before_state {
let update = UpdateEvent::new_v1(txn);
self.update_v1_events
.trigger(|callback| callback(txn, &update));
}
}
}
pub fn emit_update_v2(&self, txn: &TransactionMut) {
if self.update_v2_events.has_subscribers() {
if !txn.delete_set.is_empty() || txn.after_state != txn.before_state {
let update = UpdateEvent::new_v2(txn);
self.update_v2_events.trigger(|fun| fun(txn, &update));
}
}
}
pub fn emit_after_transaction(&self, txn: &mut TransactionMut) {
self.after_transaction_events.trigger(|fun| fun(txn));
}
pub fn emit_transaction_cleanup(&self, txn: &TransactionMut) {
if self.transaction_cleanup_events.has_subscribers() {
let event = TransactionCleanupEvent::new(txn);
self.transaction_cleanup_events
.trigger(|fun| fun(txn, &event));
}
}
}