use crate::{
db::journal::{JournalBatch, JournalSequence, codec::RawJournalBatch},
error::InternalError,
};
use ic_memory::stable_structures::{
BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
};
use std::collections::BTreeSet;
use std::ops::Bound::{Excluded, Included, Unbounded};
const FOLD_WATERMARK_CONTROL_SEQUENCE: JournalSequence = JournalSequence::new(0);
const FOLD_WATERMARK_MAGIC: &[u8] = b"ICYDB-FOLD-WATERMARK";
const FOLD_WATERMARK_VERSION: u8 = 1;
const FOLD_WATERMARK_BYTES: usize = FOLD_WATERMARK_MAGIC.len() + 1 + 8 + 8;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum JournalTailVisit {
Continue,
Stop,
}
impl JournalTailVisit {
const fn should_stop(self) -> bool {
matches!(self, Self::Stop)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) struct FoldWatermark {
highest_folded_journal_sequence: JournalSequence,
fold_epoch: u64,
}
impl FoldWatermark {
#[must_use]
pub(in crate::db) const fn initial() -> Self {
Self {
highest_folded_journal_sequence: JournalSequence::new(0),
fold_epoch: 0,
}
}
#[must_use]
pub(in crate::db) const fn new(
highest_folded_journal_sequence: JournalSequence,
fold_epoch: u64,
) -> Self {
Self {
highest_folded_journal_sequence,
fold_epoch,
}
}
#[must_use]
pub(in crate::db) const fn highest_folded_journal_sequence(self) -> JournalSequence {
self.highest_folded_journal_sequence
}
#[must_use]
pub(in crate::db) const fn fold_epoch(self) -> u64 {
self.fold_epoch
}
}
pub struct JournalTailStore {
map: StableBTreeMap<JournalSequence, RawJournalBatch, VirtualMemory<DefaultMemoryImpl>>,
}
impl JournalTailStore {
#[must_use]
pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
Self {
map: StableBTreeMap::init(memory),
}
}
pub(in crate::db) fn append_batch(
&mut self,
batch: &JournalBatch,
) -> Result<(), InternalError> {
let key = batch.journal_sequence();
if key == FOLD_WATERMARK_CONTROL_SEQUENCE {
return Err(InternalError::store_corruption(
"journal batch sequence 0 is reserved for fold watermark control",
));
}
let raw = RawJournalBatch::from_batch(batch)?;
if let Some(existing) = self.map.get(&key) {
if existing.as_bytes() == raw.as_bytes() {
return Ok(());
}
return Err(InternalError::store_corruption(format!(
"journal tail sequence {} already maps to different batch bytes",
key.get(),
)));
}
self.map.insert(key, raw);
Ok(())
}
pub(in crate::db) fn next_append_sequence(&self) -> Result<JournalSequence, InternalError> {
let watermark = self.fold_watermark()?;
let mut last_sequence = watermark.highest_folded_journal_sequence();
for entry in self.map.iter().rev() {
let key = *entry.key();
if key == FOLD_WATERMARK_CONTROL_SEQUENCE {
continue;
}
if key > last_sequence {
last_sequence = key;
}
break;
}
last_sequence.next().ok_or_else(|| {
InternalError::store_corruption("journal tail sequence space exhausted before append")
})
}
pub(in crate::db) fn fold_watermark(&self) -> Result<FoldWatermark, InternalError> {
self.map
.get(&FOLD_WATERMARK_CONTROL_SEQUENCE)
.map_or(Ok(FoldWatermark::initial()), |raw| {
decode_fold_watermark(raw.as_bytes())
})
}
pub(in crate::db) fn persist_fold_watermark(
&mut self,
watermark: FoldWatermark,
) -> Result<(), InternalError> {
let current = self.fold_watermark()?;
if watermark.highest_folded_journal_sequence() < current.highest_folded_journal_sequence()
|| (watermark.highest_folded_journal_sequence()
== current.highest_folded_journal_sequence()
&& watermark.fold_epoch() < current.fold_epoch())
{
return Err(InternalError::store_corruption(
"journal fold watermark cannot move backward",
));
}
self.map.insert(
FOLD_WATERMARK_CONTROL_SEQUENCE,
RawJournalBatch::from_control_bytes(encode_fold_watermark(watermark)),
);
Ok(())
}
pub(in crate::db) fn clear_batches_through(&mut self, watermark: JournalSequence) {
if watermark == FOLD_WATERMARK_CONTROL_SEQUENCE {
return;
}
let keys = self
.map
.range((Included(JournalSequence::new(1)), Included(watermark)))
.map(|entry| *entry.key())
.collect::<Vec<_>>();
for key in keys {
let _ = self.map.remove(&key);
}
}
pub(in crate::db) fn visit_batches_after(
&self,
watermark: JournalSequence,
mut visitor: impl FnMut(&JournalBatch) -> Result<JournalTailVisit, InternalError>,
) -> Result<(), InternalError> {
let mut expected = watermark.next();
let mut seen_batch_ids = BTreeSet::new();
for entry in self.map.range((Excluded(watermark), Unbounded)) {
let key = entry.key();
let expected_sequence = expected.ok_or_else(|| {
InternalError::store_corruption(
"journal tail contains batch after maximum fold watermark",
)
})?;
if *key != expected_sequence {
return Err(InternalError::store_corruption(format!(
"journal tail sequence gap after watermark: expected {}, found {}",
expected_sequence.get(),
key.get(),
)));
}
let batch = entry.value().decode()?;
if batch.journal_sequence() != *key {
return Err(InternalError::store_corruption(format!(
"journal batch sequence {} disagrees with journal tail key {}",
batch.journal_sequence().get(),
key.get(),
)));
}
if !seen_batch_ids.insert(batch.batch_id()) {
return Err(InternalError::store_corruption(
"journal tail contains duplicate batch id above fold watermark",
));
}
if visitor(&batch)?.should_stop() {
break;
}
expected = key.next();
}
Ok(())
}
#[must_use]
pub(in crate::db) fn len(&self) -> u64 {
self.map.len().saturating_sub(u64::from(
self.map.contains_key(&FOLD_WATERMARK_CONTROL_SEQUENCE),
))
}
#[must_use]
pub(in crate::db) fn is_empty(&self) -> bool {
self.len() == 0
}
#[cfg(test)]
pub(in crate::db) fn clear(&mut self) {
self.map.clear_new();
}
}
fn encode_fold_watermark(watermark: FoldWatermark) -> Vec<u8> {
let mut bytes = Vec::with_capacity(FOLD_WATERMARK_BYTES);
bytes.extend_from_slice(FOLD_WATERMARK_MAGIC);
bytes.push(FOLD_WATERMARK_VERSION);
bytes.extend_from_slice(
&watermark
.highest_folded_journal_sequence()
.get()
.to_be_bytes(),
);
bytes.extend_from_slice(&watermark.fold_epoch().to_be_bytes());
bytes
}
fn decode_fold_watermark(bytes: &[u8]) -> Result<FoldWatermark, InternalError> {
if bytes.len() != FOLD_WATERMARK_BYTES {
return Err(InternalError::store_corruption(format!(
"journal fold watermark has invalid byte length: expected {FOLD_WATERMARK_BYTES}, found {}",
bytes.len(),
)));
}
if !bytes.starts_with(FOLD_WATERMARK_MAGIC) {
return Err(InternalError::store_corruption(
"journal fold watermark magic mismatch",
));
}
let version_index = FOLD_WATERMARK_MAGIC.len();
let version = bytes[version_index];
if version != FOLD_WATERMARK_VERSION {
return Err(InternalError::store_corruption(format!(
"unsupported journal fold watermark version: {version}",
)));
}
let sequence_start = version_index + 1;
let epoch_start = sequence_start + 8;
let mut sequence_bytes = [0u8; 8];
let mut epoch_bytes = [0u8; 8];
sequence_bytes.copy_from_slice(&bytes[sequence_start..epoch_start]);
epoch_bytes.copy_from_slice(&bytes[epoch_start..]);
Ok(FoldWatermark::new(
JournalSequence::new(u64::from_be_bytes(sequence_bytes)),
u64::from_be_bytes(epoch_bytes),
))
}