use zerocopy::byteorder::{LE, U64};
use crate::{
DbError,
state::NextIds,
wire::{self, MUTATION_OP_PAYLOAD_WORDS, MutationOp},
};
#[derive(Clone, Debug, Default)]
pub(crate) struct MutationLog {
pub(super) ops: Vec<MutationOp>,
pub(super) blob: Vec<u8>,
}
impl MutationLog {
pub(crate) const fn is_empty(&self) -> bool {
self.ops.is_empty()
}
pub(super) fn intern(&mut self, value: &[u8]) -> (u64, u64) {
let offset = self.blob.len() as u64;
self.blob.extend_from_slice(value);
let len = value.len() as u64;
(offset, len)
}
pub(super) fn intern_words(&mut self, words: &[u64]) -> (u64, u64) {
let offset = self.blob.len() as u64;
for word in words {
self.blob.extend_from_slice(&word.to_le_bytes());
}
let len = size_of_val(words) as u64;
(offset, len)
}
pub(super) fn push(&mut self, op_kind: u32, flags: u32, words: &[u64]) {
let mut payload = [U64::<LE>::new(0); MUTATION_OP_PAYLOAD_WORDS];
for (slot, value) in payload.iter_mut().zip(words) {
*slot = U64::new(*value);
}
self.ops.push(MutationOp {
op_kind: op_kind.into(),
flags: flags.into(),
payload,
});
}
pub(super) fn push_watermark(&mut self, next: NextIds) {
self.push(
wire::OP_NEXT_ID_WATERMARK,
0,
&[
next.element.get(),
next.relation.get(),
next.incidence.get(),
next.role.get(),
next.label.get(),
next.relation_type.get(),
next.property_key.get(),
next.projection.get(),
next.index.get(),
],
);
}
}
pub(super) fn blob_str(blob: &[u8], offset: u64, len: u64, lsn: u64) -> Result<String, DbError> {
let start = usize::try_from(offset).map_err(|_overflow| {
DbError::Storage(crate::error::StorageError::LogCorrupt {
lsn,
reason: "blob offset overflow",
})
})?;
let length = usize::try_from(len).map_err(|_overflow| {
DbError::Storage(crate::error::StorageError::LogCorrupt {
lsn,
reason: "blob length overflow",
})
})?;
let end = start.checked_add(length).ok_or(DbError::Storage(
crate::error::StorageError::LogCorrupt {
lsn,
reason: "blob slice overflow",
},
))?;
let bytes =
blob.get(start..end)
.ok_or(DbError::Storage(crate::error::StorageError::LogCorrupt {
lsn,
reason: "blob slice out of bounds",
}))?;
core::str::from_utf8(bytes)
.map(str::to_owned)
.map_err(|_error| {
DbError::Storage(crate::error::StorageError::LogCorrupt {
lsn,
reason: "blob slice is not UTF-8",
})
})
}
pub(super) fn decode_def_words(
blob: &[u8],
offset: u64,
len: u64,
lsn: u64,
) -> Result<Vec<u64>, DbError> {
let start = usize::try_from(offset).map_err(|_overflow| {
DbError::Storage(crate::error::StorageError::LogCorrupt {
lsn,
reason: "def offset overflow",
})
})?;
let length = usize::try_from(len).map_err(|_overflow| {
DbError::Storage(crate::error::StorageError::LogCorrupt {
lsn,
reason: "def length overflow",
})
})?;
let end = start.checked_add(length).ok_or(DbError::Storage(
crate::error::StorageError::LogCorrupt {
lsn,
reason: "def slice overflow",
},
))?;
let bytes =
blob.get(start..end)
.ok_or(DbError::Storage(crate::error::StorageError::LogCorrupt {
lsn,
reason: "def slice out of bounds",
}))?;
if !bytes.len().is_multiple_of(size_of::<u64>()) {
return Err(DbError::Storage(crate::error::StorageError::LogCorrupt {
lsn,
reason: "def slice is not whole u64 words",
}));
}
Ok(bytes
.chunks_exact(size_of::<u64>())
.map(|chunk| {
let mut word = [0u8; 8];
word.copy_from_slice(chunk);
u64::from_le_bytes(word)
})
.collect())
}