Skip to main content

icydb_core/db/query/
admission.rs

1//! Module: db::query::admission
2//! Responsibility: shared read-admission vocabulary for query surfaces.
3//! Does not own: physical planning, executor runtime, or SQL/fluent lowering.
4//! Boundary: describes policy, proven bounds, and stable rejection diagnostics.
5
6use std::fmt::Write as _;
7use std::num::{NonZeroU32, NonZeroU64};
8use std::ops::Bound;
9
10use crate::{
11    db::{
12        access::IndexBranchSetOrderedSuffix,
13        query::plan::{
14            AccessPlanProjection, AccessPlannedQuery, GroupPlan, QueryMode, ResidualFilterShape,
15            ScalarPlan, project_access_plan,
16        },
17    },
18    value::Value,
19};
20use icydb_diagnostic_code::{
21    Diagnostic, DiagnosticCode, DiagnosticDetail, ErrorCode, ErrorOrigin, QueryReadAdmissionCode,
22};
23
24const DEFAULT_BOUNDED_READ_MAX_ROWS: u32 = 100;
25const DEFAULT_BOUNDED_READ_RESPONSE_BYTES: u32 = 128 * 1024;
26const DEFAULT_BOUNDED_READ_MAX_GROUPS: u32 = 100;
27const DEFAULT_BOUNDED_READ_MAX_GROUP_BYTES: u32 = 64 * 1024;
28const DEFAULT_BOUNDED_READ_MAX_DISTINCT_ENTRIES: u32 = 1024;
29
30fn non_zero_default(value: u32) -> NonZeroU32 {
31    NonZeroU32::new(value).unwrap_or(NonZeroU32::MIN)
32}
33
34/// Query execution lane selected by the public or internal caller surface.
35#[derive(Clone, Copy, Debug, Eq, PartialEq)]
36pub enum QueryAdmissionLane {
37    /// Caller-facing bounded read path for generated canister query endpoints.
38    PublicRead,
39    /// Trusted/admin ad-hoc read path with explicit budgets supplied by the embedder.
40    AdminAdHoc,
41    /// EXPLAIN-only path that describes planning and admission without row execution.
42    DiagnosticExplain,
43    /// Test-only lane for local harnesses that need to bypass production policy.
44    DevTest,
45}
46
47impl QueryAdmissionLane {
48    /// Return a stable lowercase diagnostic label for this lane.
49    #[must_use]
50    pub const fn as_str(self) -> &'static str {
51        match self {
52            Self::PublicRead => "public_read",
53            Self::AdminAdHoc => "admin_ad_hoc",
54            Self::DiagnosticExplain => "diagnostic_explain",
55            Self::DevTest => "dev_test",
56        }
57    }
58
59    /// Return whether this lane may execute and return data rows.
60    #[must_use]
61    pub const fn executes_rows(self) -> bool {
62        !matches!(self, Self::DiagnosticExplain)
63    }
64}
65
66/// Quality of the bound carried into read admission.
67#[derive(Clone, Copy, Debug, Eq, PartialEq)]
68pub enum QueryBoundKind {
69    /// Exact count known before execution.
70    Exact,
71    /// Conservative upper bound proven before execution.
72    ConservativeUpperBound,
73    /// Runtime cap enforced by the executor while producing rows.
74    EnforcedRuntimeCap,
75    /// Planner estimate only; not safe as public admission authority.
76    EstimateOnly,
77    /// No bound is available.
78    Unavailable,
79}
80
81impl QueryBoundKind {
82    /// Return a stable lowercase diagnostic label for this bound quality.
83    #[must_use]
84    pub const fn as_str(self) -> &'static str {
85        match self {
86            Self::Exact => "exact",
87            Self::ConservativeUpperBound => "conservative_upper_bound",
88            Self::EnforcedRuntimeCap => "enforced_runtime_cap",
89            Self::EstimateOnly => "estimate_only",
90            Self::Unavailable => "unavailable",
91        }
92    }
93
94    /// Return whether this bound kind is acceptable proof for public reads.
95    #[must_use]
96    pub const fn admits_public_read(self) -> bool {
97        matches!(
98            self,
99            Self::Exact | Self::ConservativeUpperBound | Self::EnforcedRuntimeCap
100        )
101    }
102}
103
104/// Final read-admission decision.
105#[derive(Clone, Copy, Debug, Eq, PartialEq)]
106pub enum QueryAdmissionDecision {
107    /// The selected plan is allowed under the lane policy.
108    Admitted,
109    /// The selected plan is rejected before execution.
110    Rejected,
111}
112
113impl QueryAdmissionDecision {
114    /// Return a stable lowercase diagnostic label for this decision.
115    #[must_use]
116    pub const fn as_str(self) -> &'static str {
117        match self {
118            Self::Admitted => "admitted",
119            Self::Rejected => "rejected",
120        }
121    }
122
123    /// Return whether the selected plan may execute.
124    #[must_use]
125    pub const fn is_admitted(self) -> bool {
126        matches!(self, Self::Admitted)
127    }
128}
129
130/// Coarse selected access-path class used by admission and EXPLAIN.
131#[derive(Clone, Copy, Debug, Eq, PartialEq)]
132pub enum QueryAdmissionAccessKind {
133    /// Access class has not been summarized yet.
134    Unknown,
135    /// Direct primary-key lookup.
136    ByKey,
137    /// Multiple direct primary-key lookups.
138    ByKeys,
139    /// Primary-key range access.
140    KeyRange,
141    /// Secondary-index prefix access.
142    IndexPrefix,
143    /// Secondary-index multi-lookup access.
144    IndexMultiLookup,
145    /// Secondary-index branch-set access.
146    IndexBranchSet,
147    /// Secondary-index range access.
148    IndexRange,
149    /// Full entity scan.
150    FullScan,
151    /// Union of multiple access paths.
152    Union,
153    /// Intersection of multiple access paths.
154    Intersection,
155}
156
157impl QueryAdmissionAccessKind {
158    /// Return a stable lowercase diagnostic label for this access class.
159    #[must_use]
160    pub const fn as_str(self) -> &'static str {
161        match self {
162            Self::Unknown => "unknown",
163            Self::ByKey => "by_key",
164            Self::ByKeys => "by_keys",
165            Self::KeyRange => "key_range",
166            Self::IndexPrefix => "index_prefix",
167            Self::IndexMultiLookup => "index_multi_lookup",
168            Self::IndexBranchSet => "index_branch_set",
169            Self::IndexRange => "index_range",
170            Self::FullScan => "full_scan",
171            Self::Union => "union",
172            Self::Intersection => "intersection",
173        }
174    }
175
176    /// Return whether this access class is backed by a secondary index.
177    #[must_use]
178    pub const fn is_secondary_index(self) -> bool {
179        matches!(
180            self,
181            Self::IndexPrefix | Self::IndexMultiLookup | Self::IndexBranchSet | Self::IndexRange
182        )
183    }
184
185    /// Return whether this access class is a full entity scan.
186    #[must_use]
187    pub const fn is_full_scan(self) -> bool {
188        matches!(self, Self::FullScan)
189    }
190}
191
192/// Coarse scalar/grouped statement shape used by read admission.
193#[derive(Clone, Copy, Debug, Eq, PartialEq)]
194pub enum QueryAdmissionPlanShape {
195    /// Scalar read shape, including projection-only and global-aggregate scalar plans.
196    ScalarRead,
197    /// Grouped aggregate read shape.
198    GroupedAggregate,
199    /// Delete shape surfaced only for diagnostics; public read lanes must not execute it.
200    Delete,
201}
202
203impl QueryAdmissionPlanShape {
204    /// Return a stable lowercase diagnostic label for this plan shape.
205    #[must_use]
206    pub const fn as_str(self) -> &'static str {
207        match self {
208            Self::ScalarRead => "scalar_read",
209            Self::GroupedAggregate => "grouped_aggregate",
210            Self::Delete => "delete",
211        }
212    }
213}
214
215/// Post-access residual filter shape relevant to admission.
216#[derive(Clone, Copy, Debug, Eq, PartialEq)]
217pub enum QueryAdmissionResidualFilter {
218    /// No residual runtime filter remains after access planning.
219    Absent,
220    /// A predicate-native residual filter remains.
221    Predicate,
222    /// An expression-backed residual filter remains.
223    Expression,
224    /// Both expression and predicate residual forms remain available.
225    ExpressionAndPredicate,
226}
227
228impl QueryAdmissionResidualFilter {
229    /// Return a stable lowercase diagnostic label for this residual shape.
230    #[must_use]
231    pub const fn as_str(self) -> &'static str {
232        match self {
233            Self::Absent => "none",
234            Self::Predicate => "predicate",
235            Self::Expression => "expression",
236            Self::ExpressionAndPredicate => "expression_and_predicate",
237        }
238    }
239
240    /// Return whether no residual runtime filter remains.
241    #[must_use]
242    pub const fn is_absent(self) -> bool {
243        matches!(self, Self::Absent)
244    }
245}
246
247/// ORDER BY facts relevant to read admission.
248#[derive(Clone, Copy, Debug, Eq, PartialEq)]
249pub enum QueryAdmissionOrdering {
250    /// No caller-visible ordering is requested.
251    None,
252    /// Ordering is requested but not yet resolved into executor slots.
253    Requested,
254    /// Ordering has a planner-resolved executor contract.
255    Resolved,
256}
257
258impl QueryAdmissionOrdering {
259    /// Return a stable lowercase diagnostic label for this ordering state.
260    #[must_use]
261    pub const fn as_str(self) -> &'static str {
262        match self {
263            Self::None => "none",
264            Self::Requested => "requested",
265            Self::Resolved => "resolved",
266        }
267    }
268}
269
270/// Grouped query facts relevant to read admission.
271#[derive(Clone, Copy, Debug, Eq, PartialEq)]
272pub struct QueryAdmissionGroupedSummary {
273    group_field_count: u32,
274    aggregate_count: u32,
275    distinct_aggregate_count: u32,
276    max_groups: u64,
277    max_group_bytes: u64,
278    having_filter: bool,
279}
280
281impl QueryAdmissionGroupedSummary {
282    /// Build one grouped admission summary from planner-owned grouped facts.
283    #[must_use]
284    pub const fn new(
285        group_field_count: u32,
286        aggregate_count: u32,
287        distinct_aggregate_count: u32,
288        max_groups: u64,
289        max_group_bytes: u64,
290        having_filter: bool,
291    ) -> Self {
292        Self {
293            group_field_count,
294            aggregate_count,
295            distinct_aggregate_count,
296            max_groups,
297            max_group_bytes,
298            having_filter,
299        }
300    }
301
302    /// Return the number of GROUP BY fields.
303    #[must_use]
304    pub const fn group_field_count(self) -> u32 {
305        self.group_field_count
306    }
307
308    /// Return the number of aggregate expressions.
309    #[must_use]
310    pub const fn aggregate_count(self) -> u32 {
311        self.aggregate_count
312    }
313
314    /// Return the number of aggregate expressions with DISTINCT state.
315    #[must_use]
316    pub const fn distinct_aggregate_count(self) -> u32 {
317        self.distinct_aggregate_count
318    }
319
320    /// Return the grouped execution maximum group count.
321    #[must_use]
322    pub const fn max_groups(self) -> u64 {
323        self.max_groups
324    }
325
326    /// Return the grouped execution maximum bytes per group accumulator.
327    #[must_use]
328    pub const fn max_group_bytes(self) -> u64 {
329        self.max_group_bytes
330    }
331
332    /// Return whether the grouped plan has a HAVING residual expression.
333    #[must_use]
334    pub const fn has_having_filter(self) -> bool {
335        self.having_filter
336    }
337}
338
339/// Grouped/aggregate read admission budgets.
340#[derive(Clone, Copy, Debug, Eq, PartialEq)]
341pub(in crate::db) struct GroupedAdmissionPolicy {
342    groups: Option<NonZeroU32>,
343    group_bytes: Option<NonZeroU32>,
344    distinct_entries: Option<NonZeroU32>,
345}
346
347impl GroupedAdmissionPolicy {
348    /// Build a policy that rejects grouped reads unless a later slice enables them.
349    #[must_use]
350    pub(in crate::db) const fn disabled() -> Self {
351        Self {
352            groups: None,
353            group_bytes: None,
354            distinct_entries: None,
355        }
356    }
357
358    /// Build a grouped policy with explicit group and memory budgets.
359    #[must_use]
360    pub(in crate::db) const fn bounded(
361        max_groups: NonZeroU32,
362        max_group_bytes: NonZeroU32,
363        max_distinct_entries: Option<NonZeroU32>,
364    ) -> Self {
365        Self {
366            groups: Some(max_groups),
367            group_bytes: Some(max_group_bytes),
368            distinct_entries: max_distinct_entries,
369        }
370    }
371
372    /// Build the default grouped budget used by ordinary typed/fluent reads.
373    ///
374    /// Grouped query execution still needs matching query-owned hard limits
375    /// via `grouped_limits(...)`; this policy defines the maximum values those
376    /// limits may carry on the default read path.
377    #[must_use]
378    pub(in crate::db) fn default_bounded_read() -> Self {
379        Self::bounded(
380            non_zero_default(DEFAULT_BOUNDED_READ_MAX_GROUPS),
381            non_zero_default(DEFAULT_BOUNDED_READ_MAX_GROUP_BYTES),
382            Some(non_zero_default(DEFAULT_BOUNDED_READ_MAX_DISTINCT_ENTRIES)),
383        )
384    }
385
386    /// Return the maximum allowed output groups.
387    #[must_use]
388    pub(in crate::db) const fn max_groups(&self) -> Option<NonZeroU32> {
389        self.groups
390    }
391
392    /// Return the maximum allowed bytes per group accumulator.
393    #[must_use]
394    pub(in crate::db) const fn max_group_bytes(&self) -> Option<NonZeroU32> {
395        self.group_bytes
396    }
397
398    /// Return the maximum allowed distinct entries for distinct-style aggregates.
399    #[must_use]
400    pub(in crate::db) const fn max_distinct_entries(&self) -> Option<NonZeroU32> {
401        self.distinct_entries
402    }
403
404    /// Return whether grouped execution has the minimum hard budgets admission needs.
405    #[must_use]
406    #[cfg(test)]
407    pub(in crate::db) const fn has_hard_limits(&self) -> bool {
408        self.groups.is_some() && self.group_bytes.is_some()
409    }
410
411    /// Project this admission policy into grouped execution caps.
412    #[must_use]
413    #[cfg(all(test, feature = "sql"))]
414    pub(in crate::db) const fn execution_config(
415        &self,
416    ) -> Option<crate::db::query::plan::GroupedExecutionConfig> {
417        match (self.groups, self.group_bytes) {
418            (Some(groups), Some(group_bytes)) => Some(
419                crate::db::query::plan::GroupedExecutionConfig::with_hard_limits(
420                    groups.get() as u64,
421                    group_bytes.get() as u64,
422                ),
423            ),
424            _ => None,
425        }
426    }
427}
428
429#[derive(Clone, Copy, Debug, Eq, PartialEq)]
430enum LimitRequirement {
431    Required,
432    Optional,
433}
434
435#[derive(Clone, Copy, Debug, Eq, PartialEq)]
436enum IndexRequirement {
437    Required,
438    Optional,
439}
440
441#[derive(Clone, Copy, Debug, Eq, PartialEq)]
442enum FullScanPolicy {
443    Allow,
444    Reject,
445}
446
447#[derive(Clone, Copy, Debug, Eq, PartialEq)]
448enum MaterializedSortPolicy {
449    Allow,
450    Reject,
451}
452
453#[derive(Clone, Copy, Debug, Eq, PartialEq)]
454enum OffsetPolicy {
455    Allow,
456    RejectNonZero,
457}
458
459/// Read-admission policy attached to one query surface.
460#[derive(Clone, Debug, Eq, PartialEq)]
461pub(in crate::db) struct QueryAdmissionPolicy {
462    lane: QueryAdmissionLane,
463    limit_requirement: LimitRequirement,
464    max_returned_rows: Option<NonZeroU32>,
465    max_scanned_rows: Option<NonZeroU64>,
466    max_response_bytes: Option<NonZeroU32>,
467    index_requirement: IndexRequirement,
468    offset_policy: OffsetPolicy,
469    full_scan_policy: FullScanPolicy,
470    materialized_sort_policy: MaterializedSortPolicy,
471    max_materialized_rows: Option<NonZeroU32>,
472    max_projection_columns: Option<NonZeroU32>,
473    grouped: GroupedAdmissionPolicy,
474}
475
476impl QueryAdmissionPolicy {
477    /// Build the safe default policy for caller-facing bounded read endpoints.
478    #[must_use]
479    pub(in crate::db) const fn public_read(
480        max_returned_rows: NonZeroU32,
481        max_response_bytes: NonZeroU32,
482    ) -> Self {
483        Self {
484            lane: QueryAdmissionLane::PublicRead,
485            limit_requirement: LimitRequirement::Required,
486            max_returned_rows: Some(max_returned_rows),
487            max_scanned_rows: None,
488            max_response_bytes: Some(max_response_bytes),
489            index_requirement: IndexRequirement::Required,
490            offset_policy: OffsetPolicy::RejectNonZero,
491            full_scan_policy: FullScanPolicy::Reject,
492            materialized_sort_policy: MaterializedSortPolicy::Reject,
493            max_materialized_rows: None,
494            max_projection_columns: None,
495            grouped: GroupedAdmissionPolicy::disabled(),
496        }
497    }
498
499    /// Build the default bounded policy used by ordinary typed/fluent reads.
500    ///
501    /// The policy rejects unindexed full scans, non-zero offsets, materialized
502    /// sorts, and queries without a proven row bound. Callers that intentionally
503    /// need a broader read must use an explicitly trusted execution method or
504    /// evaluate their own policy before executing.
505    #[must_use]
506    pub(in crate::db) fn default_bounded_read() -> Self {
507        Self::public_read(
508            non_zero_default(DEFAULT_BOUNDED_READ_MAX_ROWS),
509            non_zero_default(DEFAULT_BOUNDED_READ_RESPONSE_BYTES),
510        )
511        .with_grouped_policy(GroupedAdmissionPolicy::default_bounded_read())
512    }
513
514    /// Return this policy with explicit grouped execution budgets attached.
515    ///
516    /// Public read policies still reject grouped queries unless the selected
517    /// plan is executed with matching group-count and per-group byte caps.
518    #[must_use]
519    pub(in crate::db) const fn with_grouped_policy(
520        mut self,
521        grouped: GroupedAdmissionPolicy,
522    ) -> Self {
523        self.grouped = grouped;
524        self
525    }
526
527    /// Build a trusted ad-hoc policy with explicit execution budgets.
528    #[must_use]
529    #[cfg(test)]
530    pub(in crate::db) const fn admin_ad_hoc(
531        max_returned_rows: NonZeroU32,
532        max_scanned_rows: NonZeroU64,
533        max_response_bytes: NonZeroU32,
534    ) -> Self {
535        Self {
536            lane: QueryAdmissionLane::AdminAdHoc,
537            limit_requirement: LimitRequirement::Optional,
538            max_returned_rows: Some(max_returned_rows),
539            max_scanned_rows: Some(max_scanned_rows),
540            max_response_bytes: Some(max_response_bytes),
541            index_requirement: IndexRequirement::Optional,
542            offset_policy: OffsetPolicy::Allow,
543            full_scan_policy: FullScanPolicy::Allow,
544            materialized_sort_policy: MaterializedSortPolicy::Allow,
545            max_materialized_rows: Some(max_returned_rows),
546            max_projection_columns: None,
547            grouped: GroupedAdmissionPolicy::disabled(),
548        }
549    }
550
551    /// Build an EXPLAIN-only policy that cannot execute rows.
552    #[must_use]
553    pub(in crate::db) const fn diagnostic_explain() -> Self {
554        Self {
555            lane: QueryAdmissionLane::DiagnosticExplain,
556            limit_requirement: LimitRequirement::Optional,
557            max_returned_rows: None,
558            max_scanned_rows: None,
559            max_response_bytes: None,
560            index_requirement: IndexRequirement::Optional,
561            offset_policy: OffsetPolicy::Allow,
562            full_scan_policy: FullScanPolicy::Allow,
563            materialized_sort_policy: MaterializedSortPolicy::Allow,
564            max_materialized_rows: None,
565            max_projection_columns: None,
566            grouped: GroupedAdmissionPolicy::disabled(),
567        }
568    }
569
570    /// Return the lane this policy governs.
571    #[must_use]
572    pub(in crate::db) const fn lane(&self) -> QueryAdmissionLane {
573        self.lane
574    }
575
576    /// Return whether the surface requires caller-visible LIMIT.
577    #[must_use]
578    pub(in crate::db) const fn require_limit(&self) -> bool {
579        matches!(self.limit_requirement, LimitRequirement::Required)
580    }
581
582    /// Return the maximum rows that may be returned.
583    #[must_use]
584    #[cfg(test)]
585    pub(in crate::db) const fn max_returned_rows(&self) -> Option<NonZeroU32> {
586        self.max_returned_rows
587    }
588
589    /// Return the maximum rows that may be scanned.
590    #[must_use]
591    #[cfg(test)]
592    pub(in crate::db) const fn max_scanned_rows(&self) -> Option<NonZeroU64> {
593        self.max_scanned_rows
594    }
595
596    /// Return the maximum response bytes.
597    #[must_use]
598    #[cfg(test)]
599    pub(in crate::db) const fn max_response_bytes(&self) -> Option<NonZeroU32> {
600        self.max_response_bytes
601    }
602
603    /// Return whether the selected plan must use an index-backed path.
604    #[must_use]
605    pub(in crate::db) const fn require_index(&self) -> bool {
606        matches!(self.index_requirement, IndexRequirement::Required)
607    }
608
609    /// Return whether this surface rejects non-zero OFFSET execution.
610    #[must_use]
611    pub(in crate::db) const fn reject_non_zero_offset(&self) -> bool {
612        matches!(self.offset_policy, OffsetPolicy::RejectNonZero)
613    }
614
615    /// Return whether a full entity scan may execute.
616    #[must_use]
617    pub(in crate::db) const fn allow_full_scan(&self) -> bool {
618        matches!(self.full_scan_policy, FullScanPolicy::Allow)
619    }
620
621    /// Return whether this surface permits materialized ORDER BY execution.
622    #[must_use]
623    pub(in crate::db) const fn allow_materialized_sort(&self) -> bool {
624        matches!(self.materialized_sort_policy, MaterializedSortPolicy::Allow)
625    }
626
627    /// Return the maximum rows that may be materialized for sort/projection work.
628    #[must_use]
629    #[cfg(test)]
630    pub(in crate::db) const fn max_materialized_rows(&self) -> Option<NonZeroU32> {
631        self.max_materialized_rows
632    }
633
634    /// Return grouped/aggregate budgets.
635    #[must_use]
636    #[cfg(test)]
637    pub(in crate::db) const fn grouped(&self) -> GroupedAdmissionPolicy {
638        self.grouped
639    }
640
641    /// Return whether public-read construction kept the mandatory finite caps.
642    #[must_use]
643    #[cfg(test)]
644    pub(in crate::db) const fn public_caps_are_finite(&self) -> bool {
645        !matches!(self.lane, QueryAdmissionLane::PublicRead)
646            || (self.max_returned_rows.is_some() && self.max_response_bytes.is_some())
647    }
648
649    /// Apply this policy to one already-summarized plan.
650    #[must_use]
651    pub(in crate::db) fn evaluate(
652        &self,
653        mut summary: QueryAdmissionSummary,
654    ) -> QueryAdmissionSummary {
655        summary.lane = self.lane;
656
657        match self.rejection_for_summary(&summary) {
658            Some(rejection) => summary.reject(rejection),
659            None => summary.admit(),
660        }
661    }
662
663    fn rejection_for_summary(
664        &self,
665        summary: &QueryAdmissionSummary,
666    ) -> Option<QueryAdmissionRejection> {
667        if !self.lane.executes_rows() {
668            return Some(QueryAdmissionRejection::DiagnosticLaneDoesNotExecute);
669        }
670
671        if matches!(summary.plan_shape(), QueryAdmissionPlanShape::Delete) {
672            return Some(QueryAdmissionRejection::UnsupportedStatementForQueryLane);
673        }
674
675        if let Some(rejection) = self.grouped_rejection(summary) {
676            return Some(rejection);
677        }
678
679        if !self.allow_full_scan() && summary.selected_access().is_full_scan() {
680            return Some(QueryAdmissionRejection::UnboundedFullScanRejected);
681        }
682
683        if self.require_index()
684            && !access_satisfies_index_requirement(summary.selected_access(), summary.scan_bound())
685        {
686            return Some(QueryAdmissionRejection::PublicQueryRequiresIndex);
687        }
688
689        if self.require_limit() && summary.limit().is_none() && summary.grouped().is_none() {
690            return Some(QueryAdmissionRejection::PublicQueryRequiresLimit);
691        }
692
693        if self.reject_non_zero_offset() && summary.offset().unwrap_or_default() != 0 {
694            return Some(QueryAdmissionRejection::PublicQueryOffsetRejected);
695        }
696
697        if let Some(rejection) = self.returned_row_bound_rejection(summary) {
698            return Some(rejection);
699        }
700
701        if let Some(rejection) = self.scan_bound_rejection(summary) {
702            return Some(rejection);
703        }
704
705        self.materialization_rejection(summary)
706    }
707
708    fn grouped_rejection(
709        &self,
710        summary: &QueryAdmissionSummary,
711    ) -> Option<QueryAdmissionRejection> {
712        let grouped = summary.grouped()?;
713        let Some(max_groups) = self.grouped.max_groups() else {
714            return Some(QueryAdmissionRejection::GroupedQueryRequiresLimits);
715        };
716        let Some(max_group_bytes) = self.grouped.max_group_bytes() else {
717            return Some(QueryAdmissionRejection::GroupedQueryRequiresLimits);
718        };
719
720        if grouped.max_groups() == u64::MAX || grouped.max_group_bytes() == u64::MAX {
721            return Some(QueryAdmissionRejection::GroupedQueryRequiresLimits);
722        }
723
724        if grouped.max_groups() > u64::from(max_groups.get())
725            || grouped.max_group_bytes() > u64::from(max_group_bytes.get())
726        {
727            return Some(QueryAdmissionRejection::GroupedQueryExceedsBudget);
728        }
729
730        if grouped.distinct_aggregate_count() > 0 && self.grouped.max_distinct_entries().is_none() {
731            return Some(QueryAdmissionRejection::GroupedQueryRequiresLimits);
732        }
733
734        None
735    }
736
737    fn returned_row_bound_rejection(
738        &self,
739        summary: &QueryAdmissionSummary,
740    ) -> Option<QueryAdmissionRejection> {
741        let max_returned_rows = self.max_returned_rows?;
742
743        if matches!(
744            summary.returned_row_bound_kind(),
745            QueryBoundKind::EstimateOnly
746        ) {
747            return Some(QueryAdmissionRejection::EstimatedOnlyBoundRejected);
748        }
749
750        if !summary.returned_row_bound_kind().admits_public_read() {
751            return Some(QueryAdmissionRejection::ScanBoundUnavailable);
752        }
753
754        let Some(returned_row_bound) = summary.returned_row_bound() else {
755            return Some(QueryAdmissionRejection::ScanBoundUnavailable);
756        };
757
758        if returned_row_bound > max_returned_rows.get() {
759            return Some(QueryAdmissionRejection::ReturnedRowBoundExceedsPolicy);
760        }
761
762        None
763    }
764
765    fn scan_bound_rejection(
766        &self,
767        summary: &QueryAdmissionSummary,
768    ) -> Option<QueryAdmissionRejection> {
769        let max_scanned_rows = self.max_scanned_rows?;
770
771        if matches!(summary.scan_bound_kind(), QueryBoundKind::EstimateOnly) {
772            return Some(QueryAdmissionRejection::EstimatedOnlyBoundRejected);
773        }
774
775        if !summary.scan_bound_kind().admits_public_read() {
776            return Some(QueryAdmissionRejection::ScanBoundUnavailable);
777        }
778
779        let Some(scan_bound) = summary.scan_bound() else {
780            return Some(QueryAdmissionRejection::ScanBoundUnavailable);
781        };
782
783        if scan_bound > max_scanned_rows.get() {
784            return Some(QueryAdmissionRejection::ScanBoundExceedsPolicy);
785        }
786
787        None
788    }
789
790    fn materialization_rejection(
791        &self,
792        summary: &QueryAdmissionSummary,
793    ) -> Option<QueryAdmissionRejection> {
794        if !self.allow_materialized_sort() && summary.materialization().materialized_sort() {
795            return Some(QueryAdmissionRejection::SortRequiresMaterialization);
796        }
797
798        let max_materialized_rows = self.max_materialized_rows?;
799        let materialized_rows = summary.materialization().materialized_rows()?;
800
801        if materialized_rows > max_materialized_rows.get() {
802            Some(QueryAdmissionRejection::MaterializationExceedsBudget)
803        } else {
804            None
805        }
806    }
807}
808
809/// Materialization facts relevant to read admission.
810#[derive(Clone, Copy, Debug, Eq, PartialEq)]
811pub struct QueryMaterializationSummary {
812    materialized_sort: bool,
813    materialized_rows: Option<u32>,
814    row_bound_kind: QueryBoundKind,
815}
816
817impl QueryMaterializationSummary {
818    /// Build a summary for a plan that does not materialize rows for sorting.
819    #[must_use]
820    pub const fn none() -> Self {
821        Self {
822            materialized_sort: false,
823            materialized_rows: None,
824            row_bound_kind: QueryBoundKind::Unavailable,
825        }
826    }
827
828    /// Build a summary for a plan that materializes rows for sorting.
829    #[must_use]
830    pub const fn sort(materialized_rows: Option<u32>, row_bound_kind: QueryBoundKind) -> Self {
831        Self {
832            materialized_sort: true,
833            materialized_rows,
834            row_bound_kind,
835        }
836    }
837
838    /// Return whether the plan materializes rows for sorting.
839    #[must_use]
840    pub const fn materialized_sort(&self) -> bool {
841        self.materialized_sort
842    }
843
844    /// Return the row materialization bound, if known.
845    #[must_use]
846    pub const fn materialized_rows(&self) -> Option<u32> {
847        self.materialized_rows
848    }
849
850    /// Return the quality of the materialization row bound.
851    #[must_use]
852    pub const fn row_bound_kind(&self) -> QueryBoundKind {
853        self.row_bound_kind
854    }
855}
856
857/// Stable read-admission rejection reason.
858#[derive(Clone, Copy, Debug, Eq, PartialEq)]
859pub enum QueryAdmissionRejection {
860    /// Public reads require an explicit LIMIT.
861    PublicQueryRequiresLimit,
862    /// Public reads require a proven index-backed access path.
863    PublicQueryRequiresIndex,
864    /// The selected plan is an unbounded full scan.
865    UnboundedFullScanRejected,
866    /// No scan bound was available for a policy that requires one.
867    ScanBoundUnavailable,
868    /// The proven scan bound exceeds the policy.
869    ScanBoundExceedsPolicy,
870    /// Only an estimate was available for a policy that requires proof.
871    EstimatedOnlyBoundRejected,
872    /// ORDER BY requires materializing rows.
873    SortRequiresMaterialization,
874    /// Materialization exceeds the policy.
875    MaterializationExceedsBudget,
876    /// Projection bytes may exceed the response budget.
877    ProjectionResponseMayExceedLimit,
878    /// Grouped reads need explicit group and memory budgets.
879    GroupedQueryRequiresLimits,
880    /// Grouped read planning exceeds the policy.
881    GroupedQueryExceedsBudget,
882    /// Diagnostic lanes do not execute rows.
883    DiagnosticLaneDoesNotExecute,
884    /// Introspection is disabled for the selected lane.
885    IntrospectionDisabledForLane,
886    /// The statement shape is not supported by the selected lane.
887    UnsupportedStatementForQueryLane,
888    /// Public read endpoints do not permit non-zero OFFSET execution.
889    PublicQueryOffsetRejected,
890    /// The returned-row bound exceeds the selected policy.
891    ReturnedRowBoundExceedsPolicy,
892}
893
894impl QueryAdmissionRejection {
895    /// Return a stable lowercase diagnostic label for this rejection.
896    #[must_use]
897    pub const fn as_str(self) -> &'static str {
898        match self {
899            Self::PublicQueryRequiresLimit => "public_query_requires_limit",
900            Self::PublicQueryRequiresIndex => "public_query_requires_index",
901            Self::UnboundedFullScanRejected => "unbounded_full_scan_rejected",
902            Self::ScanBoundUnavailable => "scan_bound_unavailable",
903            Self::ScanBoundExceedsPolicy => "scan_bound_exceeds_policy",
904            Self::EstimatedOnlyBoundRejected => "estimated_only_bound_rejected",
905            Self::SortRequiresMaterialization => "sort_requires_materialization",
906            Self::MaterializationExceedsBudget => "materialization_exceeds_budget",
907            Self::ProjectionResponseMayExceedLimit => "projection_response_may_exceed_limit",
908            Self::GroupedQueryRequiresLimits => "grouped_query_requires_limits",
909            Self::GroupedQueryExceedsBudget => "grouped_query_exceeds_budget",
910            Self::DiagnosticLaneDoesNotExecute => "diagnostic_lane_does_not_execute",
911            Self::IntrospectionDisabledForLane => "introspection_disabled_for_lane",
912            Self::UnsupportedStatementForQueryLane => "unsupported_statement_for_query_lane",
913            Self::PublicQueryOffsetRejected => "public_query_offset_rejected",
914            Self::ReturnedRowBoundExceedsPolicy => "returned_row_bound_exceeds_policy",
915        }
916    }
917
918    /// Return the compact diagnostic detail code for this rejection.
919    #[must_use]
920    pub const fn code(self) -> QueryReadAdmissionCode {
921        match self {
922            Self::PublicQueryRequiresLimit => QueryReadAdmissionCode::PublicQueryRequiresLimit,
923            Self::PublicQueryRequiresIndex => QueryReadAdmissionCode::PublicQueryRequiresIndex,
924            Self::UnboundedFullScanRejected => QueryReadAdmissionCode::UnboundedFullScanRejected,
925            Self::ScanBoundUnavailable => QueryReadAdmissionCode::ScanBoundUnavailable,
926            Self::ScanBoundExceedsPolicy => QueryReadAdmissionCode::ScanBoundExceedsPolicy,
927            Self::EstimatedOnlyBoundRejected => QueryReadAdmissionCode::EstimatedOnlyBoundRejected,
928            Self::SortRequiresMaterialization => {
929                QueryReadAdmissionCode::SortRequiresMaterialization
930            }
931            Self::MaterializationExceedsBudget => {
932                QueryReadAdmissionCode::MaterializationExceedsBudget
933            }
934            Self::ProjectionResponseMayExceedLimit => {
935                QueryReadAdmissionCode::ProjectionResponseMayExceedLimit
936            }
937            Self::GroupedQueryRequiresLimits => QueryReadAdmissionCode::GroupedQueryRequiresLimits,
938            Self::GroupedQueryExceedsBudget => QueryReadAdmissionCode::GroupedQueryExceedsBudget,
939            Self::DiagnosticLaneDoesNotExecute => {
940                QueryReadAdmissionCode::DiagnosticLaneDoesNotExecute
941            }
942            Self::IntrospectionDisabledForLane => {
943                QueryReadAdmissionCode::IntrospectionDisabledForLane
944            }
945            Self::UnsupportedStatementForQueryLane => {
946                QueryReadAdmissionCode::UnsupportedStatementForQueryLane
947            }
948            Self::PublicQueryOffsetRejected => QueryReadAdmissionCode::PublicQueryOffsetRejected,
949            Self::ReturnedRowBoundExceedsPolicy => {
950                QueryReadAdmissionCode::ReturnedRowBoundExceedsPolicy
951            }
952        }
953    }
954
955    /// Return a compact diagnostic payload for this rejection.
956    #[must_use]
957    pub const fn diagnostic(self) -> Diagnostic {
958        Diagnostic::new(
959            DiagnosticCode::QueryReadAdmission,
960            ErrorOrigin::Query,
961            Some(DiagnosticDetail::QueryReadAdmission {
962                reason: self.code(),
963            }),
964        )
965    }
966
967    /// Return the public wire code for this rejection.
968    #[must_use]
969    pub const fn error_code(self) -> ErrorCode {
970        self.diagnostic().error_code()
971    }
972}
973
974/// Read-admission result and plan facts for diagnostics and EXPLAIN.
975#[derive(Clone, Debug, Eq, PartialEq)]
976pub struct QueryAdmissionSummary {
977    lane: QueryAdmissionLane,
978    decision: QueryAdmissionDecision,
979    plan_shape: QueryAdmissionPlanShape,
980    selected_access: QueryAdmissionAccessKind,
981    selected_index: Option<String>,
982    limit: Option<u32>,
983    offset: Option<u32>,
984    scan_bound: Option<u64>,
985    scan_bound_kind: QueryBoundKind,
986    returned_row_bound: Option<u32>,
987    returned_row_bound_kind: QueryBoundKind,
988    response_byte_bound: Option<u32>,
989    response_byte_bound_kind: QueryBoundKind,
990    residual_filter: QueryAdmissionResidualFilter,
991    ordering: QueryAdmissionOrdering,
992    grouped: Option<QueryAdmissionGroupedSummary>,
993    materialization: QueryMaterializationSummary,
994    rejection: Option<QueryAdmissionRejection>,
995}
996
997impl QueryAdmissionSummary {
998    /// Build an admitted summary with unknown bound details.
999    #[must_use]
1000    pub const fn admitted(
1001        lane: QueryAdmissionLane,
1002        selected_access: QueryAdmissionAccessKind,
1003    ) -> Self {
1004        Self {
1005            lane,
1006            decision: QueryAdmissionDecision::Admitted,
1007            plan_shape: QueryAdmissionPlanShape::ScalarRead,
1008            selected_access,
1009            selected_index: None,
1010            limit: None,
1011            offset: None,
1012            scan_bound: None,
1013            scan_bound_kind: QueryBoundKind::Unavailable,
1014            returned_row_bound: None,
1015            returned_row_bound_kind: QueryBoundKind::Unavailable,
1016            response_byte_bound: None,
1017            response_byte_bound_kind: QueryBoundKind::Unavailable,
1018            residual_filter: QueryAdmissionResidualFilter::Absent,
1019            ordering: QueryAdmissionOrdering::None,
1020            grouped: None,
1021            materialization: QueryMaterializationSummary::none(),
1022            rejection: None,
1023        }
1024    }
1025
1026    /// Build a rejected summary with unknown bound details.
1027    #[must_use]
1028    pub const fn rejected(
1029        lane: QueryAdmissionLane,
1030        selected_access: QueryAdmissionAccessKind,
1031        rejection: QueryAdmissionRejection,
1032    ) -> Self {
1033        Self {
1034            lane,
1035            decision: QueryAdmissionDecision::Rejected,
1036            plan_shape: QueryAdmissionPlanShape::ScalarRead,
1037            selected_access,
1038            selected_index: None,
1039            limit: None,
1040            offset: None,
1041            scan_bound: None,
1042            scan_bound_kind: QueryBoundKind::Unavailable,
1043            returned_row_bound: None,
1044            returned_row_bound_kind: QueryBoundKind::Unavailable,
1045            response_byte_bound: None,
1046            response_byte_bound_kind: QueryBoundKind::Unavailable,
1047            residual_filter: QueryAdmissionResidualFilter::Absent,
1048            ordering: QueryAdmissionOrdering::None,
1049            grouped: None,
1050            materialization: QueryMaterializationSummary::none(),
1051            rejection: Some(rejection),
1052        }
1053    }
1054
1055    /// Build one admitted summary from the already-selected access plan.
1056    #[must_use]
1057    pub(in crate::db) fn from_plan(lane: QueryAdmissionLane, plan: &AccessPlannedQuery) -> Self {
1058        let access = summarize_access_plan(plan);
1059        let grouped = plan.grouped_plan().map(summarize_grouped_plan);
1060        let (limit, offset) = scalar_limit_and_offset(plan.scalar_plan());
1061        let (returned_row_bound, returned_row_bound_kind) =
1062            returned_row_bound_from_plan(limit, grouped);
1063        let scan_bound_kind = access.scan_bound_kind();
1064        Self {
1065            lane,
1066            decision: QueryAdmissionDecision::Admitted,
1067            plan_shape: plan_shape(plan),
1068            selected_access: access.kind,
1069            selected_index: access.selected_index,
1070            limit,
1071            offset: Some(offset),
1072            scan_bound: access.exact_scan_bound,
1073            scan_bound_kind,
1074            returned_row_bound,
1075            returned_row_bound_kind,
1076            response_byte_bound: None,
1077            response_byte_bound_kind: QueryBoundKind::Unavailable,
1078            residual_filter: admission_residual_filter(plan.residual_filter_shape()),
1079            ordering: admission_ordering(plan),
1080            grouped,
1081            materialization: QueryMaterializationSummary::none(),
1082            rejection: None,
1083        }
1084    }
1085
1086    const fn admit(mut self) -> Self {
1087        self.decision = QueryAdmissionDecision::Admitted;
1088        self.rejection = None;
1089        self
1090    }
1091
1092    const fn reject(mut self, rejection: QueryAdmissionRejection) -> Self {
1093        self.decision = QueryAdmissionDecision::Rejected;
1094        self.rejection = Some(rejection);
1095        self
1096    }
1097
1098    /// Return the admission lane.
1099    #[must_use]
1100    pub const fn lane(&self) -> QueryAdmissionLane {
1101        self.lane
1102    }
1103
1104    /// Return the final decision.
1105    #[must_use]
1106    pub const fn decision(&self) -> QueryAdmissionDecision {
1107        self.decision
1108    }
1109
1110    /// Return the scalar/grouped statement shape.
1111    #[must_use]
1112    pub const fn plan_shape(&self) -> QueryAdmissionPlanShape {
1113        self.plan_shape
1114    }
1115
1116    /// Return the selected access class.
1117    #[must_use]
1118    pub const fn selected_access(&self) -> QueryAdmissionAccessKind {
1119        self.selected_access
1120    }
1121
1122    /// Return the selected index name, if one exists.
1123    #[must_use]
1124    pub fn selected_index(&self) -> Option<&str> {
1125        self.selected_index.as_deref()
1126    }
1127
1128    /// Return the caller-visible LIMIT, if present.
1129    #[must_use]
1130    pub const fn limit(&self) -> Option<u32> {
1131        self.limit
1132    }
1133
1134    /// Return the caller-visible OFFSET, if present.
1135    #[must_use]
1136    pub const fn offset(&self) -> Option<u32> {
1137        self.offset
1138    }
1139
1140    /// Return the scan bound, if known.
1141    #[must_use]
1142    pub const fn scan_bound(&self) -> Option<u64> {
1143        self.scan_bound
1144    }
1145
1146    /// Return the quality of the scan bound.
1147    #[must_use]
1148    pub const fn scan_bound_kind(&self) -> QueryBoundKind {
1149        self.scan_bound_kind
1150    }
1151
1152    /// Return the returned-row bound, if known.
1153    #[must_use]
1154    pub const fn returned_row_bound(&self) -> Option<u32> {
1155        self.returned_row_bound
1156    }
1157
1158    /// Return the quality of the returned-row bound.
1159    #[must_use]
1160    pub const fn returned_row_bound_kind(&self) -> QueryBoundKind {
1161        self.returned_row_bound_kind
1162    }
1163
1164    /// Return the response-byte bound, if known.
1165    #[must_use]
1166    pub const fn response_byte_bound(&self) -> Option<u32> {
1167        self.response_byte_bound
1168    }
1169
1170    /// Return the quality of the response-byte bound.
1171    #[must_use]
1172    pub const fn response_byte_bound_kind(&self) -> QueryBoundKind {
1173        self.response_byte_bound_kind
1174    }
1175
1176    /// Return post-access residual filter facts.
1177    #[must_use]
1178    pub const fn residual_filter(&self) -> QueryAdmissionResidualFilter {
1179        self.residual_filter
1180    }
1181
1182    /// Return ORDER BY facts.
1183    #[must_use]
1184    pub const fn ordering(&self) -> QueryAdmissionOrdering {
1185        self.ordering
1186    }
1187
1188    /// Return grouped query facts, if this is a grouped plan.
1189    #[must_use]
1190    pub const fn grouped(&self) -> Option<QueryAdmissionGroupedSummary> {
1191        self.grouped
1192    }
1193
1194    /// Return materialization facts.
1195    #[must_use]
1196    pub const fn materialization(&self) -> QueryMaterializationSummary {
1197        self.materialization
1198    }
1199
1200    /// Return a copy of this summary with route-derived materialization facts attached.
1201    #[must_use]
1202    #[cfg_attr(not(feature = "sql"), allow(dead_code))]
1203    pub(in crate::db) const fn with_materialization(
1204        mut self,
1205        materialization: QueryMaterializationSummary,
1206    ) -> Self {
1207        self.materialization = materialization;
1208        self
1209    }
1210
1211    /// Return the rejection reason, when the decision is rejected.
1212    #[must_use]
1213    pub const fn rejection(&self) -> Option<QueryAdmissionRejection> {
1214        self.rejection
1215    }
1216
1217    /// Render this summary as a stable top-level verbose EXPLAIN block.
1218    #[must_use]
1219    pub(in crate::db) fn render_text_block(&self) -> String {
1220        let mut out = String::from("admission:");
1221        push_text_field(&mut out, "lane", self.lane().as_str());
1222        push_text_field(&mut out, "decision", self.decision().as_str());
1223        push_text_field(
1224            &mut out,
1225            "reason",
1226            self.rejection()
1227                .map_or("none", QueryAdmissionRejection::as_str),
1228        );
1229        push_text_field(&mut out, "plan_shape", self.plan_shape().as_str());
1230        push_text_field(&mut out, "selected_access", self.selected_access().as_str());
1231        push_text_field(
1232            &mut out,
1233            "selected_index",
1234            self.selected_index().unwrap_or("none"),
1235        );
1236        push_text_option_u32(&mut out, "limit", self.limit());
1237        push_text_option_u32(&mut out, "offset", self.offset());
1238        push_text_option_u64(&mut out, "scan_bound", self.scan_bound());
1239        push_text_field(&mut out, "scan_bound_kind", self.scan_bound_kind().as_str());
1240        push_text_option_u32(&mut out, "returned_row_bound", self.returned_row_bound());
1241        push_text_field(
1242            &mut out,
1243            "returned_row_bound_kind",
1244            self.returned_row_bound_kind().as_str(),
1245        );
1246        push_text_option_u32(&mut out, "response_byte_bound", self.response_byte_bound());
1247        push_text_field(
1248            &mut out,
1249            "response_byte_bound_kind",
1250            self.response_byte_bound_kind().as_str(),
1251        );
1252        push_text_field(&mut out, "residual_filter", self.residual_filter().as_str());
1253        push_text_field(&mut out, "ordering", self.ordering().as_str());
1254        push_text_bool(
1255            &mut out,
1256            "materialized_sort",
1257            self.materialization().materialized_sort(),
1258        );
1259        push_text_option_u32(
1260            &mut out,
1261            "materialized_rows",
1262            self.materialization().materialized_rows(),
1263        );
1264        push_text_field(
1265            &mut out,
1266            "materialized_row_bound_kind",
1267            self.materialization().row_bound_kind().as_str(),
1268        );
1269
1270        if let Some(grouped) = self.grouped() {
1271            push_text_bool(&mut out, "grouped", true);
1272            push_text_u64(
1273                &mut out,
1274                "group_field_count",
1275                u64::from(grouped.group_field_count()),
1276            );
1277            push_text_u64(
1278                &mut out,
1279                "aggregate_count",
1280                u64::from(grouped.aggregate_count()),
1281            );
1282            push_text_u64(
1283                &mut out,
1284                "distinct_aggregate_count",
1285                u64::from(grouped.distinct_aggregate_count()),
1286            );
1287            push_text_u64(&mut out, "max_groups", grouped.max_groups());
1288            push_text_u64(&mut out, "max_group_bytes", grouped.max_group_bytes());
1289            push_text_bool(&mut out, "having_filter", grouped.has_having_filter());
1290        } else {
1291            push_text_bool(&mut out, "grouped", false);
1292        }
1293
1294        out
1295    }
1296}
1297
1298fn push_text_field(out: &mut String, key: &str, value: &str) {
1299    out.push('\n');
1300    out.push_str("  ");
1301    out.push_str(key);
1302    out.push('=');
1303    out.push_str(value);
1304}
1305
1306fn push_text_bool(out: &mut String, key: &str, value: bool) {
1307    push_text_field(out, key, if value { "true" } else { "false" });
1308}
1309
1310fn push_text_u64(out: &mut String, key: &str, value: u64) {
1311    out.push('\n');
1312    out.push_str("  ");
1313    out.push_str(key);
1314    out.push('=');
1315    let _ = write!(out, "{value}");
1316}
1317
1318fn push_text_option_u32(out: &mut String, key: &str, value: Option<u32>) {
1319    match value {
1320        Some(value) => push_text_u64(out, key, u64::from(value)),
1321        None => push_text_field(out, key, "none"),
1322    }
1323}
1324
1325fn push_text_option_u64(out: &mut String, key: &str, value: Option<u64>) {
1326    match value {
1327        Some(value) => push_text_u64(out, key, value),
1328        None => push_text_field(out, key, "none"),
1329    }
1330}
1331
1332// Keep the staged extractor live before admission enforcement calls it directly.
1333const _: fn(QueryAdmissionLane, &AccessPlannedQuery) -> QueryAdmissionSummary =
1334    QueryAdmissionSummary::from_plan;
1335
1336const fn access_satisfies_index_requirement(
1337    kind: QueryAdmissionAccessKind,
1338    scan_bound: Option<u64>,
1339) -> bool {
1340    kind.is_secondary_index()
1341        || matches!(
1342            (kind, scan_bound),
1343            (
1344                QueryAdmissionAccessKind::ByKey | QueryAdmissionAccessKind::ByKeys,
1345                Some(_)
1346            )
1347        )
1348}
1349
1350struct AdmissionAccessProjection;
1351
1352#[derive(Clone, Debug, Eq, PartialEq)]
1353struct AdmissionAccessSummary {
1354    kind: QueryAdmissionAccessKind,
1355    selected_index: Option<String>,
1356    exact_scan_bound: Option<u64>,
1357}
1358
1359impl AdmissionAccessSummary {
1360    const fn non_index(kind: QueryAdmissionAccessKind, exact_scan_bound: Option<u64>) -> Self {
1361        Self {
1362            kind,
1363            selected_index: None,
1364            exact_scan_bound,
1365        }
1366    }
1367
1368    fn secondary_index(kind: QueryAdmissionAccessKind, index_name: &str) -> Self {
1369        Self {
1370            kind,
1371            selected_index: Some(index_name.to_string()),
1372            exact_scan_bound: None,
1373        }
1374    }
1375
1376    const fn composite(kind: QueryAdmissionAccessKind) -> Self {
1377        Self {
1378            kind,
1379            selected_index: None,
1380            exact_scan_bound: None,
1381        }
1382    }
1383
1384    const fn scan_bound_kind(&self) -> QueryBoundKind {
1385        if self.exact_scan_bound.is_some() {
1386            QueryBoundKind::Exact
1387        } else {
1388            QueryBoundKind::Unavailable
1389        }
1390    }
1391}
1392
1393impl AccessPlanProjection<Value> for AdmissionAccessProjection {
1394    type Output = AdmissionAccessSummary;
1395
1396    fn by_key(&mut self, _key: &Value) -> Self::Output {
1397        AdmissionAccessSummary::non_index(QueryAdmissionAccessKind::ByKey, Some(1))
1398    }
1399
1400    fn by_keys(&mut self, keys: &[Value]) -> Self::Output {
1401        AdmissionAccessSummary::non_index(
1402            QueryAdmissionAccessKind::ByKeys,
1403            Some(u64::try_from(keys.len()).unwrap_or(u64::MAX)),
1404        )
1405    }
1406
1407    fn key_range(&mut self, _start: &Value, _end: &Value) -> Self::Output {
1408        AdmissionAccessSummary::non_index(QueryAdmissionAccessKind::KeyRange, None)
1409    }
1410
1411    fn index_prefix(
1412        &mut self,
1413        index_name: &str,
1414        _index_fields: &[String],
1415        _prefix_len: usize,
1416        _values: &[Value],
1417    ) -> Self::Output {
1418        AdmissionAccessSummary::secondary_index(QueryAdmissionAccessKind::IndexPrefix, index_name)
1419    }
1420
1421    fn index_multi_lookup(
1422        &mut self,
1423        index_name: &str,
1424        _index_fields: &[String],
1425        _values: &[Value],
1426    ) -> Self::Output {
1427        AdmissionAccessSummary::secondary_index(
1428            QueryAdmissionAccessKind::IndexMultiLookup,
1429            index_name,
1430        )
1431    }
1432
1433    fn index_branch_set(
1434        &mut self,
1435        index_name: &str,
1436        _index_fields: &[String],
1437        _fixed_values: &[Value],
1438        _branch_values: &[Value],
1439        _ordered_suffix: IndexBranchSetOrderedSuffix,
1440    ) -> Self::Output {
1441        AdmissionAccessSummary::secondary_index(
1442            QueryAdmissionAccessKind::IndexBranchSet,
1443            index_name,
1444        )
1445    }
1446
1447    fn index_range(
1448        &mut self,
1449        index_name: &str,
1450        _index_fields: &[String],
1451        _prefix_len: usize,
1452        _prefix: &[Value],
1453        _lower: &Bound<Value>,
1454        _upper: &Bound<Value>,
1455    ) -> Self::Output {
1456        AdmissionAccessSummary::secondary_index(QueryAdmissionAccessKind::IndexRange, index_name)
1457    }
1458
1459    fn full_scan(&mut self) -> Self::Output {
1460        AdmissionAccessSummary::non_index(QueryAdmissionAccessKind::FullScan, None)
1461    }
1462
1463    fn union(&mut self, _children: Vec<Self::Output>) -> Self::Output {
1464        AdmissionAccessSummary::composite(QueryAdmissionAccessKind::Union)
1465    }
1466
1467    fn intersection(&mut self, _children: Vec<Self::Output>) -> Self::Output {
1468        AdmissionAccessSummary::composite(QueryAdmissionAccessKind::Intersection)
1469    }
1470}
1471
1472fn summarize_access_plan(plan: &AccessPlannedQuery) -> AdmissionAccessSummary {
1473    project_access_plan(&plan.access, &mut AdmissionAccessProjection)
1474}
1475
1476fn summarize_grouped_plan(plan: &GroupPlan) -> QueryAdmissionGroupedSummary {
1477    QueryAdmissionGroupedSummary::new(
1478        u32::try_from(plan.group.group_fields.len()).unwrap_or(u32::MAX),
1479        u32::try_from(plan.group.aggregates.len()).unwrap_or(u32::MAX),
1480        u32::try_from(
1481            plan.group
1482                .aggregates
1483                .iter()
1484                .filter(|aggregate| aggregate.distinct)
1485                .count(),
1486        )
1487        .unwrap_or(u32::MAX),
1488        plan.group.execution.max_groups(),
1489        plan.group.execution.max_group_bytes(),
1490        plan.having_expr.is_some(),
1491    )
1492}
1493
1494const fn scalar_limit_and_offset(plan: &ScalarPlan) -> (Option<u32>, u32) {
1495    match plan.mode {
1496        QueryMode::Load(load) => match &plan.page {
1497            Some(page) => (page.limit, page.offset),
1498            None => (load.limit(), load.offset()),
1499        },
1500        QueryMode::Delete(delete) => match plan.delete_limit {
1501            Some(delete_limit) => (delete_limit.limit, delete_limit.offset),
1502            None => (delete.limit(), delete.offset()),
1503        },
1504    }
1505}
1506
1507fn returned_row_bound_from_plan(
1508    limit: Option<u32>,
1509    grouped: Option<QueryAdmissionGroupedSummary>,
1510) -> (Option<u32>, QueryBoundKind) {
1511    if let Some(limit) = limit {
1512        return (Some(limit), QueryBoundKind::EnforcedRuntimeCap);
1513    }
1514
1515    let Some(grouped) = grouped else {
1516        return (None, QueryBoundKind::Unavailable);
1517    };
1518    if grouped.max_groups() == u64::MAX {
1519        return (None, QueryBoundKind::Unavailable);
1520    }
1521
1522    (
1523        Some(u32::try_from(grouped.max_groups()).unwrap_or(u32::MAX)),
1524        QueryBoundKind::ConservativeUpperBound,
1525    )
1526}
1527
1528const fn admission_residual_filter(shape: ResidualFilterShape) -> QueryAdmissionResidualFilter {
1529    match shape {
1530        ResidualFilterShape::Absent => QueryAdmissionResidualFilter::Absent,
1531        ResidualFilterShape::Predicate => QueryAdmissionResidualFilter::Predicate,
1532        ResidualFilterShape::Expression => QueryAdmissionResidualFilter::Expression,
1533        ResidualFilterShape::ExpressionAndPredicate => {
1534            QueryAdmissionResidualFilter::ExpressionAndPredicate
1535        }
1536    }
1537}
1538
1539fn admission_ordering(plan: &AccessPlannedQuery) -> QueryAdmissionOrdering {
1540    if plan.scalar_plan().order.is_none() {
1541        return QueryAdmissionOrdering::None;
1542    }
1543
1544    if plan.resolved_order().is_some() {
1545        QueryAdmissionOrdering::Resolved
1546    } else {
1547        QueryAdmissionOrdering::Requested
1548    }
1549}
1550
1551const fn plan_shape(plan: &AccessPlannedQuery) -> QueryAdmissionPlanShape {
1552    if plan.grouped_plan().is_some() {
1553        return QueryAdmissionPlanShape::GroupedAggregate;
1554    }
1555
1556    match plan.scalar_plan().mode {
1557        QueryMode::Load(_) => QueryAdmissionPlanShape::ScalarRead,
1558        QueryMode::Delete(_) => QueryAdmissionPlanShape::Delete,
1559    }
1560}
1561
1562#[cfg(test)]
1563mod tests {
1564    use std::num::{NonZeroU32, NonZeroU64};
1565
1566    use crate::{
1567        db::{
1568            access::{AccessPath, SemanticIndexAccessContract},
1569            predicate::{MissingRowPolicy, Predicate},
1570            query::plan::{
1571                AccessPlannedQuery, AggregateKind, DeleteLimitSpec, DeleteSpec, FieldSlot,
1572                GroupAggregateSpec, GroupSpec, GroupedExecutionConfig, OrderDirection, OrderSpec,
1573                OrderTerm, PageSpec, QueryMode,
1574                expr::{Expr, FieldId},
1575            },
1576        },
1577        model::index::IndexModel,
1578        value::Value,
1579    };
1580
1581    use super::{
1582        GroupedAdmissionPolicy, QueryAdmissionAccessKind, QueryAdmissionDecision,
1583        QueryAdmissionLane, QueryAdmissionOrdering, QueryAdmissionPlanShape, QueryAdmissionPolicy,
1584        QueryAdmissionRejection, QueryAdmissionResidualFilter, QueryAdmissionSummary,
1585        QueryBoundKind, QueryMaterializationSummary,
1586    };
1587
1588    const ADMISSION_INDEX_FIELDS: [&str; 1] = ["tag"];
1589    const ADMISSION_INDEX: IndexModel = IndexModel::generated(
1590        "admission::tag",
1591        "admission::tag_store",
1592        &ADMISSION_INDEX_FIELDS,
1593        false,
1594    );
1595
1596    #[test]
1597    fn public_read_policy_has_safe_finite_defaults() {
1598        let max_rows = NonZeroU32::new(50).expect("test max rows is non-zero");
1599        let max_bytes = NonZeroU32::new(32_768).expect("test max bytes is non-zero");
1600        let policy = QueryAdmissionPolicy::public_read(max_rows, max_bytes);
1601
1602        assert_eq!(policy.lane(), QueryAdmissionLane::PublicRead);
1603        assert!(policy.require_limit());
1604        assert!(policy.require_index());
1605        assert!(policy.reject_non_zero_offset());
1606        assert!(!policy.allow_full_scan());
1607        assert!(!policy.allow_materialized_sort());
1608        assert_eq!(policy.max_returned_rows(), Some(max_rows));
1609        assert_eq!(policy.max_response_bytes(), Some(max_bytes));
1610        assert!(policy.public_caps_are_finite());
1611        assert!(!policy.grouped().has_hard_limits());
1612    }
1613
1614    #[test]
1615    fn admin_policy_is_broader_but_still_budgeted() {
1616        let max_rows = NonZeroU32::new(100).expect("test max rows is non-zero");
1617        let max_scanned = NonZeroU64::new(1_000).expect("test scan cap is non-zero");
1618        let max_bytes = NonZeroU32::new(65_536).expect("test max bytes is non-zero");
1619        let policy = QueryAdmissionPolicy::admin_ad_hoc(max_rows, max_scanned, max_bytes);
1620
1621        assert_eq!(policy.lane(), QueryAdmissionLane::AdminAdHoc);
1622        assert!(!policy.require_limit());
1623        assert!(!policy.require_index());
1624        assert!(policy.allow_full_scan());
1625        assert!(policy.allow_materialized_sort());
1626        assert_eq!(policy.max_scanned_rows(), Some(max_scanned));
1627        assert_eq!(policy.max_materialized_rows(), Some(max_rows));
1628    }
1629
1630    #[test]
1631    fn diagnostic_explain_lane_does_not_execute_rows() {
1632        let policy = QueryAdmissionPolicy::diagnostic_explain();
1633
1634        assert_eq!(policy.lane().as_str(), "diagnostic_explain");
1635        assert!(!policy.lane().executes_rows());
1636    }
1637
1638    #[test]
1639    fn grouped_policy_requires_group_and_memory_budgets() {
1640        let max_groups = NonZeroU32::new(8).expect("test group cap is non-zero");
1641        let max_bytes = NonZeroU32::new(4096).expect("test byte cap is non-zero");
1642        let policy = GroupedAdmissionPolicy::bounded(max_groups, max_bytes, None);
1643
1644        assert!(policy.has_hard_limits());
1645        assert_eq!(policy.max_groups(), Some(max_groups));
1646        assert_eq!(policy.max_group_bytes(), Some(max_bytes));
1647    }
1648
1649    #[test]
1650    fn only_proven_or_enforced_bounds_admit_public_reads() {
1651        assert!(QueryBoundKind::Exact.admits_public_read());
1652        assert!(QueryBoundKind::ConservativeUpperBound.admits_public_read());
1653        assert!(QueryBoundKind::EnforcedRuntimeCap.admits_public_read());
1654        assert!(!QueryBoundKind::EstimateOnly.admits_public_read());
1655        assert!(!QueryBoundKind::Unavailable.admits_public_read());
1656    }
1657
1658    #[test]
1659    fn access_kind_classifies_secondary_indexes_and_full_scans() {
1660        assert!(QueryAdmissionAccessKind::IndexPrefix.is_secondary_index());
1661        assert!(QueryAdmissionAccessKind::FullScan.is_full_scan());
1662        assert!(!QueryAdmissionAccessKind::ByKey.is_secondary_index());
1663    }
1664
1665    #[test]
1666    fn rejection_maps_to_stable_diagnostic() {
1667        let rejection = QueryAdmissionRejection::PublicQueryRequiresLimit;
1668        let diagnostic = rejection.diagnostic();
1669
1670        assert_eq!(
1671            rejection.error_code(),
1672            icydb_diagnostic_code::ErrorCode::QUERY_READ_PUBLIC_REQUIRES_LIMIT
1673        );
1674        assert_eq!(
1675            diagnostic.code(),
1676            icydb_diagnostic_code::DiagnosticCode::QueryReadAdmission
1677        );
1678    }
1679
1680    #[test]
1681    fn summaries_keep_decision_and_rejection_aligned() {
1682        let admitted = QueryAdmissionSummary::admitted(
1683            QueryAdmissionLane::PublicRead,
1684            QueryAdmissionAccessKind::ByKey,
1685        );
1686        let rejected = QueryAdmissionSummary::rejected(
1687            QueryAdmissionLane::PublicRead,
1688            QueryAdmissionAccessKind::FullScan,
1689            QueryAdmissionRejection::UnboundedFullScanRejected,
1690        );
1691
1692        assert_eq!(admitted.decision(), QueryAdmissionDecision::Admitted);
1693        assert_eq!(admitted.rejection(), None);
1694        assert_eq!(rejected.decision(), QueryAdmissionDecision::Rejected);
1695        assert_eq!(
1696            rejected.rejection(),
1697            Some(QueryAdmissionRejection::UnboundedFullScanRejected)
1698        );
1699    }
1700
1701    #[test]
1702    fn admission_summary_renders_stable_verbose_explain_block() {
1703        let summary = QueryAdmissionSummary::rejected(
1704            QueryAdmissionLane::PublicRead,
1705            QueryAdmissionAccessKind::FullScan,
1706            QueryAdmissionRejection::UnboundedFullScanRejected,
1707        );
1708
1709        let rendered = summary.render_text_block();
1710
1711        assert!(
1712            rendered.starts_with("admission:\n  lane=public_read\n  decision=rejected"),
1713            "admission block should start with stable lane and decision fields: {rendered}",
1714        );
1715        assert!(
1716            rendered.contains("\n  reason=unbounded_full_scan_rejected"),
1717            "admission block should include a stable rejection reason: {rendered}",
1718        );
1719        assert!(
1720            rendered.contains("\n  selected_access=full_scan"),
1721            "admission block should include the selected access class: {rendered}",
1722        );
1723        assert!(
1724            rendered.contains("\n  grouped=false"),
1725            "admission block should include grouped classification: {rendered}",
1726        );
1727    }
1728
1729    #[test]
1730    fn plan_summary_classifies_full_scan_without_overclaiming_bounds() {
1731        let plan = AccessPlannedQuery::new(AccessPath::<Value>::FullScan, MissingRowPolicy::Ignore);
1732
1733        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan);
1734
1735        assert_eq!(summary.plan_shape(), QueryAdmissionPlanShape::ScalarRead);
1736        assert_eq!(
1737            summary.selected_access(),
1738            QueryAdmissionAccessKind::FullScan
1739        );
1740        assert_eq!(summary.selected_index(), None);
1741        assert_eq!(summary.limit(), None);
1742        assert_eq!(summary.offset(), Some(0));
1743        assert_eq!(summary.scan_bound(), None);
1744        assert_eq!(summary.scan_bound_kind(), QueryBoundKind::Unavailable);
1745        assert_eq!(summary.returned_row_bound(), None);
1746        assert_eq!(
1747            summary.returned_row_bound_kind(),
1748            QueryBoundKind::Unavailable
1749        );
1750        assert_eq!(
1751            summary.residual_filter(),
1752            QueryAdmissionResidualFilter::Absent
1753        );
1754        assert_eq!(summary.ordering(), QueryAdmissionOrdering::None);
1755    }
1756
1757    #[test]
1758    fn plan_summary_uses_point_lookup_and_limit_as_proven_bounds() {
1759        let mut plan =
1760            AccessPlannedQuery::new(AccessPath::ByKey(Value::Nat64(7)), MissingRowPolicy::Ignore);
1761        plan.scalar_plan_mut().page = Some(PageSpec {
1762            limit: Some(5),
1763            offset: 2,
1764        });
1765
1766        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan);
1767
1768        assert_eq!(summary.selected_access(), QueryAdmissionAccessKind::ByKey);
1769        assert_eq!(summary.limit(), Some(5));
1770        assert_eq!(summary.offset(), Some(2));
1771        assert_eq!(summary.scan_bound(), Some(1));
1772        assert_eq!(summary.scan_bound_kind(), QueryBoundKind::Exact);
1773        assert_eq!(summary.returned_row_bound(), Some(5));
1774        assert_eq!(
1775            summary.returned_row_bound_kind(),
1776            QueryBoundKind::EnforcedRuntimeCap
1777        );
1778    }
1779
1780    #[test]
1781    fn plan_summary_preserves_selected_index_identity() {
1782        let plan = AccessPlannedQuery::new(
1783            AccessPath::IndexPrefix {
1784                index: SemanticIndexAccessContract::model_only_from_generated_index(
1785                    ADMISSION_INDEX,
1786                ),
1787                values: vec![Value::Text("alpha".to_string())],
1788            },
1789            MissingRowPolicy::Ignore,
1790        );
1791
1792        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan);
1793
1794        assert_eq!(
1795            summary.selected_access(),
1796            QueryAdmissionAccessKind::IndexPrefix
1797        );
1798        assert_eq!(summary.selected_index(), Some("admission::tag"));
1799        assert_eq!(summary.scan_bound(), None);
1800        assert_eq!(summary.scan_bound_kind(), QueryBoundKind::Unavailable);
1801    }
1802
1803    #[test]
1804    fn plan_summary_classifies_residual_and_requested_ordering() {
1805        let mut plan =
1806            AccessPlannedQuery::new(AccessPath::<Value>::FullScan, MissingRowPolicy::Ignore);
1807        plan.scalar_plan_mut().predicate = Some(Predicate::eq(
1808            "tag".to_string(),
1809            Value::Text("alpha".to_string()),
1810        ));
1811        plan.scalar_plan_mut().order = Some(OrderSpec {
1812            fields: vec![OrderTerm::field("tag", OrderDirection::Asc)],
1813        });
1814
1815        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::AdminAdHoc, &plan);
1816
1817        assert_eq!(
1818            summary.residual_filter(),
1819            QueryAdmissionResidualFilter::Predicate
1820        );
1821        assert_eq!(summary.ordering(), QueryAdmissionOrdering::Requested);
1822        assert!(!summary.materialization().materialized_sort());
1823        assert_eq!(summary.materialization().materialized_rows(), None);
1824        assert_eq!(
1825            summary.materialization().row_bound_kind(),
1826            QueryBoundKind::Unavailable
1827        );
1828    }
1829
1830    #[test]
1831    fn plan_summary_carries_grouped_execution_budgets() {
1832        let grouped =
1833            AccessPlannedQuery::new(AccessPath::<Value>::FullScan, MissingRowPolicy::Ignore)
1834                .into_grouped_with_having_expr(
1835                    GroupSpec {
1836                        group_fields: vec![FieldSlot::from_test_slot(0, "tag")],
1837                        aggregates: vec![GroupAggregateSpec {
1838                            kind: AggregateKind::Count,
1839                            input_expr: None,
1840                            filter_expr: None,
1841                            distinct: false,
1842                        }],
1843                        execution: GroupedExecutionConfig::with_hard_limits(12, 4096),
1844                    },
1845                    Some(Expr::Field(FieldId::new("tag"))),
1846                );
1847
1848        let summary =
1849            QueryAdmissionSummary::from_plan(QueryAdmissionLane::DiagnosticExplain, &grouped);
1850        let grouped = summary
1851            .grouped()
1852            .expect("summary should include grouped facts");
1853
1854        assert_eq!(
1855            summary.plan_shape(),
1856            QueryAdmissionPlanShape::GroupedAggregate
1857        );
1858        assert_eq!(grouped.group_field_count(), 1);
1859        assert_eq!(grouped.aggregate_count(), 1);
1860        assert_eq!(grouped.distinct_aggregate_count(), 0);
1861        assert_eq!(grouped.max_groups(), 12);
1862        assert_eq!(grouped.max_group_bytes(), 4096);
1863        assert!(grouped.has_having_filter());
1864        assert_eq!(summary.returned_row_bound(), Some(12));
1865        assert_eq!(
1866            summary.returned_row_bound_kind(),
1867            QueryBoundKind::ConservativeUpperBound
1868        );
1869    }
1870
1871    #[test]
1872    fn plan_summary_reads_delete_window_without_executing_it() {
1873        let mut plan =
1874            AccessPlannedQuery::new(AccessPath::<Value>::FullScan, MissingRowPolicy::Ignore);
1875        plan.scalar_plan_mut().mode = QueryMode::Delete(DeleteSpec::new());
1876        plan.scalar_plan_mut().delete_limit = Some(DeleteLimitSpec {
1877            limit: Some(3),
1878            offset: 1,
1879        });
1880
1881        let summary =
1882            QueryAdmissionSummary::from_plan(QueryAdmissionLane::DiagnosticExplain, &plan);
1883
1884        assert_eq!(summary.plan_shape(), QueryAdmissionPlanShape::Delete);
1885        assert_eq!(summary.limit(), Some(3));
1886        assert_eq!(summary.offset(), Some(1));
1887        assert_eq!(summary.returned_row_bound(), Some(3));
1888    }
1889
1890    #[test]
1891    fn public_read_evaluation_rejects_missing_limit_before_access_shape() {
1892        let policy = public_read_policy();
1893        let summary = summary_for_index_prefix(None, 0);
1894
1895        let evaluated = policy.evaluate(summary);
1896
1897        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1898        assert_eq!(
1899            evaluated.rejection(),
1900            Some(QueryAdmissionRejection::PublicQueryRequiresLimit)
1901        );
1902    }
1903
1904    #[test]
1905    fn public_read_evaluation_rejects_full_scan_even_with_limit() {
1906        let policy = public_read_policy();
1907        let summary = summary_for_path(AccessPath::<Value>::FullScan, Some(5), 0);
1908
1909        let evaluated = policy.evaluate(summary);
1910
1911        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1912        assert_eq!(
1913            evaluated.rejection(),
1914            Some(QueryAdmissionRejection::UnboundedFullScanRejected)
1915        );
1916    }
1917
1918    #[test]
1919    fn public_read_evaluation_admits_indexed_bounded_scalar_read() {
1920        let policy = public_read_policy();
1921        let summary = summary_for_index_prefix(Some(5), 0);
1922
1923        let evaluated = policy.evaluate(summary);
1924
1925        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Admitted);
1926        assert_eq!(evaluated.rejection(), None);
1927    }
1928
1929    #[test]
1930    fn public_read_evaluation_admits_exact_primary_key_read() {
1931        let policy = public_read_policy();
1932        let summary = summary_for_path(
1933            AccessPath::ByKey(Value::Text("primary".to_string())),
1934            Some(1),
1935            0,
1936        );
1937
1938        let evaluated = policy.evaluate(summary);
1939
1940        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Admitted);
1941        assert_eq!(evaluated.scan_bound(), Some(1));
1942    }
1943
1944    #[test]
1945    fn public_read_evaluation_rejects_non_zero_offset() {
1946        let policy = public_read_policy();
1947        let summary = summary_for_index_prefix(Some(5), 1);
1948
1949        let evaluated = policy.evaluate(summary);
1950
1951        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1952        assert_eq!(
1953            evaluated.rejection(),
1954            Some(QueryAdmissionRejection::PublicQueryOffsetRejected)
1955        );
1956    }
1957
1958    #[test]
1959    fn public_read_evaluation_rejects_returned_row_cap_overflow() {
1960        let policy = public_read_policy();
1961        let summary = summary_for_index_prefix(Some(51), 0);
1962
1963        let evaluated = policy.evaluate(summary);
1964
1965        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1966        assert_eq!(
1967            evaluated.rejection(),
1968            Some(QueryAdmissionRejection::ReturnedRowBoundExceedsPolicy)
1969        );
1970    }
1971
1972    #[test]
1973    fn public_read_evaluation_rejects_unresolved_order_materialized_sort() {
1974        let policy = public_read_policy();
1975        let summary = summary_for_index_prefix(Some(5), 0);
1976        let returned_row_bound = summary.returned_row_bound();
1977        let returned_row_bound_kind = summary.returned_row_bound_kind();
1978        let summary = summary.with_materialization(QueryMaterializationSummary::sort(
1979            returned_row_bound,
1980            returned_row_bound_kind,
1981        ));
1982
1983        let evaluated = policy.evaluate(summary);
1984
1985        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1986        assert_eq!(
1987            evaluated.rejection(),
1988            Some(QueryAdmissionRejection::SortRequiresMaterialization)
1989        );
1990    }
1991
1992    #[test]
1993    fn public_read_evaluation_rejects_grouped_query_without_group_budgets() {
1994        let policy = public_read_policy();
1995        let summary = grouped_summary_for_index_prefix(12, 4096, false);
1996
1997        let evaluated = policy.evaluate(summary);
1998
1999        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2000        assert_eq!(
2001            evaluated.rejection(),
2002            Some(QueryAdmissionRejection::GroupedQueryRequiresLimits)
2003        );
2004    }
2005
2006    #[test]
2007    fn public_read_evaluation_admits_grouped_query_with_group_budgets_without_limit() {
2008        let policy = public_grouped_read_policy(None);
2009        let summary = grouped_summary_for_index_prefix(12, 4096, false);
2010
2011        let evaluated = policy.evaluate(summary);
2012
2013        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Admitted);
2014        assert_eq!(evaluated.limit(), None);
2015        assert_eq!(evaluated.returned_row_bound(), Some(12));
2016        assert_eq!(evaluated.rejection(), None);
2017    }
2018
2019    #[test]
2020    fn public_read_evaluation_rejects_grouped_query_above_policy_budget() {
2021        let policy = public_grouped_read_policy(None);
2022        let summary = grouped_summary_for_index_prefix(51, 4096, false);
2023
2024        let evaluated = policy.evaluate(summary);
2025
2026        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2027        assert_eq!(
2028            evaluated.rejection(),
2029            Some(QueryAdmissionRejection::GroupedQueryExceedsBudget)
2030        );
2031    }
2032
2033    #[test]
2034    fn public_read_evaluation_rejects_distinct_grouped_query_without_distinct_budget() {
2035        let policy = public_grouped_read_policy(None);
2036        let summary = grouped_summary_for_index_prefix(12, 4096, true);
2037
2038        let evaluated = policy.evaluate(summary);
2039
2040        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2041        assert_eq!(
2042            evaluated.rejection(),
2043            Some(QueryAdmissionRejection::GroupedQueryRequiresLimits)
2044        );
2045    }
2046
2047    #[test]
2048    fn diagnostic_explain_policy_rejects_row_execution() {
2049        let policy = QueryAdmissionPolicy::diagnostic_explain();
2050        let summary = summary_for_index_prefix(Some(5), 0);
2051
2052        let evaluated = policy.evaluate(summary);
2053
2054        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2055        assert_eq!(
2056            evaluated.rejection(),
2057            Some(QueryAdmissionRejection::DiagnosticLaneDoesNotExecute)
2058        );
2059    }
2060
2061    fn public_read_policy() -> QueryAdmissionPolicy {
2062        QueryAdmissionPolicy::public_read(
2063            NonZeroU32::new(50).expect("test public row cap is non-zero"),
2064            NonZeroU32::new(32_768).expect("test public byte cap is non-zero"),
2065        )
2066    }
2067
2068    fn public_grouped_read_policy(distinct_entries: Option<NonZeroU32>) -> QueryAdmissionPolicy {
2069        public_read_policy().with_grouped_policy(GroupedAdmissionPolicy::bounded(
2070            NonZeroU32::new(50).expect("test public group cap is non-zero"),
2071            NonZeroU32::new(8192).expect("test public group byte cap is non-zero"),
2072            distinct_entries,
2073        ))
2074    }
2075
2076    fn summary_for_index_prefix(limit: Option<u32>, offset: u32) -> QueryAdmissionSummary {
2077        summary_for_path(
2078            AccessPath::IndexPrefix {
2079                index: SemanticIndexAccessContract::model_only_from_generated_index(
2080                    ADMISSION_INDEX,
2081                ),
2082                values: vec![Value::Text("alpha".to_string())],
2083            },
2084            limit,
2085            offset,
2086        )
2087    }
2088
2089    fn summary_for_path(
2090        path: AccessPath<Value>,
2091        limit: Option<u32>,
2092        offset: u32,
2093    ) -> QueryAdmissionSummary {
2094        let mut plan = AccessPlannedQuery::new(path, MissingRowPolicy::Ignore);
2095        plan.scalar_plan_mut().page = Some(PageSpec { limit, offset });
2096
2097        QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan)
2098    }
2099
2100    fn grouped_summary_for_index_prefix(
2101        max_groups: u64,
2102        max_group_bytes: u64,
2103        distinct: bool,
2104    ) -> QueryAdmissionSummary {
2105        let grouped = AccessPlannedQuery::new(index_prefix_path(), MissingRowPolicy::Ignore)
2106            .into_grouped(GroupSpec {
2107                group_fields: vec![FieldSlot::from_test_slot(0, "tag")],
2108                aggregates: vec![GroupAggregateSpec {
2109                    kind: AggregateKind::Count,
2110                    input_expr: Some(Box::new(Expr::Field(FieldId::new("tag")))),
2111                    filter_expr: None,
2112                    distinct,
2113                }],
2114                execution: GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes),
2115            });
2116
2117        QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &grouped)
2118    }
2119
2120    fn index_prefix_path() -> AccessPath<Value> {
2121        AccessPath::IndexPrefix {
2122            index: SemanticIndexAccessContract::model_only_from_generated_index(ADMISSION_INDEX),
2123            values: vec![Value::Text("alpha".to_string())],
2124        }
2125    }
2126}