Skip to main content

icydb_core/db/query/
admission.rs

1//! Module: db::query::admission
2//! Responsibility: shared read-admission vocabulary for query surfaces.
3//! Does not own: physical planning, executor runtime, or SQL/fluent lowering.
4//! Boundary: describes policy, proven bounds, and stable rejection diagnostics.
5
6use std::fmt::Write as _;
7use std::num::{NonZeroU32, NonZeroU64};
8use std::ops::Bound;
9
10use crate::{
11    db::{
12        access::IndexBranchSetOrderedSuffix,
13        query::plan::{
14            AccessPlanProjection, AccessPlannedQuery, GroupPlan, QueryMode, ResidualFilterShape,
15            ScalarPlan, project_access_plan,
16        },
17    },
18    value::Value,
19};
20use icydb_diagnostic_code::{
21    Diagnostic, DiagnosticCode, DiagnosticDetail, ErrorCode, ErrorOrigin, QueryReadAdmissionCode,
22};
23
24const DEFAULT_BOUNDED_READ_MAX_ROWS: u32 = 100;
25const DEFAULT_BOUNDED_READ_RESPONSE_BYTES: u32 = 128 * 1024;
26const DEFAULT_BOUNDED_READ_MAX_GROUPS: u32 = 100;
27const DEFAULT_BOUNDED_READ_MAX_GROUP_BYTES: u32 = 64 * 1024;
28const DEFAULT_BOUNDED_READ_MAX_DISTINCT_ENTRIES: u32 = 1024;
29
30fn non_zero_default(value: u32) -> NonZeroU32 {
31    NonZeroU32::new(value).unwrap_or(NonZeroU32::MIN)
32}
33
34/// Query execution lane selected by the public or internal caller surface.
35#[derive(Clone, Copy, Debug, Eq, PartialEq)]
36pub enum QueryAdmissionLane {
37    /// Caller-facing bounded read path for generated canister query endpoints.
38    PublicRead,
39    /// Trusted/admin ad-hoc read path with explicit budgets supplied by the embedder.
40    AdminAdHoc,
41    /// EXPLAIN-only path that describes planning and admission without row execution.
42    DiagnosticExplain,
43    /// Test-only lane for local harnesses that need to bypass production policy.
44    DevTest,
45}
46
47impl QueryAdmissionLane {
48    /// Return a stable lowercase diagnostic label for this lane.
49    #[must_use]
50    pub const fn as_str(self) -> &'static str {
51        match self {
52            Self::PublicRead => "public_read",
53            Self::AdminAdHoc => "admin_ad_hoc",
54            Self::DiagnosticExplain => "diagnostic_explain",
55            Self::DevTest => "dev_test",
56        }
57    }
58
59    /// Return whether this lane may execute and return data rows.
60    #[must_use]
61    pub const fn executes_rows(self) -> bool {
62        !matches!(self, Self::DiagnosticExplain)
63    }
64}
65
66/// Quality of the bound carried into read admission.
67#[derive(Clone, Copy, Debug, Eq, PartialEq)]
68pub enum QueryBoundKind {
69    /// Exact count known before execution.
70    Exact,
71    /// Conservative upper bound proven before execution.
72    ConservativeUpperBound,
73    /// Runtime cap enforced by the executor while producing rows.
74    EnforcedRuntimeCap,
75    /// Planner estimate only; not safe as public admission authority.
76    EstimateOnly,
77    /// No bound is available.
78    Unavailable,
79}
80
81impl QueryBoundKind {
82    /// Return a stable lowercase diagnostic label for this bound quality.
83    #[must_use]
84    pub const fn as_str(self) -> &'static str {
85        match self {
86            Self::Exact => "exact",
87            Self::ConservativeUpperBound => "conservative_upper_bound",
88            Self::EnforcedRuntimeCap => "enforced_runtime_cap",
89            Self::EstimateOnly => "estimate_only",
90            Self::Unavailable => "unavailable",
91        }
92    }
93
94    /// Return whether this bound kind is acceptable proof for public reads.
95    #[must_use]
96    pub const fn admits_public_read(self) -> bool {
97        matches!(
98            self,
99            Self::Exact | Self::ConservativeUpperBound | Self::EnforcedRuntimeCap
100        )
101    }
102}
103
104/// Final read-admission decision.
105#[derive(Clone, Copy, Debug, Eq, PartialEq)]
106pub enum QueryAdmissionDecision {
107    /// The selected plan is allowed under the lane policy.
108    Admitted,
109    /// The selected plan is rejected before execution.
110    Rejected,
111}
112
113impl QueryAdmissionDecision {
114    /// Return a stable lowercase diagnostic label for this decision.
115    #[must_use]
116    pub const fn as_str(self) -> &'static str {
117        match self {
118            Self::Admitted => "admitted",
119            Self::Rejected => "rejected",
120        }
121    }
122
123    /// Return whether the selected plan may execute.
124    #[must_use]
125    pub const fn is_admitted(self) -> bool {
126        matches!(self, Self::Admitted)
127    }
128}
129
130/// Coarse selected access-path class used by admission and EXPLAIN.
131#[derive(Clone, Copy, Debug, Eq, PartialEq)]
132pub enum QueryAdmissionAccessKind {
133    /// Access class has not been summarized yet.
134    Unknown,
135    /// Direct primary-key lookup.
136    ByKey,
137    /// Multiple direct primary-key lookups.
138    ByKeys,
139    /// Primary-key range access.
140    KeyRange,
141    /// Secondary-index prefix access.
142    IndexPrefix,
143    /// Secondary-index multi-lookup access.
144    IndexMultiLookup,
145    /// Secondary-index branch-set access.
146    IndexBranchSet,
147    /// Secondary-index range access.
148    IndexRange,
149    /// Full entity scan.
150    FullScan,
151    /// Union of multiple access paths.
152    Union,
153    /// Intersection of multiple access paths.
154    Intersection,
155}
156
157impl QueryAdmissionAccessKind {
158    /// Return a stable lowercase diagnostic label for this access class.
159    #[must_use]
160    pub const fn as_str(self) -> &'static str {
161        match self {
162            Self::Unknown => "unknown",
163            Self::ByKey => "by_key",
164            Self::ByKeys => "by_keys",
165            Self::KeyRange => "key_range",
166            Self::IndexPrefix => "index_prefix",
167            Self::IndexMultiLookup => "index_multi_lookup",
168            Self::IndexBranchSet => "index_branch_set",
169            Self::IndexRange => "index_range",
170            Self::FullScan => "full_scan",
171            Self::Union => "union",
172            Self::Intersection => "intersection",
173        }
174    }
175
176    /// Return whether this access class is backed by a secondary index.
177    #[must_use]
178    pub const fn is_secondary_index(self) -> bool {
179        matches!(
180            self,
181            Self::IndexPrefix | Self::IndexMultiLookup | Self::IndexBranchSet | Self::IndexRange
182        )
183    }
184
185    /// Return whether this access class is a full entity scan.
186    #[must_use]
187    pub const fn is_full_scan(self) -> bool {
188        matches!(self, Self::FullScan)
189    }
190}
191
192/// Coarse scalar/grouped statement shape used by read admission.
193#[derive(Clone, Copy, Debug, Eq, PartialEq)]
194pub enum QueryAdmissionPlanShape {
195    /// Scalar read shape, including projection-only and global-aggregate scalar plans.
196    ScalarRead,
197    /// Grouped aggregate read shape.
198    GroupedAggregate,
199    /// Delete shape surfaced only for diagnostics; public read lanes must not execute it.
200    Delete,
201}
202
203impl QueryAdmissionPlanShape {
204    /// Return a stable lowercase diagnostic label for this plan shape.
205    #[must_use]
206    pub const fn as_str(self) -> &'static str {
207        match self {
208            Self::ScalarRead => "scalar_read",
209            Self::GroupedAggregate => "grouped_aggregate",
210            Self::Delete => "delete",
211        }
212    }
213}
214
215/// Post-access residual filter shape relevant to admission.
216#[derive(Clone, Copy, Debug, Eq, PartialEq)]
217pub enum QueryAdmissionResidualFilter {
218    /// No residual runtime filter remains after access planning.
219    Absent,
220    /// A predicate-native residual filter remains.
221    Predicate,
222    /// An expression-backed residual filter remains.
223    Expression,
224    /// Both expression and predicate residual forms remain available.
225    ExpressionAndPredicate,
226}
227
228impl QueryAdmissionResidualFilter {
229    /// Return a stable lowercase diagnostic label for this residual shape.
230    #[must_use]
231    pub const fn as_str(self) -> &'static str {
232        match self {
233            Self::Absent => "none",
234            Self::Predicate => "predicate",
235            Self::Expression => "expression",
236            Self::ExpressionAndPredicate => "expression_and_predicate",
237        }
238    }
239
240    /// Return whether no residual runtime filter remains.
241    #[must_use]
242    pub const fn is_absent(self) -> bool {
243        matches!(self, Self::Absent)
244    }
245}
246
247/// ORDER BY facts relevant to read admission.
248#[derive(Clone, Copy, Debug, Eq, PartialEq)]
249pub enum QueryAdmissionOrdering {
250    /// No caller-visible ordering is requested.
251    None,
252    /// Ordering is requested but not yet resolved into executor slots.
253    Requested,
254    /// Ordering has a planner-resolved executor contract.
255    Resolved,
256}
257
258impl QueryAdmissionOrdering {
259    /// Return a stable lowercase diagnostic label for this ordering state.
260    #[must_use]
261    pub const fn as_str(self) -> &'static str {
262        match self {
263            Self::None => "none",
264            Self::Requested => "requested",
265            Self::Resolved => "resolved",
266        }
267    }
268}
269
270/// Grouped query facts relevant to read admission.
271#[derive(Clone, Copy, Debug, Eq, PartialEq)]
272pub struct QueryAdmissionGroupedSummary {
273    group_field_count: u32,
274    aggregate_count: u32,
275    distinct_aggregate_count: u32,
276    max_groups: u64,
277    max_group_bytes: u64,
278    having_filter: bool,
279}
280
281impl QueryAdmissionGroupedSummary {
282    /// Build one grouped admission summary from planner-owned grouped facts.
283    #[must_use]
284    pub const fn new(
285        group_field_count: u32,
286        aggregate_count: u32,
287        distinct_aggregate_count: u32,
288        max_groups: u64,
289        max_group_bytes: u64,
290        having_filter: bool,
291    ) -> Self {
292        Self {
293            group_field_count,
294            aggregate_count,
295            distinct_aggregate_count,
296            max_groups,
297            max_group_bytes,
298            having_filter,
299        }
300    }
301
302    /// Return the number of GROUP BY fields.
303    #[must_use]
304    pub const fn group_field_count(self) -> u32 {
305        self.group_field_count
306    }
307
308    /// Return the number of aggregate expressions.
309    #[must_use]
310    pub const fn aggregate_count(self) -> u32 {
311        self.aggregate_count
312    }
313
314    /// Return the number of aggregate expressions with DISTINCT state.
315    #[must_use]
316    pub const fn distinct_aggregate_count(self) -> u32 {
317        self.distinct_aggregate_count
318    }
319
320    /// Return the grouped execution maximum group count.
321    #[must_use]
322    pub const fn max_groups(self) -> u64 {
323        self.max_groups
324    }
325
326    /// Return the grouped execution maximum bytes per group accumulator.
327    #[must_use]
328    pub const fn max_group_bytes(self) -> u64 {
329        self.max_group_bytes
330    }
331
332    /// Return whether the grouped plan has a HAVING residual expression.
333    #[must_use]
334    pub const fn has_having_filter(self) -> bool {
335        self.having_filter
336    }
337}
338
339/// Grouped/aggregate read admission budgets.
340#[derive(Clone, Copy, Debug, Eq, PartialEq)]
341pub(in crate::db) struct GroupedAdmissionPolicy {
342    groups: Option<NonZeroU32>,
343    group_bytes: Option<NonZeroU32>,
344    distinct_entries: Option<NonZeroU32>,
345}
346
347impl GroupedAdmissionPolicy {
348    /// Build a policy that rejects grouped reads unless a later slice enables them.
349    #[must_use]
350    pub(in crate::db) const fn disabled() -> Self {
351        Self {
352            groups: None,
353            group_bytes: None,
354            distinct_entries: None,
355        }
356    }
357
358    /// Build a grouped policy with explicit group and memory budgets.
359    #[must_use]
360    pub(in crate::db) const fn bounded(
361        max_groups: NonZeroU32,
362        max_group_bytes: NonZeroU32,
363        max_distinct_entries: Option<NonZeroU32>,
364    ) -> Self {
365        Self {
366            groups: Some(max_groups),
367            group_bytes: Some(max_group_bytes),
368            distinct_entries: max_distinct_entries,
369        }
370    }
371
372    /// Build the default grouped budget used by ordinary typed/fluent reads.
373    ///
374    /// Grouped query execution still needs matching query-owned hard limits
375    /// via `grouped_limits(...)`; this policy defines the maximum values those
376    /// limits may carry on the default read path.
377    #[must_use]
378    pub(in crate::db) fn default_bounded_read() -> Self {
379        Self::bounded(
380            non_zero_default(DEFAULT_BOUNDED_READ_MAX_GROUPS),
381            non_zero_default(DEFAULT_BOUNDED_READ_MAX_GROUP_BYTES),
382            Some(non_zero_default(DEFAULT_BOUNDED_READ_MAX_DISTINCT_ENTRIES)),
383        )
384    }
385
386    /// Return the maximum allowed output groups.
387    #[must_use]
388    pub(in crate::db) const fn max_groups(&self) -> Option<NonZeroU32> {
389        self.groups
390    }
391
392    /// Return the maximum allowed bytes per group accumulator.
393    #[must_use]
394    pub(in crate::db) const fn max_group_bytes(&self) -> Option<NonZeroU32> {
395        self.group_bytes
396    }
397
398    /// Return the maximum allowed distinct entries for distinct-style aggregates.
399    #[must_use]
400    pub(in crate::db) const fn max_distinct_entries(&self) -> Option<NonZeroU32> {
401        self.distinct_entries
402    }
403
404    /// Return whether grouped execution has the minimum hard budgets admission needs.
405    #[must_use]
406    #[cfg(test)]
407    pub(in crate::db) const fn has_hard_limits(&self) -> bool {
408        self.groups.is_some() && self.group_bytes.is_some()
409    }
410
411    /// Project this admission policy into grouped execution caps.
412    #[must_use]
413    #[cfg(all(test, feature = "sql"))]
414    pub(in crate::db) const fn execution_config(
415        &self,
416    ) -> Option<crate::db::query::plan::GroupedExecutionConfig> {
417        match (self.groups, self.group_bytes) {
418            (Some(groups), Some(group_bytes)) => Some(
419                crate::db::query::plan::GroupedExecutionConfig::with_hard_limits(
420                    groups.get() as u64,
421                    group_bytes.get() as u64,
422                ),
423            ),
424            _ => None,
425        }
426    }
427}
428
429#[derive(Clone, Copy, Debug, Eq, PartialEq)]
430enum LimitRequirement {
431    Required,
432    Optional,
433}
434
435#[derive(Clone, Copy, Debug, Eq, PartialEq)]
436enum IndexRequirement {
437    Required,
438    Optional,
439}
440
441#[derive(Clone, Copy, Debug, Eq, PartialEq)]
442enum FullScanPolicy {
443    Allow,
444    Reject,
445}
446
447#[derive(Clone, Copy, Debug, Eq, PartialEq)]
448enum MaterializedSortPolicy {
449    Allow,
450    Reject,
451}
452
453#[derive(Clone, Copy, Debug, Eq, PartialEq)]
454enum OffsetPolicy {
455    Allow,
456    RejectNonZero,
457}
458
459/// Read-admission policy attached to one query surface.
460#[derive(Clone, Debug, Eq, PartialEq)]
461pub(in crate::db) struct QueryAdmissionPolicy {
462    lane: QueryAdmissionLane,
463    limit_requirement: LimitRequirement,
464    max_returned_rows: Option<NonZeroU32>,
465    max_scanned_rows: Option<NonZeroU64>,
466    max_response_bytes: Option<NonZeroU32>,
467    index_requirement: IndexRequirement,
468    offset_policy: OffsetPolicy,
469    full_scan_policy: FullScanPolicy,
470    materialized_sort_policy: MaterializedSortPolicy,
471    max_materialized_rows: Option<NonZeroU32>,
472    max_projection_columns: Option<NonZeroU32>,
473    grouped: GroupedAdmissionPolicy,
474}
475
476impl QueryAdmissionPolicy {
477    /// Build the safe default policy for caller-facing bounded read endpoints.
478    #[must_use]
479    pub(in crate::db) const fn public_read(
480        max_returned_rows: NonZeroU32,
481        max_response_bytes: NonZeroU32,
482    ) -> Self {
483        Self {
484            lane: QueryAdmissionLane::PublicRead,
485            limit_requirement: LimitRequirement::Required,
486            max_returned_rows: Some(max_returned_rows),
487            max_scanned_rows: None,
488            max_response_bytes: Some(max_response_bytes),
489            index_requirement: IndexRequirement::Required,
490            offset_policy: OffsetPolicy::RejectNonZero,
491            full_scan_policy: FullScanPolicy::Reject,
492            materialized_sort_policy: MaterializedSortPolicy::Reject,
493            max_materialized_rows: None,
494            max_projection_columns: None,
495            grouped: GroupedAdmissionPolicy::disabled(),
496        }
497    }
498
499    /// Build the default bounded policy used by ordinary typed/fluent reads.
500    ///
501    /// The policy rejects unindexed full scans, non-zero offsets, materialized
502    /// sorts, and queries without a proven row bound. Callers that intentionally
503    /// need a broader read must use an explicitly trusted execution method or
504    /// evaluate their own policy before executing.
505    #[must_use]
506    pub(in crate::db) fn default_bounded_read() -> Self {
507        Self::public_read(
508            non_zero_default(DEFAULT_BOUNDED_READ_MAX_ROWS),
509            non_zero_default(DEFAULT_BOUNDED_READ_RESPONSE_BYTES),
510        )
511        .with_grouped_policy(GroupedAdmissionPolicy::default_bounded_read())
512    }
513
514    /// Return this policy with explicit grouped execution budgets attached.
515    ///
516    /// Public read policies still reject grouped queries unless the selected
517    /// plan is executed with matching group-count and per-group byte caps.
518    #[must_use]
519    pub(in crate::db) const fn with_grouped_policy(
520        mut self,
521        grouped: GroupedAdmissionPolicy,
522    ) -> Self {
523        self.grouped = grouped;
524        self
525    }
526
527    /// Build a trusted ad-hoc policy with explicit execution budgets.
528    #[must_use]
529    #[cfg(test)]
530    pub(in crate::db) const fn admin_ad_hoc(
531        max_returned_rows: NonZeroU32,
532        max_scanned_rows: NonZeroU64,
533        max_response_bytes: NonZeroU32,
534    ) -> Self {
535        Self {
536            lane: QueryAdmissionLane::AdminAdHoc,
537            limit_requirement: LimitRequirement::Optional,
538            max_returned_rows: Some(max_returned_rows),
539            max_scanned_rows: Some(max_scanned_rows),
540            max_response_bytes: Some(max_response_bytes),
541            index_requirement: IndexRequirement::Optional,
542            offset_policy: OffsetPolicy::Allow,
543            full_scan_policy: FullScanPolicy::Allow,
544            materialized_sort_policy: MaterializedSortPolicy::Allow,
545            max_materialized_rows: Some(max_returned_rows),
546            max_projection_columns: None,
547            grouped: GroupedAdmissionPolicy::disabled(),
548        }
549    }
550
551    /// Build an EXPLAIN-only policy that cannot execute rows.
552    #[must_use]
553    pub(in crate::db) const fn diagnostic_explain() -> Self {
554        Self {
555            lane: QueryAdmissionLane::DiagnosticExplain,
556            limit_requirement: LimitRequirement::Optional,
557            max_returned_rows: None,
558            max_scanned_rows: None,
559            max_response_bytes: None,
560            index_requirement: IndexRequirement::Optional,
561            offset_policy: OffsetPolicy::Allow,
562            full_scan_policy: FullScanPolicy::Allow,
563            materialized_sort_policy: MaterializedSortPolicy::Allow,
564            max_materialized_rows: None,
565            max_projection_columns: None,
566            grouped: GroupedAdmissionPolicy::disabled(),
567        }
568    }
569
570    /// Return the lane this policy governs.
571    #[must_use]
572    pub(in crate::db) const fn lane(&self) -> QueryAdmissionLane {
573        self.lane
574    }
575
576    /// Return whether the surface requires caller-visible LIMIT.
577    #[must_use]
578    pub(in crate::db) const fn require_limit(&self) -> bool {
579        matches!(self.limit_requirement, LimitRequirement::Required)
580    }
581
582    /// Return the maximum rows that may be returned.
583    #[must_use]
584    #[cfg(test)]
585    pub(in crate::db) const fn max_returned_rows(&self) -> Option<NonZeroU32> {
586        self.max_returned_rows
587    }
588
589    /// Return the maximum rows that may be scanned.
590    #[must_use]
591    #[cfg(test)]
592    pub(in crate::db) const fn max_scanned_rows(&self) -> Option<NonZeroU64> {
593        self.max_scanned_rows
594    }
595
596    /// Return the maximum response bytes.
597    #[must_use]
598    #[cfg(test)]
599    pub(in crate::db) const fn max_response_bytes(&self) -> Option<NonZeroU32> {
600        self.max_response_bytes
601    }
602
603    /// Return whether the selected plan must use an index-backed path.
604    #[must_use]
605    pub(in crate::db) const fn require_index(&self) -> bool {
606        matches!(self.index_requirement, IndexRequirement::Required)
607    }
608
609    /// Return whether this surface rejects non-zero OFFSET execution.
610    #[must_use]
611    pub(in crate::db) const fn reject_non_zero_offset(&self) -> bool {
612        matches!(self.offset_policy, OffsetPolicy::RejectNonZero)
613    }
614
615    /// Return whether a full entity scan may execute.
616    #[must_use]
617    pub(in crate::db) const fn allow_full_scan(&self) -> bool {
618        matches!(self.full_scan_policy, FullScanPolicy::Allow)
619    }
620
621    /// Return whether this surface permits materialized ORDER BY execution.
622    #[must_use]
623    pub(in crate::db) const fn allow_materialized_sort(&self) -> bool {
624        matches!(self.materialized_sort_policy, MaterializedSortPolicy::Allow)
625    }
626
627    /// Return the maximum rows that may be materialized for sort/projection work.
628    #[must_use]
629    #[cfg(test)]
630    pub(in crate::db) const fn max_materialized_rows(&self) -> Option<NonZeroU32> {
631        self.max_materialized_rows
632    }
633
634    /// Return grouped/aggregate budgets.
635    #[must_use]
636    #[cfg(test)]
637    pub(in crate::db) const fn grouped(&self) -> GroupedAdmissionPolicy {
638        self.grouped
639    }
640
641    /// Return whether public-read construction kept the mandatory finite caps.
642    #[must_use]
643    #[cfg(test)]
644    pub(in crate::db) const fn public_caps_are_finite(&self) -> bool {
645        !matches!(self.lane, QueryAdmissionLane::PublicRead)
646            || (self.max_returned_rows.is_some() && self.max_response_bytes.is_some())
647    }
648
649    /// Apply this policy to one already-summarized plan.
650    #[must_use]
651    pub(in crate::db) fn evaluate(
652        &self,
653        mut summary: QueryAdmissionSummary,
654    ) -> QueryAdmissionSummary {
655        summary.lane = self.lane;
656
657        match self.rejection_for_summary(&summary) {
658            Some(rejection) => summary.reject(rejection),
659            None => summary.admit(),
660        }
661    }
662
663    fn rejection_for_summary(
664        &self,
665        summary: &QueryAdmissionSummary,
666    ) -> Option<QueryAdmissionRejection> {
667        if !self.lane.executes_rows() {
668            return Some(QueryAdmissionRejection::DiagnosticLaneDoesNotExecute);
669        }
670
671        if matches!(summary.plan_shape(), QueryAdmissionPlanShape::Delete) {
672            return Some(QueryAdmissionRejection::UnsupportedStatementForQueryLane);
673        }
674
675        if let Some(rejection) = self.grouped_rejection(summary) {
676            return Some(rejection);
677        }
678
679        if !self.allow_full_scan() && summary.selected_access().is_full_scan() {
680            return Some(QueryAdmissionRejection::UnboundedFullScanRejected);
681        }
682
683        if self.require_index()
684            && !access_satisfies_index_requirement(summary.selected_access(), summary.scan_bound())
685        {
686            return Some(QueryAdmissionRejection::PublicQueryRequiresIndex);
687        }
688
689        if self.require_limit()
690            && summary.limit().is_none()
691            && summary.grouped().is_none()
692            && !summary.returned_row_bound_kind().admits_public_read()
693        {
694            return Some(QueryAdmissionRejection::PublicQueryRequiresLimit);
695        }
696
697        if self.reject_non_zero_offset() && summary.offset().unwrap_or_default() != 0 {
698            return Some(QueryAdmissionRejection::PublicQueryOffsetRejected);
699        }
700
701        if let Some(rejection) = self.returned_row_bound_rejection(summary) {
702            return Some(rejection);
703        }
704
705        if let Some(rejection) = self.scan_bound_rejection(summary) {
706            return Some(rejection);
707        }
708
709        self.materialization_rejection(summary)
710    }
711
712    fn grouped_rejection(
713        &self,
714        summary: &QueryAdmissionSummary,
715    ) -> Option<QueryAdmissionRejection> {
716        let grouped = summary.grouped()?;
717        let Some(max_groups) = self.grouped.max_groups() else {
718            return Some(QueryAdmissionRejection::GroupedQueryRequiresLimits);
719        };
720        let Some(max_group_bytes) = self.grouped.max_group_bytes() else {
721            return Some(QueryAdmissionRejection::GroupedQueryRequiresLimits);
722        };
723
724        if grouped.max_groups() == u64::MAX || grouped.max_group_bytes() == u64::MAX {
725            return Some(QueryAdmissionRejection::GroupedQueryRequiresLimits);
726        }
727
728        if grouped.max_groups() > u64::from(max_groups.get())
729            || grouped.max_group_bytes() > u64::from(max_group_bytes.get())
730        {
731            return Some(QueryAdmissionRejection::GroupedQueryExceedsBudget);
732        }
733
734        if grouped.distinct_aggregate_count() > 0 && self.grouped.max_distinct_entries().is_none() {
735            return Some(QueryAdmissionRejection::GroupedQueryRequiresLimits);
736        }
737
738        None
739    }
740
741    fn returned_row_bound_rejection(
742        &self,
743        summary: &QueryAdmissionSummary,
744    ) -> Option<QueryAdmissionRejection> {
745        let max_returned_rows = self.max_returned_rows?;
746
747        if matches!(
748            summary.returned_row_bound_kind(),
749            QueryBoundKind::EstimateOnly
750        ) {
751            return Some(QueryAdmissionRejection::EstimatedOnlyBoundRejected);
752        }
753
754        if !summary.returned_row_bound_kind().admits_public_read() {
755            return Some(QueryAdmissionRejection::ScanBoundUnavailable);
756        }
757
758        let Some(returned_row_bound) = summary.returned_row_bound() else {
759            return Some(QueryAdmissionRejection::ScanBoundUnavailable);
760        };
761
762        if returned_row_bound > max_returned_rows.get() {
763            return Some(QueryAdmissionRejection::ReturnedRowBoundExceedsPolicy);
764        }
765
766        None
767    }
768
769    fn scan_bound_rejection(
770        &self,
771        summary: &QueryAdmissionSummary,
772    ) -> Option<QueryAdmissionRejection> {
773        let max_scanned_rows = self.max_scanned_rows?;
774
775        if matches!(summary.scan_bound_kind(), QueryBoundKind::EstimateOnly) {
776            return Some(QueryAdmissionRejection::EstimatedOnlyBoundRejected);
777        }
778
779        if !summary.scan_bound_kind().admits_public_read() {
780            return Some(QueryAdmissionRejection::ScanBoundUnavailable);
781        }
782
783        let Some(scan_bound) = summary.scan_bound() else {
784            return Some(QueryAdmissionRejection::ScanBoundUnavailable);
785        };
786
787        if scan_bound > max_scanned_rows.get() {
788            return Some(QueryAdmissionRejection::ScanBoundExceedsPolicy);
789        }
790
791        None
792    }
793
794    fn materialization_rejection(
795        &self,
796        summary: &QueryAdmissionSummary,
797    ) -> Option<QueryAdmissionRejection> {
798        if !self.allow_materialized_sort() && summary.materialization().materialized_sort() {
799            return Some(QueryAdmissionRejection::SortRequiresMaterialization);
800        }
801
802        let max_materialized_rows = self.max_materialized_rows?;
803        let materialized_rows = summary.materialization().materialized_rows()?;
804
805        if materialized_rows > max_materialized_rows.get() {
806            Some(QueryAdmissionRejection::MaterializationExceedsBudget)
807        } else {
808            None
809        }
810    }
811}
812
813/// Materialization facts relevant to read admission.
814#[derive(Clone, Copy, Debug, Eq, PartialEq)]
815pub struct QueryMaterializationSummary {
816    materialized_sort: bool,
817    materialized_rows: Option<u32>,
818    row_bound_kind: QueryBoundKind,
819}
820
821impl QueryMaterializationSummary {
822    /// Build a summary for a plan that does not materialize rows for sorting.
823    #[must_use]
824    pub const fn none() -> Self {
825        Self {
826            materialized_sort: false,
827            materialized_rows: None,
828            row_bound_kind: QueryBoundKind::Unavailable,
829        }
830    }
831
832    /// Build a summary for a plan that materializes rows for sorting.
833    #[must_use]
834    pub const fn sort(materialized_rows: Option<u32>, row_bound_kind: QueryBoundKind) -> Self {
835        Self {
836            materialized_sort: true,
837            materialized_rows,
838            row_bound_kind,
839        }
840    }
841
842    /// Return whether the plan materializes rows for sorting.
843    #[must_use]
844    pub const fn materialized_sort(&self) -> bool {
845        self.materialized_sort
846    }
847
848    /// Return the row materialization bound, if known.
849    #[must_use]
850    pub const fn materialized_rows(&self) -> Option<u32> {
851        self.materialized_rows
852    }
853
854    /// Return the quality of the materialization row bound.
855    #[must_use]
856    pub const fn row_bound_kind(&self) -> QueryBoundKind {
857        self.row_bound_kind
858    }
859}
860
861/// Stable read-admission rejection reason.
862#[derive(Clone, Copy, Debug, Eq, PartialEq)]
863pub enum QueryAdmissionRejection {
864    /// Public reads require an explicit LIMIT.
865    PublicQueryRequiresLimit,
866    /// Public reads require a proven index-backed access path.
867    PublicQueryRequiresIndex,
868    /// The selected plan is an unbounded full scan.
869    UnboundedFullScanRejected,
870    /// No scan bound was available for a policy that requires one.
871    ScanBoundUnavailable,
872    /// The proven scan bound exceeds the policy.
873    ScanBoundExceedsPolicy,
874    /// Only an estimate was available for a policy that requires proof.
875    EstimatedOnlyBoundRejected,
876    /// ORDER BY requires materializing rows.
877    SortRequiresMaterialization,
878    /// Materialization exceeds the policy.
879    MaterializationExceedsBudget,
880    /// Projection bytes may exceed the response budget.
881    ProjectionResponseMayExceedLimit,
882    /// Grouped reads need explicit group and memory budgets.
883    GroupedQueryRequiresLimits,
884    /// Grouped read planning exceeds the policy.
885    GroupedQueryExceedsBudget,
886    /// Diagnostic lanes do not execute rows.
887    DiagnosticLaneDoesNotExecute,
888    /// Introspection is disabled for the selected lane.
889    IntrospectionDisabledForLane,
890    /// The statement shape is not supported by the selected lane.
891    UnsupportedStatementForQueryLane,
892    /// Public read endpoints do not permit non-zero OFFSET execution.
893    PublicQueryOffsetRejected,
894    /// The returned-row bound exceeds the selected policy.
895    ReturnedRowBoundExceedsPolicy,
896}
897
898impl QueryAdmissionRejection {
899    /// Return a stable lowercase diagnostic label for this rejection.
900    #[must_use]
901    pub const fn as_str(self) -> &'static str {
902        match self {
903            Self::PublicQueryRequiresLimit => "public_query_requires_limit",
904            Self::PublicQueryRequiresIndex => "public_query_requires_index",
905            Self::UnboundedFullScanRejected => "unbounded_full_scan_rejected",
906            Self::ScanBoundUnavailable => "scan_bound_unavailable",
907            Self::ScanBoundExceedsPolicy => "scan_bound_exceeds_policy",
908            Self::EstimatedOnlyBoundRejected => "estimated_only_bound_rejected",
909            Self::SortRequiresMaterialization => "sort_requires_materialization",
910            Self::MaterializationExceedsBudget => "materialization_exceeds_budget",
911            Self::ProjectionResponseMayExceedLimit => "projection_response_may_exceed_limit",
912            Self::GroupedQueryRequiresLimits => "grouped_query_requires_limits",
913            Self::GroupedQueryExceedsBudget => "grouped_query_exceeds_budget",
914            Self::DiagnosticLaneDoesNotExecute => "diagnostic_lane_does_not_execute",
915            Self::IntrospectionDisabledForLane => "introspection_disabled_for_lane",
916            Self::UnsupportedStatementForQueryLane => "unsupported_statement_for_query_lane",
917            Self::PublicQueryOffsetRejected => "public_query_offset_rejected",
918            Self::ReturnedRowBoundExceedsPolicy => "returned_row_bound_exceeds_policy",
919        }
920    }
921
922    /// Return the compact diagnostic detail code for this rejection.
923    #[must_use]
924    pub const fn code(self) -> QueryReadAdmissionCode {
925        match self {
926            Self::PublicQueryRequiresLimit => QueryReadAdmissionCode::PublicQueryRequiresLimit,
927            Self::PublicQueryRequiresIndex => QueryReadAdmissionCode::PublicQueryRequiresIndex,
928            Self::UnboundedFullScanRejected => QueryReadAdmissionCode::UnboundedFullScanRejected,
929            Self::ScanBoundUnavailable => QueryReadAdmissionCode::ScanBoundUnavailable,
930            Self::ScanBoundExceedsPolicy => QueryReadAdmissionCode::ScanBoundExceedsPolicy,
931            Self::EstimatedOnlyBoundRejected => QueryReadAdmissionCode::EstimatedOnlyBoundRejected,
932            Self::SortRequiresMaterialization => {
933                QueryReadAdmissionCode::SortRequiresMaterialization
934            }
935            Self::MaterializationExceedsBudget => {
936                QueryReadAdmissionCode::MaterializationExceedsBudget
937            }
938            Self::ProjectionResponseMayExceedLimit => {
939                QueryReadAdmissionCode::ProjectionResponseMayExceedLimit
940            }
941            Self::GroupedQueryRequiresLimits => QueryReadAdmissionCode::GroupedQueryRequiresLimits,
942            Self::GroupedQueryExceedsBudget => QueryReadAdmissionCode::GroupedQueryExceedsBudget,
943            Self::DiagnosticLaneDoesNotExecute => {
944                QueryReadAdmissionCode::DiagnosticLaneDoesNotExecute
945            }
946            Self::IntrospectionDisabledForLane => {
947                QueryReadAdmissionCode::IntrospectionDisabledForLane
948            }
949            Self::UnsupportedStatementForQueryLane => {
950                QueryReadAdmissionCode::UnsupportedStatementForQueryLane
951            }
952            Self::PublicQueryOffsetRejected => QueryReadAdmissionCode::PublicQueryOffsetRejected,
953            Self::ReturnedRowBoundExceedsPolicy => {
954                QueryReadAdmissionCode::ReturnedRowBoundExceedsPolicy
955            }
956        }
957    }
958
959    /// Return a compact diagnostic payload for this rejection.
960    #[must_use]
961    pub const fn diagnostic(self) -> Diagnostic {
962        Diagnostic::new(
963            DiagnosticCode::QueryReadAdmission,
964            ErrorOrigin::Query,
965            Some(DiagnosticDetail::QueryReadAdmission {
966                reason: self.code(),
967            }),
968        )
969    }
970
971    /// Return the public wire code for this rejection.
972    #[must_use]
973    pub const fn error_code(self) -> ErrorCode {
974        self.diagnostic().error_code()
975    }
976}
977
978/// Read-admission result and plan facts for diagnostics and EXPLAIN.
979#[derive(Clone, Debug, Eq, PartialEq)]
980pub struct QueryAdmissionSummary {
981    lane: QueryAdmissionLane,
982    decision: QueryAdmissionDecision,
983    plan_shape: QueryAdmissionPlanShape,
984    selected_access: QueryAdmissionAccessKind,
985    selected_index: Option<String>,
986    limit: Option<u32>,
987    offset: Option<u32>,
988    scan_bound: Option<u64>,
989    scan_bound_kind: QueryBoundKind,
990    returned_row_bound: Option<u32>,
991    returned_row_bound_kind: QueryBoundKind,
992    response_byte_bound: Option<u32>,
993    response_byte_bound_kind: QueryBoundKind,
994    residual_filter: QueryAdmissionResidualFilter,
995    ordering: QueryAdmissionOrdering,
996    grouped: Option<QueryAdmissionGroupedSummary>,
997    materialization: QueryMaterializationSummary,
998    rejection: Option<QueryAdmissionRejection>,
999}
1000
1001impl QueryAdmissionSummary {
1002    /// Build an admitted summary with unknown bound details.
1003    #[must_use]
1004    pub const fn admitted(
1005        lane: QueryAdmissionLane,
1006        selected_access: QueryAdmissionAccessKind,
1007    ) -> Self {
1008        Self {
1009            lane,
1010            decision: QueryAdmissionDecision::Admitted,
1011            plan_shape: QueryAdmissionPlanShape::ScalarRead,
1012            selected_access,
1013            selected_index: None,
1014            limit: None,
1015            offset: None,
1016            scan_bound: None,
1017            scan_bound_kind: QueryBoundKind::Unavailable,
1018            returned_row_bound: None,
1019            returned_row_bound_kind: QueryBoundKind::Unavailable,
1020            response_byte_bound: None,
1021            response_byte_bound_kind: QueryBoundKind::Unavailable,
1022            residual_filter: QueryAdmissionResidualFilter::Absent,
1023            ordering: QueryAdmissionOrdering::None,
1024            grouped: None,
1025            materialization: QueryMaterializationSummary::none(),
1026            rejection: None,
1027        }
1028    }
1029
1030    /// Build a rejected summary with unknown bound details.
1031    #[must_use]
1032    pub const fn rejected(
1033        lane: QueryAdmissionLane,
1034        selected_access: QueryAdmissionAccessKind,
1035        rejection: QueryAdmissionRejection,
1036    ) -> Self {
1037        Self {
1038            lane,
1039            decision: QueryAdmissionDecision::Rejected,
1040            plan_shape: QueryAdmissionPlanShape::ScalarRead,
1041            selected_access,
1042            selected_index: None,
1043            limit: None,
1044            offset: None,
1045            scan_bound: None,
1046            scan_bound_kind: QueryBoundKind::Unavailable,
1047            returned_row_bound: None,
1048            returned_row_bound_kind: QueryBoundKind::Unavailable,
1049            response_byte_bound: None,
1050            response_byte_bound_kind: QueryBoundKind::Unavailable,
1051            residual_filter: QueryAdmissionResidualFilter::Absent,
1052            ordering: QueryAdmissionOrdering::None,
1053            grouped: None,
1054            materialization: QueryMaterializationSummary::none(),
1055            rejection: Some(rejection),
1056        }
1057    }
1058
1059    /// Build one admitted summary from the already-selected access plan.
1060    #[must_use]
1061    pub(in crate::db) fn from_plan(lane: QueryAdmissionLane, plan: &AccessPlannedQuery) -> Self {
1062        let access = summarize_access_plan(plan);
1063        let grouped = plan.grouped_plan().map(summarize_grouped_plan);
1064        let (limit, offset) = scalar_limit_and_offset(plan.scalar_plan());
1065        let (mut returned_row_bound, mut returned_row_bound_kind) =
1066            returned_row_bound_from_plan(limit, grouped);
1067        if returned_row_bound.is_none() && grouped.is_none() {
1068            (returned_row_bound, returned_row_bound_kind) =
1069                returned_row_bound_from_exact_access(&access);
1070        }
1071        let scan_bound_kind = access.scan_bound_kind();
1072        Self {
1073            lane,
1074            decision: QueryAdmissionDecision::Admitted,
1075            plan_shape: plan_shape(plan),
1076            selected_access: access.kind,
1077            selected_index: access.selected_index,
1078            limit,
1079            offset: Some(offset),
1080            scan_bound: access.exact_scan_bound,
1081            scan_bound_kind,
1082            returned_row_bound,
1083            returned_row_bound_kind,
1084            response_byte_bound: None,
1085            response_byte_bound_kind: QueryBoundKind::Unavailable,
1086            residual_filter: admission_residual_filter(plan.residual_filter_shape()),
1087            ordering: admission_ordering(plan),
1088            grouped,
1089            materialization: QueryMaterializationSummary::none(),
1090            rejection: None,
1091        }
1092    }
1093
1094    const fn admit(mut self) -> Self {
1095        self.decision = QueryAdmissionDecision::Admitted;
1096        self.rejection = None;
1097        self
1098    }
1099
1100    const fn reject(mut self, rejection: QueryAdmissionRejection) -> Self {
1101        self.decision = QueryAdmissionDecision::Rejected;
1102        self.rejection = Some(rejection);
1103        self
1104    }
1105
1106    /// Return the admission lane.
1107    #[must_use]
1108    pub const fn lane(&self) -> QueryAdmissionLane {
1109        self.lane
1110    }
1111
1112    /// Return the final decision.
1113    #[must_use]
1114    pub const fn decision(&self) -> QueryAdmissionDecision {
1115        self.decision
1116    }
1117
1118    /// Return the scalar/grouped statement shape.
1119    #[must_use]
1120    pub const fn plan_shape(&self) -> QueryAdmissionPlanShape {
1121        self.plan_shape
1122    }
1123
1124    /// Return the selected access class.
1125    #[must_use]
1126    pub const fn selected_access(&self) -> QueryAdmissionAccessKind {
1127        self.selected_access
1128    }
1129
1130    /// Return the selected index name, if one exists.
1131    #[must_use]
1132    pub fn selected_index(&self) -> Option<&str> {
1133        self.selected_index.as_deref()
1134    }
1135
1136    /// Return the caller-visible LIMIT, if present.
1137    #[must_use]
1138    pub const fn limit(&self) -> Option<u32> {
1139        self.limit
1140    }
1141
1142    /// Return the caller-visible OFFSET, if present.
1143    #[must_use]
1144    pub const fn offset(&self) -> Option<u32> {
1145        self.offset
1146    }
1147
1148    /// Return the scan bound, if known.
1149    #[must_use]
1150    pub const fn scan_bound(&self) -> Option<u64> {
1151        self.scan_bound
1152    }
1153
1154    /// Return the quality of the scan bound.
1155    #[must_use]
1156    pub const fn scan_bound_kind(&self) -> QueryBoundKind {
1157        self.scan_bound_kind
1158    }
1159
1160    /// Return the returned-row bound, if known.
1161    #[must_use]
1162    pub const fn returned_row_bound(&self) -> Option<u32> {
1163        self.returned_row_bound
1164    }
1165
1166    /// Return the quality of the returned-row bound.
1167    #[must_use]
1168    pub const fn returned_row_bound_kind(&self) -> QueryBoundKind {
1169        self.returned_row_bound_kind
1170    }
1171
1172    /// Return the response-byte bound, if known.
1173    #[must_use]
1174    pub const fn response_byte_bound(&self) -> Option<u32> {
1175        self.response_byte_bound
1176    }
1177
1178    /// Return the quality of the response-byte bound.
1179    #[must_use]
1180    pub const fn response_byte_bound_kind(&self) -> QueryBoundKind {
1181        self.response_byte_bound_kind
1182    }
1183
1184    /// Return post-access residual filter facts.
1185    #[must_use]
1186    pub const fn residual_filter(&self) -> QueryAdmissionResidualFilter {
1187        self.residual_filter
1188    }
1189
1190    /// Return ORDER BY facts.
1191    #[must_use]
1192    pub const fn ordering(&self) -> QueryAdmissionOrdering {
1193        self.ordering
1194    }
1195
1196    /// Return grouped query facts, if this is a grouped plan.
1197    #[must_use]
1198    pub const fn grouped(&self) -> Option<QueryAdmissionGroupedSummary> {
1199        self.grouped
1200    }
1201
1202    /// Return materialization facts.
1203    #[must_use]
1204    pub const fn materialization(&self) -> QueryMaterializationSummary {
1205        self.materialization
1206    }
1207
1208    /// Return a copy of this summary with route-derived materialization facts attached.
1209    #[must_use]
1210    #[cfg_attr(not(feature = "sql"), allow(dead_code))]
1211    pub(in crate::db) const fn with_materialization(
1212        mut self,
1213        materialization: QueryMaterializationSummary,
1214    ) -> Self {
1215        self.materialization = materialization;
1216        self
1217    }
1218
1219    /// Return the rejection reason, when the decision is rejected.
1220    #[must_use]
1221    pub const fn rejection(&self) -> Option<QueryAdmissionRejection> {
1222        self.rejection
1223    }
1224
1225    /// Render this summary as a stable top-level verbose EXPLAIN block.
1226    #[must_use]
1227    pub(in crate::db) fn render_text_block(&self) -> String {
1228        let mut out = String::from("admission:");
1229        push_text_field(&mut out, "lane", self.lane().as_str());
1230        push_text_field(&mut out, "decision", self.decision().as_str());
1231        push_text_field(
1232            &mut out,
1233            "reason",
1234            self.rejection()
1235                .map_or("none", QueryAdmissionRejection::as_str),
1236        );
1237        push_text_field(&mut out, "plan_shape", self.plan_shape().as_str());
1238        push_text_field(&mut out, "selected_access", self.selected_access().as_str());
1239        push_text_field(
1240            &mut out,
1241            "selected_index",
1242            self.selected_index().unwrap_or("none"),
1243        );
1244        push_text_option_u32(&mut out, "limit", self.limit());
1245        push_text_option_u32(&mut out, "offset", self.offset());
1246        push_text_option_u64(&mut out, "scan_bound", self.scan_bound());
1247        push_text_field(&mut out, "scan_bound_kind", self.scan_bound_kind().as_str());
1248        push_text_option_u32(&mut out, "returned_row_bound", self.returned_row_bound());
1249        push_text_field(
1250            &mut out,
1251            "returned_row_bound_kind",
1252            self.returned_row_bound_kind().as_str(),
1253        );
1254        push_text_option_u32(&mut out, "response_byte_bound", self.response_byte_bound());
1255        push_text_field(
1256            &mut out,
1257            "response_byte_bound_kind",
1258            self.response_byte_bound_kind().as_str(),
1259        );
1260        push_text_field(&mut out, "residual_filter", self.residual_filter().as_str());
1261        push_text_field(&mut out, "ordering", self.ordering().as_str());
1262        push_text_bool(
1263            &mut out,
1264            "materialized_sort",
1265            self.materialization().materialized_sort(),
1266        );
1267        push_text_option_u32(
1268            &mut out,
1269            "materialized_rows",
1270            self.materialization().materialized_rows(),
1271        );
1272        push_text_field(
1273            &mut out,
1274            "materialized_row_bound_kind",
1275            self.materialization().row_bound_kind().as_str(),
1276        );
1277
1278        if let Some(grouped) = self.grouped() {
1279            push_text_bool(&mut out, "grouped", true);
1280            push_text_u64(
1281                &mut out,
1282                "group_field_count",
1283                u64::from(grouped.group_field_count()),
1284            );
1285            push_text_u64(
1286                &mut out,
1287                "aggregate_count",
1288                u64::from(grouped.aggregate_count()),
1289            );
1290            push_text_u64(
1291                &mut out,
1292                "distinct_aggregate_count",
1293                u64::from(grouped.distinct_aggregate_count()),
1294            );
1295            push_text_u64(&mut out, "max_groups", grouped.max_groups());
1296            push_text_u64(&mut out, "max_group_bytes", grouped.max_group_bytes());
1297            push_text_bool(&mut out, "having_filter", grouped.has_having_filter());
1298        } else {
1299            push_text_bool(&mut out, "grouped", false);
1300        }
1301
1302        out
1303    }
1304}
1305
1306fn push_text_field(out: &mut String, key: &str, value: &str) {
1307    out.push('\n');
1308    out.push_str("  ");
1309    out.push_str(key);
1310    out.push('=');
1311    out.push_str(value);
1312}
1313
1314fn push_text_bool(out: &mut String, key: &str, value: bool) {
1315    push_text_field(out, key, if value { "true" } else { "false" });
1316}
1317
1318fn push_text_u64(out: &mut String, key: &str, value: u64) {
1319    out.push('\n');
1320    out.push_str("  ");
1321    out.push_str(key);
1322    out.push('=');
1323    let _ = write!(out, "{value}");
1324}
1325
1326fn push_text_option_u32(out: &mut String, key: &str, value: Option<u32>) {
1327    match value {
1328        Some(value) => push_text_u64(out, key, u64::from(value)),
1329        None => push_text_field(out, key, "none"),
1330    }
1331}
1332
1333fn push_text_option_u64(out: &mut String, key: &str, value: Option<u64>) {
1334    match value {
1335        Some(value) => push_text_u64(out, key, value),
1336        None => push_text_field(out, key, "none"),
1337    }
1338}
1339
1340// Keep the staged extractor live before admission enforcement calls it directly.
1341const _: fn(QueryAdmissionLane, &AccessPlannedQuery) -> QueryAdmissionSummary =
1342    QueryAdmissionSummary::from_plan;
1343
1344const fn access_satisfies_index_requirement(
1345    kind: QueryAdmissionAccessKind,
1346    scan_bound: Option<u64>,
1347) -> bool {
1348    kind.is_secondary_index()
1349        || matches!(
1350            (kind, scan_bound),
1351            (
1352                QueryAdmissionAccessKind::ByKey | QueryAdmissionAccessKind::ByKeys,
1353                Some(_)
1354            )
1355        )
1356}
1357
1358struct AdmissionAccessProjection;
1359
1360#[derive(Clone, Debug, Eq, PartialEq)]
1361struct AdmissionAccessSummary {
1362    kind: QueryAdmissionAccessKind,
1363    selected_index: Option<String>,
1364    exact_scan_bound: Option<u64>,
1365}
1366
1367impl AdmissionAccessSummary {
1368    const fn non_index(kind: QueryAdmissionAccessKind, exact_scan_bound: Option<u64>) -> Self {
1369        Self {
1370            kind,
1371            selected_index: None,
1372            exact_scan_bound,
1373        }
1374    }
1375
1376    fn secondary_index(kind: QueryAdmissionAccessKind, index_name: &str) -> Self {
1377        Self {
1378            kind,
1379            selected_index: Some(index_name.to_string()),
1380            exact_scan_bound: None,
1381        }
1382    }
1383
1384    const fn composite(kind: QueryAdmissionAccessKind) -> Self {
1385        Self {
1386            kind,
1387            selected_index: None,
1388            exact_scan_bound: None,
1389        }
1390    }
1391
1392    const fn scan_bound_kind(&self) -> QueryBoundKind {
1393        if self.exact_scan_bound.is_some() {
1394            QueryBoundKind::Exact
1395        } else {
1396            QueryBoundKind::Unavailable
1397        }
1398    }
1399}
1400
1401impl AccessPlanProjection<Value> for AdmissionAccessProjection {
1402    type Output = AdmissionAccessSummary;
1403
1404    fn by_key(&mut self, _key: &Value) -> Self::Output {
1405        AdmissionAccessSummary::non_index(QueryAdmissionAccessKind::ByKey, Some(1))
1406    }
1407
1408    fn by_keys(&mut self, keys: &[Value]) -> Self::Output {
1409        AdmissionAccessSummary::non_index(
1410            QueryAdmissionAccessKind::ByKeys,
1411            Some(u64::try_from(keys.len()).unwrap_or(u64::MAX)),
1412        )
1413    }
1414
1415    fn key_range(&mut self, _start: &Value, _end: &Value) -> Self::Output {
1416        AdmissionAccessSummary::non_index(QueryAdmissionAccessKind::KeyRange, None)
1417    }
1418
1419    fn index_prefix(
1420        &mut self,
1421        index_name: &str,
1422        _index_fields: &[String],
1423        _prefix_len: usize,
1424        _values: &[Value],
1425    ) -> Self::Output {
1426        AdmissionAccessSummary::secondary_index(QueryAdmissionAccessKind::IndexPrefix, index_name)
1427    }
1428
1429    fn index_multi_lookup(
1430        &mut self,
1431        index_name: &str,
1432        _index_fields: &[String],
1433        _values: &[Value],
1434    ) -> Self::Output {
1435        AdmissionAccessSummary::secondary_index(
1436            QueryAdmissionAccessKind::IndexMultiLookup,
1437            index_name,
1438        )
1439    }
1440
1441    fn index_branch_set(
1442        &mut self,
1443        index_name: &str,
1444        _index_fields: &[String],
1445        _fixed_values: &[Value],
1446        _branch_values: &[Value],
1447        _ordered_suffix: IndexBranchSetOrderedSuffix,
1448    ) -> Self::Output {
1449        AdmissionAccessSummary::secondary_index(
1450            QueryAdmissionAccessKind::IndexBranchSet,
1451            index_name,
1452        )
1453    }
1454
1455    fn index_range(
1456        &mut self,
1457        index_name: &str,
1458        _index_fields: &[String],
1459        _prefix_len: usize,
1460        _prefix: &[Value],
1461        _lower: &Bound<Value>,
1462        _upper: &Bound<Value>,
1463    ) -> Self::Output {
1464        AdmissionAccessSummary::secondary_index(QueryAdmissionAccessKind::IndexRange, index_name)
1465    }
1466
1467    fn full_scan(&mut self) -> Self::Output {
1468        AdmissionAccessSummary::non_index(QueryAdmissionAccessKind::FullScan, None)
1469    }
1470
1471    fn union(&mut self, _children: Vec<Self::Output>) -> Self::Output {
1472        AdmissionAccessSummary::composite(QueryAdmissionAccessKind::Union)
1473    }
1474
1475    fn intersection(&mut self, _children: Vec<Self::Output>) -> Self::Output {
1476        AdmissionAccessSummary::composite(QueryAdmissionAccessKind::Intersection)
1477    }
1478}
1479
1480fn summarize_access_plan(plan: &AccessPlannedQuery) -> AdmissionAccessSummary {
1481    project_access_plan(&plan.access, &mut AdmissionAccessProjection)
1482}
1483
1484fn summarize_grouped_plan(plan: &GroupPlan) -> QueryAdmissionGroupedSummary {
1485    QueryAdmissionGroupedSummary::new(
1486        u32::try_from(plan.group.group_fields.len()).unwrap_or(u32::MAX),
1487        u32::try_from(plan.group.aggregates.len()).unwrap_or(u32::MAX),
1488        u32::try_from(
1489            plan.group
1490                .aggregates
1491                .iter()
1492                .filter(|aggregate| aggregate.distinct)
1493                .count(),
1494        )
1495        .unwrap_or(u32::MAX),
1496        plan.group.execution.max_groups(),
1497        plan.group.execution.max_group_bytes(),
1498        plan.having_expr.is_some(),
1499    )
1500}
1501
1502const fn scalar_limit_and_offset(plan: &ScalarPlan) -> (Option<u32>, u32) {
1503    match plan.mode {
1504        QueryMode::Load(load) => match &plan.page {
1505            Some(page) => (page.limit, page.offset),
1506            None => (load.limit(), load.offset()),
1507        },
1508        QueryMode::Delete(delete) => match plan.delete_limit {
1509            Some(delete_limit) => (delete_limit.limit, delete_limit.offset),
1510            None => (delete.limit(), delete.offset()),
1511        },
1512    }
1513}
1514
1515fn returned_row_bound_from_plan(
1516    limit: Option<u32>,
1517    grouped: Option<QueryAdmissionGroupedSummary>,
1518) -> (Option<u32>, QueryBoundKind) {
1519    if let Some(limit) = limit {
1520        return (Some(limit), QueryBoundKind::EnforcedRuntimeCap);
1521    }
1522
1523    let Some(grouped) = grouped else {
1524        return (None, QueryBoundKind::Unavailable);
1525    };
1526    if grouped.max_groups() == u64::MAX {
1527        return (None, QueryBoundKind::Unavailable);
1528    }
1529
1530    (
1531        Some(u32::try_from(grouped.max_groups()).unwrap_or(u32::MAX)),
1532        QueryBoundKind::ConservativeUpperBound,
1533    )
1534}
1535
1536fn returned_row_bound_from_exact_access(
1537    access: &AdmissionAccessSummary,
1538) -> (Option<u32>, QueryBoundKind) {
1539    match (access.kind, access.exact_scan_bound) {
1540        (QueryAdmissionAccessKind::ByKey | QueryAdmissionAccessKind::ByKeys, Some(bound)) => (
1541            Some(clamp_u64_to_u32(bound)),
1542            QueryBoundKind::ConservativeUpperBound,
1543        ),
1544        _ => (None, QueryBoundKind::Unavailable),
1545    }
1546}
1547
1548fn clamp_u64_to_u32(value: u64) -> u32 {
1549    u32::try_from(value).unwrap_or(u32::MAX)
1550}
1551
1552const fn admission_residual_filter(shape: ResidualFilterShape) -> QueryAdmissionResidualFilter {
1553    match shape {
1554        ResidualFilterShape::Absent => QueryAdmissionResidualFilter::Absent,
1555        ResidualFilterShape::Predicate => QueryAdmissionResidualFilter::Predicate,
1556        ResidualFilterShape::Expression => QueryAdmissionResidualFilter::Expression,
1557        ResidualFilterShape::ExpressionAndPredicate => {
1558            QueryAdmissionResidualFilter::ExpressionAndPredicate
1559        }
1560    }
1561}
1562
1563fn admission_ordering(plan: &AccessPlannedQuery) -> QueryAdmissionOrdering {
1564    if plan.scalar_plan().order.is_none() {
1565        return QueryAdmissionOrdering::None;
1566    }
1567
1568    if plan.resolved_order().is_some() {
1569        QueryAdmissionOrdering::Resolved
1570    } else {
1571        QueryAdmissionOrdering::Requested
1572    }
1573}
1574
1575const fn plan_shape(plan: &AccessPlannedQuery) -> QueryAdmissionPlanShape {
1576    if plan.grouped_plan().is_some() {
1577        return QueryAdmissionPlanShape::GroupedAggregate;
1578    }
1579
1580    match plan.scalar_plan().mode {
1581        QueryMode::Load(_) => QueryAdmissionPlanShape::ScalarRead,
1582        QueryMode::Delete(_) => QueryAdmissionPlanShape::Delete,
1583    }
1584}
1585
1586#[cfg(test)]
1587mod tests {
1588    use std::num::{NonZeroU32, NonZeroU64};
1589
1590    use crate::{
1591        db::{
1592            access::{AccessPath, SemanticIndexAccessContract},
1593            predicate::{MissingRowPolicy, Predicate},
1594            query::plan::{
1595                AccessPlannedQuery, AggregateKind, DeleteLimitSpec, DeleteSpec, FieldSlot,
1596                GroupAggregateSpec, GroupSpec, GroupedExecutionConfig, OrderDirection, OrderSpec,
1597                OrderTerm, PageSpec, QueryMode,
1598                expr::{Expr, FieldId},
1599            },
1600        },
1601        model::index::IndexModel,
1602        value::Value,
1603    };
1604
1605    use super::{
1606        GroupedAdmissionPolicy, QueryAdmissionAccessKind, QueryAdmissionDecision,
1607        QueryAdmissionLane, QueryAdmissionOrdering, QueryAdmissionPlanShape, QueryAdmissionPolicy,
1608        QueryAdmissionRejection, QueryAdmissionResidualFilter, QueryAdmissionSummary,
1609        QueryBoundKind, QueryMaterializationSummary,
1610    };
1611
1612    const ADMISSION_INDEX_FIELDS: [&str; 1] = ["tag"];
1613    const ADMISSION_INDEX: IndexModel = IndexModel::generated(
1614        "admission::tag",
1615        "admission::tag_store",
1616        &ADMISSION_INDEX_FIELDS,
1617        false,
1618    );
1619
1620    #[test]
1621    fn public_read_policy_has_safe_finite_defaults() {
1622        let max_rows = NonZeroU32::new(50).expect("test max rows is non-zero");
1623        let max_bytes = NonZeroU32::new(32_768).expect("test max bytes is non-zero");
1624        let policy = QueryAdmissionPolicy::public_read(max_rows, max_bytes);
1625
1626        assert_eq!(policy.lane(), QueryAdmissionLane::PublicRead);
1627        assert!(policy.require_limit());
1628        assert!(policy.require_index());
1629        assert!(policy.reject_non_zero_offset());
1630        assert!(!policy.allow_full_scan());
1631        assert!(!policy.allow_materialized_sort());
1632        assert_eq!(policy.max_returned_rows(), Some(max_rows));
1633        assert_eq!(policy.max_response_bytes(), Some(max_bytes));
1634        assert!(policy.public_caps_are_finite());
1635        assert!(!policy.grouped().has_hard_limits());
1636    }
1637
1638    #[test]
1639    fn admin_policy_is_broader_but_still_budgeted() {
1640        let max_rows = NonZeroU32::new(100).expect("test max rows is non-zero");
1641        let max_scanned = NonZeroU64::new(1_000).expect("test scan cap is non-zero");
1642        let max_bytes = NonZeroU32::new(65_536).expect("test max bytes is non-zero");
1643        let policy = QueryAdmissionPolicy::admin_ad_hoc(max_rows, max_scanned, max_bytes);
1644
1645        assert_eq!(policy.lane(), QueryAdmissionLane::AdminAdHoc);
1646        assert!(!policy.require_limit());
1647        assert!(!policy.require_index());
1648        assert!(policy.allow_full_scan());
1649        assert!(policy.allow_materialized_sort());
1650        assert_eq!(policy.max_scanned_rows(), Some(max_scanned));
1651        assert_eq!(policy.max_materialized_rows(), Some(max_rows));
1652    }
1653
1654    #[test]
1655    fn diagnostic_explain_lane_does_not_execute_rows() {
1656        let policy = QueryAdmissionPolicy::diagnostic_explain();
1657
1658        assert_eq!(policy.lane().as_str(), "diagnostic_explain");
1659        assert!(!policy.lane().executes_rows());
1660    }
1661
1662    #[test]
1663    fn grouped_policy_requires_group_and_memory_budgets() {
1664        let max_groups = NonZeroU32::new(8).expect("test group cap is non-zero");
1665        let max_bytes = NonZeroU32::new(4096).expect("test byte cap is non-zero");
1666        let policy = GroupedAdmissionPolicy::bounded(max_groups, max_bytes, None);
1667
1668        assert!(policy.has_hard_limits());
1669        assert_eq!(policy.max_groups(), Some(max_groups));
1670        assert_eq!(policy.max_group_bytes(), Some(max_bytes));
1671    }
1672
1673    #[test]
1674    fn only_proven_or_enforced_bounds_admit_public_reads() {
1675        assert!(QueryBoundKind::Exact.admits_public_read());
1676        assert!(QueryBoundKind::ConservativeUpperBound.admits_public_read());
1677        assert!(QueryBoundKind::EnforcedRuntimeCap.admits_public_read());
1678        assert!(!QueryBoundKind::EstimateOnly.admits_public_read());
1679        assert!(!QueryBoundKind::Unavailable.admits_public_read());
1680    }
1681
1682    #[test]
1683    fn access_kind_classifies_secondary_indexes_and_full_scans() {
1684        assert!(QueryAdmissionAccessKind::IndexPrefix.is_secondary_index());
1685        assert!(QueryAdmissionAccessKind::FullScan.is_full_scan());
1686        assert!(!QueryAdmissionAccessKind::ByKey.is_secondary_index());
1687    }
1688
1689    #[test]
1690    fn rejection_maps_to_stable_diagnostic() {
1691        let rejection = QueryAdmissionRejection::PublicQueryRequiresLimit;
1692        let diagnostic = rejection.diagnostic();
1693
1694        assert_eq!(
1695            rejection.error_code(),
1696            icydb_diagnostic_code::ErrorCode::QUERY_READ_PUBLIC_REQUIRES_LIMIT
1697        );
1698        assert_eq!(
1699            diagnostic.code(),
1700            icydb_diagnostic_code::DiagnosticCode::QueryReadAdmission
1701        );
1702    }
1703
1704    #[test]
1705    fn summaries_keep_decision_and_rejection_aligned() {
1706        let admitted = QueryAdmissionSummary::admitted(
1707            QueryAdmissionLane::PublicRead,
1708            QueryAdmissionAccessKind::ByKey,
1709        );
1710        let rejected = QueryAdmissionSummary::rejected(
1711            QueryAdmissionLane::PublicRead,
1712            QueryAdmissionAccessKind::FullScan,
1713            QueryAdmissionRejection::UnboundedFullScanRejected,
1714        );
1715
1716        assert_eq!(admitted.decision(), QueryAdmissionDecision::Admitted);
1717        assert_eq!(admitted.rejection(), None);
1718        assert_eq!(rejected.decision(), QueryAdmissionDecision::Rejected);
1719        assert_eq!(
1720            rejected.rejection(),
1721            Some(QueryAdmissionRejection::UnboundedFullScanRejected)
1722        );
1723    }
1724
1725    #[test]
1726    fn admission_summary_renders_stable_verbose_explain_block() {
1727        let summary = QueryAdmissionSummary::rejected(
1728            QueryAdmissionLane::PublicRead,
1729            QueryAdmissionAccessKind::FullScan,
1730            QueryAdmissionRejection::UnboundedFullScanRejected,
1731        );
1732
1733        let rendered = summary.render_text_block();
1734
1735        assert!(
1736            rendered.starts_with("admission:\n  lane=public_read\n  decision=rejected"),
1737            "admission block should start with stable lane and decision fields: {rendered}",
1738        );
1739        assert!(
1740            rendered.contains("\n  reason=unbounded_full_scan_rejected"),
1741            "admission block should include a stable rejection reason: {rendered}",
1742        );
1743        assert!(
1744            rendered.contains("\n  selected_access=full_scan"),
1745            "admission block should include the selected access class: {rendered}",
1746        );
1747        assert!(
1748            rendered.contains("\n  grouped=false"),
1749            "admission block should include grouped classification: {rendered}",
1750        );
1751    }
1752
1753    #[test]
1754    fn plan_summary_classifies_full_scan_without_overclaiming_bounds() {
1755        let plan = AccessPlannedQuery::new(AccessPath::<Value>::FullScan, MissingRowPolicy::Ignore);
1756
1757        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan);
1758
1759        assert_eq!(summary.plan_shape(), QueryAdmissionPlanShape::ScalarRead);
1760        assert_eq!(
1761            summary.selected_access(),
1762            QueryAdmissionAccessKind::FullScan
1763        );
1764        assert_eq!(summary.selected_index(), None);
1765        assert_eq!(summary.limit(), None);
1766        assert_eq!(summary.offset(), Some(0));
1767        assert_eq!(summary.scan_bound(), None);
1768        assert_eq!(summary.scan_bound_kind(), QueryBoundKind::Unavailable);
1769        assert_eq!(summary.returned_row_bound(), None);
1770        assert_eq!(
1771            summary.returned_row_bound_kind(),
1772            QueryBoundKind::Unavailable
1773        );
1774        assert_eq!(
1775            summary.residual_filter(),
1776            QueryAdmissionResidualFilter::Absent
1777        );
1778        assert_eq!(summary.ordering(), QueryAdmissionOrdering::None);
1779    }
1780
1781    #[test]
1782    fn plan_summary_uses_point_lookup_and_limit_as_proven_bounds() {
1783        let mut plan =
1784            AccessPlannedQuery::new(AccessPath::ByKey(Value::Nat64(7)), MissingRowPolicy::Ignore);
1785        plan.scalar_plan_mut().page = Some(PageSpec {
1786            limit: Some(5),
1787            offset: 2,
1788        });
1789
1790        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan);
1791
1792        assert_eq!(summary.selected_access(), QueryAdmissionAccessKind::ByKey);
1793        assert_eq!(summary.limit(), Some(5));
1794        assert_eq!(summary.offset(), Some(2));
1795        assert_eq!(summary.scan_bound(), Some(1));
1796        assert_eq!(summary.scan_bound_kind(), QueryBoundKind::Exact);
1797        assert_eq!(summary.returned_row_bound(), Some(5));
1798        assert_eq!(
1799            summary.returned_row_bound_kind(),
1800            QueryBoundKind::EnforcedRuntimeCap
1801        );
1802    }
1803
1804    #[test]
1805    fn plan_summary_uses_exact_primary_key_access_as_returned_row_bound_without_limit() {
1806        let plan =
1807            AccessPlannedQuery::new(AccessPath::ByKey(Value::Nat64(7)), MissingRowPolicy::Ignore);
1808
1809        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan);
1810
1811        assert_eq!(summary.selected_access(), QueryAdmissionAccessKind::ByKey);
1812        assert_eq!(summary.limit(), None);
1813        assert_eq!(summary.scan_bound(), Some(1));
1814        assert_eq!(summary.scan_bound_kind(), QueryBoundKind::Exact);
1815        assert_eq!(summary.returned_row_bound(), Some(1));
1816        assert_eq!(
1817            summary.returned_row_bound_kind(),
1818            QueryBoundKind::ConservativeUpperBound
1819        );
1820    }
1821
1822    #[test]
1823    fn plan_summary_uses_exact_primary_key_set_as_returned_row_bound_without_limit() {
1824        let plan = AccessPlannedQuery::new(
1825            AccessPath::ByKeys(vec![Value::Nat64(7), Value::Nat64(8)]),
1826            MissingRowPolicy::Ignore,
1827        );
1828
1829        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan);
1830
1831        assert_eq!(summary.selected_access(), QueryAdmissionAccessKind::ByKeys);
1832        assert_eq!(summary.limit(), None);
1833        assert_eq!(summary.scan_bound(), Some(2));
1834        assert_eq!(summary.scan_bound_kind(), QueryBoundKind::Exact);
1835        assert_eq!(summary.returned_row_bound(), Some(2));
1836        assert_eq!(
1837            summary.returned_row_bound_kind(),
1838            QueryBoundKind::ConservativeUpperBound
1839        );
1840    }
1841
1842    #[test]
1843    fn plan_summary_preserves_selected_index_identity() {
1844        let plan = AccessPlannedQuery::new(
1845            AccessPath::IndexPrefix {
1846                index: SemanticIndexAccessContract::model_only_from_generated_index(
1847                    ADMISSION_INDEX,
1848                ),
1849                values: vec![Value::Text("alpha".to_string())],
1850            },
1851            MissingRowPolicy::Ignore,
1852        );
1853
1854        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan);
1855
1856        assert_eq!(
1857            summary.selected_access(),
1858            QueryAdmissionAccessKind::IndexPrefix
1859        );
1860        assert_eq!(summary.selected_index(), Some("admission::tag"));
1861        assert_eq!(summary.scan_bound(), None);
1862        assert_eq!(summary.scan_bound_kind(), QueryBoundKind::Unavailable);
1863    }
1864
1865    #[test]
1866    fn plan_summary_classifies_residual_and_requested_ordering() {
1867        let mut plan =
1868            AccessPlannedQuery::new(AccessPath::<Value>::FullScan, MissingRowPolicy::Ignore);
1869        plan.scalar_plan_mut().predicate = Some(Predicate::eq(
1870            "tag".to_string(),
1871            Value::Text("alpha".to_string()),
1872        ));
1873        plan.scalar_plan_mut().order = Some(OrderSpec {
1874            fields: vec![OrderTerm::field("tag", OrderDirection::Asc)],
1875        });
1876
1877        let summary = QueryAdmissionSummary::from_plan(QueryAdmissionLane::AdminAdHoc, &plan);
1878
1879        assert_eq!(
1880            summary.residual_filter(),
1881            QueryAdmissionResidualFilter::Predicate
1882        );
1883        assert_eq!(summary.ordering(), QueryAdmissionOrdering::Requested);
1884        assert!(!summary.materialization().materialized_sort());
1885        assert_eq!(summary.materialization().materialized_rows(), None);
1886        assert_eq!(
1887            summary.materialization().row_bound_kind(),
1888            QueryBoundKind::Unavailable
1889        );
1890    }
1891
1892    #[test]
1893    fn plan_summary_carries_grouped_execution_budgets() {
1894        let grouped =
1895            AccessPlannedQuery::new(AccessPath::<Value>::FullScan, MissingRowPolicy::Ignore)
1896                .into_grouped_with_having_expr(
1897                    GroupSpec {
1898                        group_fields: vec![FieldSlot::from_test_slot(0, "tag")],
1899                        aggregates: vec![GroupAggregateSpec {
1900                            kind: AggregateKind::Count,
1901                            input_expr: None,
1902                            filter_expr: None,
1903                            distinct: false,
1904                        }],
1905                        execution: GroupedExecutionConfig::with_hard_limits(12, 4096),
1906                    },
1907                    Some(Expr::Field(FieldId::new("tag"))),
1908                );
1909
1910        let summary =
1911            QueryAdmissionSummary::from_plan(QueryAdmissionLane::DiagnosticExplain, &grouped);
1912        let grouped = summary
1913            .grouped()
1914            .expect("summary should include grouped facts");
1915
1916        assert_eq!(
1917            summary.plan_shape(),
1918            QueryAdmissionPlanShape::GroupedAggregate
1919        );
1920        assert_eq!(grouped.group_field_count(), 1);
1921        assert_eq!(grouped.aggregate_count(), 1);
1922        assert_eq!(grouped.distinct_aggregate_count(), 0);
1923        assert_eq!(grouped.max_groups(), 12);
1924        assert_eq!(grouped.max_group_bytes(), 4096);
1925        assert!(grouped.has_having_filter());
1926        assert_eq!(summary.returned_row_bound(), Some(12));
1927        assert_eq!(
1928            summary.returned_row_bound_kind(),
1929            QueryBoundKind::ConservativeUpperBound
1930        );
1931    }
1932
1933    #[test]
1934    fn plan_summary_reads_delete_window_without_executing_it() {
1935        let mut plan =
1936            AccessPlannedQuery::new(AccessPath::<Value>::FullScan, MissingRowPolicy::Ignore);
1937        plan.scalar_plan_mut().mode = QueryMode::Delete(DeleteSpec::new());
1938        plan.scalar_plan_mut().delete_limit = Some(DeleteLimitSpec {
1939            limit: Some(3),
1940            offset: 1,
1941        });
1942
1943        let summary =
1944            QueryAdmissionSummary::from_plan(QueryAdmissionLane::DiagnosticExplain, &plan);
1945
1946        assert_eq!(summary.plan_shape(), QueryAdmissionPlanShape::Delete);
1947        assert_eq!(summary.limit(), Some(3));
1948        assert_eq!(summary.offset(), Some(1));
1949        assert_eq!(summary.returned_row_bound(), Some(3));
1950    }
1951
1952    #[test]
1953    fn public_read_evaluation_rejects_missing_limit_before_access_shape() {
1954        let policy = public_read_policy();
1955        let summary = summary_for_index_prefix(None, 0);
1956
1957        let evaluated = policy.evaluate(summary);
1958
1959        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1960        assert_eq!(
1961            evaluated.rejection(),
1962            Some(QueryAdmissionRejection::PublicQueryRequiresLimit)
1963        );
1964    }
1965
1966    #[test]
1967    fn public_read_evaluation_rejects_full_scan_even_with_limit() {
1968        let policy = public_read_policy();
1969        let summary = summary_for_path(AccessPath::<Value>::FullScan, Some(5), 0);
1970
1971        let evaluated = policy.evaluate(summary);
1972
1973        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
1974        assert_eq!(
1975            evaluated.rejection(),
1976            Some(QueryAdmissionRejection::UnboundedFullScanRejected)
1977        );
1978    }
1979
1980    #[test]
1981    fn public_read_evaluation_admits_indexed_bounded_scalar_read() {
1982        let policy = public_read_policy();
1983        let summary = summary_for_index_prefix(Some(5), 0);
1984
1985        let evaluated = policy.evaluate(summary);
1986
1987        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Admitted);
1988        assert_eq!(evaluated.rejection(), None);
1989    }
1990
1991    #[test]
1992    fn public_read_evaluation_admits_exact_primary_key_read() {
1993        let policy = public_read_policy();
1994        let summary = summary_for_path(
1995            AccessPath::ByKey(Value::Text("primary".to_string())),
1996            None,
1997            0,
1998        );
1999
2000        let evaluated = policy.evaluate(summary);
2001
2002        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Admitted);
2003        assert_eq!(evaluated.limit(), None);
2004        assert_eq!(evaluated.scan_bound(), Some(1));
2005        assert_eq!(evaluated.returned_row_bound(), Some(1));
2006    }
2007
2008    #[test]
2009    fn public_read_evaluation_rejects_primary_key_set_above_returned_row_policy() {
2010        let policy = public_read_policy();
2011        let keys = (0..=50).map(Value::Nat64).collect();
2012        let summary = summary_for_path(AccessPath::ByKeys(keys), None, 0);
2013
2014        let evaluated = policy.evaluate(summary);
2015
2016        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2017        assert_eq!(
2018            evaluated.rejection(),
2019            Some(QueryAdmissionRejection::ReturnedRowBoundExceedsPolicy)
2020        );
2021    }
2022
2023    #[test]
2024    fn public_read_evaluation_rejects_non_zero_offset() {
2025        let policy = public_read_policy();
2026        let summary = summary_for_index_prefix(Some(5), 1);
2027
2028        let evaluated = policy.evaluate(summary);
2029
2030        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2031        assert_eq!(
2032            evaluated.rejection(),
2033            Some(QueryAdmissionRejection::PublicQueryOffsetRejected)
2034        );
2035    }
2036
2037    #[test]
2038    fn public_read_evaluation_rejects_returned_row_cap_overflow() {
2039        let policy = public_read_policy();
2040        let summary = summary_for_index_prefix(Some(51), 0);
2041
2042        let evaluated = policy.evaluate(summary);
2043
2044        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2045        assert_eq!(
2046            evaluated.rejection(),
2047            Some(QueryAdmissionRejection::ReturnedRowBoundExceedsPolicy)
2048        );
2049    }
2050
2051    #[test]
2052    fn public_read_evaluation_rejects_unresolved_order_materialized_sort() {
2053        let policy = public_read_policy();
2054        let summary = summary_for_index_prefix(Some(5), 0);
2055        let returned_row_bound = summary.returned_row_bound();
2056        let returned_row_bound_kind = summary.returned_row_bound_kind();
2057        let summary = summary.with_materialization(QueryMaterializationSummary::sort(
2058            returned_row_bound,
2059            returned_row_bound_kind,
2060        ));
2061
2062        let evaluated = policy.evaluate(summary);
2063
2064        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2065        assert_eq!(
2066            evaluated.rejection(),
2067            Some(QueryAdmissionRejection::SortRequiresMaterialization)
2068        );
2069    }
2070
2071    #[test]
2072    fn public_read_evaluation_rejects_grouped_query_without_group_budgets() {
2073        let policy = public_read_policy();
2074        let summary = grouped_summary_for_index_prefix(12, 4096, false);
2075
2076        let evaluated = policy.evaluate(summary);
2077
2078        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2079        assert_eq!(
2080            evaluated.rejection(),
2081            Some(QueryAdmissionRejection::GroupedQueryRequiresLimits)
2082        );
2083    }
2084
2085    #[test]
2086    fn public_read_evaluation_admits_grouped_query_with_group_budgets_without_limit() {
2087        let policy = public_grouped_read_policy(None);
2088        let summary = grouped_summary_for_index_prefix(12, 4096, false);
2089
2090        let evaluated = policy.evaluate(summary);
2091
2092        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Admitted);
2093        assert_eq!(evaluated.limit(), None);
2094        assert_eq!(evaluated.returned_row_bound(), Some(12));
2095        assert_eq!(evaluated.rejection(), None);
2096    }
2097
2098    #[test]
2099    fn public_read_evaluation_rejects_grouped_query_above_policy_budget() {
2100        let policy = public_grouped_read_policy(None);
2101        let summary = grouped_summary_for_index_prefix(51, 4096, false);
2102
2103        let evaluated = policy.evaluate(summary);
2104
2105        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2106        assert_eq!(
2107            evaluated.rejection(),
2108            Some(QueryAdmissionRejection::GroupedQueryExceedsBudget)
2109        );
2110    }
2111
2112    #[test]
2113    fn public_read_evaluation_rejects_distinct_grouped_query_without_distinct_budget() {
2114        let policy = public_grouped_read_policy(None);
2115        let summary = grouped_summary_for_index_prefix(12, 4096, true);
2116
2117        let evaluated = policy.evaluate(summary);
2118
2119        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2120        assert_eq!(
2121            evaluated.rejection(),
2122            Some(QueryAdmissionRejection::GroupedQueryRequiresLimits)
2123        );
2124    }
2125
2126    #[test]
2127    fn diagnostic_explain_policy_rejects_row_execution() {
2128        let policy = QueryAdmissionPolicy::diagnostic_explain();
2129        let summary = summary_for_index_prefix(Some(5), 0);
2130
2131        let evaluated = policy.evaluate(summary);
2132
2133        assert_eq!(evaluated.decision(), QueryAdmissionDecision::Rejected);
2134        assert_eq!(
2135            evaluated.rejection(),
2136            Some(QueryAdmissionRejection::DiagnosticLaneDoesNotExecute)
2137        );
2138    }
2139
2140    fn public_read_policy() -> QueryAdmissionPolicy {
2141        QueryAdmissionPolicy::public_read(
2142            NonZeroU32::new(50).expect("test public row cap is non-zero"),
2143            NonZeroU32::new(32_768).expect("test public byte cap is non-zero"),
2144        )
2145    }
2146
2147    fn public_grouped_read_policy(distinct_entries: Option<NonZeroU32>) -> QueryAdmissionPolicy {
2148        public_read_policy().with_grouped_policy(GroupedAdmissionPolicy::bounded(
2149            NonZeroU32::new(50).expect("test public group cap is non-zero"),
2150            NonZeroU32::new(8192).expect("test public group byte cap is non-zero"),
2151            distinct_entries,
2152        ))
2153    }
2154
2155    fn summary_for_index_prefix(limit: Option<u32>, offset: u32) -> QueryAdmissionSummary {
2156        summary_for_path(
2157            AccessPath::IndexPrefix {
2158                index: SemanticIndexAccessContract::model_only_from_generated_index(
2159                    ADMISSION_INDEX,
2160                ),
2161                values: vec![Value::Text("alpha".to_string())],
2162            },
2163            limit,
2164            offset,
2165        )
2166    }
2167
2168    fn summary_for_path(
2169        path: AccessPath<Value>,
2170        limit: Option<u32>,
2171        offset: u32,
2172    ) -> QueryAdmissionSummary {
2173        let mut plan = AccessPlannedQuery::new(path, MissingRowPolicy::Ignore);
2174        plan.scalar_plan_mut().page = Some(PageSpec { limit, offset });
2175
2176        QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &plan)
2177    }
2178
2179    fn grouped_summary_for_index_prefix(
2180        max_groups: u64,
2181        max_group_bytes: u64,
2182        distinct: bool,
2183    ) -> QueryAdmissionSummary {
2184        let grouped = AccessPlannedQuery::new(index_prefix_path(), MissingRowPolicy::Ignore)
2185            .into_grouped(GroupSpec {
2186                group_fields: vec![FieldSlot::from_test_slot(0, "tag")],
2187                aggregates: vec![GroupAggregateSpec {
2188                    kind: AggregateKind::Count,
2189                    input_expr: Some(Box::new(Expr::Field(FieldId::new("tag")))),
2190                    filter_expr: None,
2191                    distinct,
2192                }],
2193                execution: GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes),
2194            });
2195
2196        QueryAdmissionSummary::from_plan(QueryAdmissionLane::PublicRead, &grouped)
2197    }
2198
2199    fn index_prefix_path() -> AccessPath<Value> {
2200        AccessPath::IndexPrefix {
2201            index: SemanticIndexAccessContract::model_only_from_generated_index(ADMISSION_INDEX),
2202            values: vec![Value::Text("alpha".to_string())],
2203        }
2204    }
2205}