#![allow(dead_code)]
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ValueNeed {
None,
CountOnly,
ExistsOnly,
Predicate,
Projection,
Whole,
Numeric,
}
impl ValueNeed {
pub(crate) fn requires_payload(self) -> bool {
!matches!(
self,
ValueNeed::None | ValueNeed::CountOnly | ValueNeed::ExistsOnly
)
}
pub(crate) fn merge(self, other: Self) -> Self {
use ValueNeed::*;
match (self, other) {
(Whole, _) | (_, Whole) => Whole,
(Numeric, _) | (_, Numeric) => Numeric,
(Projection, _) | (_, Projection) => Projection,
(Predicate, _) | (_, Predicate) => Predicate,
(ExistsOnly, _) | (_, ExistsOnly) => ExistsOnly,
(CountOnly, _) | (_, CountOnly) => CountOnly,
(None, None) => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FieldPath {
keys: Arc<[Arc<str>]>,
}
impl FieldPath {
pub fn single(key: Arc<str>) -> Self {
Self {
keys: Arc::from([key]),
}
}
pub fn chain(keys: Arc<[Arc<str>]>) -> Self {
Self { keys }
}
pub fn keys(&self) -> &[Arc<str>] {
&self.keys
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct FieldSet {
paths: Vec<FieldPath>,
}
impl FieldSet {
pub fn new() -> Self {
Self { paths: Vec::new() }
}
pub fn single(key: Arc<str>) -> Self {
let mut out = Self::new();
out.insert(FieldPath::single(key));
out
}
pub fn chain(keys: Arc<[Arc<str>]>) -> Self {
let mut out = Self::new();
out.insert(FieldPath::chain(keys));
out
}
pub fn insert(&mut self, path: FieldPath) {
if !self.paths.iter().any(|existing| existing == &path) {
self.paths.push(path);
}
}
pub fn extend(&mut self, other: &FieldSet) {
for path in other.paths.iter() {
self.insert(path.clone());
}
}
pub fn paths(&self) -> &[FieldPath] {
&self.paths
}
pub fn is_empty(&self) -> bool {
self.paths.is_empty()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FieldDemand {
None,
Fields(FieldSet),
Whole,
}
impl FieldDemand {
pub fn merge(self, other: Self) -> Self {
match (self, other) {
(Self::Whole, _) | (_, Self::Whole) => Self::Whole,
(Self::None, need) | (need, Self::None) => need,
(Self::Fields(mut lhs), Self::Fields(rhs)) => {
lhs.extend(&rhs);
Self::Fields(lhs)
}
}
}
pub fn is_none(&self) -> bool {
matches!(self, Self::None)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DemandLanes {
pub scan_need: FieldDemand,
pub result_need: FieldDemand,
}
impl DemandLanes {
pub const NONE: Self = Self {
scan_need: FieldDemand::None,
result_need: FieldDemand::None,
};
pub const RESULT: Self = Self {
scan_need: FieldDemand::None,
result_need: FieldDemand::Whole,
};
pub fn merge_scan(&mut self, need: FieldDemand) {
self.scan_need = self.scan_need.clone().merge(need);
}
pub fn merge_result(&mut self, need: FieldDemand) {
self.result_need = self.result_need.clone().merge(need);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PullDemand {
All,
FirstInput(usize),
LastInput(usize),
NthInput(usize),
UntilOutput(usize),
}
impl PullDemand {
pub(crate) fn cap_inputs(self, n: usize) -> Self {
match self {
PullDemand::All | PullDemand::UntilOutput(_) | PullDemand::LastInput(_) => {
PullDemand::FirstInput(n)
}
PullDemand::FirstInput(m) => PullDemand::FirstInput(m.min(n)),
PullDemand::NthInput(i) => {
if i < n {
PullDemand::NthInput(i)
} else {
PullDemand::FirstInput(n)
}
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Demand {
pub pull: PullDemand,
pub value: ValueNeed,
pub order: bool,
}
impl Demand {
pub const RESULT: Demand = Demand {
pull: PullDemand::All,
value: ValueNeed::Whole,
order: true,
};
pub fn all(value: ValueNeed) -> Self {
Self {
pull: PullDemand::All,
value,
order: true,
}
}
pub fn first(value: ValueNeed) -> Self {
Self {
pull: PullDemand::FirstInput(1),
value,
order: false,
}
}
}
pub trait DemandOperator {
fn propagate_demand(&self, downstream: Demand) -> Demand;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DemandStep<Op> {
pub op: Op,
pub downstream: Demand,
pub upstream: Demand,
}
pub fn propagate_demands<Op>(ops: &[Op], final_demand: Demand) -> Vec<DemandStep<Op>>
where
Op: DemandOperator + Clone,
{
let mut demand = final_demand;
let mut out = Vec::with_capacity(ops.len());
for op in ops.iter().rev() {
let upstream = op.propagate_demand(demand);
out.push(DemandStep {
op: op.clone(),
downstream: demand,
upstream,
});
demand = upstream;
}
out.reverse();
out
}
pub fn source_demand<Op>(ops: &[Op], final_demand: Demand) -> Demand
where
Op: DemandOperator,
{
ops.iter()
.rev()
.fold(final_demand, |demand, op| op.propagate_demand(demand))
}