loro-internal 1.12.0

Loro internal library. Do not use it directly as it's not stable.
Documentation
use crate::{
    arena::SharedArena, configure::Configure, container::idx::ContainerIdx,
    state::container_store::FRONTIERS_KEY, utils::kv_wrapper::KvWrapper, version::Frontiers,
};
use bytes::Bytes;
use loro_common::ContainerID;
use rustc_hash::FxHashMap;
use std::ops::Bound;

use super::ContainerWrapper;

/// The invariants about this struct:
///
/// - `kv` is either the same or older than `store`.
/// - if `all_loaded` is true, then `store` contains all the entries from `kv`
///
/// Invariants: it should be agnostic to the users of this struct whether a container is stored in `kv` or `store`
pub(crate) struct InnerStore {
    arena: SharedArena,
    store: FxHashMap<ContainerIdx, ContainerWrapper>,
    kv: KvWrapper,
    all_loaded: bool,
    config: Configure,
}

impl std::fmt::Debug for InnerStore {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("InnerStore").finish()
    }
}

/// This impl block contains all the mutation code that may break the invariants of this struct
impl InnerStore {
    pub(super) fn get_or_insert_with(
        &mut self,
        idx: ContainerIdx,
        f: impl FnOnce() -> ContainerWrapper,
    ) -> &mut ContainerWrapper {
        match self.store.entry(idx) {
            std::collections::hash_map::Entry::Vacant(e) => {
                let id = self.arena.get_container_id(idx).unwrap();
                let key = id.to_bytes();
                if !self.all_loaded {
                    if let Some(v) = self.kv.get(&key) {
                        let c = ContainerWrapper::new_from_bytes(v);
                        return e.insert(c);
                    }
                }

                let c = f();
                e.insert(c)
            }
            std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
        }
    }

    pub(super) fn ensure_container(
        &mut self,
        idx: ContainerIdx,
        f: impl FnOnce() -> ContainerWrapper,
    ) {
        if self.store.contains_key(&idx) {
            return;
        }

        if !self.all_loaded {
            let id = self.arena.get_container_id(idx).unwrap();
            let key = id.to_bytes();
            if let Some(v) = self.kv.get(&key) {
                let c = ContainerWrapper::new_from_bytes(v);
                self.store.insert(idx, c);
                return;
            }
        }

        let c = f();
        self.store.insert(idx, c);
    }

    pub(crate) fn get_mut(&mut self, idx: ContainerIdx) -> Option<&mut ContainerWrapper> {
        if let std::collections::hash_map::Entry::Vacant(e) = self.store.entry(idx) {
            if !self.all_loaded {
                let id = self.arena.get_container_id(idx).unwrap();
                let key = id.to_bytes();
                if let Some(v) = self.kv.get(&key) {
                    let c = ContainerWrapper::new_from_bytes(v);
                    e.insert(c);
                }
            }
        }

        self.store.get_mut(&idx)
    }

    pub(crate) fn contains_id(&mut self, id: &ContainerID) -> bool {
        if let Some(idx) = self.arena.id_to_idx(id) {
            if self.store.contains_key(&idx) {
                return true;
            }
        }

        if !self.all_loaded {
            let key = id.to_bytes();
            if let Some(v) = self.kv.get(&key) {
                let idx = self.arena.register_container(id);
                let c = ContainerWrapper::new_from_bytes(v);
                self.store.insert(idx, c);
                return true;
            }
        }

        false
    }

    pub(crate) fn iter_all_containers_mut(
        &mut self,
    ) -> impl Iterator<Item = (&ContainerIdx, &mut ContainerWrapper)> {
        self.load_all();
        self.store.iter_mut()
    }

