use std::{collections::BTreeMap, sync::Weak};
use loro_common::{ContainerID, IdLp, LoroResult, PeerID};
use rustc_hash::FxHashMap;
use smallvec::SmallVec;
use crate::{
configure::Configure,
container::{idx::ContainerIdx, map::MapSet},
delta::{MapValue, ResolvedMapDelta, ResolvedMapValue},
diff_calc::DiffMode,
event::{Diff, Index, InternalDiff},
handler::ValueOrHandler,
op::{Op, RawOp, RawOpContent},
InternalString, LoroDocInner, LoroValue,
};
use super::{ApplyLocalOpReturn, ContainerState, DiffApplyContext};
const TINY_MAP_MAX: usize = 4;
#[derive(Debug, Clone)]
enum MapEntries {
SortedTiny(SmallVec<[(InternalString, MapValue); 1]>),
Tree(BTreeMap<InternalString, MapValue>),
}
impl Default for MapEntries {
fn default() -> Self {
Self::SortedTiny(SmallVec::new())
}
}
enum MapEntriesIter<'a> {
SortedTiny(std::slice::Iter<'a, (InternalString, MapValue)>),
Tree(std::collections::btree_map::Iter<'a, InternalString, MapValue>),
}
impl<'a> Iterator for MapEntriesIter<'a> {
type Item = (&'a InternalString, &'a MapValue);
fn next(&mut self) -> Option<Self::Item> {
match self {
Self::SortedTiny(iter) => iter.next().map(|(key, value)| (key, value)),
Self::Tree(iter) => iter.next(),
}
}
}
impl MapEntries {
fn debug_assert_sorted_tiny(entries: &[(InternalString, MapValue)]) {
debug_assert!(entries.windows(2).all(|pair| {
let left = &pair[0].0;
let right = &pair[1].0;
left < right
}));
}
fn is_empty(&self) -> bool {
match self {
Self::SortedTiny(entries) => entries.is_empty(),
Self::Tree(map) => map.is_empty(),
}
}
fn get(&self, key: &InternalString) -> Option<&MapValue> {
match self {
Self::SortedTiny(entries) => {
Self::debug_assert_sorted_tiny(entries);
entries
.binary_search_by(|(entry_key, _)| entry_key.cmp(key))
.ok()
.map(|index| &entries[index].1)
}
Self::Tree(map) => map.get(key),
}
}
fn insert(&mut self, key: InternalString, value: MapValue) -> Option<MapValue> {
match self {
Self::SortedTiny(entries) => {
Self::debug_assert_sorted_tiny(entries);
match entries.binary_search_by(|(entry_key, _)| entry_key.cmp(&key)) {
Ok(index) => return Some(std::mem::replace(&mut entries[index].1, value)),
Err(index) if entries.len() < TINY_MAP_MAX => {
entries.insert(index, (key, value));
Self::debug_assert_sorted_tiny(entries);
return None;
}
Err(_) => {}
}
let mut map = BTreeMap::new();
for (key, value) in entries.drain(..) {
map.insert(key, value);
}
let result = map.insert(key, value);
*self = Self::Tree(map);
result
}
Self::Tree(map) => map.insert(key, value),
}
}
fn remove(&mut self, key: &InternalString) -> Option<MapValue> {
match self {
Self::SortedTiny(entries) => {
Self::debug_assert_sorted_tiny(entries);
entries
.binary_search_by(|(entry_key, _)| entry_key.cmp(key))
.ok()
.map(|index| entries.remove(index).1)
}
Self::Tree(map) => {
let result = map.remove(key);
if result.is_some() && map.len() <= TINY_MAP_MAX {
let entries = std::mem::take(map).into_iter().collect();
*self = Self::SortedTiny(entries);
}
result
}
}
}
fn iter(&self) -> MapEntriesIter<'_> {
match self {
Self::SortedTiny(entries) => {
Self::debug_assert_sorted_tiny(entries);
MapEntriesIter::SortedTiny(entries.iter())
}
Self::Tree(map) => MapEntriesIter::Tree(map.iter()),
}
}
}
#[derive(Debug, Clone)]
enum ChildContainers {
Tiny(SmallVec<[(ContainerID, InternalString); 1]>),
Map(FxHashMap<ContainerID, InternalString>),
}
impl Default for ChildContainers {
fn default() -> Self {
Self::Tiny(SmallVec::new())
}
}
impl ChildContainers {
fn get(&self, id: &ContainerID) -> Option<&InternalString> {
match self {
Self::Tiny(entries) => entries
.iter()
.find_map(|(entry_id, key)| (entry_id == id).then_some(key)),
Self::Map(map) => map.get(id),
}
}
fn contains_key(&self, id: &ContainerID) -> bool {
self.get(id).is_some()
}
fn insert(&mut self, id: ContainerID, key: InternalString) -> Option<InternalString> {
match self {
Self::Tiny(entries) => {
if let Some((_, old_key)) = entries.iter_mut().find(|(entry_id, _)| entry_id == &id)
{
return Some(std::mem::replace(old_key, key));
}
if entries.len() < TINY_MAP_MAX {
entries.push((id, key));
return None;
}
let mut map = FxHashMap::default();
for (id, key) in entries.drain(..) {
map.insert(id, key);
}
let result = map.insert(id, key);
*self = Self::Map(map);
result
}
Self::Map(map) => map.insert(id, key),
}
}
fn remove(&mut self, id: &ContainerID) -> Option<InternalString> {
match self {
Self::Tiny(entries) => entries
.iter()
.position(|(entry_id, _)| entry_id == id)
.map(|index| entries.swap_remove(index).1),
Self::Map(map) => {
let result = map.remove(id);
if result.is_some() && map.len() <= TINY_MAP_MAX {
let entries = std::mem::take(map).into_iter().collect();
*self = Self::Tiny(entries);
}
result
}
}
}
}
#[derive(Debug, Clone)]
pub struct MapState {
idx: ContainerIdx,
map: MapEntries,
child_containers: ChildContainers,
size: usize,
}
impl ContainerState for MapState {
fn container_idx(&self) -> ContainerIdx {
self.idx
}
fn is_state_empty(&self) -> bool {
self.map.is_empty()
}
fn apply_diff_and_convert(
&mut self,
diff: InternalDiff,
DiffApplyContext { doc, mode }: DiffApplyContext,
) -> Diff {
let InternalDiff::Map(delta) = diff else {
unreachable!()
};
let doc = &doc.upgrade().unwrap();
let force = matches!(mode, DiffMode::Checkout | DiffMode::Linear);
let mut resolved_delta = ResolvedMapDelta::new();
for (key, value) in delta.updated.into_iter() {
let Some(value) = value else {
assert_eq!(mode, DiffMode::Checkout);
self.remove(&key);
resolved_delta = resolved_delta.with_entry(key, ResolvedMapValue::new_unset());
continue;
};
let mut changed = false;
if force {
self.insert(key.clone(), value.clone());
changed = true;
} else {
match self.map.get(&key) {
Some(old_value) if old_value > &value => {}
_ => {
self.insert(key.clone(), value.clone());
changed = true;
}
}
}
if changed {
resolved_delta = resolved_delta.with_entry(
key,
ResolvedMapValue {
idlp: IdLp::new(value.peer, value.lamp),
value: value.value.map(|v| ValueOrHandler::from_value(v, doc)),
},
)
}
}
Diff::Map(resolved_delta)
}
fn apply_diff(&mut self, diff: InternalDiff, ctx: DiffApplyContext) -> LoroResult<()> {
let _ = self.apply_diff_and_convert(diff, ctx);
Ok(())
}
fn apply_local_op(&mut self, op: &RawOp, _: &Op) -> LoroResult<ApplyLocalOpReturn> {
let mut ans: ApplyLocalOpReturn = Default::default();
match &op.content {
RawOpContent::Map(MapSet { key, value }) => {
let prev = self.insert(
key.clone(),
MapValue {
lamp: op.lamport,
peer: op.id.peer,
value: value.clone(),
},
);
if let Some(MapValue {
value: Some(LoroValue::Container(c)),
..
}) = prev
{
ans.deleted_containers.push(c);
}
}
_ => unreachable!(),
}
Ok(ans)
}
#[doc = " Convert a state to a diff that when apply this diff on a empty state,"]
#[doc = " the state will be the same as this state."]
fn to_diff(&mut self, doc: &Weak<LoroDocInner>) -> Diff {
Diff::Map(ResolvedMapDelta {
updated: self
.map
.iter()
.map(|(k, v)| (k.clone(), ResolvedMapValue::from_map_value(v.clone(), doc)))
.collect::<FxHashMap<_, _>>(),
})
}
fn get_value(&mut self) -> LoroValue {
let ans = self.to_map();
LoroValue::Map(ans.into())
}
fn get_child_index(&self, id: &ContainerID) -> Option<Index> {
self.child_containers.get(id).map(|x| Index::Key(x.clone()))
}
fn contains_child(&self, id: &ContainerID) -> bool {
self.child_containers.contains_key(id)
}
fn get_child_containers(&self) -> Vec<ContainerID> {
let mut ans = Vec::new();
for (_, value) in self.map.iter() {
if let Some(LoroValue::Container(x)) = &value.value {
ans.push(x.clone());
}
}
ans
}
fn fork(&self, _config: &Configure) -> Self {
self.clone()
}
}
impl MapState {
pub fn new(idx: ContainerIdx) -> Self {
Self {
idx,
map: Default::default(),
child_containers: Default::default(),
size: 0,
}
}
pub fn insert(&mut self, key: InternalString, value: MapValue) -> Option<MapValue> {
let value_yes = value.value.is_some();
if let Some(LoroValue::Container(id)) = &value.value {
self.child_containers.insert(id.clone(), key.clone());
}
let result = self.map.insert(key.clone(), value);
if let Some(Some(LoroValue::Container(c))) = result.as_ref().map(|x| &x.value) {
self.child_containers.remove(c);
}
match (&result, value_yes) {
(Some(x), true) if x.value.is_none() => {
self.size += 1;
}
(None, true) => {
self.size += 1;
}
(Some(x), false) if x.value.is_some() => {
self.size -= 1;
}
_ => {}
};
result
}
pub fn remove(&mut self, key: &InternalString) {
let result = self.map.remove(key);
if let Some(x) = result {
if x.value.is_some() {
self.size -= 1;
}
if let Some(LoroValue::Container(id)) = x.value {
self.child_containers.remove(&id);
}
};
}
pub fn iter(&self) -> impl Iterator<Item = (&InternalString, &MapValue)> {
self.map.iter()
}
fn to_map(&self) -> FxHashMap<String, LoroValue> {
let mut ans = FxHashMap::with_capacity_and_hasher(self.len(), Default::default());
for (key, value) in self.map.iter() {
if value.value.is_none() {
continue;
}
ans.insert(key.to_string(), value.value.as_ref().cloned().unwrap());
}
ans
}
pub fn get(&self, k: &str) -> Option<&LoroValue> {
match self.map.get(&k.into()) {
Some(value) => match &value.value {
Some(v) => Some(v),
None => None,
},
None => None,
}
}
pub fn len(&self) -> usize {
self.size
}
pub fn get_last_edit_peer(&self, key: &str) -> Option<PeerID> {
self.map.get(&key.into()).map(|v| v.peer)
}
}
mod snapshot {
use std::collections::BTreeMap;
use loro_common::{InternalString, LoroValue};
use rustc_hash::{FxHashMap, FxHashSet};
use serde_columnar::Itertools;
use crate::{
delta::MapValue,
encoding::value_register::ValueRegister,
state::{
decode_peer_from_table, decode_peer_table, read_state_leb_u64, state_decode_error,
ContainerCreationContext, ContainerState, FastStateSnapshot,
},
};
use super::MapState;
impl FastStateSnapshot for MapState {
fn encode_snapshot_fast<W: std::io::prelude::Write>(&mut self, mut w: W) {
let value = self.get_value().into_map().unwrap();
postcard::to_io(&*value, &mut w).unwrap();
let keys_with_none_value = self
.map
.iter()
.filter_map(|(k, v)| if v.value.is_some() { None } else { Some(k) })
.collect_vec();
postcard::to_io(&keys_with_none_value, &mut w).unwrap();
let mut peer_register = ValueRegister::new();
for (_, v) in self.map.iter() {
peer_register.register(&v.peer);
}
leb128::write::unsigned(&mut w, peer_register.vec().len() as u64).unwrap();
for p in peer_register.vec() {
w.write_all(&p.to_le_bytes()).unwrap();
}
let mut keys: Vec<&InternalString> = self.map.iter().map(|(key, _)| key).collect();
keys.sort_unstable();
for key in keys.into_iter() {
let value = self.map.get(key).unwrap();
let peer_idx = peer_register.register(&value.peer);
leb128::write::unsigned(&mut w, peer_idx as u64).unwrap();
leb128::write::unsigned(&mut w, value.lamp as u64).unwrap();
}
}
fn decode_value(bytes: &[u8]) -> loro_common::LoroResult<(loro_common::LoroValue, &[u8])> {
let (value, bytes) = postcard::take_from_bytes::<FxHashMap<String, LoroValue>>(bytes)
.map_err(|_| {
loro_common::LoroError::DecodeError(
"Decode map value failed".to_string().into_boxed_str(),
)
})?;
Ok((LoroValue::Map(value.into()), bytes))
}
fn decode_snapshot_fast(
idx: crate::container::idx::ContainerIdx,
(value, bytes): (loro_common::LoroValue, &[u8]),
_ctx: ContainerCreationContext,
) -> loro_common::LoroResult<Self>
where
Self: Sized,
{
let value = value.into_map().unwrap();
let (keys_with_none_value, mut bytes) =
postcard::take_from_bytes::<Vec<InternalString>>(bytes).map_err(|_| {
loro_common::LoroError::DecodeError(
"Decode map keys_with_none_value failed"
.to_string()
.into_boxed_str(),
)
})?;
let keys_with_none_value: FxHashSet<_> = keys_with_none_value.into_iter().collect();
let peers = decode_peer_table(&mut bytes, "Decode map state failed")?;
let mut ans = MapState::new(idx);
let mut keys: Vec<_> = value.keys().map(|x| x.as_str().into()).collect();
keys.extend(keys_with_none_value.iter().cloned());
keys.sort_unstable();
for key in keys {
let peer_idx =
usize::try_from(read_state_leb_u64(&mut bytes, "Decode map state failed")?)
.map_err(|_| {
state_decode_error("Decode map state failed: peer index overflow")
})?;
let lamp =
u32::try_from(read_state_leb_u64(&mut bytes, "Decode map state failed")?)
.map_err(|_| {
state_decode_error("Decode map state failed: lamport overflow")
})?;
let peer = decode_peer_from_table(&peers, peer_idx, "Decode map state failed")?;
if keys_with_none_value.contains(&key) {
ans.insert(
key,
MapValue {
value: None,
lamp,
peer,
},
);
} else {
let value = value.get(&*key).unwrap();
ans.insert(
key,
MapValue {
value: Some(value.clone()),
lamp,
peer,
},
);
}
}
if !bytes.is_empty() {
return Err(loro_common::LoroError::DecodeError(
"Decode map state failed".to_string().into_boxed_str(),
));
}
Ok(ans)
}
}
impl MapState {
pub(crate) fn decode_value_as_btree_map(
bytes: &[u8],
) -> loro_common::LoroResult<(BTreeMap<String, LoroValue>, &[u8])> {
let (value, bytes) = postcard::take_from_bytes::<BTreeMap<String, LoroValue>>(bytes)
.map_err(|_| {
loro_common::LoroError::DecodeError(
"Decode map value failed".to_string().into_boxed_str(),
)
})?;
Ok((value, bytes))
}
}
#[cfg(test)]
mod map_snapshot_test {
use loro_common::LoroValue;
use crate::container::idx::ContainerIdx;
use super::*;
#[test]
fn map_fast_snapshot() {
let mut map = MapState::new(ContainerIdx::from_index_and_type(
0,
loro_common::ContainerType::Map,
));
map.insert(
"1".into(),
MapValue {
value: None,
lamp: 1,
peer: 1,
},
);
map.insert(
"2".into(),
MapValue {
value: Some(LoroValue::I64(0)),
lamp: 2,
peer: 2,
},
);
map.insert(
"3".into(),
MapValue {
value: Some(LoroValue::Double(1.0)),
lamp: 3,
peer: 3,
},
);
let mut bytes = Vec::new();
map.encode_snapshot_fast(&mut bytes);
assert!(bytes.len() <= 50);
let (value, bytes) = MapState::decode_value(&bytes).unwrap();
{
let m = value.clone().into_map().unwrap();
assert_eq!(m.len(), 2);
assert_eq!(m.get("2").unwrap(), &LoroValue::I64(0));
assert_eq!(m.get("3").unwrap(), &LoroValue::Double(1.0));
}
let new_map = MapState::decode_snapshot_fast(
ContainerIdx::from_index_and_type(0, loro_common::ContainerType::Map),
(value, bytes),
ContainerCreationContext {
configure: &Default::default(),
peer: 0,
},
)
.unwrap();
let v = new_map.map.get(&"2".into()).unwrap();
assert_eq!(
v,
&MapValue {
value: Some(LoroValue::I64(0)),
lamp: 2,
peer: 2,
}
);
let v = new_map.map.get(&"1".into()).unwrap();
assert_eq!(
v,
&MapValue {
value: None,
lamp: 1,
peer: 1,
}
);
}
#[test]
fn map_fast_snapshot_rejects_corrupt_state_metadata() {
let idx = ContainerIdx::from_index_and_type(0, loro_common::ContainerType::Map);
let ctx = ContainerCreationContext {
configure: &Default::default(),
peer: 0,
};
let value = LoroValue::from(std::collections::HashMap::from([(
"key".to_string(),
LoroValue::I64(1),
)]));
let mut empty = MapState::new(idx);
let mut bytes = Vec::new();
empty.encode_snapshot_fast(&mut bytes);
let (_, state_bytes) = MapState::decode_value(&bytes).unwrap();
assert!(MapState::decode_snapshot_fast(idx, (value, state_bytes), ctx).is_err());
}
}
}