Skip to main content

icydb_core/db/query/
admission.rs

1//! Module: db::query::admission
2//! Responsibility: shared read-admission vocabulary for query surfaces.
3//! Does not own: physical planning, executor runtime, or SQL/fluent lowering.
4//! Boundary: describes policy, proven bounds, and stable rejection diagnostics.
5
6use std::fmt::Write as _;
7use std::num::{NonZeroU32, NonZeroU64};
8use std::ops::Bound;
9
10use crate::{
11    db::{
12        access::IndexBranchSetOrderedSuffix,
13        query::plan::{
14            AccessPlanProjection, AccessPlannedQuery, GroupPlan, QueryMode, ResidualFilterShape,
15            ScalarPlan, project_access_plan,
16        },
17    },
18    value::Value,
19};
20use icydb_diagnostic_code::{
21    Diagnostic, DiagnosticCode, DiagnosticDetail, ErrorCode, ErrorOrigin, QueryReadAdmissionCode,
22};
23
24/// Query execution lane selected by the public or internal caller surface.
25#[derive(Clone, Copy, Debug, Eq, PartialEq)]
26pub enum QueryAdmissionLane {
27    /// Caller-facing bounded read path for generated canister query endpoints.
28    PublicRead,
29    /// Trusted/admin ad-hoc read path with explicit budgets supplied by the embedder.
30    AdminAdHoc,
31    /// EXPLAIN-only path that describes planning and admission without row execution.
32    DiagnosticExplain,
33    /// Test-only lane for local harnesses that need to bypass production policy.
34    DevTest,
35}
36
37impl QueryAdmissionLane {
38    /// Return a stable lowercase diagnostic label for this lane.
39    #[must_use]
40    pub const fn as_str(self) -> &'static str {
41        match self {
42            Self::PublicRead => "public_read",
43            Self::AdminAdHoc => "admin_ad_hoc",
44            Self::DiagnosticExplain => "diagnostic_explain",
45            Self::DevTest => "dev_test",
46        }
47    }
48
49    /// Return whether this lane may execute and return data rows.
50    #[must_use]
51    pub const fn executes_rows(self) -> bool {
52        !matches!(self, Self::DiagnosticExplain)
53    }
54}
55
56/// Quality of the bound carried into read admission.
57#[derive(Clone, Copy, Debug, Eq, PartialEq)]
58pub enum QueryBoundKind {
59    /// Exact count known before execution.
60    Exact,
61    /// Conservative upper bound proven before execution.
62    ConservativeUpperBound,
63    /// Runtime cap enforced by the executor while producing rows.
64    EnforcedRuntimeCap,
65    /// Planner estimate only; not safe as public admission authority.
66    EstimateOnly,
67    /// No bound is available.
68    Unavailable,
69}
70
71impl QueryBoundKind {
72    /// Return a stable lowercase diagnostic label for this bound quality.
73    #[must_use]
74    pub const fn as_str(self) -> &'static str {
75        match self {
76            Self::Exact => "exact",
77            Self::ConservativeUpperBound => "conservative_upper_bound",
78            Self::EnforcedRuntimeCap => "enforced_runtime_cap",
79            Self::EstimateOnly => "estimate_only",
80            Self::Unavailable => "unavailable",
81        }
82    }
83
84    /// Return whether this bound kind is acceptable proof for public reads.
85    #[must_use]
86    pub const fn admits_public_read(self) -> bool {
87        matches!(
88            self,
89            Self::Exact | Self::ConservativeUpperBound | Self::EnforcedRuntimeCap
90        )
91    }
92}
93
94/// Final read-admission decision.
95#[derive(Clone, Copy, Debug, Eq, PartialEq)]
96pub enum QueryAdmissionDecision {
97    /// The selected plan is allowed under the lane policy.
98    Admitted,
99    /// The selected plan is rejected before execution.
100    Rejected,
101}
102
103impl QueryAdmissionDecision {
104    /// Return a stable lowercase diagnostic label for this decision.
105    #[must_use]
106    pub const fn as_str(self) -> &'static str {
107        match self {
108            Self::Admitted => "admitted",
109            Self::Rejected => "rejected",
110        }
111    }
112
113    /// Return whether the selected plan may execute.
114    #[must_use]
115    pub const fn is_admitted(self) -> bool {
116        matches!(self, Self::Admitted)
117    }
118}
119
120/// Coarse selected access-path class used by admission and EXPLAIN.
121#[derive(Clone, Copy, Debug, Eq, PartialEq)]
122pub enum QueryAdmissionAccessKind {
123    /// Access class has not been summarized yet.
124    Unknown,
125    /// Direct primary-key lookup.
126    ByKey,
127    /// Multiple direct primary-key lookups.
128    ByKeys,
129    /// Primary-key range access.
130    KeyRange,
131    /// Secondary-index prefix access.
132    IndexPrefix,
133    /// Secondary-index multi-lookup access.
134    IndexMultiLookup,
135    /// Secondary-index branch-set access.
136    IndexBranchSet,
137    /// Secondary-index range access.
138    IndexRange,
139    /// Full entity scan.
140    FullScan,
141    /// Union of multiple access paths.
142    Union,
143    /// Intersection of multiple access paths.
144    Intersection,
145}
146
147impl QueryAdmissionAccessKind {
148    /// Return a stable lowercase diagnostic label for this access class.
149    #[must_use]
150    pub const fn as_str(self) -> &'static str {
151        match self {
152            Self::Unknown => "unknown",
153            Self::ByKey => "by_key",
154            Self::ByKeys => "by_keys",
155            Self::KeyRange => "key_range",
156            Self::IndexPrefix => "index_prefix",
157            Self::IndexMultiLookup => "index_multi_lookup",
158            Self::IndexBranchSet => "index_branch_set",
159            Self::IndexRange => "index_range",
160            Self::FullScan => "full_scan",
161            Self::Union => "union",
162            Self::Intersection => "intersection",
163        }
164    }
165
166    /// Return whether this access class is backed by a secondary index.
167    #[must_use]
168    pub const fn is_secondary_index(self) -> bool {
169        matches!(
170            self,
171            Self::IndexPrefix | Self::IndexMultiLookup | Self::IndexBranchSet | Self::IndexRange
172        )
173    }
174
175    /// Return whether this access class is a full entity scan.
176    #[must_use]
177    pub const fn is_full_scan(self) -> bool {
178        matches!(self, Self::FullScan)
179    }
180}
181
182/// Coarse scalar/grouped statement shape used by read admission.
183#[derive(Clone, Copy, Debug, Eq, PartialEq)]
184pub enum QueryAdmissionPlanShape {
185    /// Scalar read shape, including projection-only and global-aggregate scalar plans.
186    ScalarRead,
187    /// Grouped aggregate read shape.
188    GroupedAggregate,
189    /// Delete shape surfaced only for diagnostics; public read lanes must not execute it.
190    Delete,
191}
192
193impl QueryAdmissionPlanShape {
194    /// Return a stable lowercase diagnostic label for this plan shape.
195    #[must_use]
196    pub const fn as_str(self) -> &'static str {
197        match self {
198            Self::ScalarRead => "scalar_read",
199            Self::GroupedAggregate => "grouped_aggregate",
200            Self::Delete => "delete",
201        }
202    }
203}
204
205/// Post-access residual filter shape relevant to admission.
206#[derive(Clone, Copy, Debug, Eq, PartialEq)]
207pub enum QueryAdmissionResidualFilter {
208    /// No residual runtime filter remains after access planning.
209    Absent,
210    /// A predicate-native residual filter remains.
211    Predicate,
212    /// An expression-backed residual filter remains.
213    Expression,
214    /// Both expression and predicate residual forms remain available.
215    ExpressionAndPredicate,
216}
217
218impl QueryAdmissionResidualFilter {
219    /// Return a stable lowercase diagnostic label for this residual shape.
220    #[must_use]
221    pub const fn as_str(self) -> &'static str {
222        match self {
223            Self::Absent => "none",
224            Self::Predicate => "predicate",
225            Self::Expression => "expression",
226            Self::ExpressionAndPredicate => "expression_and_predicate",
227        }
228    }
229
230    /// Return whether no residual runtime filter remains.
231    #[must_use]
232    pub const fn is_absent(self) -> bool {
233        matches!(self, Self::Absent)
234    }
235}
236
237/// ORDER BY facts relevant to read admission.
238#[derive(Clone, Copy, Debug, Eq, PartialEq)]
239pub enum QueryAdmissionOrdering {
240    /// No caller-visible ordering is requested.
241    None,
242    /// Ordering is requested but not yet resolved into executor slots.
243    Requested,
244    /// Ordering has a planner-resolved executor contract.
245    Resolved,
246}
247
248impl QueryAdmissionOrdering {
249    /// Return a stable lowercase diagnostic label for this ordering state.
250    #[must_use]
251    pub const fn as_str(self) -> &'static str {
252        match self {
253            Self::None => "none",
254            Self::Requested => "requested",
255            Self::Resolved => "resolved",
256        }
257    }
258}
259
260/// Grouped query facts relevant to read admission.
261#[derive(Clone, Copy, Debug, Eq, PartialEq)]
262pub struct QueryAdmissionGroupedSummary {
263    group_field_count: u32,
264    aggregate_count: u32,
265    distinct_aggregate_count: u32,
266    max_groups: u64,
267    max_group_bytes: u64,
268    having_filter: bool,
269}
270
271impl QueryAdmissionGroupedSummary {
272    /// Build one grouped admission summary from planner-owned grouped facts.
273    #[must_use]
274    pub const fn new(
275        group_field_count: u32,
276        aggregate_count: u32,
277        distinct_aggregate_count: u32,
278        max_groups: u64,
279        max_group_bytes: u64,
280        having_filter: bool,
281    ) -> Self {
282        Self {
283            group_field_count,
284            aggregate_count,
285            distinct_aggregate_count,
286            max_groups,
287            max_group_bytes,
288            having_filter,
289        }
290    }
291
292    /// Return the number of GROUP BY fields.
293    #[must_use]
294    pub const fn group_field_count(self) -> u32 {
295        self.group_field_count
296    }
297
298    /// Return the number of aggregate expressions.
299    #[must_use]
300    pub const fn aggregate_count(self) -> u32 {
301        self.aggregate_count
302    }
303
304    /// Return the number of aggregate expressions with DISTINCT state.
305    #[must_use]
306    pub const fn distinct_aggregate_count(self) -> u32 {
307        self.distinct_aggregate_count
308    }
309
310    /// Return the grouped execution maximum group count.
311    #[must_use]
312    pub const fn max_groups(self) -> u64 {
313        self.max_groups
314    }
315
316    /// Return the grouped execution maximum bytes per group accumulator.
317    #[must_use]
318    pub const fn max_group_bytes(self) -> u64 {
319        self.max_group_bytes
320    }
321
322    /// Return whether the grouped plan has a HAVING residual expression.
323    #[must_use]
324    pub const fn has_having_filter(self) -> bool {
325        self.having_filter
326    }
327}
328
329/// Grouped/aggregate read admission budgets.
330#[derive(Clone, Copy, Debug, Eq, PartialEq)]
331pub struct GroupedAdmissionPolicy {
332    groups: Option<NonZeroU32>,
333    group_bytes: Option<NonZeroU32>,
334    distinct_entries: Option<NonZeroU32>,
335}
336
337impl GroupedAdmissionPolicy {
338    /// Build a policy that rejects grouped reads unless a later slice enables them.
339    #[must_use]
340    pub const fn disabled() -> Self {
341        Self {
342            groups: None,
343            group_bytes: None,
344            distinct_entries: None,
345        }
346    }
347
348    /// Build a grouped policy with explicit group and memory budgets.
349    #[must_use]
350    pub const fn bounded(
351        max_groups: NonZeroU32,
352        max_group_bytes: NonZeroU32,
353        max_distinct_entries: Option<NonZeroU32>,
354    ) -> Self {
355        Self {
356            groups: Some(max_groups),
357            group_bytes: Some(max_group_bytes),
358            distinct_entries: max_distinct_entries,
359        }
360    }
361
362    /// Return the maximum allowed output groups.
363    #[must_use]
364    pub const fn max_groups(&self) -> Option<NonZeroU32> {
365        self.groups
366    }
367
368    /// Return the maximum allowed bytes per group accumulator.
369    #[must_use]
370    pub const fn max_group_bytes(&self) -> Option<NonZeroU32> {
371        self.group_bytes
372    }
373
374    /// Return the maximum allowed distinct entries for distinct-style aggregates.
375    #[must_use]
376    pub const fn max_distinct_entries(&self) -> Option<NonZeroU32> {
377        self.distinct_entries
378    }
379
380    /// Return whether grouped execution has the minimum hard budgets admission needs.
381    #[must_use]
382    pub const fn has_hard_limits(&self) -> bool {
383        self.groups.is_some() && self.group_bytes.is_some()
384    }
385
386    /// Project this admission policy into grouped execution caps.
387    #[must_use]
388    #[cfg(feature = "sql")]
389    pub(in crate::db) const fn execution_config(
390        &self,
391    ) -> Option<crate::db::query::plan::GroupedExecutionConfig> {
392        match (self.groups, self.group_bytes) {
393            (Some(groups), Some(group_bytes)) => Some(
394                crate::db::query::plan::GroupedExecutionConfig::with_hard_limits(
395                    groups.get() as u64,
396                    group_bytes.get() as u64,
397                ),
398            ),
399            _ => None,
400        }
401    }
402}
403
404#[derive(Clone, Copy, Debug, Eq, PartialEq)]
405enum LimitRequirement {
406    Required,
407    Optional,
408}
409
410#[derive(Clone, Copy, Debug, Eq, PartialEq)]
411enum IndexRequirement {
412    Required,
413    Optional,
414}
415
416#[derive(Clone, Copy, Debug, Eq, PartialEq)]
417enum FullScanPolicy {
418    Allow,
419    Reject,
420}
421
422#[derive(Clone, Copy, Debug, Eq, PartialEq)]
423enum MaterializedSortPolicy {
424    Allow,
425    Reject,
426}
427
428#[derive(Clone, Copy, Debug, Eq, PartialEq)]
429enum OffsetPolicy {
430    Allow,
431    RejectNonZero,
432}
433
434/// Read-admission policy attached to one query surface.
435#[derive(Clone, Debug, Eq, PartialEq)]
436pub struct QueryAdmissionPolicy {
437    lane: QueryAdmissionLane,
438    limit_requirement: LimitRequirement,
439    max_returned_rows: Option<NonZeroU32>,
440    max_scanned_rows: Option<NonZeroU64>,
441    max_response_bytes: Option<NonZeroU32>,
442    index_requirement: IndexRequirement,
443    offset_policy: OffsetPolicy,
444    full_scan_policy: FullScanPolicy,
445    materialized_sort_policy: MaterializedSortPolicy,
446    max_materialized_rows: Option<NonZeroU32>,
447    max_projection_columns: Option<NonZeroU32>,
448    grouped: GroupedAdmissionPolicy,
449}
450
451impl QueryAdmissionPolicy {
452    /// Build the safe default policy for caller-facing bounded read endpoints.
453    #[must_use]
454    pub const fn public_read(
455        max_returned_rows: NonZeroU32,
456        max_response_bytes: NonZeroU32,
457    ) -> Self {
458        Self {
459            lane: QueryAdmissionLane::PublicRead,
460            limit_requirement: LimitRequirement::Required,
461            max_returned_rows: Some(max_returned_rows),
462            max_scanned_rows: None,
463            max_response_bytes: Some(max_response_bytes),
464            index_requirement: IndexRequirement::Required,
465            offset_policy: OffsetPolicy::RejectNonZero,
466            full_scan_policy: FullScanPolicy::Reject,
467            materialized_sort_policy: MaterializedSortPolicy::Reject,
468            max_materialized_rows: None,
469            max_projection_columns: None,
470            grouped: GroupedAdmissionPolicy::disabled(),
471        }
472    }
473
474    /// Return this policy with explicit grouped execution budgets attached.
475    ///
476    /// Public read policies still reject grouped queries unless the selected
477    /// plan is executed with matching group-count and per-group byte caps.
478    #[must_use]
479    pub const fn with_grouped_policy(mut self, grouped: GroupedAdmissionPolicy) -> Self {
480        self.grouped = grouped;
481        self
482    }
483
484    /// Build a trusted ad-hoc policy with explicit execution budgets.
485    #[must_use]
486    pub const fn admin_ad_hoc(
487        max_returned_rows: NonZeroU32,
488        max_scanned_rows: NonZeroU64,
489        max_response_bytes: NonZeroU32,
490    ) -> Self {
491        Self {
492            lane: QueryAdmissionLane::AdminAdHoc,
493            limit_requirement: LimitRequirement::Optional,
494            max_returned_rows: Some(max_returned_rows),
495            max_scanned_rows: Some(max_scanned_rows),
496            max_response_bytes: Some(max_response_bytes),
497            index_requirement: IndexRequirement::Optional,
498            offset_policy: OffsetPolicy::Allow,
499            full_scan_policy: FullScanPolicy::Allow,
500            materialized_sort_policy: MaterializedSortPolicy::Allow,
501            max_materialized_rows: Some(max_returned_rows),
502            max_projection_columns: None,
503            grouped: GroupedAdmissionPolicy::disabled(),
504        }
505    }
506
507    /// Build an EXPLAIN-only policy that cannot execute rows.
508    #[must_use]
509    pub const fn diagnostic_explain() -> Self {
510        Self {
511            lane: QueryAdmissionLane::DiagnosticExplain,
512            limit_requirement: LimitRequirement::Optional,
513            max_returned_rows: None,
514            max_scanned_rows: None,
515            max_response_bytes: None,
516            index_requirement: IndexRequirement::Optional,
517            offset_policy: OffsetPolicy::Allow,
518            full_scan_policy: FullScanPolicy::Allow,
519            materialized_sort_policy: MaterializedSortPolicy::Allow,
520            max_materialized_rows: None,
521            max_projection_columns: None,
522            grouped: GroupedAdmissionPolicy::disabled(),
523        }
524    }
525
526    /// Build an unbounded test policy for local harnesses only.
527    #[must_use]
528    pub const fn dev_test_unbounded() -> Self {
529        Self {
530            lane: QueryAdmissionLane::DevTest,
531            limit_requirement: LimitRequirement::Optional,
532            max_returned_rows: None,
533            max_scanned_rows: None,
534            max_response_bytes: None,
535            index_requirement: IndexRequirement::Optional,
536            offset_policy: OffsetPolicy::Allow,
537            full_scan_policy: FullScanPolicy::Allow,
538            materialized_sort_policy: MaterializedSortPolicy::Allow,
539            max_materialized_rows: None,
540            max_projection_columns: None,
541            grouped: GroupedAdmissionPolicy::disabled(),
542        }
543    }
544
545    /// Return the lane this policy governs.
546    #[must_use]
547    pub const fn lane(&self) -> QueryAdmissionLane {
548        self.lane
549    }
550
551    /// Return whether the surface requires caller-visible LIMIT.
552    #[must_use]
553    pub const fn require_limit(&self) -> bool {
554        matches!(self.limit_requirement, LimitRequirement::Required)
555    }
556
557    /// Return the maximum rows that may be returned.
558    #[must_use]
559    pub const fn max_returned_rows(&self) -> Option<NonZeroU32> {
560        self.max_returned_rows
561    }
562
563    /// Return the maximum rows that may be scanned.
564    #[must_use]
565    pub const fn max_scanned_rows(&self) -> Option<NonZeroU64> {
566        self.max_scanned_rows
567    }
568
569    /// Return the maximum response bytes.
570    #[must_use]
571    pub const fn max_response_bytes(&self) -> Option<NonZeroU32> {
572        self.max_response_bytes
573    }
574
575    /// Return whether the selected plan must use an index-backed path.
576    #[must_use]
577    pub const fn require_index(&self) -> bool {
578        matches!(self.index_requirement, IndexRequirement::Required)
579    }
580
581    /// Return whether this surface rejects non-zero OFFSET execution.
582    #[must_use]
583    pub const fn reject_non_zero_offset(&self) -> bool {
584        matches!(self.offset_policy, OffsetPolicy::RejectNonZero)
585    }
586
587    /// Return whether a full entity scan may execute.
588    #[must_use]
589    pub const fn allow_full_scan(&self) -> bool {
590        matches!(self.full_scan_policy, FullScanPolicy::Allow)
591    }
592
593    /// Return whether this surface permits materialized ORDER BY execution.
594    #[must_use]
595    pub const fn allow_materialized_sort(&self) -> bool {
596        matches!(self.materialized_sort_policy, MaterializedSortPolicy::Allow)
597    }
598
599    /// Return the maximum rows that may be materialized for sort/projection work.
600    #[must_use]
601    pub const fn max_materialized_rows(&self) -> Option<NonZeroU32> {
602        self.max_materialized_rows
603    }
604
605    /// Return the maximum projected columns.
606    #[must_use]
607    pub const fn max_projection_columns(&self) -> Option<NonZeroU32> {
608        self.max_projection_columns
609    }
610
611    /// Return grouped/aggregate budgets.
612    #[must_use]
613    pub const fn grouped(&self) -> GroupedAdmissionPolicy {
614        self.grouped
615    }
616
617    /// Return whether public-read construction kept the mandatory finite caps.
618    #[must_use]
619    pub const fn public_caps_are_finite(&self) -> bool {
620        !matches!(self.lane, QueryAdmissionLane::PublicRead)
621            || (self.max_returned_rows.is_some() && self.max_response_bytes.is_some())
622    }
623
624    /// Apply this policy to one already-summarized plan.
625    #[must_use]
626    pub fn evaluate(&self, mut summary: QueryAdmissionSummary) -> QueryAdmissionSummary {
627        summary.lane = self.lane;
628
629        match self.rejection_for_summary(&summary) {
630            Some(rejection) => summary.reject(rejection),
631            None => summary.admit(),
632        }
633    }
634
635    fn rejection_for_summary(
636        &self,
637        summary: &QueryAdmissionSummary,
638    ) -> Option<QueryAdmissionRejection> {
639        if !self.lane.executes_rows() {
640            return Some(QueryAdmissionRejection::DiagnosticLaneDoesNotExecute);
641        }
642
643        if matches!(summary.plan_shape(), QueryAdmissionPlanShape::Delete) {
644            return Some(QueryAdmissionRejection::UnsupportedStatementForQueryLane);
645        }
646
647        if let Some(rejection) = self.grouped_rejection(summary) {
648            return Some(rejection);
649        }
650
651        if !self.allow_full_scan() && summary.selected_access().is_full_scan() {
652            return Some(QueryAdmissionRejection::UnboundedFullScanRejected);
653        }
654
655        if self.require_index()
656            && !access_satisfies_index_requirement(summary.selected_access(), summary.scan_bound())
657        {
658            return Some(QueryAdmissionRejection::PublicQueryRequiresIndex);
659        }
660
661        if self.require_limit() && summary.limit().is_none() && summary.grouped().is_none() {
662            return Some(QueryAdmissionRejection::PublicQueryRequiresLimit);
663        }
664
665        if self.reject_non_zero_offset() && summary.offset().unwrap_or_default() != 0 {
666            return Some(QueryAdmissionRejection::PublicQueryOffsetRejected);
667        }
668
669        if let Some(rejection) = self.returned_row_bound_rejection(summary) {
670            return Some(rejection);
671        }
672
673        if let Some(rejection) = self.scan_bound_rejection(summary) {
674            return Some(rejection);
675        }
676
677        self.materialization_rejection(summary)
678    }
679
680    fn grouped_rejection(
681        &self,
682        summary: &QueryAdmissionSummary,
683    ) -> Option<QueryAdmissionRejection> {
684        let grouped = summary.grouped()?;
685        let Some(max_groups) = self.grouped.max_groups() else {
686            return Some(QueryAdmissionRejection::GroupedQueryRequiresLimits);
687        };
688        let Some(max_group_bytes) = self.grouped.max_group_bytes() else {
689            return Some(QueryAdmissionRejection::GroupedQueryRequiresLimits);
690        };
691
692        if grouped.max_groups() == u64::MAX || grouped.max_group_bytes() == u64::MAX {
693            return Some(QueryAdmissionRejection::GroupedQueryRequiresLimits);
694        }
695
696        if grouped.max_groups() > u64::from(max_groups.get())
697            || grouped.max_group_bytes() > u64::from(max_group_bytes.get())
698        {
699            return Some(QueryAdmissionRejection::GroupedQueryExceedsBudget);
700        }
701
702        if grouped.distinct_aggregate_count() > 0 && self.grouped.max_distinct_entries().is_none() {
703            return Some(QueryAdmissionRejection::GroupedQueryRequiresLimits);
704        }
705
706        None
707    }
708
709    fn returned_row_bound_rejection(
710        &self,
711        summary: &QueryAdmissionSummary,
712    ) -> Option<QueryAdmissionRejection> {
713        let max_returned_rows = self.max_returned_rows?;
714
715        if matches!(
716            summary.returned_row_bound_kind(),
717            QueryBoundKind::EstimateOnly
718        ) {
719            return Some(QueryAdmissionRejection::EstimatedOnlyBoundRejected);
720        }
721
722        if !summary.returned_row_bound_kind().admits_public_read() {
723            return Some(QueryAdmissionRejection::ScanBoundUnavailable);
724        }
725
726        let Some(returned_row_bound) = summary.returned_row_bound() else {
727            return Some(QueryAdmissionRejection::ScanBoundUnavailable);
728        };
729
730        if returned_row_bound > max_returned_rows.get() {
731            return Some(QueryAdmissionRejection::ReturnedRowBoundExceedsPolicy);
732        }
733
734        None
735    }
736
737    fn scan_bound_rejection(
738        &self,
739        summary: &QueryAdmissionSummary,
740    ) -> Option<QueryAdmissionRejection> {
741        let max_scanned_rows = self.max_scanned_rows?;
742
743        if matches!(summary.scan_bound_kind(), QueryBoundKind::EstimateOnly) {
744            return Some(QueryAdmissionRejection::EstimatedOnlyBoundRejected);
745        }
746
747        if !summary.scan_bound_kind().admits_public_read() {
748            return Some(QueryAdmissionRejection::ScanBoundUnavailable);
749        }
750
751        let Some(scan_bound) = summary.scan_bound() else {
752            return Some(QueryAdmissionRejection::ScanBoundUnavailable);
753        };
754
755        if scan_bound > max_scanned_rows.get() {
756            return Some(QueryAdmissionRejection::ScanBoundExceedsPolicy);
757        }
758
759        None
760    }
761
762    fn materialization_rejection(
763        &self,
764        summary: &QueryAdmissionSummary,
765    ) -> Option<QueryAdmissionRejection> {
766        if !self.allow_materialized_sort() && summary.materialization().materialized_sort() {
767            return Some(QueryAdmissionRejection::SortRequiresMaterialization);
768        }
769
770        let max_materialized_rows = self.max_materialized_rows?;
771        let materialized_rows = summary.materialization().materialized_rows()?;
772
773        if materialized_rows > max_materialized_rows.get() {
774            Some(QueryAdmissionRejection::MaterializationExceedsBudget)
775        } else {
776            None
777        }
778    }
779}
780
781/// Materialization facts relevant to read admission.
782#[derive(Clone, Copy, Debug, Eq, PartialEq)]
783pub struct QueryMaterializationSummary {
784    materialized_sort: bool,
785    materialized_rows: Option<u32>,
786    row_bound_kind: QueryBoundKind,
787}
788
789impl QueryMaterializationSummary {
790    /// Build a summary for a plan that does not materialize rows for sorting.
791    #[must_use]
792    pub const fn none() -> Self {
793        Self {
794            materialized_sort: false,
795            materialized_rows: None,
796            row_bound_kind: QueryBoundKind::Unavailable,
797        }
798    }
799
800    /// Build a summary for a plan that materializes rows for sorting.
801    #[must_use]
802    pub const fn sort(materialized_rows: Option<u32>, row_bound_kind: QueryBoundKind) -> Self {
803        Self {
804            materialized_sort: true,
805            materialized_rows,
806            row_bound_kind,
807        }
808    }
809
810    /// Return whether the plan materializes rows for sorting.
811    #[must_use]
812    pub const fn materialized_sort(&self) -> bool {
813        self.materialized_sort
814    }
815
816    /// Return the row materialization bound, if known.
817    #[must_use]
818    pub const fn materialized_rows(&self) -> Option<u32> {
819        self.materialized_rows
820    }
821
822    /// Return the quality of the materialization row bound.
823    #[must_use]
824    pub const fn row_bound_kind(&self) -> QueryBoundKind {
825        self.row_bound_kind
826    }
827}
828
829/// Stable read-admission rejection reason.
830#[derive(Clone, Copy, Debug, Eq, PartialEq)]
831pub enum QueryAdmissionRejection {
832    /// Public reads require an explicit LIMIT.
833    PublicQueryRequiresLimit,
834    /// Public reads require a proven index-backed access path.
835    PublicQueryRequiresIndex,
836    /// The selected plan is an unbounded full scan.
837    UnboundedFullScanRejected,
838    /// No scan bound was available for a policy that requires one.
839    ScanBoundUnavailable,
840    /// The proven scan bound exceeds the policy.
841    ScanBoundExceedsPolicy,
842    /// Only an estimate was available for a policy that requires proof.
843    EstimatedOnlyBoundRejected,
844    /// ORDER BY requires materializing rows.
845    SortRequiresMaterialization,
846    /// Materialization exceeds the policy.
847    MaterializationExceedsBudget,
848    /// Projection bytes may exceed the response budget.
849    ProjectionResponseMayExceedLimit,
850    /// Grouped reads need explicit group and memory budgets.
851    GroupedQueryRequiresLimits,
852    /// Grouped read planning exceeds the policy.
853    GroupedQueryExceedsBudget,
854    /// Diagnostic lanes do not execute rows.
855    DiagnosticLaneDoesNotExecute,
856    /// Introspection is disabled for the selected lane.
857    IntrospectionDisabledForLane,
858    /// The statement shape is not supported by the selected lane.
859    UnsupportedStatementForQueryLane,
860    /// Public read endpoints do not permit non-zero OFFSET execution.
861    PublicQueryOffsetRejected,
862    /// The returned-row bound exceeds the selected policy.
863    ReturnedRowBoundExceedsPolicy,
864}
865
866impl QueryAdmissionRejection {
867    /// Return a stable lowercase diagnostic label for this rejection.
868    #[must_use]
869    pub const fn as_str(self) -> &'static str {
870        match self {
871            Self::PublicQueryRequiresLimit => "public_query_requires_limit",
872            Self::PublicQueryRequiresIndex => "public_query_requires_index",
873            Self::UnboundedFullScanRejected => "unbounded_full_scan_rejected",
874            Self::ScanBoundUnavailable => "scan_bound_unavailable",
875            Self::ScanBoundExceedsPolicy => "scan_bound_exceeds_policy",
876            Self::EstimatedOnlyBoundRejected => "estimated_only_bound_rejected",
877            Self::SortRequiresMaterialization => "sort_requires_materialization",
878            Self::MaterializationExceedsBudget => "materialization_exceeds_budget",
879            Self::ProjectionResponseMayExceedLimit => "projection_response_may_exceed_limit",
880            Self::GroupedQueryRequiresLimits => "grouped_query_requires_limits",
881            Self::GroupedQueryExceedsBudget => "grouped_query_exceeds_budget",
882            Self::DiagnosticLaneDoesNotExecute => "diagnostic_lane_does_not_execute",
883            Self::IntrospectionDisabledForLane => "introspection_disabled_for_lane",
884            Self::UnsupportedStatementForQueryLane => "unsupported_statement_for_query_lane",
885            Self::PublicQueryOffsetRejected => "public_query_offset_rejected",
886            Self::ReturnedRowBoundExceedsPolicy => "returned_row_bound_exceeds_policy",
887        }
888    }
889
890    /// Return the compact diagnostic detail code for this rejection.
891    #[must_use]
892    pub const fn code(self) -> QueryReadAdmissionCode {
893        match self {
894            Self::PublicQueryRequiresLimit => QueryReadAdmissionCode::PublicQueryRequiresLimit,
895            Self::PublicQueryRequiresIndex => QueryReadAdmissionCode::PublicQueryRequiresIndex,
896            Self::UnboundedFullScanRejected => QueryReadAdmissionCode::UnboundedFullScanRejected,
897            Self::ScanBoundUnavailable => QueryReadAdmissionCode::ScanBoundUnavailable,
898            Self::ScanBoundExceedsPolicy => QueryReadAdmissionCode::ScanBoundExceedsPolicy,
899            Self::EstimatedOnlyBoundRejected => QueryReadAdmissionCode::EstimatedOnlyBoundRejected,
900            Self::SortRequiresMaterialization => {
901                QueryReadAdmissionCode::SortRequiresMaterialization
902            }
903            Self::MaterializationExceedsBudget => {
904                QueryReadAdmissionCode::MaterializationExceedsBudget
905            }
906            Self::ProjectionResponseMayExceedLimit => {
907                QueryReadAdmissionCode::ProjectionResponseMayExceedLimit
908            }
909            Self::GroupedQueryRequiresLimits => QueryReadAdmissionCode::GroupedQueryRequiresLimits,
910            Self::GroupedQueryExceedsBudget => QueryReadAdmissionCode::GroupedQueryExceedsBudget,
911            Self::DiagnosticLaneDoesNotExecute => {
912                QueryReadAdmissionCode::DiagnosticLaneDoesNotExecute
913            }
914            Self::IntrospectionDisabledForLane => {
915                QueryReadAdmissionCode::IntrospectionDisabledForLane
916            }
917            Self::UnsupportedStatementForQueryLane => {
918                QueryReadAdmissionCode::UnsupportedStatementForQueryLane
919            }
920            Self::PublicQueryOffsetRejected => QueryReadAdmissionCode::PublicQueryOffsetRejected,
921            Self::ReturnedRowBoundExceedsPolicy => {
922                QueryReadAdmissionCode::ReturnedRowBoundExceedsPolicy
923            }
924        }
925    }
926
927    /// Return a compact diagnostic payload for this rejection.
928    #[must_use]
929    pub const fn diagnostic(self) -> Diagnostic {
930        Diagnostic::new(
931            DiagnosticCode::QueryReadAdmission,
932            ErrorOrigin::Query,
933            Some(DiagnosticDetail::QueryReadAdmission {
934                reason: self.code(),
935            }),
936        )
937    }
938
939    /// Return the public wire code for this rejection.
940    #[must_use]
941    pub const fn error_code(self) -> ErrorCode {
942        self.diagnostic().error_code()
943    }
944}
945
946/// Read-admission result and plan facts for diagnostics and EXPLAIN.
947#[derive(Clone, Debug, Eq, PartialEq)]
948pub struct QueryAdmissionSummary {
949    lane: QueryAdmissionLane,
950    decision: QueryAdmissionDecision,
951    plan_shape: QueryAdmissionPlanShape,
952    selected_access: QueryAdmissionAccessKind,
953    selected_index: Option<String>,
954    limit: Option<u32>,
955    offset: Option<u32>,
956    scan_bound: Option<u64>,
957    scan_bound_kind: QueryBoundKind,
958    returned_row_bound: Option<u32>,
959    returned_row_bound_kind: QueryBoundKind,
960    response_byte_bound: Option<u32>,
961    response_byte_bound_kind: QueryBoundKind,
962    residual_filter: QueryAdmissionResidualFilter,
963    ordering: QueryAdmissionOrdering,
964    grouped: Option<QueryAdmissionGroupedSummary>,
965    materialization: QueryMaterializationSummary,
966    rejection: Option<QueryAdmissionRejection>,
967}
968
969impl QueryAdmissionSummary {
970    /// Build an admitted summary with unknown bound details.
971    #[must_use]
972    pub const fn admitted(
973        lane: QueryAdmissionLane,
974        selected_access: QueryAdmissionAccessKind,
975    ) -> Self {
976        Self {
977            lane,
978            decision: QueryAdmissionDecision::Admitted,
979            plan_shape: QueryAdmissionPlanShape::ScalarRead,
980            selected_access,
981            selected_index: None,
982            limit: None,
983            offset: None,
984            scan_bound: None,
985            scan_bound_kind: QueryBoundKind::Unavailable,
986            returned_row_bound: None,
987            returned_row_bound_kind: QueryBoundKind::Unavailable,
988            response_byte_bound: None,
989            response_byte_bound_kind: QueryBoundKind::Unavailable,
990            residual_filter: QueryAdmissionResidualFilter::Absent,
991            ordering: QueryAdmissionOrdering::None,
992            grouped: None,
993            materialization: QueryMaterializationSummary::none(),
994            rejection: None,
995        }
996    }
997
998    /// Build a rejected summary with unknown bound details.
999    #[must_use]
1000    pub const fn rejected(
1001        lane: QueryAdmissionLane,
1002        selected_access: QueryAdmissionAccessKind,
1003        rejection: QueryAdmissionRejection,
1004    ) -> Self {
1005        Self {
1006            lane,
1007            decision: QueryAdmissionDecision::Rejected,
1008            plan_shape: QueryAdmissionPlanShape::ScalarRead,
1009            selected_access,
1010            selected_index: None,
1011            limit: None,
1012            offset: None,
1013            scan_bound: None,
1014            scan_bound_kind: QueryBoundKind::Unavailable,
1015            returned_row_bound: None,
1016            returned_row_bound_kind: QueryBoundKind::Unavailable,
1017            response_byte_bound: None,
1018            response_byte_bound_kind: QueryBoundKind::Unavailable,
1019            residual_filter: QueryAdmissionResidualFilter::Absent,
1020            ordering: QueryAdmissionOrdering::None,
1021            grouped: None,
1022            materialization: QueryMaterializationSummary::none(),
1023            rejection: Some(rejection),
1024        }
1025    }
1026
1027    /// Build one admitted summary from the already-selected access plan.
1028    #[must_use]
1029    pub(in crate::db) fn from_plan(lane: QueryAdmissionLane, plan: &AccessPlannedQuery) -> Self {
1030        let access = summarize_access_plan(plan);
1031        let grouped = plan.grouped_plan().map(summarize_grouped_plan);
1032        let (limit, offset) = scalar_limit_and_offset(plan.scalar_plan());
1033        let (returned_row_bound, returned_row_bound_kind) =
1034            returned_row_bound_from_plan(limit, grouped);
1035        let scan_bound_kind = access.scan_bound_kind();
1036        Self {
1037            lane,
1038            decision: QueryAdmissionDecision::Admitted,
1039            plan_shape: plan_shape(plan),
1040            selected_access: access.kind,
1041            selected_index: access.selected_index,
1042            limit,
1043            offset: Some(offset),
1044            scan_bound: access.exact_scan_bound,
1045            scan_bound_kind,
1046            returned_row_bound,
1047            returned_row_bound_kind,
1048            response_byte_bound: None,
1049            response_byte_bound_kind: QueryBoundKind::Unavailable,
1050            residual_filter: admission_residual_filter(plan.residual_filter_shape()),
1051            ordering: admission_ordering(plan),
1052            grouped,
1053            materialization: QueryMaterializationSummary::none(),
1054            rejection: None,
1055        }
1056    }
1057
1058    const fn admit(mut self) -> Self {
1059        self.decision = QueryAdmissionDecision::Admitted;
1060        self.rejection = None;
1061        self
1062    }
1063
1064    const fn reject(mut self, rejection: QueryAdmissionRejection) -> Self {
1065        self.decision = QueryAdmissionDecision::Rejected;
1066        self.rejection = Some(rejection);
1067        self
1068    }
1069
1070    /// Return the admission lane.
1071    #[must_use]
1072    pub const fn lane(&self) -> QueryAdmissionLane {
1073        self.lane
1074    }
1075
1076    /// Return the final decision.
1077    #[must_use]
1078    pub const fn decision(&self) -> QueryAdmissionDecision {
1079        self.decision
1080    }
1081
1082    /// Return the scalar/grouped statement shape.
1083    #[must_use]
1084    pub const fn plan_shape(&self) -> QueryAdmissionPlanShape {
1085        self.plan_shape
1086    }
1087
1088    /// Return the selected access class.
1089    #[must_use]
1090    pub const fn selected_access(&self) -> QueryAdmissionAccessKind {
1091        self.selected_access
1092    }
1093
1094    /// Return the selected index name, if one exists.
1095    #[must_use]
1096    pub fn selected_index(&self) -> Option<&str> {
1097        self.selected_index.as_deref()
1098    }
1099
1100    /// Return the caller-visible LIMIT, if present.
1101    #[must_use]
1102    pub const fn limit(&self) -> Option<u32> {
1103        self.limit
1104    }
1105
1106    /// Return the caller-visible OFFSET, if present.
1107    #[must_use]
1108    pub const fn offset(&self) -> Option<u32> {
1109        self.offset
1110    }
1111
1112    /// Return the scan bound, if known.
1113    #[must_use]
1114    pub const fn scan_bound(&self) -> Option<u64> {
1115        self.scan_bound
1116    }
1117
1118    /// Return the quality of the scan bound.
1119    #[must_use]
1120    pub const fn scan_bound_kind(&self) -> QueryBoundKind {
1121        self.scan_bound_kind
1122    }
1123
1124    /// Return the returned-row bound, if known.
1125    #[must_use]
1126    pub const fn returned_row_bound(&self) -> Option<u32> {
1127        self.returned_row_bound
1128    }
1129
1130    /// Return the quality of the returned-row bound.
1131    #[must_use]
1132    pub const fn returned_row_bound_kind(&self) -> QueryBoundKind {
1133        self.returned_row_bound_kind
1134    }
1135
1136    /// Return the response-byte bound, if known.
1137    #[must_use]
1138    pub const fn response_byte_bound(&self) -> Option<u32> {
1139        self.response_byte_bound
1140    }
1141
1142    /// Return the quality of the response-byte bound.
1143    #[must_use]
1144    pub const fn response_byte_bound_kind(&self) -> QueryBoundKind {
1145        self.response_byte_bound_kind
1146    }
1147
1148    /// Return post-access residual filter facts.
1149    #[must_use]
1150    pub const fn residual_filter(&self) -> QueryAdmissionResidualFilter {
1151        self.residual_filter
1152    }
1153
1154    /// Return ORDER BY facts.
1155    #[must_use]
1156    pub const fn ordering(&self) -> QueryAdmissionOrdering {
1157        self.ordering
1158    }
1159
1160    /// Return grouped query facts, if this is a grouped plan.
1161    #[must_use]
1162    pub const fn grouped(&self) -> Option<QueryAdmissionGroupedSummary> {
1163        self.grouped
1164    }
1165
1166    /// Return materialization facts.
1167    #[must_use]
1168    pub const fn materialization(&self) -> QueryMaterializationSummary {
1169        self.materialization
1170    }
1171
1172    /// Return a copy of this summary with route-derived materialization facts attached.
1173    #[must_use]
1174    #[cfg_attr(not(feature = "sql"), allow(dead_code))]
1175    pub(in crate::db) const fn with_materialization(
1176        mut self,
1177        materialization: QueryMaterializationSummary,
1178    ) -> Self {
1179        self.materialization = materialization;
1180        self
1181    }
1182
1183    /// Return the rejection reason, when the decision is rejected.
1184    #[must_use]
1185    pub const fn rejection(&self) -> Option<QueryAdmissionRejection> {
1186        self.rejection
1187    }
1188
1189    /// Render this summary as a stable top-level verbose EXPLAIN block.
1190    #[must_use]
1191    pub(in crate::db) fn render_text_block(&self) -> String {
1192        let mut out = String::from("admission:");
1193        push_text_field(&mut out, "lane", self.lane().as_str());
1194        push_text_field(&mut out, "decision", self.decision().as_str());
1195        push_text_field(
1196            &mut out,
1197            "reason",
1198            self.rejection()
1199                .map_or("none", QueryAdmissionRejection::as_str),
1200        );
1201        push_text_field(&mut out, "plan_shape", self.plan_shape().as_str());
1202        push_text_field(&mut out, "selected_access", self.selected_access().as_str());
1203        push_text_field(
1204            &mut out,
1205            "selected_index",
1206            self.selected_index().unwrap_or("none"),
1207        );
1208        push_text_option_u32(&mut out, "limit", self.limit());
1209        push_text_option_u32(&mut out, "offset", self.offset());
1210        push_text_option_u64(&mut out, "scan_bound", self.scan_bound());
1211        push_text_field(&mut out, "scan_bound_kind", self.scan_bound_kind().as_str());
1212        push_text_option_u32(&mut out, "returned_row_bound", self.returned_row_bound());
1213        push_text_field(
1214            &mut out,
1215            "returned_row_bound_kind",
1216            self.returned_row_bound_kind().as_str(),
1217        );
1218        push_text_option_u32(&mut out, "response_byte_bound", self.response_byte_bound());
1219        push_text_field(
1220            &mut out,
1221            "response_byte_bound_kind",
1222            self.response_byte_bound_kind().as_str(),
1223        );
1224        push_text_field(&mut out, "residual_filter", self.residual_filter().as_str());
1225        push_text_field(&mut out, "ordering", self.ordering().as_str());
1226        push_text_bool(
1227            &mut out,
1228            "materialized_sort",
1229            self.materialization().materialized_sort(),
1230        );
1231        push_text_option_u32(
1232            &mut out,
1233            "materialized_rows",
1234            self.materialization().materialized_rows(),
1235        );
1236        push_text_field(
1237            &mut out,
1238            "materialized_row_bound_kind",
1239            self.materialization().row_bound_kind().as_str(),
1240        );
1241
1242        if let Some(grouped) = self.grouped() {
1243            push_text_bool(&mut out, "grouped", true);
1244            push_text_u64(
1245                &mut out,
1246                "group_field_count",
1247                u64::from(grouped.group_field_count()),
1248            );
1249            push_text_u64(
1250                &mut out,
1251                "aggregate_count",
1252                u64::from(grouped.aggregate_count()),
1253            );
1254            push_text_u64(
1255                &mut out,
1256                "distinct_aggregate_count",
1257                u64::from(grouped.distinct_aggregate_count()),
1258            );
1259            push_text_u64(&mut out, "max_groups", grouped.max_groups());
1260            push_text_u64(&mut out, "max_group_bytes", grouped.max_group_bytes());
1261            push_text_bool(&mut out, "having_filter", grouped.has_having_filter());
1262        } else {
1263            push_text_bool(&mut out, "grouped", false);
1264        }
1265
1266        out
1267    }
1268}
1269
1270fn push_text_field(out: &mut String, key: &str, value: &str) {
1271    out.push('\n');
1272    out.push_str("  ");
1273    out.push_str(key);
1274    out.push('=');
1275    out.push_str(value);
1276}
1277
1278fn push_text_bool(out: &mut String, key: &str, value: bool) {
1279    push_text_field(out, key, if value { "true" } else { "false" });
1280}
1281
1282fn push_text_u64(out: &mut String, key: &str, value: u64) {
1283    out.push('\n');
1284    out.push_str("  ");
1285    out.push_str(key);
1286    out.push('=');
1287    let _ = write!(out, "{value}");
1288}
1289
1290fn push_text_option_u32(out: &mut String, key: &str, value: Option<u32>) {
1291    match value {
1292        Some(value) => push_text_u64(out, key, u64::from(value)),
1293        None => push_text_field(out, key, "none"),
1294    }
1295}
1296
1297fn push_text_option_u64(out: &mut String, key: &str, value: Option<u64>) {
1298    match value {
1299        Some(value) => push_text_u64(out, key, value),
1300        None => push_text_field(out, key, "none"),
1301    }
1302}
1303
1304// Keep the staged extractor live before admission enforcement calls it directly.
1305const _: fn(QueryAdmissionLane, &AccessPlannedQuery) -> QueryAdmissionSummary =
1306    QueryAdmissionSummary::from_plan;
1307
1308const fn access_satisfies_index_requirement(
1309    kind: QueryAdmissionAccessKind,
1310    scan_bound: Option<u64>,
1311) -> bool {
1312    kind.is_secondary_index()
1313        || matches!(
1314            (kind, scan_bound),
1315            (
1316                QueryAdmissionAccessKind::ByKey | QueryAdmissionAccessKind::ByKeys,
1317                Some(_)
1318            )
1319        )
1320}
1321
1322struct AdmissionAccessProjection;
1323
1324#[derive(Clone, Debug, Eq, PartialEq)]
1325struct AdmissionAccessSummary {
1326    kind: QueryAdmissionAccessKind,
1327    selected_index: Option<String>,
1328    exact_scan_bound: Option<u64>,
1329}
1330
1331impl AdmissionAccessSummary {
1332    const fn non_index(kind: QueryAdmissionAccessKind, exact_scan_bound: Option<u64>) -> Self {
1333        Self {
1334            kind,
1335            selected_index: None,
1336            exact_scan_bound,
1337        }
1338    }
1339
1340    fn secondary_index(kind: QueryAdmissionAccessKind, index_name: &str) -> Self {
1341        Self {
1342            kind,
1343            selected_index: Some(index_name.to_string()),
1344            exact_scan_bound: None,
1345        }
1346    }
1347
1348    const fn composite(kind: QueryAdmissionAccessKind) -> Self {
1349        Self {
1350            kind,
1351            selected_index: None,
1352            exact_scan_bound: None,
1353        }
1354    }
1355
1356    const fn scan_bound_kind(&self) -> QueryBoundKind {
1357        if self.exact_scan_bound.is_some() {
1358            QueryBoundKind::Exact
1359        } else {
1360            QueryBoundKind::Unavailable
1361        }
1362    }
1363}
1364
1365impl AccessPlanProjection<Value> for AdmissionAccessProjection {
1366    type Output = AdmissionAccessSummary;
1367
1368    fn by_key(&mut self, _key: &Value) -> Self::Output {
1369        AdmissionAccessSummary::non_index(QueryAdmissionAccessKind::ByKey, Some(1))
1370    }
1371
1372    fn by_keys(&mut self, keys: &[Value]) -> Self::Output {
1373        AdmissionAccessSummary::non_index(
1374            QueryAdmissionAccessKind::ByKeys,
1375            Some(u64::try_from(keys.len()).unwrap_or(u64::MAX)),
1376        )
1377    }
1378
1379    fn key_range(&mut self, _start: &Value, _end: &Value) -> Self::Output {
1380        AdmissionAccessSummary::non_index(QueryAdmissionAccessKind::KeyRange, None)
1381    }
1382
1383    fn index_prefix(
1384        &mut self,
1385        index_name: &str,
1386        _index_fields: &[String],
1387        _prefix_len: usize,
1388        _values: &[Value],
1389    ) -> Self::Output {
1390        AdmissionAccessSummary::secondary_index(QueryAdmissionAccessKind::IndexPrefix, index_name)
1391    }
1392
1393    fn index_multi_lookup(
1394        &mut self,
1395        index_name: &str,
1396        _index_fields: &[String],
1397        _values: &[Value],
1398    ) -> Self::Output {
1399        AdmissionAccessSummary::secondary_index(
1400            QueryAdmissionAccessKind::IndexMultiLookup,
1401            index_name,
1402        )
1403    }
1404
1405    fn index_branch_set(
1406        &mut self,
1407        index_name: &str,
1408        _index_fields: &[String],
1409        _fixed_values: &[Value],
1410        _branch_values: &[Value],
1411        _ordered_suffix: IndexBranchSetOrderedSuffix,
1412    ) -> Self::Output {
1413        AdmissionAccessSummary::secondary_index(
1414            QueryAdmissionAccessKind::IndexBranchSet,
1415            index_name,
1416        )
1417    }
1418
1419    fn index_range(
1420        &mut self,
1421        index_name: &str,
1422        _index_fields: &[String],
1423        _prefix_len: usize,
1424        _prefix: &[Value],
1425        _lower: &Bound<Value>,
1426        _upper: &Bound<Value>,
1427    ) -> Self::Output {
1428        AdmissionAccessSummary::secondary_index(QueryAdmissionAccessKind::IndexRange, index_name)
1429    }
1430
1431    fn full_scan(&mut self) -> Self::Output {
1432        AdmissionAccessSummary::non_index(QueryAdmissionAccessKind::FullScan, None)
1433    }
1434
1435    fn union(&mut self, _children: Vec<Self::Output>) -> Self::Output {
1436        AdmissionAccessSummary::composite(QueryAdmissionAccessKind::Union)
1437    }
1438
1439    fn intersection(&mut self, _children: Vec<Self::Output>) -> Self::Output {
1440        AdmissionAccessSummary::composite(QueryAdmissionAccessKind::Intersection)
1441    }
1442}
1443
1444fn summarize_access_plan(plan: &AccessPlannedQuery) -> AdmissionAccessSummary {
1445    project_access_plan(&plan.access, &mut AdmissionAccessProjection)
1446}
1447
1448fn summarize_grouped_plan(plan: &GroupPlan) -> QueryAdmissionGroupedSummary {
1449    QueryAdmissionGroupedSummary::new(
1450        u32::try_from(plan.group.group_fields.len()).unwrap_or(u32::MAX),
1451        u32::try_from(plan.group.aggregates.len()).unwrap_or(u32::MAX),
1452        u32::try_from(
1453            plan.group
1454                .aggregates
1455                .iter()
1456                .filter(|aggregate| aggregate.distinct)
1457                .count(),
1458        )
1459        .unwrap_or(u32::MAX),
1460        plan.group.execution.max_groups(),
1461        plan.group.execution.max_group_bytes(),
1462        plan.having_expr.is_some(),
1463    )
1464}
1465
1466const fn scalar_limit_and_offset(plan: &ScalarPlan) -> (Option<u32>, u32) {
1467    match plan.mode {
1468        QueryMode::Load(load) => match &plan.page {
1469            Some(page) => (page.limit, page.offset),
1470            None => (load.limit(), load.offset()),
1471        },
1472        QueryMode::Delete(delete) => match plan.delete_limit {
1473            Some(delete_limit) => (delete_limit.limit, delete_limit.offset),
1474            None => (delete.limit(), delete.offset()),
1475        },
1476    }
1477}
1478
1479fn returned_row_bound_from_plan(
1480    limit: Option<u32>,
1481    grouped: Option<QueryAdmissionGroupedSummary>,
1482) -> (Option<u32>, QueryBoundKind) {
1483    if let Some(limit) = limit {
1484        return (Some(limit), QueryBoundKind::EnforcedRuntimeCap);
1485    }
1486
1487    let Some(grouped) = grouped else {
1488        return (None, QueryBoundKind::Unavailable);
1489    };
1490    if grouped.max_groups() == u64::MAX {
1491        return (None, QueryBoundKind::Unavailable);
1492    }
1493
1494    (
1495        Some(u32::try_from(grouped.max_groups()).unwrap_or(u32::MAX)),
1496        QueryBoundKind::ConservativeUpperBound,
1497    )
1498}
1499
1500const fn admission_residual_filter(shape: ResidualFilterShape) -> QueryAdmissionResidualFilter {
1501    match shape {
1502        ResidualFilterShape::Absent => QueryAdmissionResidualFilter::Absent,
1503        ResidualFilterShape::Predicate => QueryAdmissionResidualFilter::Predicate,
1504        ResidualFilterShape::Expression => QueryAdmissionResidualFilter::Expression,
1505        ResidualFilterShape::ExpressionAndPredicate => {
1506            QueryAdmissionResidualFilter::ExpressionAndPredicate
1507        }
1508    }
1509}
1510
1511fn admission_ordering(plan: &AccessPlannedQuery) -> QueryAdmissionOrdering {
1512    if plan.scalar_plan().order.is_none() {
1513        return QueryAdmissionOrdering::None;
1514    }
1515
1516    if plan.resolved_order().is_some() {
1517        QueryAdmissionOrdering::Resolved
1518    } else {
1519        QueryAdmissionOrdering::Requested
1520    }
1521}
1522
1523const fn plan_shape(plan: &AccessPlannedQuery) -> QueryAdmissionPlanShape {
1524    if plan.grouped_plan().is_some() {
1525        return QueryAdmissionPlanShape::GroupedAggregate;
1526    }
1527
1528    match plan.scalar_plan().mode {
1529        QueryMode::Load(_) => QueryAdmissionPlanShape::ScalarRead,
1530        QueryMode::Delete(_) => QueryAdmissionPlanShape::Delete,
1531    }
1532}
1533
1534#[cfg(test)]
1535mod tests {
1536    use std::num::{NonZeroU32, NonZeroU64};
1537
1538    use crate::{
1539        db::{
1540            access::{AccessPath, SemanticIndexAccessContract},
1541            predicate::{MissingRowPolicy, Predicate},
1542            query::plan::{
1543                AccessPlannedQuery, AggregateKind, DeleteLimitSpec, DeleteSpec, FieldSlot,
1544                GroupAggregateSpec, GroupSpec, GroupedExecutionConfig, OrderDirection, OrderSpec,
1545                OrderTerm, PageSpec, QueryMode,
1546                expr::{Expr, FieldId},
1547            },
1548        },
1549        model::index::IndexModel,
1550        value::Value,
1551    };
1552
1553    use super::{
1554        GroupedAdmissionPolicy, QueryAdmissionAccessKind, QueryAdmissionDecision,
1555        QueryAdmissionLane, QueryAdmissionOrdering, QueryAdmissionPlanShape, QueryAdmissionPolicy,
1556        QueryAdmissionRejection, QueryAdmissionResidualFilter, QueryAdmissionSummary,
1557        QueryBoundKind, QueryMaterializationSummary,
1558    };
1559
1560    const ADMISSION_INDEX_FIELDS: [&str; 1] = ["tag"];
1561    const ADMISSION_INDEX: IndexModel = IndexModel::generated(
1562        "admission::tag",
1563        "admission::tag_store",
1564        &ADMISSION_INDEX_FIELDS,
1565        false,
1566    );
1567
1568    #[test]
1569    fn public_read_policy_has_safe_finite_defaults() {
1570        let max_rows = NonZeroU32::new(50).expect("test max rows is non-zero");
1571        let max_bytes = NonZeroU32::new(32_768).expect("test max bytes is non-zero");
1572        let policy = QueryAdmissionPolicy::public_read(max_rows, max_bytes);
1573
1574        assert_eq!(policy.lane(), QueryAdmissionLane::PublicRead);
1575        assert!(policy.require_limit());
1576        assert!(policy.require_index());
1577        assert!(policy.reject_non_zero_offset());
1578        assert!(!policy.allow_full_scan());
1579        assert!(!policy.allow_materialized_sort());
1580        assert_eq!(policy.max_returned_rows(), Some(max_rows));
1581        assert_eq!(policy.max_response_bytes(), Some(max_bytes));
1582        assert!(policy.public_caps_are_finite());
1583        assert!(!policy.grouped().has_hard_limits());
1584    }
1585
1586    #[test]
1587    fn admin_policy_is_broader_but_still_budgeted() {
1588        let max_rows = NonZeroU32::new(100).expect("test max rows is non-zero");
1589        let max_scanned = NonZeroU64::new(1_000).expect("test scan cap is non-zero");
1590        let max_bytes = NonZeroU32::new(65_536).expect("test max bytes is non-zero");
1591        let policy = QueryAdmissionPolicy::admin_ad_hoc(max_rows, max_scanned, max_bytes);
1592
1593        assert_eq!(policy.lane(), QueryAdmissionLane::AdminAdHoc);
1594        assert!(!policy.require_limit());
1595        assert!(!policy.require_index());
1596        assert!(policy.allow_full_scan());
1597        assert!(policy.allow_materialized_sort());
1598        assert_eq!(policy.max_scanned_rows(), Some(max_scanned));
1599        assert_eq!(policy.max_materialized_rows(), Some(max_rows));
1600    }
1601
1602    #[test]
1603    fn diagnostic_explain_lane_does_not_execute_rows() {
1604        let policy = QueryAdmissionPolicy::diagnostic_explain();
1605
1606        assert_eq!(policy.lane().as_str(), "diagnostic_explain");
1607        assert!(!policy.lane().executes_rows());
1608    }
1609
1610    #[test]
1611    fn grouped_policy_requires_group_and_memory_budgets() {
1612        let max_groups = NonZeroU32::new(8).expect("test group cap is non-zero");
1613        let max_bytes = NonZeroU32::new(4096).expect("test byte cap is non-zero");
1614        let policy = GroupedAdmissionPolicy::bounded(max_groups, max_bytes, None);
1615
1616        assert!(policy.has_hard_limits());
1617        assert_eq!(policy.max_groups(), Some(max_groups));
1618        assert_eq!(policy.max_group_bytes(), Some(max_bytes));
1619    }
1620
1621    #[test]
1622    fn only_proven_or_enforced_bounds_admit_public_reads() {
1623        assert!(QueryBoundKind::Exact.admits_public_read());
1624        assert!(QueryBoundKind::ConservativeUpperBound.admits_public_read());
1625        assert!(QueryBoundKind::EnforcedRuntimeCap.admits_public_read());
1626        assert!(!QueryBoundKind::EstimateOnly.admits_public_read());
1627        assert!(!QueryBoundKind::Unavailable.admits_public_read());
1628    }
1629
1630    #[test]
1631    fn access_kind_classifies_secondary_indexes_and_full_scans() {
1632        assert!(QueryAdmissionAccessKind::IndexPrefix.is_secondary_index());
1633        assert!(QueryAdmissionAccessKind::FullScan.is_full_scan());
1634        assert!(!QueryAdmissionAccessKind::ByKey.is_secondary_index());
1635    }
1636
1637    #[test]
1638    fn rejection_maps_to_stable_diagnostic() {
1639        let rejection = QueryAdmissionRejection::PublicQueryRequiresLimit;
1640        let diagnostic = rejection.diagnostic();
1641
1642        assert_eq!(
1643            rejection.error_code(),
1644            icydb_diagnostic_code::ErrorCode::QUERY_READ_PUBLIC_REQUIRES_LIMIT
1645        );
1646        assert_eq!(
1647            diagnostic.code(),
1648            icydb_diagnostic_code::DiagnosticCode::QueryReadAdmission
1649        );
1650    }
1651
1652    #[test]
1653    fn summaries_keep_decision_and_rejection_aligned() {
1654        let admitted = QueryAdmissionSummary::admitted(
1655            QueryAdmissionLane::PublicRead,
1656            QueryAdmissionAccessKind::ByKey,
1657        );
1658        let rejected = QueryAdmissionSummary::rejected(
1659            QueryAdmissionLane::PublicRead,
1660            QueryAdmissionAccessKind::FullScan,
1661            QueryAdmissionRejection::UnboundedFullScanRejected,
1662        );
1663
1664        assert_eq!(admitted.decision(), QueryAdmissionDecision::Admitted);
1665        assert_eq!(admitted.rejection(), None);
1666        assert_eq!(rejected.decision(), QueryAdmissionDecision::Rejected);
1667        assert_eq!(
1668            rejected.rejection(),
1669            Some(QueryAdmissionRejection::UnboundedFullScanRejected)
1670        );
1671    }
1672
1673    #[test]
1674    fn admission_summary_renders_stable_verbose_explain_block() {
1675        let summary = QueryAdmissionSummary::rejected(
1676            QueryAdmissionLane::PublicRead,
1677            QueryAdmissionAccessKind::FullScan,
1678            QueryAdmissionRejection::UnboundedFullScanRejected,
1679        );
1680
1681        let rendered = summary.render_text_block();
1682
1683        assert!(
1684            rendered.starts_with("admission:\n  lane=public_read\n  decision=rejected"),
1685            "admission block should start with stable lane and decision fields: {rendered}",
1686        );
1687        assert!(
1688            rendered.contains("\n  reason=unbounded_full_scan_rejected"),
1689            "admission block should include a stable rejection reason: {rendered}",
1690        );
1691        assert!(
1692            rendered.contains("\n  selected_access=full_scan"),
1693            "admission block should include the selected access class: {rendered}",
1694        );
1695        assert!(
1696            rendered.contains("\n  grouped=false"),
1697            "admission block should include grouped classification: {rendered}",
1698        );
1699    }
1700
1701    #[test]
1702    fn plan_summary_classifies_full_scan_without_overclaiming_bounds() {
1703        let plan = AccessPlannedQuery::new(AccessPath::<Value>::FullScan, MissingRowPolicy::Ignore);
1704
1705        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan);
1706
1707        assert_eq!(summary.plan_shape(), QueryAdmissionPlanShape::ScalarRead);
1708        assert_eq!(
1709            summary.selected_access(),
1710            QueryAdmissionAccessKind::FullScan
1711        );
1712        assert_eq!(summary.selected_index(), None);
1713        assert_eq!(summary.limit(), None);
1714        assert_eq!(summary.offset(), Some(0));
1715        assert_eq!(summary.scan_bound(), None);
1716        assert_eq!(summary.scan_bound_kind(), QueryBoundKind::Unavailable);
1717        assert_eq!(summary.returned_row_bound(), None);
1718        assert_eq!(
1719            summary.returned_row_bound_kind(),
1720            QueryBoundKind::Unavailable
1721        );
1722        assert_eq!(
1723            summary.residual_filter(),
1724            QueryAdmissionResidualFilter::Absent
1725        );
1726        assert_eq!(summary.ordering(), QueryAdmissionOrdering::None);
1727    }
1728
1729    #[test]
1730    fn plan_summary_uses_point_lookup_and_limit_as_proven_bounds() {
1731        let mut plan =
1732            AccessPlannedQuery::new(AccessPath::ByKey(Value::Nat64(7)), MissingRowPolicy::Ignore);
1733        plan.scalar_plan_mut().page = Some(PageSpec {
1734            limit: Some(5),
1735            offset: 2,
1736        });
1737
1738        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan);
1739
1740        assert_eq!(summary.selected_access(), QueryAdmissionAccessKind::ByKey);
1741        assert_eq!(summary.limit(), Some(5));
1742        assert_eq!(summary.offset(), Some(2));
1743        assert_eq!(summary.scan_bound(), Some(1));
1744        assert_eq!(summary.scan_bound_kind(), QueryBoundKind::Exact);
1745        assert_eq!(summary.returned_row_bound(), Some(5));
1746        assert_eq!(
1747            summary.returned_row_bound_kind(),
1748            QueryBoundKind::EnforcedRuntimeCap
1749        );
1750    }
1751
1752    #[test]
1753    fn plan_summary_preserves_selected_index_identity() {
1754        let plan = AccessPlannedQuery::new(
1755            AccessPath::IndexPrefix {
1756                index: SemanticIndexAccessContract::model_only_from_generated_index(
1757                    ADMISSION_INDEX,
1758                ),
1759                values: vec![Value::Text("alpha".to_string())],
1760            },
1761            MissingRowPolicy::Ignore,
1762        );
1763
1764        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan);
1765
1766        assert_eq!(
1767            summary.selected_access(),
1768            QueryAdmissionAccessKind::IndexPrefix
1769        );
1770        assert_eq!(summary.selected_index(), Some("admission::tag"));
1771        assert_eq!(summary.scan_bound(), None);
1772        assert_eq!(summary.scan_bound_kind(), QueryBoundKind::Unavailable);
1773    }
1774
1775    #[test]
1776    fn plan_summary_classifies_residual_and_requested_ordering() {
1777        let mut plan =
1778            AccessPlannedQuery::new(AccessPath::<Value>::FullScan, MissingRowPolicy::Ignore);
1779        plan.scalar_plan_mut().predicate = Some(Predicate::eq(
1780            "tag".to_string(),
1781            Value::Text("alpha".to_string()),
1782        ));
1783        plan.scalar_plan_mut().order = Some(OrderSpec {
1784            fields: vec![OrderTerm::field("tag", OrderDirection::Asc)],
1785        });
1786
1787        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::AdminAdHoc, &plan);
1788
1789        assert_eq!(
1790            summary.residual_filter(),
1791            QueryAdmissionResidualFilter::Predicate
1792        );
1793        assert_eq!(summary.ordering(), QueryAdmissionOrdering::Requested);
1794        assert!(!summary.materialization().materialized_sort());
1795        assert_eq!(summary.materialization().materialized_rows(), None);
1796        assert_eq!(
1797            summary.materialization().row_bound_kind(),
1798            QueryBoundKind::Unavailable
1799        );
1800    }
1801
1802    #[test]
1803    fn plan_summary_carries_grouped_execution_budgets() {
1804        let grouped =
1805            AccessPlannedQuery::new(AccessPath::<Value>::FullScan, MissingRowPolicy::Ignore)
1806                .into_grouped_with_having_expr(
1807                    GroupSpec {
1808                        group_fields: vec![FieldSlot::from_test_slot(0, "tag")],
1809                        aggregates: vec![GroupAggregateSpec {
1810                            kind: AggregateKind::Count,
1811                            input_expr: None,
1812                            filter_expr: None,
1813                            distinct: false,
1814                        }],
1815                        execution: GroupedExecutionConfig::with_hard_limits(12, 4096),
1816                    },
1817                    Some(Expr::Field(FieldId::new("tag"))),
1818                );
1819
1820        let summary =
1821            QueryAdmissionSummary::from_plan(QueryAdmissionLane::DiagnosticExplain, &grouped);
1822        let grouped = summary
1823            .grouped()
1824            .expect("summary should include grouped facts");
1825
1826        assert_eq!(
1827            summary.plan_shape(),
1828            QueryAdmissionPlanShape::GroupedAggregate
1829        );
1830        assert_eq!(grouped.group_field_count(), 1);
1831        assert_eq!(grouped.aggregate_count(), 1);
1832        assert_eq!(grouped.distinct_aggregate_count(), 0);
1833        assert_eq!(grouped.max_groups(), 12);
1834        assert_eq!(grouped.max_group_bytes(), 4096);
1835        assert!(grouped.has_having_filter());
1836        assert_eq!(summary.returned_row_bound(), Some(12));
1837        assert_eq!(
1838            summary.returned_row_bound_kind(),
1839            QueryBoundKind::ConservativeUpperBound
1840        );
1841    }
1842
1843    #[test]
1844    fn plan_summary_reads_delete_window_without_executing_it() {
1845        let mut plan =
1846            AccessPlannedQuery::new(AccessPath::<Value>::FullScan, MissingRowPolicy::Ignore);
1847        plan.scalar_plan_mut().mode = QueryMode::Delete(DeleteSpec::new());
1848        plan.scalar_plan_mut().delete_limit = Some(DeleteLimitSpec {
1849            limit: Some(3),
1850            offset: 1,
1851        });
1852
1853        let summary =
1854            QueryAdmissionSummary::from_plan(QueryAdmissionLane::DiagnosticExplain, &plan);
1855
1856        assert_eq!(summary.plan_shape(), QueryAdmissionPlanShape::Delete);
1857        assert_eq!(summary.limit(), Some(3));
1858        assert_eq!(summary.offset(), Some(1));
1859        assert_eq!(summary.returned_row_bound(), Some(3));
1860    }
1861
1862    #[test]
1863    fn public_read_evaluation_rejects_missing_limit_before_access_shape() {
1864        let policy = public_read_policy();
1865        let summary = summary_for_index_prefix(None, 0);
1866
1867        let evaluated = policy.evaluate(summary);
1868
1869        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1870        assert_eq!(
1871            evaluated.rejection(),
1872            Some(QueryAdmissionRejection::PublicQueryRequiresLimit)
1873        );
1874    }
1875
1876    #[test]
1877    fn public_read_evaluation_rejects_full_scan_even_with_limit() {
1878        let policy = public_read_policy();
1879        let summary = summary_for_path(AccessPath::<Value>::FullScan, Some(5), 0);
1880
1881        let evaluated = policy.evaluate(summary);
1882
1883        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1884        assert_eq!(
1885            evaluated.rejection(),
1886            Some(QueryAdmissionRejection::UnboundedFullScanRejected)
1887        );
1888    }
1889
1890    #[test]
1891    fn public_read_evaluation_admits_indexed_bounded_scalar_read() {
1892        let policy = public_read_policy();
1893        let summary = summary_for_index_prefix(Some(5), 0);
1894
1895        let evaluated = policy.evaluate(summary);
1896
1897        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Admitted);
1898        assert_eq!(evaluated.rejection(), None);
1899    }
1900
1901    #[test]
1902    fn public_read_evaluation_admits_exact_primary_key_read() {
1903        let policy = public_read_policy();
1904        let summary = summary_for_path(
1905            AccessPath::ByKey(Value::Text("primary".to_string())),
1906            Some(1),
1907            0,
1908        );
1909
1910        let evaluated = policy.evaluate(summary);
1911
1912        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Admitted);
1913        assert_eq!(evaluated.scan_bound(), Some(1));
1914    }
1915
1916    #[test]
1917    fn public_read_evaluation_rejects_non_zero_offset() {
1918        let policy = public_read_policy();
1919        let summary = summary_for_index_prefix(Some(5), 1);
1920
1921        let evaluated = policy.evaluate(summary);
1922
1923        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1924        assert_eq!(
1925            evaluated.rejection(),
1926            Some(QueryAdmissionRejection::PublicQueryOffsetRejected)
1927        );
1928    }
1929
1930    #[test]
1931    fn public_read_evaluation_rejects_returned_row_cap_overflow() {
1932        let policy = public_read_policy();
1933        let summary = summary_for_index_prefix(Some(51), 0);
1934
1935        let evaluated = policy.evaluate(summary);
1936
1937        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1938        assert_eq!(
1939            evaluated.rejection(),
1940            Some(QueryAdmissionRejection::ReturnedRowBoundExceedsPolicy)
1941        );
1942    }
1943
1944    #[test]
1945    fn public_read_evaluation_rejects_unresolved_order_materialized_sort() {
1946        let policy = public_read_policy();
1947        let summary = summary_for_index_prefix(Some(5), 0);
1948        let returned_row_bound = summary.returned_row_bound();
1949        let returned_row_bound_kind = summary.returned_row_bound_kind();
1950        let summary = summary.with_materialization(QueryMaterializationSummary::sort(
1951            returned_row_bound,
1952            returned_row_bound_kind,
1953        ));
1954
1955        let evaluated = policy.evaluate(summary);
1956
1957        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1958        assert_eq!(
1959            evaluated.rejection(),
1960            Some(QueryAdmissionRejection::SortRequiresMaterialization)
1961        );
1962    }
1963
1964    #[test]
1965    fn public_read_evaluation_rejects_grouped_query_without_group_budgets() {
1966        let policy = public_read_policy();
1967        let summary = grouped_summary_for_index_prefix(12, 4096, false);
1968
1969        let evaluated = policy.evaluate(summary);
1970
1971        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1972        assert_eq!(
1973            evaluated.rejection(),
1974            Some(QueryAdmissionRejection::GroupedQueryRequiresLimits)
1975        );
1976    }
1977
1978    #[test]
1979    fn public_read_evaluation_admits_grouped_query_with_group_budgets_without_limit() {
1980        let policy = public_grouped_read_policy(None);
1981        let summary = grouped_summary_for_index_prefix(12, 4096, false);
1982
1983        let evaluated = policy.evaluate(summary);
1984
1985        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Admitted);
1986        assert_eq!(evaluated.limit(), None);
1987        assert_eq!(evaluated.returned_row_bound(), Some(12));
1988        assert_eq!(evaluated.rejection(), None);
1989    }
1990
1991    #[test]
1992    fn public_read_evaluation_rejects_grouped_query_above_policy_budget() {
1993        let policy = public_grouped_read_policy(None);
1994        let summary = grouped_summary_for_index_prefix(51, 4096, false);
1995
1996        let evaluated = policy.evaluate(summary);
1997
1998        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1999        assert_eq!(
2000            evaluated.rejection(),
2001            Some(QueryAdmissionRejection::GroupedQueryExceedsBudget)
2002        );
2003    }
2004
2005    #[test]
2006    fn public_read_evaluation_rejects_distinct_grouped_query_without_distinct_budget() {
2007        let policy = public_grouped_read_policy(None);
2008        let summary = grouped_summary_for_index_prefix(12, 4096, true);
2009
2010        let evaluated = policy.evaluate(summary);
2011
2012        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2013        assert_eq!(
2014            evaluated.rejection(),
2015            Some(QueryAdmissionRejection::GroupedQueryRequiresLimits)
2016        );
2017    }
2018
2019    #[test]
2020    fn diagnostic_explain_policy_rejects_row_execution() {
2021        let policy = QueryAdmissionPolicy::diagnostic_explain();
2022        let summary = summary_for_index_prefix(Some(5), 0);
2023
2024        let evaluated = policy.evaluate(summary);
2025
2026        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2027        assert_eq!(
2028            evaluated.rejection(),
2029            Some(QueryAdmissionRejection::DiagnosticLaneDoesNotExecute)
2030        );
2031    }
2032
2033    fn public_read_policy() -> QueryAdmissionPolicy {
2034        QueryAdmissionPolicy::public_read(
2035            NonZeroU32::new(50).expect("test public row cap is non-zero"),
2036            NonZeroU32::new(32_768).expect("test public byte cap is non-zero"),
2037        )
2038    }
2039
2040    fn public_grouped_read_policy(distinct_entries: Option<NonZeroU32>) -> QueryAdmissionPolicy {
2041        public_read_policy().with_grouped_policy(GroupedAdmissionPolicy::bounded(
2042            NonZeroU32::new(50).expect("test public group cap is non-zero"),
2043            NonZeroU32::new(8192).expect("test public group byte cap is non-zero"),
2044            distinct_entries,
2045        ))
2046    }
2047
2048    fn summary_for_index_prefix(limit: Option<u32>, offset: u32) -> QueryAdmissionSummary {
2049        summary_for_path(
2050            AccessPath::IndexPrefix {
2051                index: SemanticIndexAccessContract::model_only_from_generated_index(
2052                    ADMISSION_INDEX,
2053                ),
2054                values: vec![Value::Text("alpha".to_string())],
2055            },
2056            limit,
2057            offset,
2058        )
2059    }
2060
2061    fn summary_for_path(
2062        path: AccessPath<Value>,
2063        limit: Option<u32>,
2064        offset: u32,
2065    ) -> QueryAdmissionSummary {
2066        let mut plan = AccessPlannedQuery::new(path, MissingRowPolicy::Ignore);
2067        plan.scalar_plan_mut().page = Some(PageSpec { limit, offset });
2068
2069        QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan)
2070    }
2071
2072    fn grouped_summary_for_index_prefix(
2073        max_groups: u64,
2074        max_group_bytes: u64,
2075        distinct: bool,
2076    ) -> QueryAdmissionSummary {
2077        let grouped = AccessPlannedQuery::new(index_prefix_path(), MissingRowPolicy::Ignore)
2078            .into_grouped(GroupSpec {
2079                group_fields: vec![FieldSlot::from_test_slot(0, "tag")],
2080                aggregates: vec![GroupAggregateSpec {
2081                    kind: AggregateKind::Count,
2082                    input_expr: Some(Box::new(Expr::Field(FieldId::new("tag")))),
2083                    filter_expr: None,
2084                    distinct,
2085                }],
2086                execution: GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes),
2087            });
2088
2089        QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &grouped)
2090    }
2091
2092    fn index_prefix_path() -> AccessPath<Value> {
2093        AccessPath::IndexPrefix {
2094            index: SemanticIndexAccessContract::model_only_from_generated_index(ADMISSION_INDEX),
2095            values: vec![Value::Text("alpha".to_string())],
2096        }
2097    }
2098}