    pub(crate) fn iter_all_container_ids(&mut self) -> impl Iterator<Item = ContainerID> + '_ {
        // PERF: we don't need to load all the containers here
        self.load_all();
        self.store
            .keys()
            .map(|idx| self.arena.get_container_id(*idx).unwrap())
    }

    pub(crate) fn encode(&mut self) -> Bytes {
        self.flush();
        self.kv.export()
    }

    pub(crate) fn flush(&mut self) {
        let deleted = self.config.deleted_root_containers.lock();
        self.kv
            .set_all(self.store.iter_mut().filter_map(|(idx, c)| {
                if c.is_flushed() {
                    return None;
                }

                let cid = self.arena.get_container_id(*idx).unwrap();
                if c.is_state_empty() && cid.is_root() && deleted.contains(&cid) {
                    return None;
                }

                let cid: Bytes = cid.to_bytes().into();
                let value = c.encode();
                c.set_flushed(true);
                Some((cid, value))
            }));
    }

    pub(crate) fn get_kv_clone(&self) -> KvWrapper {
        self.kv.clone()
    }

    pub(crate) fn decode(
        &mut self,
        bytes: bytes::Bytes,
    ) -> Result<Option<Frontiers>, loro_common::LoroError> {
        assert!(self.kv.is_empty());
        let mut fr = None;
        self.kv
            .import(bytes)
            .map_err(|e| loro_common::LoroError::DecodeError(e.into_boxed_str()))?;
        if let Some(f) = self.kv.remove(FRONTIERS_KEY) {
            fr = Some(Frontiers::decode(&f)?);
        }

        let kv = self.kv.arc_clone();
        self.arena
            .set_parent_resolver(Some(move |child_id: ContainerID| {
                let k = child_id.to_bytes();
                let v = kv.get(&k)?;
                let c = ContainerWrapper::new_from_bytes(v);
                c.parent().cloned()
            }));

        self.store.clear();
        self.all_loaded = false;
        Ok(fr)
    }

    pub(crate) fn decode_twice(
        &mut self,
        bytes_a: bytes::Bytes,
        bytes_b: bytes::Bytes,
    ) -> Result<(), loro_common::LoroError> {
        assert!(self.kv.is_empty());
        // TODO: add assert that all containers in the store should be empty right now
        self.kv
            .import(bytes_a)
            .map_err(|e| loro_common::LoroError::DecodeError(e.into_boxed_str()))?;
        self.kv
            .import(bytes_b)
            .map_err(|e| loro_common::LoroError::DecodeError(e.into_boxed_str()))?;
        self.kv.remove(FRONTIERS_KEY);
        self.kv.with_kv(|kv| {
            self.arena.with_guards(|guards| {
                let iter = kv.scan(Bound::Unbounded, Bound::Unbounded);
                for (k, v) in iter {
                    let cid = ContainerID::from_bytes(&k);
                    let c = ContainerWrapper::new_from_bytes(v);
                    let parent = c.parent();
                    let idx = guards.register_container(&cid);
                    let p = parent.as_ref().map(|p| guards.register_container(p));
                    guards.set_parent(idx, p);
                    if self.store.insert(idx, c).is_some() {}
                }
            });
        });

        self.all_loaded = true;
        Ok(())
    }

    pub fn load_all(&mut self) {
        if self.all_loaded {
            return;
        }

        self.kv.with_kv(|kv| {
            let iter = kv.scan(Bound::Unbounded, Bound::Unbounded);
            self.arena.with_guards(|guards| {
                for (k, v) in iter {
                    let cid = ContainerID::from_bytes(&k);
                    let idx = guards.register_container(&cid);
                    if self.store.contains_key(&idx) {
                        // the container is already loaded
                        // the content in `store` is guaranteed to be newer than the content in `kv`
                        continue;
                    }

                    let container = ContainerWrapper::new_from_bytes(v);
                    self.store.insert(idx, container);
                }
            });
        });

        self.all_loaded = true;
    }

    pub(crate) fn can_import_snapshot(&self) -> bool {
        if !self.kv.is_empty() {
            return false;
        }

        self.store.iter().all(|(_, c)| c.is_state_empty())
    }
}

impl InnerStore {
    pub(crate) fn new(arena: SharedArena, config: Configure) -> Self {
        Self {
            arena,
            store: FxHashMap::default(),
            kv: KvWrapper::new_mem(),
            all_loaded: true,
            config,
        }
    }

    pub(crate) fn fork(&mut self, arena: SharedArena, config: &Configure) -> InnerStore {
        // PERF: we can try to reuse
        let bytes = self.encode();
        let mut new_store = Self::new(arena, config.clone());
        new_store.decode(bytes).unwrap();
        new_store
    }
}