use crate::{
db::{
data::DataKey,
executor::stream::key::{KeyOrderComparator, OrderedKeyStream},
},
error::InternalError,
};
use std::{cell::Cell, rc::Rc};
pub(in crate::db::executor) struct DistinctOrderedKeyStream<S> {
inner: S,
last_emitted: Option<DataKey>,
comparator: KeyOrderComparator,
deduped_keys_counter: Option<Rc<Cell<u64>>>,
}
impl<S> DistinctOrderedKeyStream<S> {
#[must_use]
pub(in crate::db::executor) const fn new(inner: S, comparator: KeyOrderComparator) -> Self {
Self {
inner,
last_emitted: None,
comparator,
deduped_keys_counter: None,
}
}
#[must_use]
pub(in crate::db::executor) const fn new_with_dedup_counter(
inner: S,
comparator: KeyOrderComparator,
deduped_keys_counter: Rc<Cell<u64>>,
) -> Self {
Self {
inner,
last_emitted: None,
comparator,
deduped_keys_counter: Some(deduped_keys_counter),
}
}
}
impl<S> OrderedKeyStream for DistinctOrderedKeyStream<S>
where
S: OrderedKeyStream,
{
fn next_key(&mut self) -> Result<Option<DataKey>, InternalError> {
loop {
let Some(next) = self.inner.next_key()? else {
return Ok(None);
};
if let Some(last) = self.last_emitted.as_ref() {
if self.comparator.compare_data_keys(last, &next).is_gt() {
return Err(InternalError::query_executor_invariant(
"distinct ordered stream received non-monotonic key order",
));
}
if last == &next {
if let Some(counter) = self.deduped_keys_counter.as_ref() {
counter.set(counter.get().saturating_add(1));
}
continue;
}
}
self.last_emitted = Some(next.clone());
return Ok(Some(next));
}
}
}