use crate::{
db::access::{AccessPath, ExecutableAccessPlan, SemanticIndexRangeSpec},
model::index::IndexModel,
traits::KeyValueCodec,
value::Value,
};
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) enum AccessPlan<K> {
Path(Box<AccessPath<K>>),
Union(Vec<Self>),
Intersection(Vec<Self>),
}
impl<K> AccessPlan<K> {
#[must_use]
pub(crate) fn path(path: AccessPath<K>) -> Self {
Self::Path(Box::new(path))
}
#[must_use]
pub(crate) fn by_key(key: K) -> Self {
Self::path(AccessPath::ByKey(key))
}
#[must_use]
pub(crate) fn by_keys(keys: Vec<K>) -> Self {
Self::path(AccessPath::ByKeys(keys))
}
#[must_use]
pub(crate) fn key_range(start: K, end: K) -> Self {
Self::path(AccessPath::KeyRange { start, end })
}
#[must_use]
pub(crate) fn index_prefix(index: IndexModel, values: Vec<Value>) -> Self {
Self::path(AccessPath::IndexPrefix { index, values })
}
#[must_use]
pub(crate) fn index_multi_lookup(index: IndexModel, values: Vec<Value>) -> Self {
Self::path(AccessPath::IndexMultiLookup { index, values })
}
#[must_use]
pub(crate) fn index_range(spec: SemanticIndexRangeSpec) -> Self {
Self::path(AccessPath::IndexRange { spec })
}
#[must_use]
pub(crate) fn full_scan() -> Self {
Self::path(AccessPath::FullScan)
}
#[must_use]
pub(crate) fn union(children: Vec<Self>) -> Self {
let mut out = Vec::new();
let mut saw_explicit_empty = false;
for child in children {
Self::append_flattened_child(&mut out, child, true);
}
out.retain(|child| {
let is_empty = child.is_explicit_empty();
if is_empty {
saw_explicit_empty = true;
}
!is_empty
});
if out.is_empty() && saw_explicit_empty {
return Self::by_keys(Vec::new());
}
Self::collapse_canonical_composite(out, true)
}
#[must_use]
pub(crate) fn intersection(children: Vec<Self>) -> Self {
let mut out = Vec::new();
for child in children {
Self::append_flattened_child(&mut out, child, false);
}
if let Some(empty_child) = out.iter().position(Self::is_explicit_empty) {
return out.remove(empty_child);
}
Self::collapse_canonical_composite(out, false)
}
#[must_use]
pub(crate) fn as_path(&self) -> Option<&AccessPath<K>> {
match self {
Self::Path(path) => Some(path.as_ref()),
Self::Union(_) | Self::Intersection(_) => None,
}
}
#[must_use]
pub(crate) const fn is_single_full_scan(&self) -> bool {
matches!(self, Self::Path(path) if path.is_full_scan())
}
#[must_use]
pub(crate) fn is_explicit_empty(&self) -> bool {
matches!(self, Self::Path(path) if matches!(path.as_ref(), AccessPath::ByKeys(keys) if keys.is_empty()))
}
#[must_use]
pub(crate) fn is_singleton_or_empty_primary_key_access(&self) -> bool {
let Some(path) = self.as_path() else {
return false;
};
path.as_by_key().is_some() || path.as_by_keys().is_some_and(|keys| keys.len() <= 1)
}
#[must_use]
pub(crate) fn as_index_prefix_path(&self) -> Option<(&IndexModel, &[Value])> {
self.as_path().and_then(|path| path.as_index_prefix())
}
#[must_use]
pub(crate) fn as_index_range_path(&self) -> Option<&SemanticIndexRangeSpec> {
self.as_path().and_then(|path| path.as_index_range())
}
#[must_use]
pub(crate) fn as_primary_key_range_path(&self) -> Option<(&K, &K)> {
self.as_path().and_then(|path| path.as_key_range())
}
#[must_use]
pub(crate) fn as_by_key_path(&self) -> Option<&K> {
self.as_path().and_then(|path| path.as_by_key())
}
#[must_use]
pub(crate) fn as_by_keys_path(&self) -> Option<&[K]> {
self.as_path().and_then(|path| path.as_by_keys())
}
#[must_use]
pub(crate) fn selected_index_model(&self) -> Option<&IndexModel> {
self.as_path().and_then(|path| path.selected_index_model())
}
#[must_use]
pub(in crate::db) fn executable_contract(&self) -> ExecutableAccessPlan<'_, K> {
ExecutableAccessPlan::from_access_plan(self)
}
pub(crate) fn map_keys<T, E, F>(self, mut map_key: F) -> Result<AccessPlan<T>, E>
where
F: FnMut(K) -> Result<T, E>,
{
self.map_keys_with(&mut map_key)
}
fn collapse_canonical_composite(mut children: Vec<Self>, is_union: bool) -> Self {
if children.is_empty() {
return Self::full_scan();
}
if children.len() == 1 {
return children.pop().expect("single composite child");
}
if is_union {
Self::Union(children)
} else {
Self::Intersection(children)
}
}
fn append_flattened_child(out: &mut Vec<Self>, child: Self, flatten_union: bool) {
match child {
Self::Union(children) if flatten_union => {
for child in children {
Self::append_flattened_child(out, child, flatten_union);
}
}
Self::Intersection(children) if !flatten_union => {
for child in children {
Self::append_flattened_child(out, child, flatten_union);
}
}
other => out.push(other),
}
}
fn map_keys_with<T, E, F>(self, map_key: &mut F) -> Result<AccessPlan<T>, E>
where
F: FnMut(K) -> Result<T, E>,
{
match self {
Self::Path(path) => Ok(AccessPlan::path(path.map_keys(map_key)?)),
Self::Union(children) => {
Ok(AccessPlan::union(Self::map_child_plans(children, map_key)?))
}
Self::Intersection(children) => Ok(AccessPlan::intersection(Self::map_child_plans(
children, map_key,
)?)),
}
}
fn map_child_plans<T, E, F>(
children: Vec<Self>,
map_key: &mut F,
) -> Result<Vec<AccessPlan<T>>, E>
where
F: FnMut(K) -> Result<T, E>,
{
let mut out = Vec::with_capacity(children.len());
for child in children {
out.push(child.map_keys_with(map_key)?);
}
Ok(out)
}
}
impl<K> AccessPlan<K>
where
K: KeyValueCodec,
{
#[must_use]
pub(crate) fn into_value_plan(self) -> AccessPlan<Value> {
self.map_keys(|key| Ok::<Value, core::convert::Infallible>(key.to_key_value()))
.expect("field value conversion is infallible")
}
}
impl<K> From<AccessPath<K>> for AccessPlan<K> {
fn from(value: AccessPath<K>) -> Self {
Self::path(value)
}
}
#[cfg(test)]
mod tests {
use crate::db::access::AccessPlan;
#[test]
fn union_constructor_flattens_and_collapses_single_child() {
let plan: AccessPlan<u64> =
AccessPlan::union(vec![AccessPlan::union(vec![AccessPlan::by_key(7)])]);
assert_eq!(plan, AccessPlan::by_key(7));
}
#[test]
fn intersection_constructor_flattens_and_collapses_single_child() {
let plan: AccessPlan<u64> =
AccessPlan::intersection(vec![AccessPlan::intersection(vec![AccessPlan::by_key(9)])]);
assert_eq!(plan, AccessPlan::by_key(9));
}
#[test]
fn union_constructor_empty_collapses_to_full_scan() {
let plan: AccessPlan<u64> = AccessPlan::union(Vec::new());
assert_eq!(plan, AccessPlan::full_scan());
}
#[test]
fn union_constructor_explicit_empty_is_identity_for_non_empty_children() {
let plan: AccessPlan<u64> =
AccessPlan::union(vec![AccessPlan::by_key(7), AccessPlan::by_keys(Vec::new())]);
assert_eq!(plan, AccessPlan::by_key(7));
}
#[test]
fn union_constructor_only_explicit_empty_children_stays_explicit_empty() {
let plan: AccessPlan<u64> = AccessPlan::union(vec![
AccessPlan::by_keys(Vec::new()),
AccessPlan::by_keys(Vec::new()),
]);
assert_eq!(plan, AccessPlan::by_keys(Vec::new()));
}
#[test]
fn intersection_constructor_empty_collapses_to_full_scan() {
let plan: AccessPlan<u64> = AccessPlan::intersection(Vec::new());
assert_eq!(plan, AccessPlan::full_scan());
}
#[test]
fn intersection_constructor_explicit_empty_annihilates_children() {
let plan: AccessPlan<u64> =
AccessPlan::intersection(vec![AccessPlan::by_key(9), AccessPlan::by_keys(Vec::new())]);
assert_eq!(plan, AccessPlan::by_keys(Vec::new()));
}
#[test]
fn intersection_constructor_nested_explicit_empty_annihilates_children() {
let plan: AccessPlan<u64> = AccessPlan::intersection(vec![AccessPlan::intersection(vec![
AccessPlan::by_key(9),
AccessPlan::by_keys(Vec::new()),
])]);
assert_eq!(plan, AccessPlan::by_keys(Vec::new()));
}
}