use crate::{
db::{
codec::MAX_ROW_BYTES,
commit::prepared_op::PreparedIndexDeltaKind,
data::{DataKey, RawDataKey},
index::{IndexStore, RawIndexEntry, RawIndexKey},
},
error::InternalError,
types::Ulid,
};
use canic_cdk::structures::Storable;
use std::{borrow::Cow, cell::RefCell, thread::LocalKey};
pub(crate) const COMMIT_LABEL: &str = "CommitMarker";
pub(in crate::db) const COMMIT_ID_BYTES: usize = 16;
const COMMIT_SCHEMA_FINGERPRINT_BYTES: usize = 16;
pub(in crate::db) const COMMIT_MARKER_FORMAT_VERSION_CURRENT: u8 = 1;
pub(in crate::db) type CommitSchemaFingerprint = [u8; COMMIT_SCHEMA_FINGERPRINT_BYTES];
pub(crate) const MAX_COMMIT_BYTES: u32 = 16 * 1024 * 1024;
#[derive(Clone, Debug)]
pub(in crate::db) struct CommitRowOp {
pub(crate) entity_path: Cow<'static, str>,
pub(crate) key: RawDataKey,
pub(crate) before: Option<Vec<u8>>,
pub(crate) after: Option<Vec<u8>>,
pub(crate) schema_fingerprint: CommitSchemaFingerprint,
}
impl CommitRowOp {
#[must_use]
pub(crate) fn new(
entity_path: impl Into<Cow<'static, str>>,
key: RawDataKey,
before: Option<Vec<u8>>,
after: Option<Vec<u8>>,
schema_fingerprint: CommitSchemaFingerprint,
) -> Self {
Self {
entity_path: entity_path.into(),
key,
before,
after,
schema_fingerprint,
}
}
pub(crate) fn try_new_bytes(
entity_path: impl Into<Cow<'static, str>>,
key: &[u8],
before: Option<Vec<u8>>,
after: Option<Vec<u8>>,
schema_fingerprint: CommitSchemaFingerprint,
) -> Result<Self, InternalError> {
let (raw_key, _) = decode_data_key(key)?;
Ok(Self::new(
entity_path,
raw_key,
before,
after,
schema_fingerprint,
))
}
}
#[derive(Clone, Debug)]
pub(crate) struct CommitIndexOp {
pub(crate) store: &'static LocalKey<RefCell<IndexStore>>,
pub(crate) key: RawIndexKey,
pub(crate) value: Option<RawIndexEntry>,
pub(crate) delta_kind: PreparedIndexDeltaKind,
}
#[derive(Clone, Debug)]
pub(crate) struct CommitMarker {
pub(crate) id: [u8; COMMIT_ID_BYTES],
pub(crate) row_ops: Vec<CommitRowOp>,
}
impl CommitMarker {
pub(crate) fn new(row_ops: Vec<CommitRowOp>) -> Result<Self, InternalError> {
let id = generate_commit_id()?;
Ok(Self { id, row_ops })
}
fn payload_truncated_length(label: &'static str) -> InternalError {
InternalError::commit_corruption(format!("{label} decode failed: truncated length",))
}
fn payload_truncated_bytes(label: &'static str) -> InternalError {
InternalError::commit_corruption(format!("{label} decode failed: truncated bytes",))
}
fn payload_invalid_fixed_size(label: &'static str) -> InternalError {
InternalError::commit_corruption(format!(
"{label} decode failed: invalid fixed-size payload",
))
}
fn row_op_payload_too_large(label: &str, len: usize) -> InternalError {
InternalError::commit_corruption(format!(
"row op {label} payload exceeds max size: {len} bytes (limit {MAX_ROW_BYTES})",
))
}
fn row_op_key_decode_failed(err: impl std::fmt::Display) -> InternalError {
InternalError::commit_corruption(format!("row op key decode: {err}"))
}
}
const COMMIT_MARKER_ID_BYTES: usize = COMMIT_ID_BYTES;
const COMMIT_MARKER_SCHEMA_FINGERPRINT_BYTES: usize = COMMIT_SCHEMA_FINGERPRINT_BYTES;
const COMMIT_MARKER_FLAG_BEFORE: u8 = 0b0000_0001;
const COMMIT_MARKER_FLAG_AFTER: u8 = 0b0000_0010;
const COMMIT_MARKER_FLAG_MASK: u8 = COMMIT_MARKER_FLAG_BEFORE | COMMIT_MARKER_FLAG_AFTER;
const COMMIT_MARKER_ROW_COUNT_BYTES: usize = 4;
pub(in crate::db) fn generate_commit_id() -> Result<[u8; COMMIT_ID_BYTES], InternalError> {
Ulid::try_generate()
.map_err(InternalError::commit_id_generation_failed)
.map(|ulid| ulid.to_bytes())
}
#[cfg(test)]
pub(in crate::db) fn encode_commit_marker_payload(
marker: &CommitMarker,
) -> Result<Vec<u8>, InternalError> {
let capacity = commit_marker_payload_capacity(marker);
if capacity > u32::MAX as usize {
return Err(
InternalError::commit_marker_payload_exceeds_u32_length_limit(
"commit marker payload",
capacity,
),
);
}
let mut encoded = Vec::with_capacity(capacity);
write_commit_marker_payload(&mut encoded, marker)?;
Ok(encoded)
}
#[cfg(test)]
pub(in crate::db) fn encode_single_row_commit_marker_payload(
marker_id: [u8; COMMIT_ID_BYTES],
row_op: &CommitRowOp,
) -> Result<Vec<u8>, InternalError> {
let capacity = single_row_commit_marker_payload_capacity(row_op);
if capacity > u32::MAX as usize {
return Err(
InternalError::commit_marker_payload_exceeds_u32_length_limit(
"commit marker payload",
capacity,
),
);
}
let mut encoded = Vec::with_capacity(capacity);
write_single_row_commit_marker_payload(&mut encoded, marker_id, row_op)?;
Ok(encoded)
}
pub(in crate::db) fn single_row_commit_marker_payload_capacity(row_op: &CommitRowOp) -> usize {
COMMIT_MARKER_ID_BYTES
.saturating_add(COMMIT_MARKER_ROW_COUNT_BYTES)
.saturating_add(commit_row_op_payload_capacity(row_op))
}
pub(in crate::db) fn commit_marker_payload_capacity(marker: &CommitMarker) -> usize {
let mut capacity = COMMIT_MARKER_ID_BYTES + COMMIT_MARKER_ROW_COUNT_BYTES;
for row_op in &marker.row_ops {
capacity = capacity.saturating_add(commit_row_op_payload_capacity(row_op));
}
capacity
}
pub(in crate::db) fn write_single_row_commit_marker_payload(
out: &mut Vec<u8>,
marker_id: [u8; COMMIT_ID_BYTES],
row_op: &CommitRowOp,
) -> Result<(), InternalError> {
out.extend_from_slice(&marker_id);
write_len_u32(out, 1, "commit marker row count")?;
write_commit_row_op(out, row_op)?;
Ok(())
}
pub(in crate::db) fn write_commit_marker_payload(
out: &mut Vec<u8>,
marker: &CommitMarker,
) -> Result<(), InternalError> {
out.extend_from_slice(&marker.id);
write_len_u32(out, marker.row_ops.len(), "commit marker row count")?;
for row_op in &marker.row_ops {
write_commit_row_op(out, row_op)?;
}
Ok(())
}
fn commit_row_op_payload_capacity(row_op: &CommitRowOp) -> usize {
let mut capacity = 4 + row_op.entity_path.len();
capacity = capacity
.saturating_add(4 + DataKey::STORED_SIZE_USIZE)
.saturating_add(1)
.saturating_add(COMMIT_MARKER_SCHEMA_FINGERPRINT_BYTES);
if let Some(bytes) = &row_op.before {
capacity = capacity.saturating_add(4 + bytes.len());
}
if let Some(bytes) = &row_op.after {
capacity = capacity.saturating_add(4 + bytes.len());
}
capacity
}
fn write_commit_row_op(out: &mut Vec<u8>, row_op: &CommitRowOp) -> Result<(), InternalError> {
write_len_prefixed_bytes(
out,
row_op.entity_path.as_bytes(),
"commit marker entity_path",
)?;
write_len_prefixed_bytes(out, row_op.key.as_bytes(), "commit marker key")?;
let mut flags = 0_u8;
if row_op.before.is_some() {
flags |= COMMIT_MARKER_FLAG_BEFORE;
}
if row_op.after.is_some() {
flags |= COMMIT_MARKER_FLAG_AFTER;
}
out.push(flags);
if let Some(bytes) = &row_op.before {
write_len_prefixed_bytes(out, bytes, "commit marker before payload")?;
}
if let Some(bytes) = &row_op.after {
write_len_prefixed_bytes(out, bytes, "commit marker after payload")?;
}
out.extend_from_slice(&row_op.schema_fingerprint);
Ok(())
}
pub(in crate::db) fn decode_commit_marker_payload(
bytes: &[u8],
) -> Result<CommitMarker, InternalError> {
if bytes.len() < COMMIT_MARKER_ID_BYTES + COMMIT_MARKER_ROW_COUNT_BYTES {
return Err(InternalError::commit_corruption(
"commit marker payload decode: truncated header",
));
}
let mut cursor = 0;
let id = read_fixed_array::<COMMIT_MARKER_ID_BYTES>(bytes, &mut cursor, "commit marker id")?;
let row_op_count = read_len_u32(bytes, &mut cursor, "commit marker row count")? as usize;
let mut row_ops = Vec::with_capacity(row_op_count);
for _ in 0..row_op_count {
let entity_path_bytes =
read_len_prefixed_bytes(bytes, &mut cursor, "commit marker entity_path")?;
let entity_path = std::str::from_utf8(entity_path_bytes).map_err(|_| {
InternalError::commit_corruption("commit marker payload decode: entity_path not utf-8")
})?;
let key = read_len_prefixed_bytes(bytes, &mut cursor, "commit marker key")?;
let flags = *bytes.get(cursor).ok_or_else(|| {
InternalError::commit_corruption("commit marker payload decode: truncated row-op flags")
})?;
cursor = cursor.saturating_add(1);
if flags & !COMMIT_MARKER_FLAG_MASK != 0 {
return Err(InternalError::commit_corruption(
"commit marker payload decode: invalid row-op flags",
));
}
let before = if flags & COMMIT_MARKER_FLAG_BEFORE != 0 {
Some(
read_len_prefixed_bytes(bytes, &mut cursor, "commit marker before payload")?
.to_vec(),
)
} else {
None
};
let after = if flags & COMMIT_MARKER_FLAG_AFTER != 0 {
Some(
read_len_prefixed_bytes(bytes, &mut cursor, "commit marker after payload")?
.to_vec(),
)
} else {
None
};
let schema_fingerprint = read_fixed_array::<COMMIT_MARKER_SCHEMA_FINGERPRINT_BYTES>(
bytes,
&mut cursor,
"commit marker schema fingerprint",
)?;
row_ops.push(CommitRowOp::try_new_bytes(
entity_path.to_owned(),
key,
before,
after,
schema_fingerprint,
)?);
}
if cursor != bytes.len() {
return Err(InternalError::commit_corruption(
"commit marker payload decode: trailing bytes after payload",
));
}
Ok(CommitMarker { id, row_ops })
}
fn write_len_u32(out: &mut Vec<u8>, len: usize, label: &'static str) -> Result<(), InternalError> {
let len = u32::try_from(len)
.map_err(|_| InternalError::commit_marker_payload_exceeds_u32_length_limit(label, len))?;
out.extend_from_slice(&len.to_le_bytes());
Ok(())
}
fn write_len_prefixed_bytes(
out: &mut Vec<u8>,
bytes: &[u8],
label: &'static str,
) -> Result<(), InternalError> {
write_len_u32(out, bytes.len(), label)?;
out.extend_from_slice(bytes);
Ok(())
}
fn read_len_u32(
bytes: &[u8],
cursor: &mut usize,
label: &'static str,
) -> Result<u32, InternalError> {
let payload = bytes
.get(*cursor..cursor.saturating_add(4))
.ok_or_else(|| CommitMarker::payload_truncated_length(label))?;
*cursor = cursor.saturating_add(4);
Ok(u32::from_le_bytes([
payload[0], payload[1], payload[2], payload[3],
]))
}
fn read_fixed_array<const N: usize>(
bytes: &[u8],
cursor: &mut usize,
label: &'static str,
) -> Result<[u8; N], InternalError> {
let payload = bytes
.get(*cursor..cursor.saturating_add(N))
.ok_or_else(|| CommitMarker::payload_truncated_bytes(label))?;
*cursor = cursor.saturating_add(N);
payload
.try_into()
.map_err(|_| CommitMarker::payload_invalid_fixed_size(label))
}
fn read_len_prefixed_bytes<'a>(
bytes: &'a [u8],
cursor: &mut usize,
label: &'static str,
) -> Result<&'a [u8], InternalError> {
let len = read_len_u32(bytes, cursor, label)? as usize;
let payload = bytes
.get(*cursor..cursor.saturating_add(len))
.ok_or_else(|| CommitMarker::payload_truncated_bytes(label))?;
*cursor = cursor.saturating_add(len);
Ok(payload)
}
pub(in crate::db) fn decode_data_key(bytes: &[u8]) -> Result<(RawDataKey, DataKey), InternalError> {
let len = bytes.len();
let expected = DataKey::STORED_SIZE_USIZE;
if len != expected {
return Err(InternalError::commit_component_length_invalid(
"data key", len, expected,
));
}
let raw = <RawDataKey as Storable>::from_bytes(Cow::Borrowed(bytes));
let data_key = DataKey::try_from_raw(&raw)
.map_err(|err| InternalError::commit_component_corruption("data key", err))?;
Ok((raw, data_key))
}
pub(crate) fn validate_commit_marker_shape(marker: &CommitMarker) -> Result<(), InternalError> {
for row_op in &marker.row_ops {
if row_op.entity_path.is_empty() {
return Err(InternalError::commit_corruption(
"row op has empty entity_path",
));
}
if row_op.before.is_none() && row_op.after.is_none() {
return Err(InternalError::commit_corruption(
"row op has neither before nor after payload",
));
}
for (label, payload) in [
("before", row_op.before.as_ref()),
("after", row_op.after.as_ref()),
] {
if let Some(bytes) = payload
&& bytes.len() > MAX_ROW_BYTES as usize
{
return Err(CommitMarker::row_op_payload_too_large(label, bytes.len()));
}
}
DataKey::try_from_raw(&row_op.key).map_err(CommitMarker::row_op_key_decode_failed)?;
}
Ok(())
}