use crate::{
arena::SharedArena, configure::Configure, container::idx::ContainerIdx,
state::container_store::FRONTIERS_KEY, utils::kv_wrapper::KvWrapper, version::Frontiers,
};
use bytes::Bytes;
use loro_common::ContainerID;
use rustc_hash::FxHashMap;
use std::ops::Bound;
use super::ContainerWrapper;
pub(crate) struct InnerStore {
arena: SharedArena,
store: FxHashMap<ContainerIdx, ContainerWrapper>,
kv: KvWrapper,
all_loaded: bool,
config: Configure,
}
impl std::fmt::Debug for InnerStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InnerStore").finish()
}
}
impl InnerStore {
pub(super) fn get_or_insert_with(
&mut self,
idx: ContainerIdx,
f: impl FnOnce() -> ContainerWrapper,
) -> &mut ContainerWrapper {
match self.store.entry(idx) {
std::collections::hash_map::Entry::Vacant(e) => {
let id = self.arena.get_container_id(idx).unwrap();
let key = id.to_bytes();
if !self.all_loaded {
if let Some(v) = self.kv.get(&key) {
let c = ContainerWrapper::new_from_bytes(v);
return e.insert(c);
}
}
let c = f();
e.insert(c)
}
std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
}
}
pub(super) fn ensure_container(
&mut self,
idx: ContainerIdx,
f: impl FnOnce() -> ContainerWrapper,
) {
if self.store.contains_key(&idx) {
return;
}
if !self.all_loaded {
let id = self.arena.get_container_id(idx).unwrap();
let key = id.to_bytes();
if let Some(v) = self.kv.get(&key) {
let c = ContainerWrapper::new_from_bytes(v);
self.store.insert(idx, c);
return;
}
}
let c = f();
self.store.insert(idx, c);
}
pub(crate) fn get_mut(&mut self, idx: ContainerIdx) -> Option<&mut ContainerWrapper> {
if let std::collections::hash_map::Entry::Vacant(e) = self.store.entry(idx) {
if !self.all_loaded {
let id = self.arena.get_container_id(idx).unwrap();
let key = id.to_bytes();
if let Some(v) = self.kv.get(&key) {
let c = ContainerWrapper::new_from_bytes(v);
e.insert(c);
}
}
}
self.store.get_mut(&idx)
}
pub(crate) fn contains_id(&mut self, id: &ContainerID) -> bool {
if let Some(idx) = self.arena.id_to_idx(id) {
if self.store.contains_key(&idx) {
return true;
}
}
if !self.all_loaded {
let key = id.to_bytes();
if let Some(v) = self.kv.get(&key) {
let idx = self.arena.register_container(id);
let c = ContainerWrapper::new_from_bytes(v);
self.store.insert(idx, c);
return true;
}
}
false
}
pub(crate) fn iter_all_containers_mut(
&mut self,
) -> impl Iterator<Item = (&ContainerIdx, &mut ContainerWrapper)> {
self.load_all();
self.store.iter_mut()
}
pub(crate) fn iter_all_container_ids(&mut self) -> impl Iterator<Item = ContainerID> + '_ {
self.load_all();
self.store
.keys()
.map(|idx| self.arena.get_container_id(*idx).unwrap())
}
pub(crate) fn encode(&mut self) -> Bytes {
self.flush();
self.kv.export()
}
pub(crate) fn flush(&mut self) {
let deleted = self.config.deleted_root_containers.lock();
self.kv
.set_all(self.store.iter_mut().filter_map(|(idx, c)| {
if c.is_flushed() {
return None;
}
let cid = self.arena.get_container_id(*idx).unwrap();
if c.is_state_empty() && cid.is_root() && deleted.contains(&cid) {
return None;
}
let cid: Bytes = cid.to_bytes().into();
let value = c.encode();
c.set_flushed(true);
Some((cid, value))
}));
}
pub(crate) fn get_kv_clone(&self) -> KvWrapper {
self.kv.clone()
}
pub(crate) fn decode(
&mut self,
bytes: bytes::Bytes,
) -> Result<Option<Frontiers>, loro_common::LoroError> {
assert!(self.kv.is_empty());
let mut fr = None;
self.kv
.import(bytes)
.map_err(|e| loro_common::LoroError::DecodeError(e.into_boxed_str()))?;
if let Some(f) = self.kv.remove(FRONTIERS_KEY) {
fr = Some(Frontiers::decode(&f)?);
}
let kv = self.kv.arc_clone();
self.arena
.set_parent_resolver(Some(move |child_id: ContainerID| {
let k = child_id.to_bytes();
let v = kv.get(&k)?;
let c = ContainerWrapper::new_from_bytes(v);
c.parent().cloned()
}));
self.store.clear();
self.all_loaded = false;
Ok(fr)
}
pub(crate) fn decode_twice(
&mut self,
bytes_a: bytes::Bytes,
bytes_b: bytes::Bytes,
) -> Result<(), loro_common::LoroError> {
assert!(self.kv.is_empty());
self.kv
.import(bytes_a)
.map_err(|e| loro_common::LoroError::DecodeError(e.into_boxed_str()))?;
self.kv
.import(bytes_b)
.map_err(|e| loro_common::LoroError::DecodeError(e.into_boxed_str()))?;
self.kv.remove(FRONTIERS_KEY);
self.kv.with_kv(|kv| {
self.arena.with_guards(|guards| {
let iter = kv.scan(Bound::Unbounded, Bound::Unbounded);
for (k, v) in iter {
let cid = ContainerID::from_bytes(&k);
let c = ContainerWrapper::new_from_bytes(v);
let parent = c.parent();
let idx = guards.register_container(&cid);
let p = parent.as_ref().map(|p| guards.register_container(p));
guards.set_parent(idx, p);
if self.store.insert(idx, c).is_some() {}
}
});
});
self.all_loaded = true;
Ok(())
}
pub fn load_all(&mut self) {
if self.all_loaded {
return;
}
self.kv.with_kv(|kv| {
let iter = kv.scan(Bound::Unbounded, Bound::Unbounded);
self.arena.with_guards(|guards| {
for (k, v) in iter {
let cid = ContainerID::from_bytes(&k);
let idx = guards.register_container(&cid);
if self.store.contains_key(&idx) {
continue;
}
let container = ContainerWrapper::new_from_bytes(v);
self.store.insert(idx, container);
}
});
});
self.all_loaded = true;
}
pub(crate) fn can_import_snapshot(&self) -> bool {
if !self.kv.is_empty() {
return false;
}
self.store.iter().all(|(_, c)| c.is_state_empty())
}
}
impl InnerStore {
pub(crate) fn new(arena: SharedArena, config: Configure) -> Self {
Self {
arena,
store: FxHashMap::default(),
kv: KvWrapper::new_mem(),
all_loaded: true,
config,
}
}
pub(crate) fn fork(&mut self, arena: SharedArena, config: &Configure) -> InnerStore {
let bytes = self.encode();
let mut new_store = Self::new(arena, config.clone());
new_store.decode(bytes).unwrap();
new_store
}
}