1mod computed;
8mod lowered;
9
10use crate::{
11 db::{
12 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13 executor::{
14 EntityAuthority, execute_sql_projection_rows_for_canister,
15 execute_sql_projection_text_rows_for_canister,
16 },
17 identifiers_tail_match,
18 query::{intent::StructuralQuery, plan::AccessPlannedQuery},
19 session::sql::{
20 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
21 aggregate::parsed_requires_dedicated_sql_aggregate_lane,
22 computed_projection,
23 projection::{
24 SqlProjectionPayload, projection_labels_from_fields,
25 projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
26 },
27 },
28 sql::lowering::{
29 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, bind_lowered_sql_query,
30 },
31 sql::parser::{
32 SqlAggregateCall, SqlAggregateKind, SqlProjection, SqlSelectItem, SqlStatement,
33 SqlTextFunction,
34 },
35 },
36 traits::{CanisterKind, EntityKind, EntityValue},
37};
38
39#[cfg(feature = "perf-attribution")]
40pub use lowered::LoweredSqlDispatchExecutorAttribution;
41
42#[doc(hidden)]
51pub struct GeneratedSqlDispatchAttempt {
52 entity_name: &'static str,
53 explain_order_field: Option<&'static str>,
54 result: Result<SqlDispatchResult, QueryError>,
55}
56
57impl GeneratedSqlDispatchAttempt {
58 const fn new(
60 entity_name: &'static str,
61 explain_order_field: Option<&'static str>,
62 result: Result<SqlDispatchResult, QueryError>,
63 ) -> Self {
64 Self {
65 entity_name,
66 explain_order_field,
67 result,
68 }
69 }
70
71 #[must_use]
73 pub const fn entity_name(&self) -> &'static str {
74 self.entity_name
75 }
76
77 #[must_use]
79 pub const fn explain_order_field(&self) -> Option<&'static str> {
80 self.explain_order_field
81 }
82
83 pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
85 self.result
86 }
87}
88
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub(in crate::db::session::sql) enum SqlGroupingSurface {
91 Scalar,
92 Grouped,
93}
94
95const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
96 match surface {
97 SqlGroupingSurface::Scalar => {
98 "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
99 }
100 SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
101 }
102}
103
104fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
107 let sql_trimmed = sql.trim();
108 if sql_trimmed.is_empty() {
109 return Err(QueryError::unsupported_query(
110 "query endpoint requires a non-empty SQL string",
111 ));
112 }
113
114 Ok(sql_trimmed)
115}
116
117fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
120 let mut entities = Vec::with_capacity(authorities.len());
121
122 for authority in authorities {
123 entities.push(authority.model().name().to_string());
124 }
125
126 entities
127}
128
129fn grouped_sql_projection_labels_from_statement(
131 statement: &SqlStatement,
132) -> Result<Vec<String>, QueryError> {
133 let SqlStatement::Select(select) = statement else {
134 return Err(QueryError::invariant(
135 "grouped SQL projection labels require SELECT statement shape",
136 ));
137 };
138 let SqlProjection::Items(items) = &select.projection else {
139 return Err(QueryError::unsupported_query(
140 "grouped SQL dispatch requires explicit grouped projection items",
141 ));
142 };
143
144 Ok(items
145 .iter()
146 .map(grouped_sql_projection_item_label)
147 .collect())
148}
149
150fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
153 match item {
154 SqlSelectItem::Field(field) => field.clone(),
155 SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
156 SqlSelectItem::TextFunction(call) => {
157 format!(
158 "{}({})",
159 grouped_sql_text_function_name(call.function),
160 call.field
161 )
162 }
163 }
164}
165
166fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
168 let kind = match aggregate.kind {
169 SqlAggregateKind::Count => "COUNT",
170 SqlAggregateKind::Sum => "SUM",
171 SqlAggregateKind::Avg => "AVG",
172 SqlAggregateKind::Min => "MIN",
173 SqlAggregateKind::Max => "MAX",
174 };
175
176 match aggregate.field.as_deref() {
177 Some(field) => format!("{kind}({field})"),
178 None => format!("{kind}(*)"),
179 }
180}
181
182const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
185 match function {
186 SqlTextFunction::Trim => "TRIM",
187 SqlTextFunction::Ltrim => "LTRIM",
188 SqlTextFunction::Rtrim => "RTRIM",
189 SqlTextFunction::Lower => "LOWER",
190 SqlTextFunction::Upper => "UPPER",
191 SqlTextFunction::Length => "LENGTH",
192 SqlTextFunction::Left => "LEFT",
193 SqlTextFunction::Right => "RIGHT",
194 SqlTextFunction::StartsWith => "STARTS_WITH",
195 SqlTextFunction::EndsWith => "ENDS_WITH",
196 SqlTextFunction::Contains => "CONTAINS",
197 SqlTextFunction::Position => "POSITION",
198 SqlTextFunction::Replace => "REPLACE",
199 SqlTextFunction::Substring => "SUBSTRING",
200 }
201}
202
203fn authority_for_generated_sql_route(
205 route: &SqlStatementRoute,
206 authorities: &[EntityAuthority],
207) -> Result<EntityAuthority, QueryError> {
208 let sql_entity = route.entity();
209
210 for authority in authorities {
211 if identifiers_tail_match(sql_entity, authority.model().name()) {
212 return Ok(*authority);
213 }
214 }
215
216 Err(unsupported_generated_sql_entity_error(
217 sql_entity,
218 authorities,
219 ))
220}
221
222fn unsupported_generated_sql_entity_error(
225 entity_name: &str,
226 authorities: &[EntityAuthority],
227) -> QueryError {
228 let mut supported = String::new();
229
230 for (index, authority) in authorities.iter().enumerate() {
231 if index != 0 {
232 supported.push_str(", ");
233 }
234
235 supported.push_str(authority.model().name());
236 }
237
238 QueryError::unsupported_query(format!(
239 "query endpoint does not support entity '{entity_name}'; supported: {supported}"
240 ))
241}
242
243impl<C: CanisterKind> DbSession<C> {
244 fn prepare_structural_sql_projection_execution(
247 &self,
248 query: StructuralQuery,
249 authority: EntityAuthority,
250 ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
251 let (_, plan) =
254 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
255 let projection = plan.projection_spec(authority.model());
256 let columns = projection_labels_from_projection_spec(&projection);
257
258 Ok((columns, plan))
259 }
260
261 pub(in crate::db::session::sql) fn execute_structural_sql_projection(
265 &self,
266 query: StructuralQuery,
267 authority: EntityAuthority,
268 ) -> Result<SqlProjectionPayload, QueryError> {
269 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
271
272 let projected =
275 execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
276 .map_err(QueryError::execute)?;
277 let (rows, row_count) = projected.into_parts();
278
279 Ok(SqlProjectionPayload::new(columns, rows, row_count))
280 }
281
282 fn execute_structural_sql_projection_text(
286 &self,
287 query: StructuralQuery,
288 authority: EntityAuthority,
289 ) -> Result<SqlDispatchResult, QueryError> {
290 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
292
293 let projected =
296 execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
297 .map_err(QueryError::execute)?;
298 let (rows, row_count) = projected.into_parts();
299
300 Ok(SqlDispatchResult::ProjectionText {
301 columns,
302 rows,
303 row_count,
304 })
305 }
306
307 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
311 where
312 E: PersistedRow<Canister = C> + EntityValue,
313 {
314 let plan = self
315 .compile_query_with_visible_indexes(query)?
316 .into_executable();
317 let deleted = self
318 .with_metrics(|| self.delete_executor::<E>().execute_sql_projection(plan))
319 .map_err(QueryError::execute)?;
320 let (rows, row_count) = deleted.into_parts();
321 let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
322
323 Ok(SqlProjectionPayload::new(
324 projection_labels_from_fields(E::MODEL.fields()),
325 rows,
326 row_count,
327 )
328 .into_dispatch_result())
329 }
330
331 fn lowered_sql_query_dispatch_inputs_for_authority(
334 parsed: &SqlParsedStatement,
335 authority: EntityAuthority,
336 unsupported_message: &'static str,
337 ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
338 let lowered = parsed.lower_query_lane_for_entity(
339 authority.model().name(),
340 authority.model().primary_key.name,
341 )?;
342 let grouped_columns = lowered
343 .query()
344 .filter(|query| query.has_grouping())
345 .map(|_| grouped_sql_projection_labels_from_statement(&parsed.statement))
346 .transpose()?;
347 let query = lowered
348 .into_query()
349 .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
350
351 Ok((query, grouped_columns))
352 }
353
354 fn dispatch_sql_query_route_for_authority(
358 &self,
359 parsed: &SqlParsedStatement,
360 authority: EntityAuthority,
361 unsupported_message: &'static str,
362 dispatch_select: impl FnOnce(
363 &Self,
364 LoweredSelectShape,
365 EntityAuthority,
366 Option<Vec<String>>,
367 ) -> Result<SqlDispatchResult, QueryError>,
368 dispatch_delete: impl FnOnce(
369 &Self,
370 LoweredBaseQueryShape,
371 EntityAuthority,
372 ) -> Result<SqlDispatchResult, QueryError>,
373 ) -> Result<SqlDispatchResult, QueryError> {
374 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
377 let command =
378 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
379
380 return self.execute_sql_aggregate_dispatch_for_authority(command, authority);
381 }
382
383 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
384 return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
385 }
386
387 let (query, grouped_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
390 parsed,
391 authority,
392 unsupported_message,
393 )?;
394
395 match query {
396 LoweredSqlQuery::Select(select) => {
397 dispatch_select(self, select, authority, grouped_columns)
398 }
399 LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
400 }
401 }
402
403 fn dispatch_sql_explain_route_for_authority(
407 &self,
408 parsed: &SqlParsedStatement,
409 authority: EntityAuthority,
410 ) -> Result<SqlDispatchResult, QueryError> {
411 if let Some((mode, plan)) =
414 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
415 {
416 return self
417 .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
418 .map(SqlDispatchResult::Explain);
419 }
420
421 let lowered = parsed.lower_query_lane_for_entity(
424 authority.model().name(),
425 authority.model().primary_key.name,
426 )?;
427 if let Some(explain) =
428 self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
429 {
430 return Ok(SqlDispatchResult::Explain(explain));
431 }
432
433 self.explain_lowered_sql_for_authority(&lowered, authority)
434 .map(SqlDispatchResult::Explain)
435 }
436
437 pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
440 query: &Query<E>,
441 surface: SqlGroupingSurface,
442 ) -> Result<(), QueryError>
443 where
444 E: EntityKind,
445 {
446 match (surface, query.has_grouping()) {
447 (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
448 (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
449 QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
450 ),
451 }
452 }
453
454 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
456 where
457 E: PersistedRow<Canister = C> + EntityValue,
458 {
459 let parsed = self.parse_sql_statement(sql)?;
460
461 self.execute_sql_dispatch_parsed::<E>(&parsed)
462 }
463
464 pub fn execute_sql_dispatch_parsed<E>(
466 &self,
467 parsed: &SqlParsedStatement,
468 ) -> Result<SqlDispatchResult, QueryError>
469 where
470 E: PersistedRow<Canister = C> + EntityValue,
471 {
472 match parsed.route() {
473 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
474 parsed,
475 EntityAuthority::for_type::<E>(),
476 "execute_sql_dispatch accepts SELECT or DELETE only",
477 |session, select, authority, grouped_columns| match grouped_columns {
478 Some(columns) => session.execute_lowered_sql_grouped_dispatch_select_core(
479 select, authority, columns,
480 ),
481 None => session
482 .execute_lowered_sql_projection_core(select, authority)
483 .map(SqlProjectionPayload::into_dispatch_result),
484 },
485 |session, delete, _authority| {
486 let typed_query = bind_lowered_sql_query::<E>(
487 LoweredSqlQuery::Delete(delete),
488 MissingRowPolicy::Ignore,
489 )
490 .map_err(QueryError::from_sql_lowering_error)?;
491
492 session.execute_typed_sql_delete(&typed_query)
493 },
494 ),
495 SqlStatementRoute::Explain { .. } => self
496 .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
497 SqlStatementRoute::Describe { .. } => {
498 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
499 }
500 SqlStatementRoute::ShowIndexes { .. } => {
501 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
502 }
503 SqlStatementRoute::ShowColumns { .. } => {
504 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
505 }
506 SqlStatementRoute::ShowEntities => {
507 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
508 }
509 }
510 }
511
512 #[doc(hidden)]
519 pub fn execute_generated_query_surface_dispatch_for_authority(
520 &self,
521 parsed: &SqlParsedStatement,
522 authority: EntityAuthority,
523 ) -> Result<SqlDispatchResult, QueryError> {
524 match parsed.route() {
525 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
526 parsed,
527 authority,
528 "generated SQL query surface requires query or EXPLAIN statement lanes",
529 |session, select, authority, grouped_columns| match grouped_columns {
530 Some(columns) => session.execute_lowered_sql_grouped_dispatch_select_core(
531 select, authority, columns,
532 ),
533 None => {
534 session.execute_lowered_sql_dispatch_select_text_core(select, authority)
535 }
536 },
537 |session, delete, authority| {
538 session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
539 },
540 ),
541 SqlStatementRoute::Explain { .. } => {
542 self.dispatch_sql_explain_route_for_authority(parsed, authority)
543 }
544 SqlStatementRoute::Describe { .. }
545 | SqlStatementRoute::ShowIndexes { .. }
546 | SqlStatementRoute::ShowColumns { .. }
547 | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
548 "generated SQL query surface requires query or EXPLAIN statement lanes",
549 )),
550 }
551 }
552
553 #[doc(hidden)]
559 #[must_use]
560 pub fn execute_generated_query_surface_sql(
561 &self,
562 sql: &str,
563 authorities: &[EntityAuthority],
564 ) -> GeneratedSqlDispatchAttempt {
565 let sql_trimmed = match trim_generated_query_sql_input(sql) {
568 Ok(sql_trimmed) => sql_trimmed,
569 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
570 };
571 let parsed = match self.parse_sql_statement(sql_trimmed) {
572 Ok(parsed) => parsed,
573 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
574 };
575
576 if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
579 return GeneratedSqlDispatchAttempt::new(
580 "",
581 None,
582 Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
583 authorities,
584 ))),
585 );
586 }
587 let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
588 Ok(authority) => authority,
589 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
590 };
591
592 let entity_name = authority.model().name();
596 let explain_order_field = parsed
597 .route()
598 .is_explain()
599 .then_some(authority.model().primary_key.name);
600 let result = match parsed.route() {
601 SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
602 self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
603 }
604 SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
605 self.describe_entity_model(authority.model()),
606 )),
607 SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
608 self.show_indexes_for_store_model(authority.store_path(), authority.model()),
609 )),
610 SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
611 self.show_columns_for_model(authority.model()),
612 )),
613 SqlStatementRoute::ShowEntities => unreachable!(
614 "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
615 ),
616 };
617
618 GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
619 }
620}