#[cfg(test)]
mod test;
use crate::{
DagMapId, MapxOrdRawKey, Orphan,
common::error::{Result, VsdbError},
};
use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
fmt,
ops::{Deref, DerefMut},
};
use vsdb_core::{
basic::mapx_raw::{self, MapxRaw},
common::RawBytes,
};
type DagHead = DagMapRaw;
#[derive(Clone, Debug, Default)]
pub struct DagMapRaw {
data: MapxRaw,
parent: Orphan<Option<DagMapRaw>>,
children: MapxOrdRawKey<DagMapRaw>,
}
impl Serialize for DagMapRaw {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut t = serializer.serialize_tuple(3)?;
t.serialize_element(&self.data)?;
t.serialize_element(&self.parent)?;
t.serialize_element(&self.children)?;
t.end()
}
}
impl<'de> Deserialize<'de> for DagMapRaw {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct Vis;
impl<'de> serde::de::Visitor<'de> for Vis {
type Value = DagMapRaw;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("DagMapRaw")
}
fn visit_seq<A: serde::de::SeqAccess<'de>>(
self,
mut seq: A,
) -> std::result::Result<DagMapRaw, A::Error> {
let data = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
let parent = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
let children = seq
.next_element()?
.ok_or_else(|| serde::de::Error::invalid_length(2, &self))?;
Ok(DagMapRaw {
data,
parent,
children,
})
}
}
deserializer.deserialize_tuple(3, Vis)
}
}
impl DagMapRaw {
pub fn new(parent: &mut Orphan<Option<Self>>) -> Result<Self> {
let r = Self {
parent: unsafe { parent.shadow() },
..Default::default()
};
if let Some(p) = parent.get_mut().as_mut() {
let child_id = super::gen_dag_map_id_num().to_le_bytes();
debug_assert!(
p.children.get(child_id).is_none(),
"Child ID already exists — possible ID counter rollback"
);
p.children.insert(child_id, &r);
}
Ok(r)
}
#[inline(always)]
pub unsafe fn shadow(&self) -> Self {
unsafe {
Self {
data: self.data.shadow(),
parent: self.parent.shadow(),
children: self.children.shadow(),
}
}
}
#[inline(always)]
pub fn instance_id(&self) -> u64 {
self.data.instance_id()
}
pub fn save_meta(&self) -> Result<u64> {
let id = self.instance_id();
crate::common::save_instance_meta(id, self)?;
Ok(id)
}
pub fn from_meta(instance_id: u64) -> Result<Self> {
crate::common::load_instance_meta(instance_id)
}
#[inline(always)]
pub fn is_dead(&self) -> bool {
self.data.iter().next().is_none()
&& self.parent.get_value().is_none()
&& self.no_children()
}
#[inline(always)]
pub fn no_children(&self) -> bool {
self.children.inner.iter().next().is_none()
}
pub fn get(&self, key: impl AsRef<[u8]>) -> Option<RawBytes> {
const MAX_DEPTH: usize = 1024;
let key = key.as_ref();
let mut hdr = self;
let mut hdr_owned;
for _ in 0..MAX_DEPTH {
if let Some(v) = hdr.data.get(key) {
return if v.is_empty() { None } else { Some(v) };
}
match hdr.parent.get_value() {
Some(p) => {
hdr_owned = p;
hdr = &hdr_owned;
}
_ => {
return None;
}
}
}
None
}
#[inline(always)]
pub fn get_mut(&mut self, key: impl AsRef<[u8]>) -> Option<ValueMut<'_>> {
self.data.get_mut(key.as_ref()).and_then(|inner| {
if inner.is_empty() {
return None;
}
Some(ValueMut {
value: inner.clone(),
inner,
dirty: false,
})
})
}
#[inline(always)]
pub fn insert(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
debug_assert!(
!value.as_ref().is_empty(),
"empty value is a tombstone; call remove() instead"
);
self.data.insert(key.as_ref(), value)
}
#[inline(always)]
pub fn remove(&mut self, key: impl AsRef<[u8]>) {
self.data.insert(key.as_ref(), [])
}
#[inline(always)]
pub fn prune(self) -> Result<DagHead> {
self.prune_mainline()
}
fn prune_mainline(mut self) -> Result<DagHead> {
let p = match self.parent.get_value() {
Some(p) => p,
_ => {
return Ok(self);
}
};
const MAX_DEPTH: usize = 1024;
let mut linebuf = vec![p];
while let Some(p) = linebuf.last().unwrap().parent.get_value() {
if linebuf.len() >= MAX_DEPTH {
return Err(VsdbError::Other {
detail: "DAG mainline exceeds MAX_DEPTH — possible cycle".to_owned(),
});
}
linebuf.push(p);
}
let mid = linebuf.len() - 1;
let (others, genesis) = linebuf.split_at_mut(mid);
let mainline_ids: Vec<u64> = {
let mut ids = vec![self.instance_id()];
ids.extend(others.iter().map(|n| n.instance_id()));
ids.push(genesis[0].instance_id());
ids
};
for i in others.iter_mut().rev() {
for (k, v) in i.data.iter() {
genesis[0].data.insert(k, v);
}
for (_, mut child) in i.children.iter_mut() {
if !mainline_ids.contains(&child.instance_id()) {
child.destroy();
}
}
*i.parent.get_mut() = None;
i.data.clear();
i.children.clear();
}
for (k, v) in self.data.iter() {
genesis[0].data.insert(k, v);
}
let mut exclude_targets = vec![];
for (id, mut child) in self.children.iter_mut() {
*child.parent.get_mut() = Some(unsafe { genesis[0].shadow() });
genesis[0].children.insert(&id, &child);
exclude_targets.push(id);
}
*self.parent.get_mut() = None;
self.data.clear();
self.children.clear();
genesis[0].prune_children_exclude(&exclude_targets);
Ok(linebuf.pop().unwrap())
}
#[inline(always)]
pub fn prune_children_include(&mut self, include_targets: &[impl AsRef<DagMapId>]) {
self.prune_children(include_targets, false);
}
#[inline(always)]
pub fn prune_children_exclude(&mut self, exclude_targets: &[impl AsRef<DagMapId>]) {
self.prune_children(exclude_targets, true);
}
fn prune_children(&mut self, targets: &[impl AsRef<DagMapId>], exclude_mode: bool) {
let targets = targets.iter().map(|i| i.as_ref()).collect::<HashSet<_>>();
let dropped_children = if exclude_mode {
self.children
.iter()
.filter(|(id, _)| !targets.contains(&id.as_slice()))
.collect::<Vec<_>>()
} else {
self.children
.iter()
.filter(|(id, _)| targets.contains(&id.as_slice()))
.collect::<Vec<_>>()
};
for (id, _) in dropped_children.iter() {
self.children.remove(id);
}
for (_, mut child) in dropped_children.into_iter() {
child.destroy();
}
}
#[inline(always)]
pub fn destroy(&mut self) {
*self.parent.get_mut() = None;
self.data.clear();
let mut children = self.children.iter().map(|(_, c)| c).collect::<Vec<_>>();
self.children.clear();
for c in children.iter_mut() {
c.destroy();
}
}
#[inline(always)]
pub fn is_the_same_instance(&self, other_hdr: &Self) -> bool {
self.data.is_the_same_instance(&other_hdr.data)
}
}
#[derive(Debug)]
pub struct ValueMut<'a> {
value: RawBytes,
inner: mapx_raw::ValueMut<'a>,
dirty: bool,
}
impl Drop for ValueMut<'_> {
fn drop(&mut self) {
if self.dirty {
self.inner.clone_from(&self.value);
}
}
}
impl Deref for ValueMut<'_> {
type Target = RawBytes;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl DerefMut for ValueMut<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.dirty = true;
&mut self.value
}
}