1mod aggregate;
8mod computed_projection;
9mod dispatch;
10mod explain;
11mod projection;
12mod surface;
13
14use crate::{
15 db::{
16 DbSession, EntityResponse, GroupedTextCursorPageWithTrace, MissingRowPolicy,
17 PagedGroupedExecutionWithTrace, PersistedRow, Query, QueryError,
18 executor::EntityAuthority,
19 query::{
20 intent::StructuralQuery,
21 plan::{AccessPlannedQuery, VisibleIndexes},
22 },
23 sql::{
24 lowering::{
25 bind_lowered_sql_query, lower_sql_command_from_prepared_statement,
26 prepare_sql_statement,
27 },
28 parser::{SqlStatement, parse_sql},
29 },
30 },
31 traits::{CanisterKind, EntityKind, EntityValue},
32};
33
34use crate::db::session::sql::aggregate::{
35 SqlAggregateSurface, parsed_requires_dedicated_sql_aggregate_lane,
36 unsupported_sql_aggregate_lane_message,
37};
38use crate::db::session::sql::surface::{
39 SqlSurface, session_sql_lane, sql_statement_route_from_statement, unsupported_sql_lane_message,
40};
41
42#[cfg(feature = "structural-read-metrics")]
43pub use crate::db::session::sql::projection::{
44 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
45};
46pub use crate::db::session::sql::surface::{
47 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
48};
49#[cfg(feature = "perf-attribution")]
50pub use crate::db::{
51 session::sql::dispatch::LoweredSqlDispatchExecutorAttribution,
52 session::sql::projection::SqlProjectionTextExecutorAttribution,
53};
54
55#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56enum SqlComputedProjectionSurface {
57 QueryFrom,
58 ExecuteSql,
59 ExecuteSqlGrouped,
60}
61
62const fn unsupported_sql_computed_projection_message(
63 surface: SqlComputedProjectionSurface,
64) -> &'static str {
65 match surface {
66 SqlComputedProjectionSurface::QueryFrom => {
67 "query_from_sql does not accept computed text projection; use execute_sql_dispatch(...)"
68 }
69 SqlComputedProjectionSurface::ExecuteSql => {
70 "execute_sql rejects computed text projection; use execute_sql_dispatch(...)"
71 }
72 SqlComputedProjectionSurface::ExecuteSqlGrouped => {
73 "execute_sql_grouped rejects computed text projection; use execute_sql_dispatch(...)"
74 }
75 }
76}
77
78const fn unsupported_sql_write_surface_message(
79 surface: SqlSurface,
80 statement: &SqlStatement,
81) -> &'static str {
82 match (surface, statement) {
83 (SqlSurface::QueryFrom, SqlStatement::Insert(_)) => {
84 "query_from_sql rejects INSERT; use execute_sql_dispatch(...)"
85 }
86 (SqlSurface::QueryFrom, SqlStatement::Update(_)) => {
87 "query_from_sql rejects UPDATE; use execute_sql_dispatch(...)"
88 }
89 (SqlSurface::ExecuteSql, SqlStatement::Insert(_)) => {
90 "execute_sql rejects INSERT; use execute_sql_dispatch(...)"
91 }
92 (SqlSurface::ExecuteSql, SqlStatement::Update(_)) => {
93 "execute_sql rejects UPDATE; use execute_sql_dispatch(...)"
94 }
95 (SqlSurface::ExecuteSqlGrouped, SqlStatement::Insert(_)) => {
96 "execute_sql_grouped rejects INSERT; use execute_sql_dispatch(...)"
97 }
98 (SqlSurface::ExecuteSqlGrouped, SqlStatement::Update(_)) => {
99 "execute_sql_grouped rejects UPDATE; use execute_sql_dispatch(...)"
100 }
101 (SqlSurface::Explain, SqlStatement::Insert(_) | SqlStatement::Update(_)) => {
102 "explain_sql requires EXPLAIN"
103 }
104 (
105 _,
106 SqlStatement::Select(_)
107 | SqlStatement::Delete(_)
108 | SqlStatement::Explain(_)
109 | SqlStatement::Describe(_)
110 | SqlStatement::ShowIndexes(_)
111 | SqlStatement::ShowColumns(_)
112 | SqlStatement::ShowEntities(_),
113 ) => unreachable!(),
114 }
115}
116
117const fn unsupported_sql_returning_surface_message(
118 surface: SqlSurface,
119 statement: &SqlStatement,
120) -> &'static str {
121 match (surface, statement) {
122 (SqlSurface::QueryFrom, SqlStatement::Delete(_)) => {
123 "query_from_sql rejects DELETE RETURNING; use execute_sql_dispatch(...)"
124 }
125 (SqlSurface::ExecuteSql, SqlStatement::Delete(_)) => {
126 "execute_sql rejects DELETE RETURNING; use execute_sql_dispatch(...)"
127 }
128 (SqlSurface::ExecuteSqlGrouped, SqlStatement::Delete(_)) => {
129 "execute_sql_grouped rejects DELETE RETURNING; use execute_sql_dispatch(...)"
130 }
131 (SqlSurface::Explain, SqlStatement::Delete(_)) => "explain_sql requires EXPLAIN",
132 (
133 _,
134 SqlStatement::Select(_)
135 | SqlStatement::Insert(_)
136 | SqlStatement::Update(_)
137 | SqlStatement::Explain(_)
138 | SqlStatement::Describe(_)
139 | SqlStatement::ShowIndexes(_)
140 | SqlStatement::ShowColumns(_)
141 | SqlStatement::ShowEntities(_),
142 ) => unreachable!(),
143 }
144}
145
146impl<C: CanisterKind> DbSession<C> {
147 pub(in crate::db::session::sql) fn build_structural_plan_with_visible_indexes_for_authority(
150 &self,
151 query: StructuralQuery,
152 authority: EntityAuthority,
153 ) -> Result<(VisibleIndexes<'_>, AccessPlannedQuery), QueryError> {
154 let visible_indexes =
155 self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
156 let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
157
158 Ok((visible_indexes, plan))
159 }
160
161 fn query_from_sql_parsed<E>(
164 parsed: &SqlParsedStatement,
165 lane_surface: SqlSurface,
166 computed_surface: SqlComputedProjectionSurface,
167 surface: SqlAggregateSurface,
168 ) -> Result<Query<E>, QueryError>
169 where
170 E: EntityKind<Canister = C>,
171 {
172 if matches!(
173 &parsed.statement,
174 SqlStatement::Insert(_) | SqlStatement::Update(_)
175 ) {
176 return Err(QueryError::unsupported_query(
177 unsupported_sql_write_surface_message(lane_surface, &parsed.statement),
178 ));
179 }
180 if matches!(&parsed.statement, SqlStatement::Delete(delete) if delete.returning.is_some()) {
181 return Err(QueryError::unsupported_query(
182 unsupported_sql_returning_surface_message(lane_surface, &parsed.statement),
183 ));
184 }
185
186 if computed_projection::computed_sql_projection_plan(&parsed.statement)?.is_some() {
187 return Err(QueryError::unsupported_query(
188 unsupported_sql_computed_projection_message(computed_surface),
189 ));
190 }
191
192 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
193 return Err(QueryError::unsupported_query(
194 unsupported_sql_aggregate_lane_message(surface),
195 ));
196 }
197
198 let lowered = lower_sql_command_from_prepared_statement(
199 parsed.prepare(E::MODEL.name())?,
200 E::MODEL.primary_key.name,
201 )
202 .map_err(QueryError::from_sql_lowering_error)?;
203 let lane = session_sql_lane(&lowered);
204 let Some(query) = lowered.query().cloned() else {
205 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
206 lane_surface,
207 lane,
208 )));
209 };
210 let query = bind_lowered_sql_query::<E>(query, MissingRowPolicy::Ignore)
211 .map_err(QueryError::from_sql_lowering_error)?;
212
213 Ok(query)
214 }
215
216 fn grouped_query_from_computed_sql_projection_plan<E>(
219 plan: &computed_projection::SqlComputedProjectionPlan,
220 ) -> Result<Query<E>, QueryError>
221 where
222 E: EntityKind<Canister = C>,
223 {
224 let lowered = lower_sql_command_from_prepared_statement(
225 prepare_sql_statement(plan.cloned_base_statement(), E::MODEL.name())
226 .map_err(QueryError::from_sql_lowering_error)?,
227 E::MODEL.primary_key.name,
228 )
229 .map_err(QueryError::from_sql_lowering_error)?;
230 let Some(query) = lowered.query().cloned() else {
231 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
232 SqlSurface::ExecuteSqlGrouped,
233 session_sql_lane(&lowered),
234 )));
235 };
236 let query = bind_lowered_sql_query::<E>(query, MissingRowPolicy::Ignore)
237 .map_err(QueryError::from_sql_lowering_error)?;
238 Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Grouped)?;
239
240 Ok(query)
241 }
242
243 pub fn parse_sql_statement(&self, sql: &str) -> Result<SqlParsedStatement, QueryError> {
247 let statement = parse_sql(sql).map_err(QueryError::from_sql_parse_error)?;
248 let route = sql_statement_route_from_statement(&statement);
249
250 Ok(SqlParsedStatement::new(statement, route))
251 }
252
253 pub fn sql_statement_route(&self, sql: &str) -> Result<SqlStatementRoute, QueryError> {
258 let parsed = self.parse_sql_statement(sql)?;
259
260 Ok(parsed.route().clone())
261 }
262
263 pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
268 where
269 E: EntityKind<Canister = C>,
270 {
271 let parsed = self.parse_sql_statement(sql)?;
272
273 Self::query_from_sql_parsed::<E>(
274 &parsed,
275 SqlSurface::QueryFrom,
276 SqlComputedProjectionSurface::QueryFrom,
277 SqlAggregateSurface::QueryFrom,
278 )
279 }
280
281 pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
283 where
284 E: PersistedRow<Canister = C> + EntityValue,
285 {
286 let parsed = self.parse_sql_statement(sql)?;
287 if matches!(&parsed.statement, SqlStatement::Delete(_)) {
288 return Err(QueryError::unsupported_query(
289 "execute_sql rejects DELETE; use execute_sql_dispatch(...) or delete::<E>()",
290 ));
291 }
292 let query = Self::query_from_sql_parsed::<E>(
293 &parsed,
294 SqlSurface::ExecuteSql,
295 SqlComputedProjectionSurface::ExecuteSql,
296 SqlAggregateSurface::ExecuteSql,
297 )?;
298 Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Scalar)?;
299
300 self.execute_query(&query)
301 }
302
303 pub fn execute_sql_grouped<E>(
305 &self,
306 sql: &str,
307 cursor_token: Option<&str>,
308 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
309 where
310 E: PersistedRow<Canister = C> + EntityValue,
311 {
312 let parsed = self.parse_sql_statement(sql)?;
313
314 if matches!(&parsed.statement, SqlStatement::Delete(_)) {
315 return Err(QueryError::unsupported_query(
316 "execute_sql_grouped rejects DELETE; use execute_sql_dispatch(...)",
317 ));
318 }
319
320 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
321 if !plan.is_grouped() {
322 return Err(QueryError::unsupported_query(
323 unsupported_sql_computed_projection_message(
324 SqlComputedProjectionSurface::ExecuteSqlGrouped,
325 ),
326 ));
327 }
328
329 let query = Self::grouped_query_from_computed_sql_projection_plan::<E>(&plan)?;
330 let grouped = self.execute_grouped(&query, cursor_token)?;
331 let (rows, continuation_cursor, execution_trace) = grouped.into_parts();
332 let rows =
333 computed_projection::apply_computed_sql_projection_grouped_rows(rows, &plan)?;
334
335 return Ok(PagedGroupedExecutionWithTrace::new(
336 rows,
337 continuation_cursor,
338 execution_trace,
339 ));
340 }
341
342 let query = Self::query_from_sql_parsed::<E>(
343 &parsed,
344 SqlSurface::ExecuteSqlGrouped,
345 SqlComputedProjectionSurface::ExecuteSqlGrouped,
346 SqlAggregateSurface::ExecuteSqlGrouped,
347 )?;
348 Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Grouped)?;
349
350 self.execute_grouped(&query, cursor_token)
351 }
352
353 #[doc(hidden)]
355 pub fn execute_sql_grouped_text_cursor<E>(
356 &self,
357 sql: &str,
358 cursor_token: Option<&str>,
359 ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
360 where
361 E: PersistedRow<Canister = C> + EntityValue,
362 {
363 let parsed = self.parse_sql_statement(sql)?;
364
365 if matches!(&parsed.statement, SqlStatement::Delete(_)) {
366 return Err(QueryError::unsupported_query(
367 "execute_sql_grouped rejects DELETE; use execute_sql_dispatch(...)",
368 ));
369 }
370
371 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
372 if !plan.is_grouped() {
373 return Err(QueryError::unsupported_query(
374 unsupported_sql_computed_projection_message(
375 SqlComputedProjectionSurface::ExecuteSqlGrouped,
376 ),
377 ));
378 }
379
380 let query = Self::grouped_query_from_computed_sql_projection_plan::<E>(&plan)?;
381 let (rows, continuation_cursor, execution_trace) =
382 self.execute_grouped_text_cursor(&query, cursor_token)?;
383 let rows =
384 computed_projection::apply_computed_sql_projection_grouped_rows(rows, &plan)?;
385
386 return Ok((rows, continuation_cursor, execution_trace));
387 }
388
389 let query = Self::query_from_sql_parsed::<E>(
390 &parsed,
391 SqlSurface::ExecuteSqlGrouped,
392 SqlComputedProjectionSurface::ExecuteSqlGrouped,
393 SqlAggregateSurface::ExecuteSqlGrouped,
394 )?;
395 Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Grouped)?;
396
397 self.execute_grouped_text_cursor(&query, cursor_token)
398 }
399}