icydb_core/db/session/sql/
mod.rs1mod aggregate;
8mod computed_projection;
9mod dispatch;
10mod explain;
11mod projection;
12mod surface;
13
14use crate::{
15 db::{
16 DbSession, EntityResponse, GroupedTextCursorPageWithTrace, MissingRowPolicy,
17 PagedGroupedExecutionWithTrace, PersistedRow, Query, QueryError,
18 executor::EntityAuthority,
19 query::{
20 intent::StructuralQuery,
21 plan::{AccessPlannedQuery, VisibleIndexes},
22 },
23 sql::{
24 lowering::{
25 bind_lowered_sql_query, lower_sql_command_from_prepared_statement,
26 prepare_sql_statement,
27 },
28 parser::{SqlStatement, parse_sql},
29 },
30 },
31 traits::{CanisterKind, EntityKind, EntityValue},
32};
33
34use crate::db::session::sql::aggregate::{
35 SqlAggregateSurface, parsed_requires_dedicated_sql_aggregate_lane,
36 unsupported_sql_aggregate_lane_message,
37};
38use crate::db::session::sql::surface::{
39 SqlSurface, session_sql_lane, sql_statement_route_from_statement, unsupported_sql_lane_message,
40};
41
42pub use crate::db::session::sql::surface::{
43 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
44};
45#[cfg(feature = "perf-attribution")]
46pub use crate::db::{
47 executor::SqlProjectionTextExecutorAttribution,
48 session::sql::dispatch::LoweredSqlDispatchExecutorAttribution,
49};
50
51#[derive(Clone, Copy, Debug, Eq, PartialEq)]
52enum SqlComputedProjectionSurface {
53 QueryFrom,
54 ExecuteSql,
55 ExecuteSqlGrouped,
56}
57
58const fn unsupported_sql_computed_projection_message(
59 surface: SqlComputedProjectionSurface,
60) -> &'static str {
61 match surface {
62 SqlComputedProjectionSurface::QueryFrom => {
63 "query_from_sql does not accept computed text projection; use execute_sql_dispatch(...)"
64 }
65 SqlComputedProjectionSurface::ExecuteSql => {
66 "execute_sql rejects computed text projection; use execute_sql_dispatch(...)"
67 }
68 SqlComputedProjectionSurface::ExecuteSqlGrouped => {
69 "execute_sql_grouped rejects computed text projection; use execute_sql_dispatch(...)"
70 }
71 }
72}
73
74impl<C: CanisterKind> DbSession<C> {
75 pub(in crate::db::session::sql) fn build_structural_plan_with_visible_indexes_for_authority(
78 &self,
79 query: StructuralQuery,
80 authority: EntityAuthority,
81 ) -> Result<(VisibleIndexes<'_>, AccessPlannedQuery), QueryError> {
82 let visible_indexes =
83 self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
84 let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
85
86 Ok((visible_indexes, plan))
87 }
88
89 fn query_from_sql_parsed<E>(
92 parsed: &SqlParsedStatement,
93 lane_surface: SqlSurface,
94 computed_surface: SqlComputedProjectionSurface,
95 surface: SqlAggregateSurface,
96 ) -> Result<Query<E>, QueryError>
97 where
98 E: EntityKind<Canister = C>,
99 {
100 if computed_projection::computed_sql_projection_plan(&parsed.statement)?.is_some() {
101 return Err(QueryError::unsupported_query(
102 unsupported_sql_computed_projection_message(computed_surface),
103 ));
104 }
105
106 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
107 return Err(QueryError::unsupported_query(
108 unsupported_sql_aggregate_lane_message(surface),
109 ));
110 }
111
112 let lowered = lower_sql_command_from_prepared_statement(
113 parsed.prepare(E::MODEL.name())?,
114 E::MODEL.primary_key.name,
115 )
116 .map_err(QueryError::from_sql_lowering_error)?;
117 let lane = session_sql_lane(&lowered);
118 let Some(query) = lowered.query().cloned() else {
119 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
120 lane_surface,
121 lane,
122 )));
123 };
124 let query = bind_lowered_sql_query::<E>(query, MissingRowPolicy::Ignore)
125 .map_err(QueryError::from_sql_lowering_error)?;
126
127 Ok(query)
128 }
129
130 fn grouped_query_from_computed_sql_projection_plan<E>(
133 plan: &computed_projection::SqlComputedProjectionPlan,
134 ) -> Result<Query<E>, QueryError>
135 where
136 E: EntityKind<Canister = C>,
137 {
138 let lowered = lower_sql_command_from_prepared_statement(
139 prepare_sql_statement(plan.cloned_base_statement(), E::MODEL.name())
140 .map_err(QueryError::from_sql_lowering_error)?,
141 E::MODEL.primary_key.name,
142 )
143 .map_err(QueryError::from_sql_lowering_error)?;
144 let Some(query) = lowered.query().cloned() else {
145 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
146 SqlSurface::ExecuteSqlGrouped,
147 session_sql_lane(&lowered),
148 )));
149 };
150 let query = bind_lowered_sql_query::<E>(query, MissingRowPolicy::Ignore)
151 .map_err(QueryError::from_sql_lowering_error)?;
152 Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Grouped)?;
153
154 Ok(query)
155 }
156
157 pub fn parse_sql_statement(&self, sql: &str) -> Result<SqlParsedStatement, QueryError> {
161 let statement = parse_sql(sql).map_err(QueryError::from_sql_parse_error)?;
162 let route = sql_statement_route_from_statement(&statement);
163
164 Ok(SqlParsedStatement::new(statement, route))
165 }
166
167 pub fn sql_statement_route(&self, sql: &str) -> Result<SqlStatementRoute, QueryError> {
172 let parsed = self.parse_sql_statement(sql)?;
173
174 Ok(parsed.route().clone())
175 }
176
177 pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
182 where
183 E: EntityKind<Canister = C>,
184 {
185 let parsed = self.parse_sql_statement(sql)?;
186
187 Self::query_from_sql_parsed::<E>(
188 &parsed,
189 SqlSurface::QueryFrom,
190 SqlComputedProjectionSurface::QueryFrom,
191 SqlAggregateSurface::QueryFrom,
192 )
193 }
194
195 pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
197 where
198 E: PersistedRow<Canister = C> + EntityValue,
199 {
200 let parsed = self.parse_sql_statement(sql)?;
201 let query = Self::query_from_sql_parsed::<E>(
202 &parsed,
203 SqlSurface::ExecuteSql,
204 SqlComputedProjectionSurface::ExecuteSql,
205 SqlAggregateSurface::ExecuteSql,
206 )?;
207 Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Scalar)?;
208
209 self.execute_query(&query)
210 }
211
212 pub fn execute_sql_grouped<E>(
214 &self,
215 sql: &str,
216 cursor_token: Option<&str>,
217 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
218 where
219 E: PersistedRow<Canister = C> + EntityValue,
220 {
221 let parsed = self.parse_sql_statement(sql)?;
222
223 if matches!(&parsed.statement, SqlStatement::Delete(_)) {
224 return Err(QueryError::unsupported_query(
225 "execute_sql_grouped rejects DELETE; use execute_sql_dispatch(...)",
226 ));
227 }
228
229 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
230 if !plan.is_grouped() {
231 return Err(QueryError::unsupported_query(
232 unsupported_sql_computed_projection_message(
233 SqlComputedProjectionSurface::ExecuteSqlGrouped,
234 ),
235 ));
236 }
237
238 let query = Self::grouped_query_from_computed_sql_projection_plan::<E>(&plan)?;
239 let grouped = self.execute_grouped(&query, cursor_token)?;
240 let (rows, continuation_cursor, execution_trace) = grouped.into_parts();
241 let rows =
242 computed_projection::apply_computed_sql_projection_grouped_rows(rows, &plan)?;
243
244 return Ok(PagedGroupedExecutionWithTrace::new(
245 rows,
246 continuation_cursor,
247 execution_trace,
248 ));
249 }
250
251 let query = Self::query_from_sql_parsed::<E>(
252 &parsed,
253 SqlSurface::ExecuteSqlGrouped,
254 SqlComputedProjectionSurface::ExecuteSqlGrouped,
255 SqlAggregateSurface::ExecuteSqlGrouped,
256 )?;
257 Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Grouped)?;
258
259 self.execute_grouped(&query, cursor_token)
260 }
261
262 #[doc(hidden)]
264 pub fn execute_sql_grouped_text_cursor<E>(
265 &self,
266 sql: &str,
267 cursor_token: Option<&str>,
268 ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
269 where
270 E: PersistedRow<Canister = C> + EntityValue,
271 {
272 let parsed = self.parse_sql_statement(sql)?;
273
274 if matches!(&parsed.statement, SqlStatement::Delete(_)) {
275 return Err(QueryError::unsupported_query(
276 "execute_sql_grouped rejects DELETE; use execute_sql_dispatch(...)",
277 ));
278 }
279
280 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
281 if !plan.is_grouped() {
282 return Err(QueryError::unsupported_query(
283 unsupported_sql_computed_projection_message(
284 SqlComputedProjectionSurface::ExecuteSqlGrouped,
285 ),
286 ));
287 }
288
289 let query = Self::grouped_query_from_computed_sql_projection_plan::<E>(&plan)?;
290 let (rows, continuation_cursor, execution_trace) =
291 self.execute_grouped_text_cursor(&query, cursor_token)?;
292 let rows =
293 computed_projection::apply_computed_sql_projection_grouped_rows(rows, &plan)?;
294
295 return Ok((rows, continuation_cursor, execution_trace));
296 }
297
298 let query = Self::query_from_sql_parsed::<E>(
299 &parsed,
300 SqlSurface::ExecuteSqlGrouped,
301 SqlComputedProjectionSurface::ExecuteSqlGrouped,
302 SqlAggregateSurface::ExecuteSqlGrouped,
303 )?;
304 Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Grouped)?;
305
306 self.execute_grouped_text_cursor(&query, cursor_token)
307 }
308}