use crate::{
db::{
codec::MAX_ROW_BYTES,
commit::prepared_op::PreparedIndexDeltaKind,
data::{DecodedDataStoreKey, RawDataStoreKey},
index::{IndexEntryValue, IndexStore, RawIndexStoreKey},
journal::{
JournalBatch, decode_journal_batch, encode_journal_batch, journal_batch_encoded_len,
},
},
error::InternalError,
runtime::now_millis,
};
use ic_memory::stable_structures::Storable;
use std::{
borrow::Cow,
cell::RefCell,
collections::BTreeSet,
sync::atomic::{AtomicU64, Ordering},
thread::LocalKey,
};
pub(in crate::db) const COMMIT_ID_BYTES: usize = 16;
const COMMIT_SCHEMA_FINGERPRINT_BYTES: usize = 16;
pub(in crate::db) const COMMIT_MARKER_FORMAT_VERSION_CURRENT: u8 = 2;
pub(in crate::db) type CommitSchemaFingerprint = [u8; COMMIT_SCHEMA_FINGERPRINT_BYTES];
static COMMIT_ID_SEQUENCE: AtomicU64 = AtomicU64::new(1);
pub(crate) const MAX_COMMIT_BYTES: u32 = 16 * 1024 * 1024;
#[derive(Clone, Debug)]
pub(in crate::db) struct CommitRowOp {
pub(crate) entity_path: Cow<'static, str>,
pub(crate) key: RawDataStoreKey,
pub(crate) before: Option<Vec<u8>>,
pub(crate) after: Option<Vec<u8>>,
pub(crate) schema_fingerprint: CommitSchemaFingerprint,
}
impl CommitRowOp {
#[must_use]
pub(crate) fn new(
entity_path: impl Into<Cow<'static, str>>,
key: RawDataStoreKey,
before: Option<Vec<u8>>,
after: Option<Vec<u8>>,
schema_fingerprint: CommitSchemaFingerprint,
) -> Self {
Self {
entity_path: entity_path.into(),
key,
before,
after,
schema_fingerprint,
}
}
pub(crate) fn try_new_bytes(
entity_path: impl Into<Cow<'static, str>>,
key: &[u8],
before: Option<Vec<u8>>,
after: Option<Vec<u8>>,
schema_fingerprint: CommitSchemaFingerprint,
) -> Result<Self, InternalError> {
let (raw_key, _) = decode_data_key(key)?;
Ok(Self::new(
entity_path,
raw_key,
before,
after,
schema_fingerprint,
))
}
}
#[derive(Clone, Debug)]
pub(crate) struct CommitIndexOp {
pub(crate) index_store: &'static LocalKey<RefCell<IndexStore>>,
pub(crate) key: RawIndexStoreKey,
pub(crate) value: Option<IndexEntryValue>,
pub(crate) delta_kind: PreparedIndexDeltaKind,
}
impl CommitIndexOp {
pub(crate) const fn unchanged(
index_store: &'static LocalKey<RefCell<IndexStore>>,
key: RawIndexStoreKey,
value: Option<IndexEntryValue>,
) -> Self {
Self {
index_store,
key,
value,
delta_kind: PreparedIndexDeltaKind::None,
}
}
pub(crate) const fn index_insert(
index_store: &'static LocalKey<RefCell<IndexStore>>,
key: RawIndexStoreKey,
value: Option<IndexEntryValue>,
) -> Self {
Self {
index_store,
key,
value,
delta_kind: PreparedIndexDeltaKind::IndexInsert,
}
}
pub(crate) const fn index_remove(
index_store: &'static LocalKey<RefCell<IndexStore>>,
key: RawIndexStoreKey,
value: Option<IndexEntryValue>,
) -> Self {
Self {
index_store,
key,
value,
delta_kind: PreparedIndexDeltaKind::IndexRemove,
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct CommitMarker {
pub(crate) id: [u8; COMMIT_ID_BYTES],
pub(crate) row_ops: Vec<CommitRowOp>,
pub(in crate::db) journal_batches: Vec<JournalBatch>,
}
impl CommitMarker {
#[cfg(test)]
pub(crate) fn new(row_ops: Vec<CommitRowOp>) -> Result<Self, InternalError> {
let id = generate_commit_id()?;
Self::from_parts(id, row_ops, Vec::new())
}
pub(in crate::db) fn from_parts(
id: [u8; COMMIT_ID_BYTES],
row_ops: Vec<CommitRowOp>,
journal_batches: Vec<JournalBatch>,
) -> Result<Self, InternalError> {
let marker = Self {
id,
row_ops,
journal_batches,
};
validate_commit_marker_shape(&marker)?;
Ok(marker)
}
#[allow(
dead_code,
reason = "journal runtime recovery consumes embedded batches in a later 0.174 slice"
)]
#[must_use]
pub(in crate::db) fn journal_batches(&self) -> &[JournalBatch] {
&self.journal_batches
}
fn payload_truncated_length(label: &'static str) -> InternalError {
InternalError::commit_corruption(format!("{label} decode failed: truncated length"))
}
fn payload_truncated_bytes(label: &'static str) -> InternalError {
InternalError::commit_corruption(format!("{label} decode failed: truncated bytes"))
}
fn payload_invalid_fixed_size(label: &'static str) -> InternalError {
InternalError::commit_corruption(format!(
"{label} decode failed: invalid fixed-size payload",
))
}
fn row_op_payload_too_large(label: &str, len: usize) -> InternalError {
InternalError::commit_corruption(format!(
"row op {label} payload exceeds max size: {len} bytes (limit {MAX_ROW_BYTES})",
))
}
fn row_op_key_decode_failed(err: impl std::fmt::Display) -> InternalError {
InternalError::commit_corruption(format!("row op key decode: {err}"))
}
}
const COMMIT_MARKER_ID_BYTES: usize = COMMIT_ID_BYTES;
const COMMIT_MARKER_SCHEMA_FINGERPRINT_BYTES: usize = COMMIT_SCHEMA_FINGERPRINT_BYTES;
const COMMIT_MARKER_FLAG_BEFORE: u8 = 0b0000_0001;
const COMMIT_MARKER_FLAG_AFTER: u8 = 0b0000_0010;
const COMMIT_MARKER_FLAG_MASK: u8 = COMMIT_MARKER_FLAG_BEFORE | COMMIT_MARKER_FLAG_AFTER;
const COMMIT_MARKER_ROW_COUNT_BYTES: usize = 4;
const COMMIT_MARKER_JOURNAL_BATCH_COUNT_BYTES: usize = 4;
pub(in crate::db) fn generate_commit_id() -> Result<[u8; COMMIT_ID_BYTES], InternalError> {
let sequence = COMMIT_ID_SEQUENCE
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
current.checked_add(1)
})
.map_err(|_| InternalError::commit_id_generation_failed("commit id sequence exhausted"))?;
let mut id = [0u8; COMMIT_ID_BYTES];
id[..8].copy_from_slice(&now_millis().to_be_bytes());
id[8..].copy_from_slice(&sequence.to_be_bytes());
Ok(id)
}
#[cfg(test)]
pub(in crate::db) fn encode_commit_marker_payload(
marker: &CommitMarker,
) -> Result<Vec<u8>, InternalError> {
let capacity = commit_marker_payload_capacity(marker);
if capacity > u32::MAX as usize {
return Err(
InternalError::commit_marker_payload_exceeds_u32_length_limit(
"commit marker payload",
capacity,
),
);
}
let mut encoded = Vec::with_capacity(capacity);
write_commit_marker_payload(&mut encoded, marker)?;
Ok(encoded)
}
#[cfg(test)]
pub(in crate::db) fn encode_single_row_commit_marker_payload(
marker_id: [u8; COMMIT_ID_BYTES],
row_op: &CommitRowOp,
) -> Result<Vec<u8>, InternalError> {
let capacity = single_row_commit_marker_payload_capacity(row_op);
if capacity > u32::MAX as usize {
return Err(
InternalError::commit_marker_payload_exceeds_u32_length_limit(
"commit marker payload",
capacity,
),
);
}
let mut encoded = Vec::with_capacity(capacity);
write_single_row_commit_marker_payload(&mut encoded, marker_id, row_op)?;
Ok(encoded)
}
pub(in crate::db) fn single_row_commit_marker_payload_capacity(row_op: &CommitRowOp) -> usize {
COMMIT_MARKER_ID_BYTES
.saturating_add(COMMIT_MARKER_ROW_COUNT_BYTES)
.saturating_add(commit_row_op_payload_capacity(row_op))
.saturating_add(COMMIT_MARKER_JOURNAL_BATCH_COUNT_BYTES)
}
pub(in crate::db) fn commit_marker_payload_capacity(marker: &CommitMarker) -> usize {
let mut capacity = COMMIT_MARKER_ID_BYTES + COMMIT_MARKER_ROW_COUNT_BYTES;
for row_op in &marker.row_ops {
capacity = capacity.saturating_add(commit_row_op_payload_capacity(row_op));
}
capacity = capacity.saturating_add(COMMIT_MARKER_JOURNAL_BATCH_COUNT_BYTES);
for batch in &marker.journal_batches {
capacity = capacity.saturating_add(4 + journal_batch_encoded_len(batch));
}
capacity
}
pub(in crate::db) fn write_single_row_commit_marker_payload(
out: &mut Vec<u8>,
marker_id: [u8; COMMIT_ID_BYTES],
row_op: &CommitRowOp,
) -> Result<(), InternalError> {
out.extend_from_slice(&marker_id);
write_len_u32(out, 1, "commit marker row count")?;
write_commit_row_op(out, row_op)?;
write_len_u32(out, 0, "commit marker journal batch count")?;
Ok(())
}
pub(in crate::db) fn write_commit_marker_payload(
out: &mut Vec<u8>,
marker: &CommitMarker,
) -> Result<(), InternalError> {
out.extend_from_slice(&marker.id);
write_len_u32(out, marker.row_ops.len(), "commit marker row count")?;
for row_op in &marker.row_ops {
write_commit_row_op(out, row_op)?;
}
write_len_u32(
out,
marker.journal_batches.len(),
"commit marker journal batch count",
)?;
for batch in &marker.journal_batches {
let encoded = encode_journal_batch(batch)?;
write_len_prefixed_bytes(out, &encoded, "commit marker journal batch")?;
}
Ok(())
}
fn commit_row_op_payload_capacity(row_op: &CommitRowOp) -> usize {
let mut capacity = 4 + row_op.entity_path.len();
capacity = capacity
.saturating_add(4 + row_op.key.as_bytes().len())
.saturating_add(1)
.saturating_add(COMMIT_MARKER_SCHEMA_FINGERPRINT_BYTES);
if let Some(bytes) = &row_op.before {
capacity = capacity.saturating_add(4 + bytes.len());
}
if let Some(bytes) = &row_op.after {
capacity = capacity.saturating_add(4 + bytes.len());
}
capacity
}
fn write_commit_row_op(out: &mut Vec<u8>, row_op: &CommitRowOp) -> Result<(), InternalError> {
write_len_prefixed_bytes(
out,
row_op.entity_path.as_bytes(),
"commit marker entity_path",
)?;
write_len_prefixed_bytes(out, row_op.key.as_bytes(), "commit marker key")?;
let mut flags = 0_u8;
if row_op.before.is_some() {
flags |= COMMIT_MARKER_FLAG_BEFORE;
}
if row_op.after.is_some() {
flags |= COMMIT_MARKER_FLAG_AFTER;
}
out.push(flags);
if let Some(bytes) = &row_op.before {
write_len_prefixed_bytes(out, bytes, "commit marker before payload")?;
}
if let Some(bytes) = &row_op.after {
write_len_prefixed_bytes(out, bytes, "commit marker after payload")?;
}
out.extend_from_slice(&row_op.schema_fingerprint);
Ok(())
}
pub(in crate::db) fn decode_commit_marker_payload(
bytes: &[u8],
) -> Result<CommitMarker, InternalError> {
if bytes.len() < COMMIT_MARKER_ID_BYTES + COMMIT_MARKER_ROW_COUNT_BYTES {
return Err(InternalError::commit_corruption(
"commit marker payload decode: truncated header",
));
}
let mut cursor = 0;
let id = read_fixed_array::<COMMIT_MARKER_ID_BYTES>(bytes, &mut cursor, "commit marker id")?;
let row_op_count = read_len_u32(bytes, &mut cursor, "commit marker row count")? as usize;
let mut row_ops = Vec::with_capacity(row_op_count);
for _ in 0..row_op_count {
let entity_path_bytes =
read_len_prefixed_bytes(bytes, &mut cursor, "commit marker entity_path")?;
let entity_path = std::str::from_utf8(entity_path_bytes).map_err(|_| {
InternalError::commit_corruption("commit marker payload decode: entity_path not utf-8")
})?;
let key = read_len_prefixed_bytes(bytes, &mut cursor, "commit marker key")?;
let flags = *bytes.get(cursor).ok_or_else(|| {
InternalError::commit_corruption("commit marker payload decode: truncated row-op flags")
})?;
cursor = cursor.saturating_add(1);
if flags & !COMMIT_MARKER_FLAG_MASK != 0 {
return Err(InternalError::commit_corruption(
"commit marker payload decode: invalid row-op flags",
));
}
let before = if flags & COMMIT_MARKER_FLAG_BEFORE != 0 {
Some(
read_len_prefixed_bytes(bytes, &mut cursor, "commit marker before payload")?
.to_vec(),
)
} else {
None
};
let after = if flags & COMMIT_MARKER_FLAG_AFTER != 0 {
Some(
read_len_prefixed_bytes(bytes, &mut cursor, "commit marker after payload")?
.to_vec(),
)
} else {
None
};
let schema_fingerprint = read_fixed_array::<COMMIT_MARKER_SCHEMA_FINGERPRINT_BYTES>(
bytes,
&mut cursor,
"commit marker schema fingerprint",
)?;
row_ops.push(CommitRowOp::try_new_bytes(
entity_path.to_owned(),
key,
before,
after,
schema_fingerprint,
)?);
}
let journal_batch_count =
read_len_u32(bytes, &mut cursor, "commit marker journal batch count")? as usize;
let mut journal_batches = Vec::with_capacity(journal_batch_count);
for _ in 0..journal_batch_count {
let encoded = read_len_prefixed_bytes(bytes, &mut cursor, "commit marker journal batch")?;
journal_batches.push(decode_journal_batch(encoded)?);
}
if cursor != bytes.len() {
return Err(InternalError::commit_corruption(
"commit marker payload decode: trailing bytes after payload",
));
}
Ok(CommitMarker {
id,
row_ops,
journal_batches,
})
}
fn write_len_u32(out: &mut Vec<u8>, len: usize, label: &'static str) -> Result<(), InternalError> {
let len = u32::try_from(len)
.map_err(|_| InternalError::commit_marker_payload_exceeds_u32_length_limit(label, len))?;
out.extend_from_slice(&len.to_le_bytes());
Ok(())
}
fn write_len_prefixed_bytes(
out: &mut Vec<u8>,
bytes: &[u8],
label: &'static str,
) -> Result<(), InternalError> {
write_len_u32(out, bytes.len(), label)?;
out.extend_from_slice(bytes);
Ok(())
}
fn read_len_u32(
bytes: &[u8],
cursor: &mut usize,
label: &'static str,
) -> Result<u32, InternalError> {
let payload = bytes
.get(*cursor..cursor.saturating_add(4))
.ok_or_else(|| CommitMarker::payload_truncated_length(label))?;
*cursor = cursor.saturating_add(4);
Ok(u32::from_le_bytes([
payload[0], payload[1], payload[2], payload[3],
]))
}
fn read_fixed_array<const N: usize>(
bytes: &[u8],
cursor: &mut usize,
label: &'static str,
) -> Result<[u8; N], InternalError> {
let payload = bytes
.get(*cursor..cursor.saturating_add(N))
.ok_or_else(|| CommitMarker::payload_truncated_bytes(label))?;
*cursor = cursor.saturating_add(N);
payload
.try_into()
.map_err(|_| CommitMarker::payload_invalid_fixed_size(label))
}
fn read_len_prefixed_bytes<'a>(
bytes: &'a [u8],
cursor: &mut usize,
label: &'static str,
) -> Result<&'a [u8], InternalError> {
let len = read_len_u32(bytes, cursor, label)? as usize;
let payload = bytes
.get(*cursor..cursor.saturating_add(len))
.ok_or_else(|| CommitMarker::payload_truncated_bytes(label))?;
*cursor = cursor.saturating_add(len);
Ok(payload)
}
pub(in crate::db) fn decode_data_key(
bytes: &[u8],
) -> Result<(RawDataStoreKey, DecodedDataStoreKey), InternalError> {
let len = bytes.len();
let max = RawDataStoreKey::MAX_STORED_SIZE_USIZE;
if len > max {
return Err(InternalError::commit_component_length_invalid(
"data key", len, max,
));
}
let raw = <RawDataStoreKey as Storable>::from_bytes(Cow::Borrowed(bytes));
let data_key = DecodedDataStoreKey::try_from_raw(&raw)
.map_err(|err| InternalError::commit_component_corruption("data key", err))?;
Ok((raw, data_key))
}
pub(crate) fn validate_commit_marker_shape(marker: &CommitMarker) -> Result<(), InternalError> {
for row_op in &marker.row_ops {
validate_commit_row_op_shape(row_op)?;
}
let mut batch_ids = BTreeSet::new();
let mut sequences = BTreeSet::new();
for batch in &marker.journal_batches {
if batch.commit_marker_id() != marker.id {
return Err(InternalError::commit_corruption(
"journal batch commit marker id does not match marker id",
));
}
if !batch_ids.insert(batch.batch_id()) {
return Err(InternalError::commit_corruption(
"duplicate journal batch id in commit marker",
));
}
if !sequences.insert(batch.journal_sequence()) {
return Err(InternalError::commit_corruption(
"duplicate journal batch sequence in commit marker",
));
}
}
Ok(())
}
pub(crate) fn validate_commit_row_op_shape(row_op: &CommitRowOp) -> Result<(), InternalError> {
if row_op.entity_path.is_empty() {
return Err(InternalError::commit_corruption(
"row op has empty entity_path",
));
}
if row_op.before.is_none() && row_op.after.is_none() {
return Err(InternalError::commit_corruption(
"row op has neither before nor after payload",
));
}
for (label, payload) in [
("before", row_op.before.as_ref()),
("after", row_op.after.as_ref()),
] {
if let Some(bytes) = payload
&& bytes.len() > MAX_ROW_BYTES as usize
{
return Err(CommitMarker::row_op_payload_too_large(label, bytes.len()));
}
}
DecodedDataStoreKey::try_from_raw(&row_op.key)
.map_err(CommitMarker::row_op_key_decode_failed)?;
Ok(())
}