icydb_core/db/session/sql/
aggregate.rs1use crate::{
7 db::{
8 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
9 executor::{ScalarNumericFieldBoundaryRequest, ScalarProjectionBoundaryRequest},
10 query::plan::{AggregateKind, FieldSlot},
11 session::sql::explain::resolve_sql_aggregate_target_slot,
12 sql::lowering::{SqlGlobalAggregateTerminal, compile_sql_global_aggregate_command},
13 },
14 traits::{CanisterKind, EntityValue},
15 types::Id,
16 value::Value,
17};
18
19impl<C: CanisterKind> DbSession<C> {
20 pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
25 where
26 E: PersistedRow<Canister = C> + EntityValue,
27 {
28 let command = compile_sql_global_aggregate_command::<E>(sql, MissingRowPolicy::Ignore)
32 .map_err(QueryError::from_sql_lowering_error)?;
33
34 match command.terminal() {
37 SqlGlobalAggregateTerminal::CountRows => self
38 .execute_load_query_with(command.query(), |load, plan| {
39 load.execute_scalar_terminal_request(
40 plan,
41 crate::db::executor::ScalarTerminalBoundaryRequest::Count,
42 )?
43 .into_count()
44 })
45 .map(|count| Value::Uint(u64::from(count))),
46 SqlGlobalAggregateTerminal::CountField(field) => {
47 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
48
49 self.execute_load_query_with(command.query(), |load, plan| {
50 load.execute_scalar_projection_boundary(
51 plan,
52 target_slot,
53 ScalarProjectionBoundaryRequest::Values,
54 )?
55 .into_values()
56 })
57 .map(|values| {
58 let count = values
59 .into_iter()
60 .filter(|value| !matches!(value, Value::Null))
61 .count();
62
63 Value::Uint(u64::try_from(count).unwrap_or(u64::MAX))
64 })
65 }
66 SqlGlobalAggregateTerminal::SumField(field) => {
67 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
68
69 self.execute_load_query_with(command.query(), |load, plan| {
70 load.execute_numeric_field_boundary(
71 plan,
72 target_slot,
73 ScalarNumericFieldBoundaryRequest::Sum,
74 )
75 })
76 .map(|value| value.map_or(Value::Null, Value::Decimal))
77 }
78 SqlGlobalAggregateTerminal::AvgField(field) => {
79 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
80
81 self.execute_load_query_with(command.query(), |load, plan| {
82 load.execute_numeric_field_boundary(
83 plan,
84 target_slot,
85 ScalarNumericFieldBoundaryRequest::Avg,
86 )
87 })
88 .map(|value| value.map_or(Value::Null, Value::Decimal))
89 }
90 SqlGlobalAggregateTerminal::MinField(field) => self
91 .execute_ranked_sql_aggregate_field::<E>(
92 command.query(),
93 field,
94 AggregateKind::Min,
95 ),
96 SqlGlobalAggregateTerminal::MaxField(field) => self
97 .execute_ranked_sql_aggregate_field::<E>(
98 command.query(),
99 field,
100 AggregateKind::Max,
101 ),
102 }
103 }
104
105 fn execute_ranked_sql_aggregate_field<E>(
108 &self,
109 query: &Query<E>,
110 field: &str,
111 kind: AggregateKind,
112 ) -> Result<Value, QueryError>
113 where
114 E: PersistedRow<Canister = C> + EntityValue,
115 {
116 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
117 let matched_id = self.execute_ranked_sql_aggregate_id(query, target_slot, kind)?;
118
119 match matched_id {
120 Some(id) => self
121 .load::<E>()
122 .by_id(id)
123 .first_value_by(field)
124 .map(|value| value.unwrap_or(Value::Null)),
125 None => Ok(Value::Null),
126 }
127 }
128
129 fn execute_ranked_sql_aggregate_id<E>(
132 &self,
133 query: &Query<E>,
134 target_slot: FieldSlot,
135 kind: AggregateKind,
136 ) -> Result<Option<Id<E>>, QueryError>
137 where
138 E: PersistedRow<Canister = C> + EntityValue,
139 {
140 if !kind.is_extrema() {
141 return Err(QueryError::invariant(
142 "ranked SQL aggregate id helper only supports MIN/MAX",
143 ));
144 }
145
146 self.execute_load_query_with(query, |load, plan| {
147 load.execute_scalar_terminal_request(
148 plan,
149 crate::db::executor::ScalarTerminalBoundaryRequest::IdBySlot {
150 kind,
151 target_field: target_slot,
152 },
153 )?
154 .into_id()
155 })
156 }
157}