use std::{
collections::{BTreeMap, HashMap, HashSet},
ops::{Bound, RangeBounds},
};
use reifydb_core::{
actors::drop::{DropMessage, DropRequest},
common::CommitVersion,
delta::Delta,
encoded::{
key::{EncodedKey, EncodedKeyRange},
row::EncodedRow,
},
event::metric::{MultiCommittedEvent, MultiDelete, MultiWrite},
interface::store::{
EntryKind, MultiVersionBatch, MultiVersionCommit, MultiVersionContains, MultiVersionGet,
MultiVersionGetPrevious, MultiVersionRow, MultiVersionStore,
},
};
use reifydb_type::util::{cowvec::CowVec, hex};
use tracing::{instrument, warn};
use super::{
StandardMultiStore,
router::{classify_key, classify_range, is_single_version_semantics_key},
version::{VersionedGetResult, get_at_version},
};
use crate::{
Result,
tier::{RangeCursor, TierBatch, TierStorage},
};
const TIER_SCAN_CHUNK_SIZE: usize = 4096;
impl MultiVersionGet for StandardMultiStore {
#[instrument(name = "store::multi::get", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0))]
fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<MultiVersionRow>> {
let table = classify_key(key);
if let Some(hot) = &self.hot {
match get_at_version(hot, table, key.as_ref(), version)? {
VersionedGetResult::Value {
value,
version: v,
} => {
return Ok(Some(MultiVersionRow {
key: key.clone(),
row: EncodedRow(value),
version: v,
}));
}
VersionedGetResult::Tombstone => return Ok(None),
VersionedGetResult::NotFound => {}
}
}
if let Some(warm) = &self.warm {
match get_at_version(warm, table, key.as_ref(), version)? {
VersionedGetResult::Value {
value,
version: v,
} => {
return Ok(Some(MultiVersionRow {
key: key.clone(),
row: EncodedRow(value),
version: v,
}));
}
VersionedGetResult::Tombstone => return Ok(None),
VersionedGetResult::NotFound => {}
}
}
if let Some(cold) = &self.cold {
match get_at_version(cold, table, key.as_ref(), version)? {
VersionedGetResult::Value {
value,
version: v,
} => {
return Ok(Some(MultiVersionRow {
key: key.clone(),
row: EncodedRow(value),
version: v,
}));
}
VersionedGetResult::Tombstone => return Ok(None),
VersionedGetResult::NotFound => {}
}
}
Ok(None)
}
}
impl MultiVersionContains for StandardMultiStore {
#[instrument(name = "store::multi::contains", level = "trace", skip(self), fields(key_hex = %hex::display(key.as_ref()), version = version.0), ret)]
fn contains(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
Ok(MultiVersionGet::get(self, key, version)?.is_some())
}
}
impl MultiVersionCommit for StandardMultiStore {
#[instrument(name = "store::multi::commit", level = "debug", skip(self, deltas), fields(delta_count = deltas.len(), version = version.0))]
fn commit(&self, deltas: CowVec<Delta>, version: CommitVersion) -> Result<()> {
let Some(storage) = &self.hot else {
return Ok(());
};
let classified = classify_deltas(&deltas);
let drop_batch = build_drop_batch(classified.explicit_drops, &classified.pending_set_keys, version);
self.dispatch_drops(drop_batch);
storage.set(version, classified.batches)?;
self.emit_commit_metrics(classified.writes, classified.deletes, version);
Ok(())
}
}
struct ClassifiedDeltas {
pending_set_keys: HashSet<CowVec<u8>>,
writes: Vec<MultiWrite>,
deletes: Vec<MultiDelete>,
batches: TierBatch,
explicit_drops: Vec<(EntryKind, EncodedKey)>,
}
#[inline]
fn classify_deltas(deltas: &CowVec<Delta>) -> ClassifiedDeltas {
let mut pending_set_keys: HashSet<CowVec<u8>> = HashSet::new();
let mut writes: Vec<MultiWrite> = Vec::new();
let mut deletes: Vec<MultiDelete> = Vec::new();
let mut batches: TierBatch = HashMap::new();
let mut explicit_drops: Vec<(EntryKind, EncodedKey)> = Vec::new();
for delta in deltas.iter() {
let key = delta.key();
let table = classify_key(key);
let is_single_version = is_single_version_semantics_key(key);
match delta {
Delta::Set {
key,
row,
} => {
if is_single_version {
pending_set_keys.insert(key.0.clone());
}
writes.push(MultiWrite {
key: key.clone(),
value_bytes: row.len() as u64,
});
batches.entry(table).or_default().push((key.0.clone(), Some(row.0.clone())));
}
Delta::Unset {
key,
row,
} => {
deletes.push(MultiDelete {
key: key.clone(),
value_bytes: row.len() as u64,
});
batches.entry(table).or_default().push((key.0.clone(), None));
}
Delta::Remove {
key,
} => {
batches.entry(table).or_default().push((key.0.clone(), None));
}
Delta::Drop {
key,
} => {
explicit_drops.push((table, key.clone()));
}
}
}
ClassifiedDeltas {
pending_set_keys,
writes,
deletes,
batches,
explicit_drops,
}
}
#[inline]
fn build_drop_batch(
explicit_drops: Vec<(EntryKind, EncodedKey)>,
pending_set_keys: &HashSet<CowVec<u8>>,
version: CommitVersion,
) -> Vec<DropRequest> {
let mut drop_batch = Vec::with_capacity(explicit_drops.len() + pending_set_keys.len());
for (table, key) in explicit_drops {
let pending_version = if pending_set_keys.contains(key.as_ref()) {
Some(version)
} else {
None
};
drop_batch.push(DropRequest {
table,
key: key.0.clone(),
commit_version: version,
pending_version,
});
}
for key in pending_set_keys.iter() {
let table = classify_key(&EncodedKey(key.clone()));
drop_batch.push(DropRequest {
table,
key: key.clone(),
commit_version: version,
pending_version: Some(version),
});
}
drop_batch
}
impl StandardMultiStore {
#[inline]
fn dispatch_drops(&self, drop_batch: Vec<DropRequest>) {
if !drop_batch.is_empty() && self.drop_actor.send_blocking(DropMessage::Batch(drop_batch)).is_err() {
warn!("Failed to send drop batch");
}
}
#[inline]
fn emit_commit_metrics(&self, writes: Vec<MultiWrite>, deletes: Vec<MultiDelete>, version: CommitVersion) {
if writes.is_empty() && deletes.is_empty() {
return;
}
self.event_bus.emit(MultiCommittedEvent::new(writes, deletes, vec![], version));
}
}
#[derive(Debug, Clone, Default)]
pub struct MultiVersionRangeCursor {
pub hot: RangeCursor,
pub warm: RangeCursor,
pub cold: RangeCursor,
pub exhausted: bool,
}
impl MultiVersionRangeCursor {
pub fn new() -> Self {
Self::default()
}
pub fn is_exhausted(&self) -> bool {
self.exhausted
}
}
struct TierScanQuery<'a> {
table: EntryKind,
start: &'a [u8],
end: &'a [u8],
version: CommitVersion,
range: &'a EncodedKeyRange,
}
impl StandardMultiStore {
pub fn range_next(
&self,
cursor: &mut MultiVersionRangeCursor,
range: EncodedKeyRange,
version: CommitVersion,
batch_size: u64,
) -> Result<MultiVersionBatch> {
if cursor.exhausted {
return Ok(MultiVersionBatch {
items: Vec::new(),
has_more: false,
});
}
let table = classify_key_range(&range);
let (start, end) = make_range_bounds(&range);
let batch_size = batch_size as usize;
let scan = TierScanQuery {
table,
start: &start,
end: &end,
version,
range: &range,
};
let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
while collected.len() < batch_size {
let mut any_progress = false;
if let Some(hot) = &self.hot
&& !cursor.hot.exhausted
{
let progress = Self::scan_tier_chunk(hot, &mut cursor.hot, &scan, &mut collected)?;
any_progress |= progress;
}
if let Some(warm) = &self.warm
&& !cursor.warm.exhausted
{
let progress = Self::scan_tier_chunk(warm, &mut cursor.warm, &scan, &mut collected)?;
any_progress |= progress;
}
if let Some(cold) = &self.cold
&& !cursor.cold.exhausted
{
let progress = Self::scan_tier_chunk(cold, &mut cursor.cold, &scan, &mut collected)?;
any_progress |= progress;
}
if !any_progress {
cursor.exhausted = true;
break;
}
}
let items: Vec<MultiVersionRow> = collected
.into_iter()
.take(batch_size)
.filter_map(|(key_bytes, (v, value))| {
value.map(|val| MultiVersionRow {
key: EncodedKey(CowVec::new(key_bytes)),
row: EncodedRow(val),
version: v,
})
})
.collect();
let has_more = items.len() >= batch_size || !cursor.exhausted;
Ok(MultiVersionBatch {
items,
has_more,
})
}
fn scan_tier_chunk<S: TierStorage>(
storage: &S,
cursor: &mut RangeCursor,
scan: &TierScanQuery,
collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
) -> Result<bool> {
let batch = storage.range_next(
scan.table,
cursor,
Bound::Included(scan.start),
Bound::Included(scan.end),
scan.version,
TIER_SCAN_CHUNK_SIZE,
)?;
if batch.entries.is_empty() {
return Ok(false);
}
for entry in batch.entries {
let original_key = entry.key.as_slice().to_vec();
let entry_version = entry.version;
let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
if !scan.range.contains(&original_key_encoded) {
continue;
}
let should_update = match collected.get(&original_key) {
None => true,
Some((existing_version, _)) => entry_version > *existing_version,
};
if should_update {
collected.insert(original_key, (entry_version, entry.value));
}
}
Ok(true)
}
pub fn range(
&self,
range: EncodedKeyRange,
version: CommitVersion,
batch_size: usize,
) -> MultiVersionRangeIter {
MultiVersionRangeIter {
store: self.clone(),
cursor: MultiVersionRangeCursor::new(),
range,
version,
batch_size,
current_batch: Vec::new(),
current_index: 0,
}
}
pub fn range_rev(
&self,
range: EncodedKeyRange,
version: CommitVersion,
batch_size: usize,
) -> MultiVersionRangeRevIter {
MultiVersionRangeRevIter {
store: self.clone(),
cursor: MultiVersionRangeCursor::new(),
range,
version,
batch_size,
current_batch: Vec::new(),
current_index: 0,
}
}
fn range_rev_next(
&self,
cursor: &mut MultiVersionRangeCursor,
range: EncodedKeyRange,
version: CommitVersion,
batch_size: u64,
) -> Result<MultiVersionBatch> {
if cursor.exhausted {
return Ok(MultiVersionBatch {
items: Vec::new(),
has_more: false,
});
}
let table = classify_key_range(&range);
let (start, end) = make_range_bounds(&range);
let batch_size = batch_size as usize;
let scan = TierScanQuery {
table,
start: &start,
end: &end,
version,
range: &range,
};
let mut collected: BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)> = BTreeMap::new();
while collected.len() < batch_size {
let mut any_progress = false;
if let Some(hot) = &self.hot
&& !cursor.hot.exhausted
{
let progress = Self::scan_tier_chunk_rev(hot, &mut cursor.hot, &scan, &mut collected)?;
any_progress |= progress;
}
if let Some(warm) = &self.warm
&& !cursor.warm.exhausted
{
let progress =
Self::scan_tier_chunk_rev(warm, &mut cursor.warm, &scan, &mut collected)?;
any_progress |= progress;
}
if let Some(cold) = &self.cold
&& !cursor.cold.exhausted
{
let progress =
Self::scan_tier_chunk_rev(cold, &mut cursor.cold, &scan, &mut collected)?;
any_progress |= progress;
}
if !any_progress {
cursor.exhausted = true;
break;
}
}
let items: Vec<MultiVersionRow> = collected
.into_iter()
.rev()
.take(batch_size)
.filter_map(|(key_bytes, (v, value))| {
value.map(|val| MultiVersionRow {
key: EncodedKey(CowVec::new(key_bytes)),
row: EncodedRow(val),
version: v,
})
})
.collect();
let has_more = items.len() >= batch_size || !cursor.exhausted;
Ok(MultiVersionBatch {
items,
has_more,
})
}
fn scan_tier_chunk_rev<S: TierStorage>(
storage: &S,
cursor: &mut RangeCursor,
scan: &TierScanQuery,
collected: &mut BTreeMap<Vec<u8>, (CommitVersion, Option<CowVec<u8>>)>,
) -> Result<bool> {
let batch = storage.range_rev_next(
scan.table,
cursor,
Bound::Included(scan.start),
Bound::Included(scan.end),
scan.version,
TIER_SCAN_CHUNK_SIZE,
)?;
if batch.entries.is_empty() {
return Ok(false);
}
for entry in batch.entries {
let original_key = entry.key.as_slice().to_vec();
let entry_version = entry.version;
let original_key_encoded = EncodedKey(CowVec::new(original_key.clone()));
if !scan.range.contains(&original_key_encoded) {
continue;
}
let should_update = match collected.get(&original_key) {
None => true,
Some((existing_version, _)) => entry_version > *existing_version,
};
if should_update {
collected.insert(original_key, (entry_version, entry.value));
}
}
Ok(true)
}
}
impl MultiVersionGetPrevious for StandardMultiStore {
fn get_previous_version(
&self,
key: &EncodedKey,
before_version: CommitVersion,
) -> Result<Option<MultiVersionRow>> {
if before_version.0 == 0 {
return Ok(None);
}
let storage = self.hot.as_ref().expect("hot storage required for version lookups");
let table = classify_key(key);
let prev_version = CommitVersion(before_version.0 - 1);
match get_at_version(storage, table, key.as_ref(), prev_version) {
Ok(VersionedGetResult::Value {
value,
version,
}) => Ok(Some(MultiVersionRow {
key: key.clone(),
row: EncodedRow(CowVec::new(value.to_vec())),
version,
})),
Ok(VersionedGetResult::Tombstone) | Ok(VersionedGetResult::NotFound) => Ok(None),
Err(e) => Err(e),
}
}
}
impl MultiVersionStore for StandardMultiStore {}
pub struct MultiVersionRangeIter {
store: StandardMultiStore,
cursor: MultiVersionRangeCursor,
range: EncodedKeyRange,
version: CommitVersion,
batch_size: usize,
current_batch: Vec<MultiVersionRow>,
current_index: usize,
}
impl Iterator for MultiVersionRangeIter {
type Item = Result<MultiVersionRow>;
fn next(&mut self) -> Option<Self::Item> {
if self.current_index < self.current_batch.len() {
let item = self.current_batch[self.current_index].clone();
self.current_index += 1;
return Some(Ok(item));
}
if self.cursor.exhausted {
return None;
}
match self.store.range_next(&mut self.cursor, self.range.clone(), self.version, self.batch_size as u64)
{
Ok(batch) => {
if batch.items.is_empty() {
return None;
}
self.current_batch = batch.items;
self.current_index = 0;
self.next()
}
Err(e) => Some(Err(e)),
}
}
}
pub struct MultiVersionRangeRevIter {
store: StandardMultiStore,
cursor: MultiVersionRangeCursor,
range: EncodedKeyRange,
version: CommitVersion,
batch_size: usize,
current_batch: Vec<MultiVersionRow>,
current_index: usize,
}
impl Iterator for MultiVersionRangeRevIter {
type Item = Result<MultiVersionRow>;
fn next(&mut self) -> Option<Self::Item> {
if self.current_index < self.current_batch.len() {
let item = self.current_batch[self.current_index].clone();
self.current_index += 1;
return Some(Ok(item));
}
if self.cursor.exhausted {
return None;
}
match self.store.range_rev_next(
&mut self.cursor,
self.range.clone(),
self.version,
self.batch_size as u64,
) {
Ok(batch) => {
if batch.items.is_empty() {
return None;
}
self.current_batch = batch.items;
self.current_index = 0;
self.next()
}
Err(e) => Some(Err(e)),
}
}
}
fn classify_key_range(range: &EncodedKeyRange) -> EntryKind {
classify_range(range).unwrap_or(EntryKind::Multi)
}
fn make_range_bounds(range: &EncodedKeyRange) -> (Vec<u8>, Vec<u8>) {
let start = match &range.start {
Bound::Included(key) => key.as_ref().to_vec(),
Bound::Excluded(key) => key.as_ref().to_vec(),
Bound::Unbounded => vec![],
};
let end = match &range.end {
Bound::Included(key) => key.as_ref().to_vec(),
Bound::Excluded(key) => key.as_ref().to_vec(),
Bound::Unbounded => vec![0xFFu8; 256],
};
(start, end)
}