use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ValueNeed {
#[allow(dead_code)]
None,
CountOnly,
ExistsOnly,
Predicate,
Projection,
Whole,
Numeric,
}
impl ValueNeed {
pub(crate) fn requires_payload(self) -> bool {
!matches!(
self,
ValueNeed::None | ValueNeed::CountOnly | ValueNeed::ExistsOnly
)
}
pub(crate) fn merge(self, other: Self) -> Self {
use ValueNeed::*;
match (self, other) {
(Whole, _) | (_, Whole) => Whole,
(Numeric, _) | (_, Numeric) => Numeric,
(Projection, _) | (_, Projection) => Projection,
(Predicate, _) | (_, Predicate) => Predicate,
(ExistsOnly, _) | (_, ExistsOnly) => ExistsOnly,
(CountOnly, _) | (_, CountOnly) => CountOnly,
(None, None) => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FieldPath {
keys: Arc<[Arc<str>]>,
}
impl FieldPath {
pub fn single(key: Arc<str>) -> Self {
Self {
keys: Arc::from([key]),
}
}
pub fn chain(keys: Arc<[Arc<str>]>) -> Self {
Self { keys }
}
#[cfg(test)]
pub fn keys(&self) -> &[Arc<str>] {
&self.keys
}
pub fn prefixed(&self, prefix: &[Arc<str>]) -> Self {
let mut keys = Vec::with_capacity(prefix.len() + self.keys.len());
keys.extend(prefix.iter().cloned());
keys.extend(self.keys.iter().cloned());
Self { keys: keys.into() }
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct FieldSet {
paths: Vec<FieldPath>,
}
impl FieldSet {
pub fn new() -> Self {
Self { paths: Vec::new() }
}
pub fn single(key: Arc<str>) -> Self {
let mut out = Self::new();
out.insert(FieldPath::single(key));
out
}
pub fn chain(keys: Arc<[Arc<str>]>) -> Self {
let mut out = Self::new();
out.insert(FieldPath::chain(keys));
out
}
pub fn insert(&mut self, path: FieldPath) {
if !self.paths.iter().any(|existing| existing == &path) {
self.paths.push(path);
}
}
pub fn extend(&mut self, other: &FieldSet) {
for path in other.paths.iter() {
self.insert(path.clone());
}
}
#[cfg(test)]
pub fn paths(&self) -> &[FieldPath] {
&self.paths
}
pub fn prefixed(&self, prefix: &[Arc<str>]) -> Self {
let mut out = Self::new();
for path in self.paths.iter() {
out.insert(path.prefixed(prefix));
}
out
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FieldDemand {
None,
Fields(FieldSet),
Whole,
}
impl FieldDemand {
pub fn merge(self, other: Self) -> Self {
match (self, other) {
(Self::Whole, _) | (_, Self::Whole) => Self::Whole,
(Self::None, need) | (need, Self::None) => need,
(Self::Fields(mut lhs), Self::Fields(rhs)) => {
lhs.extend(&rhs);
Self::Fields(lhs)
}
}
}
pub fn is_none(&self) -> bool {
matches!(self, Self::None)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DemandLanes {
pub scan_need: FieldDemand,
pub result_need: FieldDemand,
}
impl DemandLanes {
pub const NONE: Self = Self {
scan_need: FieldDemand::None,
result_need: FieldDemand::None,
};
pub const RESULT: Self = Self {
scan_need: FieldDemand::None,
result_need: FieldDemand::Whole,
};
pub fn merge_scan(&mut self, need: FieldDemand) {
self.scan_need = self.scan_need.clone().merge(need);
}
pub fn merge_result(&mut self, need: FieldDemand) {
self.result_need = self.result_need.clone().merge(need);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PullDemand {
All,
FirstInput(usize),
LastInput(usize),
NthInput(usize),
UntilOutput(usize),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SinkResultDemand {
None,
UntilMatch,
UntilFailure,
}
impl SinkResultDemand {
#[cfg(test)]
pub(crate) fn can_short_circuit(self) -> bool {
!matches!(self, Self::None)
}
}
impl PullDemand {
pub(crate) fn is_zero(self) -> bool {
matches!(
self,
PullDemand::FirstInput(0) | PullDemand::LastInput(0) | PullDemand::UntilOutput(0)
)
}
pub(crate) fn cap_inputs(self, n: usize) -> Self {
match self {
PullDemand::All | PullDemand::UntilOutput(_) | PullDemand::LastInput(_) => {
PullDemand::FirstInput(n)
}
PullDemand::FirstInput(m) => PullDemand::FirstInput(m.min(n)),
PullDemand::NthInput(i) => {
if i < n {
PullDemand::NthInput(i)
} else {
PullDemand::FirstInput(n)
}
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Demand {
pub pull: PullDemand,
pub value: ValueNeed,
pub order: bool,
}
impl Demand {
pub const RESULT: Demand = Demand {
pull: PullDemand::All,
value: ValueNeed::Whole,
order: true,
};
pub fn all(value: ValueNeed) -> Self {
Self {
pull: PullDemand::All,
value,
order: true,
}
}
pub fn first(value: ValueNeed) -> Self {
Self {
pull: PullDemand::FirstInput(1),
value,
order: false,
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::{FieldDemand, FieldSet, PullDemand, SinkResultDemand};
fn paths(need: FieldDemand) -> Vec<String> {
match need {
FieldDemand::Fields(fields) => fields
.paths()
.iter()
.map(|path| {
path.keys()
.iter()
.map(|key| key.as_ref())
.collect::<Vec<_>>()
.join(".")
})
.collect(),
FieldDemand::None => Vec::new(),
FieldDemand::Whole => vec!["*".to_string()],
}
}
#[test]
fn field_sets_prefix_nested_paths() {
let mut fields = FieldSet::chain(Arc::from([Arc::<str>::from("name")]));
fields.insert(super::FieldPath::chain(Arc::from([
Arc::<str>::from("address"),
Arc::<str>::from("city"),
])));
let prefixed = fields.prefixed(&[Arc::from("user")]);
assert_eq!(
paths(FieldDemand::Fields(prefixed)),
vec!["user.name", "user.address.city"]
);
}
#[test]
fn pull_demand_caps_inputs_without_crossing_prefix_bounds() {
assert_eq!(PullDemand::All.cap_inputs(3), PullDemand::FirstInput(3));
assert_eq!(
PullDemand::UntilOutput(2).cap_inputs(3),
PullDemand::FirstInput(3)
);
assert_eq!(
PullDemand::LastInput(2).cap_inputs(3),
PullDemand::FirstInput(3)
);
assert_eq!(
PullDemand::FirstInput(5).cap_inputs(3),
PullDemand::FirstInput(3)
);
assert_eq!(PullDemand::NthInput(2).cap_inputs(3), PullDemand::NthInput(2));
assert_eq!(
PullDemand::NthInput(3).cap_inputs(3),
PullDemand::FirstInput(3)
);
}
#[test]
fn pull_demand_zero_only_matches_no_read_variants() {
assert!(PullDemand::FirstInput(0).is_zero());
assert!(PullDemand::LastInput(0).is_zero());
assert!(PullDemand::UntilOutput(0).is_zero());
assert!(!PullDemand::NthInput(0).is_zero());
assert!(!PullDemand::All.is_zero());
}
#[test]
fn sink_result_demand_is_separate_from_row_output_demand() {
assert!(SinkResultDemand::UntilMatch.can_short_circuit());
assert!(SinkResultDemand::UntilFailure.can_short_circuit());
assert!(!SinkResultDemand::None.can_short_circuit());
}
}
pub trait DemandOperator {
fn propagate_demand(&self, downstream: Demand) -> Demand;
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg(test)]
pub struct DemandStep<Op> {
pub op: Op,
pub downstream: Demand,
pub upstream: Demand,
}
#[cfg(test)]
pub fn propagate_demands<Op>(ops: &[Op], final_demand: Demand) -> Vec<DemandStep<Op>>
where
Op: DemandOperator + Clone,
{
let mut demand = final_demand;
let mut out = Vec::with_capacity(ops.len());
for op in ops.iter().rev() {
let upstream = op.propagate_demand(demand);
out.push(DemandStep {
op: op.clone(),
downstream: demand,
upstream,
});
demand = upstream;
}
out.reverse();
out
}
#[cfg(test)]
pub fn source_demand<Op>(ops: &[Op], final_demand: Demand) -> Demand
where
Op: DemandOperator,
{
ops.iter()
.rev()
.fold(final_demand, |demand, op| op.propagate_demand(demand))
}