use super::json_writers::timeseries_tags_json_value;
use super::*;
pub(crate) fn extract_equality_prefilter(filter: &Filter) -> Option<(String, Value)> {
use crate::storage::query::ast::{CompareOp, FieldRef};
match filter {
Filter::Compare { field, op, value } if *op == CompareOp::Eq => {
let col = match field {
FieldRef::TableColumn { column, .. } => column.clone(),
_ => return None,
};
if col.starts_with('_') {
return None;
}
Some((col, value.clone()))
}
Filter::And(left, right) => {
extract_equality_prefilter(left).or_else(|| extract_equality_prefilter(right))
}
_ => None,
}
}
pub(crate) fn extract_entity_id_from_filter(
filter: &Option<crate::storage::query::ast::Filter>,
) -> Option<u64> {
use crate::storage::query::ast::{CompareOp, FieldRef, Filter};
let filter = filter.as_ref()?;
match filter {
Filter::Compare { field, op, value } if *op == CompareOp::Eq => {
let field_name = match field {
FieldRef::TableColumn { column, .. } => column.as_str(),
_ => return None,
};
if field_name != "red_entity_id" && field_name != "entity_id" {
return None;
}
match value {
Value::Integer(n) => Some(*n as u64),
Value::UnsignedInteger(n) => Some(*n),
_ => None,
}
}
Filter::And(left, right) => extract_entity_id_from_filter(&Some(*left.clone()))
.or_else(|| extract_entity_id_from_filter(&Some(*right.clone()))),
_ => None,
}
}
pub(crate) fn extract_bloom_key_for_pk(
filter: &crate::storage::query::ast::Filter,
) -> Option<Vec<u8>> {
use crate::storage::query::ast::{CompareOp, FieldRef, Filter};
match filter {
Filter::Compare { field, op, value } if *op == CompareOp::Eq => {
let field_name = match field {
FieldRef::TableColumn { column, .. } => column.as_str(),
_ => return None,
};
if field_name != "red_entity_id" {
return None;
}
let key = match value {
Value::Text(s) => s.as_bytes().to_vec(),
Value::Integer(n) => n.to_le_bytes().to_vec(),
Value::UnsignedInteger(n) => n.to_le_bytes().to_vec(),
_ => return None,
};
Some(key)
}
Filter::And(left, right) => {
extract_bloom_key_for_pk(left).or_else(|| extract_bloom_key_for_pk(right))
}
_ => None,
}
}
pub(crate) fn extract_index_candidate(
filter: &crate::storage::query::ast::Filter,
) -> Option<(String, Vec<u8>)> {
use crate::storage::query::ast::{CompareOp, FieldRef, Filter};
match filter {
Filter::Compare { field, op, value } if *op == CompareOp::Eq => {
let column = match field {
FieldRef::TableColumn { column, .. } => column.clone(),
_ => return None,
};
let bytes = match value {
Value::Text(s) => s.as_bytes().to_vec(),
Value::Integer(n) => n.to_le_bytes().to_vec(),
Value::UnsignedInteger(n) => n.to_le_bytes().to_vec(),
_ => return None,
};
Some((column, bytes))
}
Filter::And(left, right) => {
extract_index_candidate(left).or_else(|| extract_index_candidate(right))
}
_ => None,
}
}
pub(crate) fn extract_all_eq_candidates(
filter: &crate::storage::query::ast::Filter,
out: &mut Vec<(String, Vec<u8>, crate::storage::schema::Value)>,
) {
use crate::storage::query::ast::{CompareOp, FieldRef, Filter};
match filter {
Filter::Compare {
field,
op: CompareOp::Eq,
value,
} => {
let col = match field {
FieldRef::TableColumn { column, .. } => column.clone(),
_ => return,
};
let bytes = match value {
crate::storage::schema::Value::Text(s) => s.as_bytes().to_vec(),
crate::storage::schema::Value::Integer(n) => n.to_le_bytes().to_vec(),
crate::storage::schema::Value::UnsignedInteger(n) => n.to_le_bytes().to_vec(),
_ => return,
};
out.push((col, bytes, value.clone()));
}
Filter::And(left, right) => {
extract_all_eq_candidates(left, out);
extract_all_eq_candidates(right, out);
}
_ => {}
}
}
pub(crate) fn extract_zone_predicates(
filter: &crate::storage::query::ast::Filter,
out: &mut Vec<(
String,
crate::storage::schema::Value,
crate::storage::unified::segment::ZoneColPredKind,
)>,
) {
use crate::storage::query::ast::{CompareOp, FieldRef, Filter};
use crate::storage::unified::segment::ZoneColPredKind;
match filter {
Filter::Compare { field, op, value } => {
let col = match field {
FieldRef::TableColumn { column, .. } => column.as_str(),
_ => return,
};
if col.starts_with('_') || col == "red_entity_id" || col == "entity_id" {
return;
}
let kind = match op {
CompareOp::Eq => ZoneColPredKind::Eq,
CompareOp::Gt => ZoneColPredKind::Gt,
CompareOp::Ge => ZoneColPredKind::Gte,
CompareOp::Lt => ZoneColPredKind::Lt,
CompareOp::Le => ZoneColPredKind::Lte,
_ => return,
};
out.push((col.to_string(), value.clone(), kind));
}
Filter::Between { field, low, high } => {
let col = match field {
FieldRef::TableColumn { column, .. } => column.as_str(),
_ => return,
};
if col.starts_with('_') || col == "red_entity_id" || col == "entity_id" {
return;
}
out.push((col.to_string(), low.clone(), ZoneColPredKind::Gte));
out.push((col.to_string(), high.clone(), ZoneColPredKind::Lte));
}
Filter::And(left, right) => {
extract_zone_predicates(left, out);
extract_zone_predicates(right, out);
}
_ => {}
}
}
pub(crate) fn extract_select_column_names(projections: &[Projection]) -> Vec<String> {
if projections.is_empty() || projections.iter().any(|p| matches!(p, Projection::All)) {
return Vec::new();
}
projections
.iter()
.filter_map(|p| match p {
Projection::Column(c) | Projection::Alias(c, _) if !c.starts_with("LIT:") => {
Some(c.clone())
}
Projection::Field(FieldRef::TableColumn { column: c, .. }, _) => Some(c.clone()),
_ => None,
})
.collect()
}
pub(crate) fn resolve_entity_field<'a>(
entity: &'a crate::storage::unified::entity::UnifiedEntity,
field: &FieldRef,
table_name: &str,
table_alias: &str,
) -> Option<std::borrow::Cow<'a, Value>> {
use std::borrow::Cow;
let (column, document_path) = match field {
FieldRef::TableColumn { table, column } => {
if !table.is_empty()
&& !runtime_table_context_matches(
table.as_str(),
Some(table_name),
Some(table_alias),
)
{
return resolve_entity_document_path(entity, &format!("{table}.{column}"))
.map(Cow::Owned);
}
let document_path = column.contains('.').then_some(column.as_str());
(column.as_str(), document_path)
}
_ => return None,
};
match column {
"red_entity_id" | "entity_id" => {
return Some(Cow::Owned(Value::UnsignedInteger(
entity.logical_id().raw(),
)));
}
"created_at" => {
return Some(Cow::Owned(Value::UnsignedInteger(entity.created_at)));
}
"updated_at" => {
return Some(Cow::Owned(Value::UnsignedInteger(entity.updated_at)));
}
"red_sequence_id" => {
return Some(Cow::Owned(Value::UnsignedInteger(entity.sequence_id)));
}
"red_collection" => {
return Some(Cow::Owned(Value::text(
entity.kind.collection().to_string(),
)));
}
"red_kind" => {
return Some(Cow::Owned(Value::text(
entity.kind.storage_type().to_string(),
)));
}
"row_id" => {
if let crate::storage::unified::entity::EntityKind::TableRow { row_id, .. } =
&entity.kind
{
return Some(Cow::Owned(Value::UnsignedInteger(*row_id)));
}
return None;
}
_ => {}
}
if let Some(row) = entity.data.as_row() {
if let Some(value) = row.get_field(column) {
return Some(Cow::Borrowed(value));
}
if let Some(index) = column
.strip_prefix('c')
.and_then(|index| index.parse::<usize>().ok())
{
if let Some(value) = row.columns.get(index) {
return Some(Cow::Borrowed(value));
}
}
}
if let EntityData::Node(ref node) = entity.data {
if let Some(value) = node.properties.get(column) {
return Some(Cow::Borrowed(value));
}
}
if let EntityData::Edge(ref edge) = entity.data {
if column == "weight" {
return Some(Cow::Owned(Value::Float(edge.weight as f64)));
}
if let Some(value) = edge.properties.get(column) {
return Some(Cow::Borrowed(value));
}
}
if let EntityData::TimeSeries(ref ts) = entity.data {
match column {
"metric" => return Some(Cow::Owned(Value::text(ts.metric.clone()))),
"timestamp_ns" => return Some(Cow::Owned(Value::UnsignedInteger(ts.timestamp_ns))),
"timestamp" | "time" => {
return Some(Cow::Owned(Value::UnsignedInteger(ts.timestamp_ns)));
}
"value" => return Some(Cow::Owned(Value::Float(ts.value))),
"tags" => {
return Some(Cow::Owned(timeseries_tags_json_value(&ts.tags)));
}
_ => {}
}
}
if let Some(path) = document_path {
if let Some(value) = resolve_entity_document_path(entity, path) {
return Some(Cow::Owned(value));
}
}
match &entity.kind {
EntityKind::GraphNode(ref gn) => match column {
"label" => return Some(Cow::Owned(Value::text(gn.label.to_string()))),
"node_type" => return Some(Cow::Owned(Value::text(gn.node_type.to_string()))),
_ => {}
},
EntityKind::GraphEdge(ref ge) => match column {
"label" => return Some(Cow::Owned(Value::text(ge.label.to_string()))),
"from_node" => return Some(Cow::Owned(Value::text(ge.from_node.to_string()))),
"to_node" => return Some(Cow::Owned(Value::text(ge.to_node.to_string()))),
_ => {}
},
_ => {}
}
None
}
pub(crate) fn resolve_entity_document_path(
entity: &crate::storage::unified::entity::UnifiedEntity,
path: &str,
) -> Option<Value> {
let segments = parse_runtime_document_path(path);
let (root, tail) = segments.split_first()?;
if let Some(row) = entity.data.as_row() {
if let Some(value) = row.get_field(root) {
if tail.is_empty() {
return Some(value.clone());
}
return resolve_runtime_document_path_from_value(value, tail);
}
}
if let EntityData::Node(ref node) = entity.data {
if let Some(value) = node.properties.get(root) {
if tail.is_empty() {
return Some(value.clone());
}
return resolve_runtime_document_path_from_value(value, tail);
}
}
if let EntityData::Edge(ref edge) = entity.data {
if let Some(value) = edge.properties.get(root) {
if tail.is_empty() {
return Some(value.clone());
}
return resolve_runtime_document_path_from_value(value, tail);
}
}
if let EntityData::TimeSeries(ref ts) = entity.data {
let root_value = match root.as_str() {
"tags" => Some(timeseries_tags_json_value(&ts.tags)),
_ => None,
}?;
if tail.is_empty() {
return Some(root_value);
}
return resolve_runtime_document_path_from_value(&root_value, tail);
}
None
}
pub(crate) fn try_hash_eq_lookup(
filter: &crate::storage::query::ast::Filter,
table: &str,
idx_store: &super::super::index_store::IndexStore,
) -> Option<Vec<crate::storage::unified::entity::EntityId>> {
use crate::storage::query::ast::{FieldRef, Filter};
if let Filter::In { field, values } = filter {
let col = match field {
FieldRef::TableColumn { column, .. } => column.as_str(),
_ => return None,
};
let idx = idx_store.find_index_for_column(table, col)?;
let lookup_name = idx.hash_lookup_name();
let mut ids = Vec::with_capacity(values.len());
for value in values {
let lookup = match value {
Value::Text(s) => idx_store.hash_lookup(table, lookup_name.as_ref(), s.as_bytes()),
Value::Integer(n) => {
let bytes = n.to_le_bytes();
idx_store.hash_lookup(table, lookup_name.as_ref(), &bytes)
}
Value::UnsignedInteger(n) => {
let bytes = n.to_le_bytes();
idx_store.hash_lookup(table, lookup_name.as_ref(), &bytes)
}
_ => return None,
}
.ok()?;
ids.extend(lookup);
}
return Some(ids);
}
let (col, val_bytes) = extract_index_candidate(filter)?;
let idx = idx_store.find_index_for_column(table, &col)?;
idx_store
.hash_lookup(table, idx.hash_lookup_name().as_ref(), &val_bytes)
.ok()
}
pub(crate) fn evaluate_entity_filter(
entity: &crate::storage::unified::entity::UnifiedEntity,
filter: &Filter,
table_name: &str,
table_alias: &str,
) -> bool {
evaluate_entity_filter_with_db(None, entity, filter, table_name, table_alias)
}
pub(crate) fn evaluate_entity_filter_with_db(
db: Option<&RedDB>,
entity: &crate::storage::unified::entity::UnifiedEntity,
filter: &Filter,
table_name: &str,
table_alias: &str,
) -> bool {
match filter {
Filter::Compare { field, op, value } => {
resolve_entity_field(entity, field, table_name, table_alias)
.as_ref()
.map(|candidate| compare_runtime_values(candidate.as_ref(), value, *op))
.unwrap_or(false)
}
Filter::CompareFields { left, op, right } => {
let left_val = resolve_entity_field(entity, left, table_name, table_alias);
let right_val = resolve_entity_field(entity, right, table_name, table_alias);
match (left_val, right_val) {
(Some(l), Some(r)) => compare_runtime_values(l.as_ref(), r.as_ref(), *op),
_ => false,
}
}
Filter::CompareExpr { .. } => {
let Some(db) = db else { return true };
let table_record = runtime_table_record_from_entity(entity.clone());
let record = match table_record {
Some(r) => Some(r),
None => super::super::record_search_helpers::any_record_from_entity(entity.clone()),
};
let Some(record) = record else {
return false;
};
super::super::join_filter::evaluate_runtime_filter_with_db(
Some(db),
&record,
filter,
Some(table_name),
Some(table_alias),
)
}
Filter::And(left, right) => {
evaluate_entity_filter_with_db(db, entity, left, table_name, table_alias)
&& evaluate_entity_filter_with_db(db, entity, right, table_name, table_alias)
}
Filter::Or(left, right) => {
evaluate_entity_filter_with_db(db, entity, left, table_name, table_alias)
|| evaluate_entity_filter_with_db(db, entity, right, table_name, table_alias)
}
Filter::Not(inner) => {
!evaluate_entity_filter_with_db(db, entity, inner, table_name, table_alias)
}
Filter::IsNull(field) => resolve_entity_field(entity, field, table_name, table_alias)
.map(|value| value.as_ref() == &Value::Null)
.unwrap_or(true),
Filter::IsNotNull(field) => resolve_entity_field(entity, field, table_name, table_alias)
.map(|value| value.as_ref() != &Value::Null)
.unwrap_or(false),
Filter::In { field, values } => {
resolve_entity_field(entity, field, table_name, table_alias)
.as_ref()
.is_some_and(|candidate| {
values.iter().any(|value| {
compare_runtime_values(candidate.as_ref(), value, CompareOp::Eq)
})
})
}
Filter::Between { field, low, high } => {
resolve_entity_field(entity, field, table_name, table_alias)
.as_ref()
.is_some_and(|candidate| {
compare_runtime_values(candidate.as_ref(), low, CompareOp::Ge)
&& compare_runtime_values(candidate.as_ref(), high, CompareOp::Le)
})
}
Filter::Like { field, pattern } => {
resolve_entity_field(entity, field, table_name, table_alias)
.as_ref()
.and_then(|v| runtime_value_text_cow(v.as_ref()))
.is_some_and(|value| like_matches(value.as_ref(), pattern))
}
Filter::StartsWith { field, prefix } => {
resolve_entity_field(entity, field, table_name, table_alias)
.as_ref()
.and_then(|v| runtime_value_text_cow(v.as_ref()))
.is_some_and(|value| value.starts_with(prefix.as_str()))
}
Filter::EndsWith { field, suffix } => {
resolve_entity_field(entity, field, table_name, table_alias)
.as_ref()
.and_then(|v| runtime_value_text_cow(v.as_ref()))
.is_some_and(|value| value.ends_with(suffix.as_str()))
}
Filter::Contains { field, substring } => {
resolve_entity_field(entity, field, table_name, table_alias)
.as_ref()
.and_then(|v| runtime_value_text_cow(v.as_ref()))
.is_some_and(|value| value.contains(substring.as_str()))
}
}
}