use crate::{
db::{
data::{DataKey, StorageKey},
executor::stream::key::{KeyOrderComparator, OrderedKeyStream},
},
error::InternalError,
types::EntityTag,
};
type DataKeyWitness = (EntityTag, StorageKey);
const fn data_key_witness(key: &DataKey) -> DataKeyWitness {
(key.entity_tag(), key.storage_key())
}
fn witness_matches_key(witness: &DataKeyWitness, key: &DataKey) -> bool {
witness.0 == key.entity_tag() && witness.1 == key.storage_key()
}
struct StreamSideState {
item: Option<DataKey>,
done: bool,
last_key: Option<DataKeyWitness>,
comparator: KeyOrderComparator,
strict_monotonicity: bool,
name: &'static str,
}
impl StreamSideState {
const fn new(name: &'static str, comparator: KeyOrderComparator) -> Self {
Self {
item: None,
done: false,
last_key: None,
comparator,
strict_monotonicity: true,
name,
}
}
fn ensure_item<S>(
&mut self,
stream: &mut S,
stream_kind: &'static str,
direction_context: &'static str,
) -> Result<(), InternalError>
where
S: OrderedKeyStream,
{
if self.done || self.item.is_some() {
return Ok(());
}
match stream.next_key()? {
Some(key) => self.push_key(key, stream_kind, direction_context)?,
None => self.done = true,
}
Ok(())
}
fn push_key(
&mut self,
key: DataKey,
stream_kind: &'static str,
direction_context: &'static str,
) -> Result<(), InternalError> {
self.validate_monotonicity(&key, stream_kind, direction_context)?;
self.item = Some(key);
Ok(())
}
fn entity_monotonicity_required(
&self,
stream_kind: &'static str,
direction_context: &'static str,
previous_entity: EntityTag,
current_entity: EntityTag,
) -> InternalError {
InternalError::query_executor_invariant(format!(
"{stream_kind} stream {} changed entity while enforcing {} {direction_context} monotonicity (previous entity: {:?}, current entity: {:?})",
self.name,
self.comparator.order_label(),
previous_entity,
current_entity,
))
}
fn key_monotonicity_required(
&self,
stream_kind: &'static str,
direction_context: &'static str,
current_entity: EntityTag,
previous_key: &StorageKey,
current_key: &StorageKey,
) -> InternalError {
InternalError::query_executor_invariant(format!(
"{stream_kind} stream {} emitted out-of-order key for {} {direction_context} (entity: {:?}, previous key: {:?}, current key: {:?})",
self.name,
self.comparator.order_label(),
current_entity,
previous_key,
current_key,
))
}
fn validate_monotonicity(
&self,
current: &DataKey,
stream_kind: &'static str,
direction_context: &'static str,
) -> Result<(), InternalError> {
if !self.strict_monotonicity {
return Ok(());
}
let Some((previous_entity, previous_key)) = self.last_key.as_ref() else {
return Ok(());
};
let (current_entity, current_key) = data_key_witness(current);
if *previous_entity != current_entity {
return Err(self.entity_monotonicity_required(
stream_kind,
direction_context,
*previous_entity,
current_entity,
));
}
if !self
.comparator
.violates_monotonicity(previous_key, ¤t_key)
{
return Ok(());
}
Err(self.key_monotonicity_required(
stream_kind,
direction_context,
current_entity,
previous_key,
¤t_key,
))
}
fn take_item(&mut self) -> Option<DataKey> {
let key = self.item.take()?;
self.last_key = Some(data_key_witness(&key));
Some(key)
}
const fn clear_item(&mut self) {
if let Some(key) = self.item.take() {
self.last_key = Some(data_key_witness(&key));
}
}
}
struct OrderedPairState {
left: StreamSideState,
right: StreamSideState,
}
impl OrderedPairState {
const fn new(comparator: KeyOrderComparator) -> Self {
Self {
left: StreamSideState::new("left", comparator),
right: StreamSideState::new("right", comparator),
}
}
}
pub(in crate::db::executor) struct MergeOrderedKeyStream<A, B> {
left: A,
right: B,
pair: OrderedPairState,
comparator: KeyOrderComparator,
last_emitted: Option<DataKeyWitness>,
}
impl<A, B> MergeOrderedKeyStream<A, B>
where
A: OrderedKeyStream,
B: OrderedKeyStream,
{
#[must_use]
pub(in crate::db::executor) const fn new_with_comparator(
left: A,
right: B,
comparator: KeyOrderComparator,
) -> Self {
Self {
left,
right,
pair: OrderedPairState::new(comparator),
comparator,
last_emitted: None,
}
}
fn ensure_left_item(&mut self) -> Result<(), InternalError> {
self.pair.left.ensure_item(&mut self.left, "merge", "merge")
}
fn ensure_right_item(&mut self) -> Result<(), InternalError> {
self.pair
.right
.ensure_item(&mut self.right, "merge", "merge")
}
}
impl<A, B> OrderedKeyStream for MergeOrderedKeyStream<A, B>
where
A: OrderedKeyStream,
B: OrderedKeyStream,
{
fn next_key(&mut self) -> Result<Option<DataKey>, InternalError> {
loop {
self.ensure_left_item()?;
self.ensure_right_item()?;
if self.pair.left.item.is_none() && self.pair.right.item.is_none() {
return Ok(None);
}
let next = match (self.pair.left.item.as_ref(), self.pair.right.item.as_ref()) {
(Some(left_key), Some(right_key)) => {
if left_key == right_key {
self.pair.right.clear_item();
self.pair.left.take_item()
} else {
let choose_left = self
.comparator
.compare_data_keys(left_key, right_key)
.is_lt();
if choose_left {
self.pair.left.take_item()
} else {
self.pair.right.take_item()
}
}
}
(Some(_), None) => self.pair.left.take_item(),
(None, Some(_)) => self.pair.right.take_item(),
(None, None) => None,
};
let Some(next) = next else {
return Ok(None);
};
if self
.last_emitted
.as_ref()
.is_some_and(|last| witness_matches_key(last, &next))
{
continue;
}
self.last_emitted = Some(data_key_witness(&next));
return Ok(Some(next));
}
}
}
pub(in crate::db::executor) struct IntersectOrderedKeyStream<A, B> {
left: A,
right: B,
pair: OrderedPairState,
comparator: KeyOrderComparator,
last_emitted: Option<DataKeyWitness>,
}
impl<A, B> IntersectOrderedKeyStream<A, B>
where
A: OrderedKeyStream,
B: OrderedKeyStream,
{
#[must_use]
pub(in crate::db::executor) const fn new_with_comparator(
left: A,
right: B,
comparator: KeyOrderComparator,
) -> Self {
Self {
left,
right,
pair: OrderedPairState::new(comparator),
comparator,
last_emitted: None,
}
}
fn ensure_left_item(&mut self) -> Result<(), InternalError> {
self.pair
.left
.ensure_item(&mut self.left, "intersect", "intersection")
}
fn ensure_right_item(&mut self) -> Result<(), InternalError> {
self.pair
.right
.ensure_item(&mut self.right, "intersect", "intersection")
}
}
impl<A, B> OrderedKeyStream for IntersectOrderedKeyStream<A, B>
where
A: OrderedKeyStream,
B: OrderedKeyStream,
{
fn next_key(&mut self) -> Result<Option<DataKey>, InternalError> {
loop {
if self.pair.left.done || self.pair.right.done {
return Ok(None);
}
self.ensure_left_item()?;
self.ensure_right_item()?;
let (Some(left_key), Some(right_key)) =
(self.pair.left.item.as_ref(), self.pair.right.item.as_ref())
else {
return Ok(None);
};
if left_key == right_key {
let Some(next) = self.pair.left.take_item() else {
return Ok(None);
};
self.pair.right.clear_item();
if self
.last_emitted
.as_ref()
.is_some_and(|last| witness_matches_key(last, &next))
{
continue;
}
self.last_emitted = Some(data_key_witness(&next));
return Ok(Some(next));
}
let advance_left = self
.comparator
.compare_data_keys(left_key, right_key)
.is_lt();
if advance_left {
self.pair.left.clear_item();
} else {
self.pair.right.clear_item();
}
}
}
}