Skip to main content

icydb_core/db/session/sql/
aggregate.rs

1//! Module: db::session::sql::aggregate
2//! Responsibility: module-local ownership and contracts for db::session::sql::aggregate.
3//! Does not own: cross-module orchestration outside this module.
4//! Boundary: exposes this module API while keeping implementation details internal.
5
6use crate::{
7    db::{
8        DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
9        executor::{ScalarNumericFieldBoundaryRequest, ScalarProjectionBoundaryRequest},
10        query::plan::{AggregateKind, FieldSlot},
11        session::sql::explain::resolve_sql_aggregate_target_slot,
12        sql::lowering::{SqlGlobalAggregateTerminal, compile_sql_global_aggregate_command},
13    },
14    traits::{CanisterKind, EntityValue},
15    types::Id,
16    value::Value,
17};
18
19impl<C: CanisterKind> DbSession<C> {
20    /// Execute one reduced SQL global aggregate `SELECT` statement.
21    ///
22    /// This entrypoint is intentionally constrained to one aggregate terminal
23    /// shape per statement and preserves existing terminal semantics.
24    pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
25    where
26        E: PersistedRow<Canister = C> + EntityValue,
27    {
28        // First lower the SQL surface onto the existing single-terminal
29        // aggregate command authority so execution never has to rediscover the
30        // accepted aggregate shape family.
31        let command = compile_sql_global_aggregate_command::<E>(sql, MissingRowPolicy::Ignore)
32            .map_err(QueryError::from_sql_lowering_error)?;
33
34        // Then dispatch each accepted terminal onto the existing load/query
35        // boundaries instead of reopening aggregate execution ownership here.
36        match command.terminal() {
37            SqlGlobalAggregateTerminal::CountRows => self
38                .execute_load_query_with(command.query(), |load, plan| {
39                    load.execute_scalar_terminal_request(
40                        plan,
41                        crate::db::executor::ScalarTerminalBoundaryRequest::Count,
42                    )?
43                    .into_count()
44                })
45                .map(|count| Value::Uint(u64::from(count))),
46            SqlGlobalAggregateTerminal::CountField(field) => {
47                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
48
49                self.execute_load_query_with(command.query(), |load, plan| {
50                    load.execute_scalar_projection_boundary(
51                        plan,
52                        target_slot,
53                        ScalarProjectionBoundaryRequest::Values,
54                    )?
55                    .into_values()
56                })
57                .map(|values| {
58                    let count = values
59                        .into_iter()
60                        .filter(|value| !matches!(value, Value::Null))
61                        .count();
62
63                    Value::Uint(u64::try_from(count).unwrap_or(u64::MAX))
64                })
65            }
66            SqlGlobalAggregateTerminal::SumField(field) => {
67                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
68
69                self.execute_load_query_with(command.query(), |load, plan| {
70                    load.execute_numeric_field_boundary(
71                        plan,
72                        target_slot,
73                        ScalarNumericFieldBoundaryRequest::Sum,
74                    )
75                })
76                .map(|value| value.map_or(Value::Null, Value::Decimal))
77            }
78            SqlGlobalAggregateTerminal::AvgField(field) => {
79                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
80
81                self.execute_load_query_with(command.query(), |load, plan| {
82                    load.execute_numeric_field_boundary(
83                        plan,
84                        target_slot,
85                        ScalarNumericFieldBoundaryRequest::Avg,
86                    )
87                })
88                .map(|value| value.map_or(Value::Null, Value::Decimal))
89            }
90            SqlGlobalAggregateTerminal::MinField(field) => self
91                .execute_ranked_sql_aggregate_field::<E>(
92                    command.query(),
93                    field,
94                    AggregateKind::Min,
95                ),
96            SqlGlobalAggregateTerminal::MaxField(field) => self
97                .execute_ranked_sql_aggregate_field::<E>(
98                    command.query(),
99                    field,
100                    AggregateKind::Max,
101                ),
102        }
103    }
104
105    // Execute one ranked field aggregate by resolving the winning id first and
106    // then reading the projected field through the typed load surface.
107    fn execute_ranked_sql_aggregate_field<E>(
108        &self,
109        query: &Query<E>,
110        field: &str,
111        kind: AggregateKind,
112    ) -> Result<Value, QueryError>
113    where
114        E: PersistedRow<Canister = C> + EntityValue,
115    {
116        let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
117        let matched_id = self.execute_ranked_sql_aggregate_id(query, target_slot, kind)?;
118
119        match matched_id {
120            Some(id) => self
121                .load::<E>()
122                .by_id(id)
123                .first_value_by(field)
124                .map(|value| value.unwrap_or(Value::Null)),
125            None => Ok(Value::Null),
126        }
127    }
128
129    // Resolve the id selected by one ranked aggregate terminal through the
130    // shared scalar terminal boundary before any field-value load occurs.
131    fn execute_ranked_sql_aggregate_id<E>(
132        &self,
133        query: &Query<E>,
134        target_slot: FieldSlot,
135        kind: AggregateKind,
136    ) -> Result<Option<Id<E>>, QueryError>
137    where
138        E: PersistedRow<Canister = C> + EntityValue,
139    {
140        if !kind.is_extrema() {
141            return Err(QueryError::invariant(
142                "ranked SQL aggregate id helper only supports MIN/MAX",
143            ));
144        }
145
146        self.execute_load_query_with(query, |load, plan| {
147            load.execute_scalar_terminal_request(
148                plan,
149                crate::db::executor::ScalarTerminalBoundaryRequest::IdBySlot {
150                    kind,
151                    target_field: target_slot,
152                },
153            )?
154            .into_id()
155        })
156    }
157}