Skip to main content

icydb_core/db/query/intent/
query.rs

1use crate::{
2    db::{
3        predicate::{CoercionId, CompareOp, MissingRowPolicy, Predicate},
4        query::{
5            builder::aggregate::AggregateExpr,
6            explain::{
7                ExplainAccessPath, ExplainExecutionNodeDescriptor, ExplainExecutionNodeType,
8                ExplainOrderPushdown, ExplainPlan, ExplainPredicate,
9            },
10            expr::{FilterExpr, SortExpr},
11            intent::{QueryError, access_plan_to_entity_keys, model::QueryModel},
12            plan::{AccessPlannedQuery, LoadSpec, QueryMode},
13        },
14    },
15    traits::{EntityKind, EntityValue, SingletonEntity},
16    value::Value,
17};
18
19///
20/// Query
21///
22/// Typed, declarative query intent for a specific entity type.
23///
24/// This intent is:
25/// - schema-agnostic at construction
26/// - normalized and validated only during planning
27/// - free of access-path decisions
28///
29
30#[derive(Debug)]
31pub struct Query<E: EntityKind> {
32    intent: QueryModel<'static, E::Key>,
33}
34
35impl<E: EntityKind> Query<E> {
36    /// Create a new intent with an explicit missing-row policy.
37    /// Ignore favors idempotency and may mask index/data divergence on deletes.
38    /// Use Error to surface missing rows during scan/delete execution.
39    #[must_use]
40    pub const fn new(consistency: MissingRowPolicy) -> Self {
41        Self {
42            intent: QueryModel::new(E::MODEL, consistency),
43        }
44    }
45
46    /// Return the intent mode (load vs delete).
47    #[must_use]
48    pub const fn mode(&self) -> QueryMode {
49        self.intent.mode()
50    }
51
52    #[must_use]
53    pub(crate) fn has_explicit_order(&self) -> bool {
54        self.intent.has_explicit_order()
55    }
56
57    #[must_use]
58    pub(crate) const fn has_grouping(&self) -> bool {
59        self.intent.has_grouping()
60    }
61
62    #[must_use]
63    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
64        match self.intent.mode() {
65            QueryMode::Load(spec) => Some(spec),
66            QueryMode::Delete(_) => None,
67        }
68    }
69
70    /// Add a predicate, implicitly AND-ing with any existing predicate.
71    #[must_use]
72    pub fn filter(mut self, predicate: Predicate) -> Self {
73        self.intent = self.intent.filter(predicate);
74        self
75    }
76
77    /// Apply a dynamic filter expression.
78    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
79        let Self { intent } = self;
80        let intent = intent.filter_expr(expr)?;
81
82        Ok(Self { intent })
83    }
84
85    /// Apply a dynamic sort expression.
86    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
87        let Self { intent } = self;
88        let intent = intent.sort_expr(expr)?;
89
90        Ok(Self { intent })
91    }
92
93    /// Append an ascending sort key.
94    #[must_use]
95    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
96        self.intent = self.intent.order_by(field);
97        self
98    }
99
100    /// Append a descending sort key.
101    #[must_use]
102    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
103        self.intent = self.intent.order_by_desc(field);
104        self
105    }
106
107    /// Enable DISTINCT semantics for this query.
108    #[must_use]
109    pub fn distinct(mut self) -> Self {
110        self.intent = self.intent.distinct();
111        self
112    }
113
114    /// Add one GROUP BY field.
115    pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
116        let Self { intent } = self;
117        let intent = intent.push_group_field(field.as_ref())?;
118
119        Ok(Self { intent })
120    }
121
122    /// Add one aggregate terminal via composable aggregate expression.
123    #[must_use]
124    pub fn aggregate(mut self, aggregate: AggregateExpr) -> Self {
125        self.intent = self.intent.push_group_aggregate(aggregate);
126        self
127    }
128
129    /// Override grouped hard limits for grouped execution budget enforcement.
130    #[must_use]
131    pub fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
132        self.intent = self.intent.grouped_limits(max_groups, max_group_bytes);
133        self
134    }
135
136    /// Add one grouped HAVING compare clause over one grouped key field.
137    pub fn having_group(
138        self,
139        field: impl AsRef<str>,
140        op: CompareOp,
141        value: Value,
142    ) -> Result<Self, QueryError> {
143        let field = field.as_ref().to_owned();
144        let Self { intent } = self;
145        let intent = intent.push_having_group_clause(&field, op, value)?;
146
147        Ok(Self { intent })
148    }
149
150    /// Add one grouped HAVING compare clause over one grouped aggregate output.
151    pub fn having_aggregate(
152        self,
153        aggregate_index: usize,
154        op: CompareOp,
155        value: Value,
156    ) -> Result<Self, QueryError> {
157        let Self { intent } = self;
158        let intent = intent.push_having_aggregate_clause(aggregate_index, op, value)?;
159
160        Ok(Self { intent })
161    }
162
163    /// Set the access path to a single primary key lookup.
164    pub(crate) fn by_id(self, id: E::Key) -> Self {
165        let Self { intent } = self;
166        Self {
167            intent: intent.by_id(id),
168        }
169    }
170
171    /// Set the access path to a primary key batch lookup.
172    pub(crate) fn by_ids<I>(self, ids: I) -> Self
173    where
174        I: IntoIterator<Item = E::Key>,
175    {
176        let Self { intent } = self;
177        Self {
178            intent: intent.by_ids(ids),
179        }
180    }
181
182    /// Mark this intent as a delete query.
183    #[must_use]
184    pub fn delete(mut self) -> Self {
185        self.intent = self.intent.delete();
186        self
187    }
188
189    /// Apply a limit to the current mode.
190    ///
191    /// Load limits bound result size; delete limits bound mutation size.
192    /// For scalar load queries, any use of `limit` or `offset` requires an
193    /// explicit `order_by(...)` so pagination is deterministic.
194    /// GROUP BY queries use canonical grouped-key order by default.
195    #[must_use]
196    pub fn limit(mut self, limit: u32) -> Self {
197        self.intent = self.intent.limit(limit);
198        self
199    }
200
201    /// Apply an offset to a load intent.
202    ///
203    /// Scalar pagination requires an explicit `order_by(...)`.
204    /// GROUP BY queries use canonical grouped-key order by default.
205    /// Delete intents reject `offset(...)` during planning.
206    #[must_use]
207    pub fn offset(mut self, offset: u32) -> Self {
208        self.intent = self.intent.offset(offset);
209        self
210    }
211
212    /// Explain this intent without executing it.
213    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
214        let plan = self.planned()?;
215
216        Ok(plan.explain())
217    }
218
219    /// Return a stable plan hash for this intent.
220    ///
221    /// The hash is derived from the canonical explain projection and is suitable
222    /// for diagnostics, explain diffing, and cache key construction.
223    pub fn plan_hash_hex(&self) -> Result<String, QueryError> {
224        Ok(self.explain()?.fingerprint().to_string())
225    }
226
227    /// Explain executor-selected scalar load execution shape without running it.
228    pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
229    where
230        E: EntityValue,
231    {
232        let executable = self.plan()?.into_executable();
233
234        executable
235            .explain_load_execution_node_descriptor()
236            .map_err(QueryError::execute)
237    }
238
239    /// Explain executor-selected scalar load execution shape as deterministic text.
240    pub fn explain_execution_text(&self) -> Result<String, QueryError>
241    where
242        E: EntityValue,
243    {
244        Ok(self.explain_execution()?.render_text_tree())
245    }
246
247    /// Explain executor-selected scalar load execution shape as canonical JSON.
248    pub fn explain_execution_json(&self) -> Result<String, QueryError>
249    where
250        E: EntityValue,
251    {
252        Ok(self.explain_execution()?.render_json_canonical())
253    }
254
255    /// Explain executor-selected scalar load execution shape with route diagnostics.
256    pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
257    where
258        E: EntityValue,
259    {
260        let executable = self.plan()?.into_executable();
261        let descriptor = executable
262            .explain_load_execution_node_descriptor()
263            .map_err(QueryError::execute)?;
264        let route_diagnostics = executable
265            .explain_load_execution_verbose_diagnostics()
266            .map_err(QueryError::execute)?;
267        let explain = self.explain()?;
268
269        // Phase 1: render descriptor tree with node-local metadata.
270        let mut lines = vec![descriptor.render_text_tree_verbose()];
271        lines.extend(route_diagnostics);
272
273        // Phase 2: add descriptor-stage summaries for key execution operators.
274        lines.push(format!(
275            "diagnostic.descriptor.has_top_n_seek={}",
276            contains_execution_node_type(&descriptor, ExplainExecutionNodeType::TopNSeek)
277        ));
278        lines.push(format!(
279            "diagnostic.descriptor.has_index_range_limit_pushdown={}",
280            contains_execution_node_type(
281                &descriptor,
282                ExplainExecutionNodeType::IndexRangeLimitPushdown,
283            )
284        ));
285        lines.push(format!(
286            "diagnostic.descriptor.has_index_predicate_prefilter={}",
287            contains_execution_node_type(
288                &descriptor,
289                ExplainExecutionNodeType::IndexPredicatePrefilter,
290            )
291        ));
292        lines.push(format!(
293            "diagnostic.descriptor.has_residual_predicate_filter={}",
294            contains_execution_node_type(
295                &descriptor,
296                ExplainExecutionNodeType::ResidualPredicateFilter,
297            )
298        ));
299
300        // Phase 3: append logical-plan diagnostics relevant to verbose explain.
301        lines.push(format!("diagnostic.plan.mode={:?}", explain.mode()));
302        lines.push(format!(
303            "diagnostic.plan.order_pushdown={}",
304            plan_order_pushdown_label(explain.order_pushdown())
305        ));
306        lines.push(format!(
307            "diagnostic.plan.predicate_pushdown={}",
308            plan_predicate_pushdown_label(explain.predicate(), explain.access())
309        ));
310        lines.push(format!("diagnostic.plan.distinct={}", explain.distinct()));
311        lines.push(format!("diagnostic.plan.page={:?}", explain.page()));
312        lines.push(format!(
313            "diagnostic.plan.consistency={:?}",
314            explain.consistency()
315        ));
316
317        Ok(lines.join("\n"))
318    }
319
320    /// Plan this intent into a neutral planned query contract.
321    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
322        let plan = self.build_plan()?;
323        let _projection = plan.projection_spec(E::MODEL);
324
325        Ok(PlannedQuery::new(plan))
326    }
327
328    /// Compile this intent into query-owned handoff state.
329    ///
330    /// This boundary intentionally does not expose executor runtime shape.
331    pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
332        let plan = self.build_plan()?;
333        let _projection = plan.projection_spec(E::MODEL);
334
335        Ok(CompiledQuery::new(plan))
336    }
337
338    // Build a logical plan for the current intent.
339    fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
340        let plan_value = self.intent.build_plan_model()?;
341        let (logical, access) = plan_value.into_parts();
342        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
343        let plan = AccessPlannedQuery::from_parts(logical, access);
344
345        Ok(plan)
346    }
347}
348
349fn contains_execution_node_type(
350    descriptor: &ExplainExecutionNodeDescriptor,
351    target: ExplainExecutionNodeType,
352) -> bool {
353    descriptor.node_type() == target
354        || descriptor
355            .children()
356            .iter()
357            .any(|child| contains_execution_node_type(child, target))
358}
359
360fn plan_order_pushdown_label(order_pushdown: &ExplainOrderPushdown) -> String {
361    match order_pushdown {
362        ExplainOrderPushdown::MissingModelContext => "missing_model_context".to_string(),
363        ExplainOrderPushdown::EligibleSecondaryIndex { index, prefix_len } => {
364            format!("eligible(index={index},prefix_len={prefix_len})",)
365        }
366        ExplainOrderPushdown::Rejected(reason) => format!("rejected({reason:?})"),
367    }
368}
369
370fn plan_predicate_pushdown_label(
371    predicate: &ExplainPredicate,
372    access: &ExplainAccessPath,
373) -> String {
374    let access_label = match access {
375        ExplainAccessPath::ByKey { .. } => "by_key",
376        ExplainAccessPath::ByKeys { keys } if keys.is_empty() => "empty_access_contract",
377        ExplainAccessPath::ByKeys { .. } => "by_keys",
378        ExplainAccessPath::KeyRange { .. } => "key_range",
379        ExplainAccessPath::IndexPrefix { .. } => "index_prefix",
380        ExplainAccessPath::IndexMultiLookup { .. } => "index_multi_lookup",
381        ExplainAccessPath::IndexRange { .. } => "index_range",
382        ExplainAccessPath::FullScan => "full_scan",
383        ExplainAccessPath::Union(_) => "union",
384        ExplainAccessPath::Intersection(_) => "intersection",
385    };
386    if matches!(predicate, ExplainPredicate::None) {
387        return "none".to_string();
388    }
389    if matches!(access, ExplainAccessPath::FullScan) {
390        if explain_predicate_contains_non_strict_compare(predicate) {
391            return "fallback(non_strict_compare_coercion)".to_string();
392        }
393        if explain_predicate_contains_empty_prefix_starts_with(predicate) {
394            return "fallback(starts_with_empty_prefix)".to_string();
395        }
396        if explain_predicate_contains_is_null(predicate) {
397            return "fallback(is_null_full_scan)".to_string();
398        }
399        if explain_predicate_contains_text_scan_operator(predicate) {
400            return "fallback(text_operator_full_scan)".to_string();
401        }
402
403        return format!("fallback({access_label})");
404    }
405
406    format!("applied({access_label})")
407}
408
409fn explain_predicate_contains_non_strict_compare(predicate: &ExplainPredicate) -> bool {
410    match predicate {
411        ExplainPredicate::Compare { coercion, .. } => coercion.id != CoercionId::Strict,
412        ExplainPredicate::And(children) | ExplainPredicate::Or(children) => children
413            .iter()
414            .any(explain_predicate_contains_non_strict_compare),
415        ExplainPredicate::Not(inner) => explain_predicate_contains_non_strict_compare(inner),
416        ExplainPredicate::None
417        | ExplainPredicate::True
418        | ExplainPredicate::False
419        | ExplainPredicate::IsNull { .. }
420        | ExplainPredicate::IsMissing { .. }
421        | ExplainPredicate::IsEmpty { .. }
422        | ExplainPredicate::IsNotEmpty { .. }
423        | ExplainPredicate::TextContains { .. }
424        | ExplainPredicate::TextContainsCi { .. } => false,
425    }
426}
427
428fn explain_predicate_contains_is_null(predicate: &ExplainPredicate) -> bool {
429    match predicate {
430        ExplainPredicate::IsNull { .. } => true,
431        ExplainPredicate::And(children) | ExplainPredicate::Or(children) => {
432            children.iter().any(explain_predicate_contains_is_null)
433        }
434        ExplainPredicate::Not(inner) => explain_predicate_contains_is_null(inner),
435        ExplainPredicate::None
436        | ExplainPredicate::True
437        | ExplainPredicate::False
438        | ExplainPredicate::Compare { .. }
439        | ExplainPredicate::IsMissing { .. }
440        | ExplainPredicate::IsEmpty { .. }
441        | ExplainPredicate::IsNotEmpty { .. }
442        | ExplainPredicate::TextContains { .. }
443        | ExplainPredicate::TextContainsCi { .. } => false,
444    }
445}
446
447fn explain_predicate_contains_empty_prefix_starts_with(predicate: &ExplainPredicate) -> bool {
448    match predicate {
449        ExplainPredicate::Compare {
450            op: CompareOp::StartsWith,
451            value: Value::Text(prefix),
452            ..
453        } => prefix.is_empty(),
454        ExplainPredicate::And(children) | ExplainPredicate::Or(children) => children
455            .iter()
456            .any(explain_predicate_contains_empty_prefix_starts_with),
457        ExplainPredicate::Not(inner) => explain_predicate_contains_empty_prefix_starts_with(inner),
458        ExplainPredicate::None
459        | ExplainPredicate::True
460        | ExplainPredicate::False
461        | ExplainPredicate::Compare { .. }
462        | ExplainPredicate::IsNull { .. }
463        | ExplainPredicate::IsMissing { .. }
464        | ExplainPredicate::IsEmpty { .. }
465        | ExplainPredicate::IsNotEmpty { .. }
466        | ExplainPredicate::TextContains { .. }
467        | ExplainPredicate::TextContainsCi { .. } => false,
468    }
469}
470
471fn explain_predicate_contains_text_scan_operator(predicate: &ExplainPredicate) -> bool {
472    match predicate {
473        ExplainPredicate::Compare {
474            op: CompareOp::EndsWith,
475            ..
476        }
477        | ExplainPredicate::TextContains { .. }
478        | ExplainPredicate::TextContainsCi { .. } => true,
479        ExplainPredicate::And(children) | ExplainPredicate::Or(children) => children
480            .iter()
481            .any(explain_predicate_contains_text_scan_operator),
482        ExplainPredicate::Not(inner) => explain_predicate_contains_text_scan_operator(inner),
483        ExplainPredicate::Compare { .. }
484        | ExplainPredicate::None
485        | ExplainPredicate::True
486        | ExplainPredicate::False
487        | ExplainPredicate::IsNull { .. }
488        | ExplainPredicate::IsMissing { .. }
489        | ExplainPredicate::IsEmpty { .. }
490        | ExplainPredicate::IsNotEmpty { .. } => false,
491    }
492}
493
494impl<E> Query<E>
495where
496    E: EntityKind + SingletonEntity,
497    E::Key: Default,
498{
499    /// Set the access path to the singleton primary key.
500    pub(crate) fn only(self) -> Self {
501        let Self { intent } = self;
502
503        Self {
504            intent: intent.only(E::Key::default()),
505        }
506    }
507}
508
509///
510/// PlannedQuery
511///
512/// Neutral query-owned planned contract produced by query planning.
513/// Stores logical + access shape without executor compilation state.
514///
515
516#[derive(Debug)]
517pub struct PlannedQuery<E: EntityKind> {
518    plan: AccessPlannedQuery<E::Key>,
519}
520
521impl<E: EntityKind> PlannedQuery<E> {
522    #[must_use]
523    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
524        Self { plan }
525    }
526
527    #[must_use]
528    pub fn explain(&self) -> ExplainPlan {
529        self.plan.explain_with_model(E::MODEL)
530    }
531
532    /// Return the stable plan hash for this planned query.
533    #[must_use]
534    pub fn plan_hash_hex(&self) -> String {
535        self.explain().fingerprint().to_string()
536    }
537}
538
539///
540/// CompiledQuery
541///
542/// Query-owned compiled handoff produced by `Query::plan()`.
543/// This type intentionally carries only logical/access query semantics.
544/// Executor runtime shape is derived explicitly at the executor boundary.
545///
546
547#[derive(Clone, Debug)]
548pub struct CompiledQuery<E: EntityKind> {
549    plan: AccessPlannedQuery<E::Key>,
550}
551
552impl<E: EntityKind> CompiledQuery<E> {
553    #[must_use]
554    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
555        Self { plan }
556    }
557
558    #[must_use]
559    pub fn explain(&self) -> ExplainPlan {
560        self.plan.explain_with_model(E::MODEL)
561    }
562
563    /// Return the stable plan hash for this compiled query.
564    #[must_use]
565    pub fn plan_hash_hex(&self) -> String {
566        self.explain().fingerprint().to_string()
567    }
568
569    /// Borrow planner-lowered projection semantics for this compiled query.
570    #[must_use]
571    #[cfg(test)]
572    pub(crate) fn projection_spec(&self) -> crate::db::query::plan::expr::ProjectionSpec {
573        self.plan.projection_spec(E::MODEL)
574    }
575
576    #[must_use]
577    pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
578        self.plan
579    }
580}
581
582///
583/// TESTS
584///
585
586#[cfg(test)]
587mod tests {
588    use super::*;
589    use crate::{db::predicate::CoercionSpec, types::Ulid};
590
591    fn strict_compare(field: &str, op: CompareOp, value: Value) -> ExplainPredicate {
592        ExplainPredicate::Compare {
593            field: field.to_string(),
594            op,
595            value,
596            coercion: CoercionSpec::new(CoercionId::Strict),
597        }
598    }
599
600    #[test]
601    fn predicate_pushdown_label_prefix_like_and_equivalent_range_share_label() {
602        let starts_with_predicate = strict_compare(
603            "name",
604            CompareOp::StartsWith,
605            Value::Text("foo".to_string()),
606        );
607        let equivalent_range_predicate = ExplainPredicate::And(vec![
608            strict_compare("name", CompareOp::Gte, Value::Text("foo".to_string())),
609            strict_compare("name", CompareOp::Lt, Value::Text("fop".to_string())),
610        ]);
611        let access = ExplainAccessPath::IndexRange {
612            name: "idx_name",
613            fields: vec!["name"],
614            prefix_len: 0,
615            prefix: Vec::new(),
616            lower: std::ops::Bound::Included(Value::Text("foo".to_string())),
617            upper: std::ops::Bound::Excluded(Value::Text("fop".to_string())),
618        };
619
620        assert_eq!(
621            plan_predicate_pushdown_label(&starts_with_predicate, &access),
622            plan_predicate_pushdown_label(&equivalent_range_predicate, &access),
623            "equivalent prefix-like and bounded-range shapes should report identical pushdown reason labels",
624        );
625        assert_eq!(
626            plan_predicate_pushdown_label(&starts_with_predicate, &access),
627            "applied(index_range)"
628        );
629    }
630
631    #[test]
632    fn predicate_pushdown_label_distinguishes_is_null_and_non_strict_full_scan_fallbacks() {
633        let is_null_predicate = ExplainPredicate::IsNull {
634            field: "group".to_string(),
635        };
636        let non_strict_predicate = ExplainPredicate::Compare {
637            field: "group".to_string(),
638            op: CompareOp::Eq,
639            value: Value::Uint(7),
640            coercion: CoercionSpec::new(CoercionId::NumericWiden),
641        };
642        let access = ExplainAccessPath::FullScan;
643
644        assert_eq!(
645            plan_predicate_pushdown_label(&is_null_predicate, &access),
646            "fallback(is_null_full_scan)"
647        );
648        assert_eq!(
649            plan_predicate_pushdown_label(&non_strict_predicate, &access),
650            "fallback(non_strict_compare_coercion)"
651        );
652    }
653
654    #[test]
655    fn predicate_pushdown_label_reports_none_when_no_predicate_is_present() {
656        let predicate = ExplainPredicate::None;
657        let access = ExplainAccessPath::ByKey {
658            key: Value::Ulid(Ulid::from_u128(7)),
659        };
660
661        assert_eq!(plan_predicate_pushdown_label(&predicate, &access), "none");
662    }
663
664    #[test]
665    fn predicate_pushdown_label_reports_empty_access_contract_for_impossible_shapes() {
666        let predicate = ExplainPredicate::Or(vec![
667            ExplainPredicate::IsNull {
668                field: "id".to_string(),
669            },
670            ExplainPredicate::And(vec![
671                ExplainPredicate::Compare {
672                    field: "id".to_string(),
673                    op: CompareOp::In,
674                    value: Value::List(Vec::new()),
675                    coercion: CoercionSpec::new(CoercionId::Strict),
676                },
677                ExplainPredicate::True,
678            ]),
679        ]);
680        let access = ExplainAccessPath::ByKeys { keys: Vec::new() };
681
682        assert_eq!(
683            plan_predicate_pushdown_label(&predicate, &access),
684            "applied(empty_access_contract)"
685        );
686    }
687
688    #[test]
689    fn predicate_pushdown_label_distinguishes_empty_prefix_starts_with_full_scan_fallback() {
690        let empty_prefix_predicate = ExplainPredicate::Compare {
691            field: "label".to_string(),
692            op: CompareOp::StartsWith,
693            value: Value::Text(String::new()),
694            coercion: CoercionSpec::new(CoercionId::Strict),
695        };
696        let non_empty_prefix_predicate = ExplainPredicate::Compare {
697            field: "label".to_string(),
698            op: CompareOp::StartsWith,
699            value: Value::Text("l".to_string()),
700            coercion: CoercionSpec::new(CoercionId::Strict),
701        };
702        let access = ExplainAccessPath::FullScan;
703
704        assert_eq!(
705            plan_predicate_pushdown_label(&empty_prefix_predicate, &access),
706            "fallback(starts_with_empty_prefix)"
707        );
708        assert_eq!(
709            plan_predicate_pushdown_label(&non_empty_prefix_predicate, &access),
710            "fallback(full_scan)"
711        );
712    }
713
714    #[test]
715    fn predicate_pushdown_label_reports_text_operator_full_scan_fallback() {
716        let text_contains = ExplainPredicate::TextContainsCi {
717            field: "label".to_string(),
718            value: Value::Text("needle".to_string()),
719        };
720        let ends_with = ExplainPredicate::Compare {
721            field: "label".to_string(),
722            op: CompareOp::EndsWith,
723            value: Value::Text("fix".to_string()),
724            coercion: CoercionSpec::new(CoercionId::Strict),
725        };
726        let access = ExplainAccessPath::FullScan;
727
728        assert_eq!(
729            plan_predicate_pushdown_label(&text_contains, &access),
730            "fallback(text_operator_full_scan)"
731        );
732        assert_eq!(
733            plan_predicate_pushdown_label(&ends_with, &access),
734            "fallback(text_operator_full_scan)"
735        );
736    }
737
738    #[test]
739    fn predicate_pushdown_label_keeps_collection_contains_on_generic_full_scan_fallback() {
740        let collection_contains = ExplainPredicate::Compare {
741            field: "tags".to_string(),
742            op: CompareOp::Contains,
743            value: Value::Uint(7),
744            coercion: CoercionSpec::new(CoercionId::CollectionElement),
745        };
746        let access = ExplainAccessPath::FullScan;
747
748        assert_eq!(
749            plan_predicate_pushdown_label(&collection_contains, &access),
750            "fallback(non_strict_compare_coercion)"
751        );
752        assert_ne!(
753            plan_predicate_pushdown_label(&collection_contains, &access),
754            "fallback(text_operator_full_scan)"
755        );
756    }
757
758    #[test]
759    fn predicate_pushdown_label_non_strict_ends_with_uses_non_strict_fallback_precedence() {
760        let non_strict_ends_with = ExplainPredicate::Compare {
761            field: "label".to_string(),
762            op: CompareOp::EndsWith,
763            value: Value::Text("fix".to_string()),
764            coercion: CoercionSpec::new(CoercionId::TextCasefold),
765        };
766        let access = ExplainAccessPath::FullScan;
767
768        assert_eq!(
769            plan_predicate_pushdown_label(&non_strict_ends_with, &access),
770            "fallback(non_strict_compare_coercion)"
771        );
772        assert_ne!(
773            plan_predicate_pushdown_label(&non_strict_ends_with, &access),
774            "fallback(text_operator_full_scan)"
775        );
776    }
777}