use crate::{
db::{
data::DecodedDataStoreKey,
executor::stream::key::{KeyOrderComparator, OrderedKeyStream},
key_taxonomy::PrimaryKeyValue,
},
error::InternalError,
types::EntityTag,
};
type RowKeyWitness = (EntityTag, PrimaryKeyValue);
const fn row_key_witness(key: &DecodedDataStoreKey) -> RowKeyWitness {
(key.entity_tag(), key.primary_key_value())
}
fn witness_matches_key(witness: &RowKeyWitness, key: &DecodedDataStoreKey) -> bool {
witness.0 == key.entity_tag() && witness.1 == key.primary_key_value()
}
struct StreamSideState {
item: Option<DecodedDataStoreKey>,
done: bool,
last_key: Option<RowKeyWitness>,
comparator: KeyOrderComparator,
strict_monotonicity: bool,
}
impl StreamSideState {
const fn new(comparator: KeyOrderComparator) -> Self {
Self {
item: None,
done: false,
last_key: None,
comparator,
strict_monotonicity: true,
}
}
fn ensure_item<S>(&mut self, stream: &mut S) -> Result<(), InternalError>
where
S: OrderedKeyStream,
{
if self.done || self.item.is_some() {
return Ok(());
}
match stream.next_key()? {
Some(key) => self.push_key(key)?,
None => self.done = true,
}
Ok(())
}
fn push_key(&mut self, key: DecodedDataStoreKey) -> Result<(), InternalError> {
self.validate_monotonicity(&key)?;
self.item = Some(key);
Ok(())
}
fn entity_monotonicity_required() -> InternalError {
InternalError::query_executor_invariant("")
}
fn key_monotonicity_required() -> InternalError {
InternalError::query_executor_invariant("")
}
fn validate_monotonicity(&self, current: &DecodedDataStoreKey) -> Result<(), InternalError> {
if !self.strict_monotonicity {
return Ok(());
}
let Some((previous_entity, previous_key)) = self.last_key.as_ref() else {
return Ok(());
};
let (current_entity, current_key) = row_key_witness(current);
if *previous_entity != current_entity {
return Err(Self::entity_monotonicity_required());
}
if !self
.comparator
.violates_monotonicity(previous_key, ¤t_key)
{
return Ok(());
}
Err(Self::key_monotonicity_required())
}
fn take_item(&mut self) -> Option<DecodedDataStoreKey> {
let key = self.item.take()?;
self.last_key = Some(row_key_witness(&key));
Some(key)
}
fn clear_item(&mut self) {
if let Some(key) = self.item.take() {
self.last_key = Some(row_key_witness(&key));
}
}
}
struct OrderedPairState {
left: StreamSideState,
right: StreamSideState,
}
impl OrderedPairState {
const fn new(comparator: KeyOrderComparator) -> Self {
Self {
left: StreamSideState::new(comparator),
right: StreamSideState::new(comparator),
}
}
}
pub(in crate::db::executor) struct MergeOrderedKeyStream<A, B> {
left: A,
right: B,
pair: OrderedPairState,
comparator: KeyOrderComparator,
last_emitted: Option<RowKeyWitness>,
}
impl<A, B> MergeOrderedKeyStream<A, B>
where
A: OrderedKeyStream,
B: OrderedKeyStream,
{
#[must_use]
pub(in crate::db::executor) const fn new_with_comparator(
left: A,
right: B,
comparator: KeyOrderComparator,
) -> Self {
Self {
left,
right,
pair: OrderedPairState::new(comparator),
comparator,
last_emitted: None,
}
}
fn ensure_left_item(&mut self) -> Result<(), InternalError> {
self.pair.left.ensure_item(&mut self.left)
}
fn ensure_right_item(&mut self) -> Result<(), InternalError> {
self.pair.right.ensure_item(&mut self.right)
}
}
impl<A, B> OrderedKeyStream for MergeOrderedKeyStream<A, B>
where
A: OrderedKeyStream,
B: OrderedKeyStream,
{
fn next_key(&mut self) -> Result<Option<DecodedDataStoreKey>, InternalError> {
loop {
self.ensure_left_item()?;
self.ensure_right_item()?;
if self.pair.left.item.is_none() && self.pair.right.item.is_none() {
return Ok(None);
}
let next = match (self.pair.left.item.as_ref(), self.pair.right.item.as_ref()) {
(Some(left_key), Some(right_key)) => {
if left_key == right_key {
self.pair.right.clear_item();
self.pair.left.take_item()
} else {
let choose_left = self
.comparator
.compare_data_keys(left_key, right_key)
.is_lt();
if choose_left {
self.pair.left.take_item()
} else {
self.pair.right.take_item()
}
}
}
(Some(_), None) => self.pair.left.take_item(),
(None, Some(_)) => self.pair.right.take_item(),
(None, None) => None,
};
let Some(next) = next else {
return Ok(None);
};
if self
.last_emitted
.as_ref()
.is_some_and(|last| witness_matches_key(last, &next))
{
continue;
}
self.last_emitted = Some(row_key_witness(&next));
return Ok(Some(next));
}
}
}
pub(in crate::db::executor) struct IntersectOrderedKeyStream<A, B> {
left: A,
right: B,
pair: OrderedPairState,
comparator: KeyOrderComparator,
last_emitted: Option<RowKeyWitness>,
}
impl<A, B> IntersectOrderedKeyStream<A, B>
where
A: OrderedKeyStream,
B: OrderedKeyStream,
{
#[must_use]
pub(in crate::db::executor) const fn new_with_comparator(
left: A,
right: B,
comparator: KeyOrderComparator,
) -> Self {
Self {
left,
right,
pair: OrderedPairState::new(comparator),
comparator,
last_emitted: None,
}
}
fn ensure_left_item(&mut self) -> Result<(), InternalError> {
self.pair.left.ensure_item(&mut self.left)
}
fn ensure_right_item(&mut self) -> Result<(), InternalError> {
self.pair.right.ensure_item(&mut self.right)
}
}
impl<A, B> OrderedKeyStream for IntersectOrderedKeyStream<A, B>
where
A: OrderedKeyStream,
B: OrderedKeyStream,
{
fn next_key(&mut self) -> Result<Option<DecodedDataStoreKey>, InternalError> {
loop {
if self.pair.left.done || self.pair.right.done {
return Ok(None);
}
self.ensure_left_item()?;
self.ensure_right_item()?;
let (Some(left_key), Some(right_key)) =
(self.pair.left.item.as_ref(), self.pair.right.item.as_ref())
else {
return Ok(None);
};
if left_key == right_key {
let Some(next) = self.pair.left.take_item() else {
return Ok(None);
};
self.pair.right.clear_item();
if self
.last_emitted
.as_ref()
.is_some_and(|last| witness_matches_key(last, &next))
{
continue;
}
self.last_emitted = Some(row_key_witness(&next));
return Ok(Some(next));
}
let advance_left = self
.comparator
.compare_data_keys(left_key, right_key)
.is_lt();
if advance_left {
self.pair.left.clear_item();
} else {
self.pair.right.clear_item();
}
}
}
}