1mod computed;
8mod lowered;
9
10use crate::{
11 db::{
12 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13 executor::{
14 EntityAuthority, execute_sql_projection_rows_for_canister,
15 execute_sql_projection_text_rows_for_canister,
16 },
17 identifiers_tail_match,
18 query::{intent::StructuralQuery, plan::AccessPlannedQuery},
19 session::sql::{
20 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
21 aggregate::parsed_requires_dedicated_sql_aggregate_lane,
22 computed_projection,
23 projection::{
24 SqlProjectionPayload, projection_labels_from_fields,
25 projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
26 },
27 },
28 sql::lowering::{
29 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, bind_lowered_sql_query,
30 },
31 sql::parser::{
32 SqlAggregateCall, SqlAggregateKind, SqlProjection, SqlSelectItem, SqlStatement,
33 SqlTextFunction,
34 },
35 },
36 traits::{CanisterKind, EntityKind, EntityValue},
37};
38
39#[cfg(feature = "perf-attribution")]
40pub use lowered::LoweredSqlDispatchExecutorAttribution;
41
42#[doc(hidden)]
51pub struct GeneratedSqlDispatchAttempt {
52 entity_name: &'static str,
53 explain_order_field: Option<&'static str>,
54 result: Result<SqlDispatchResult, QueryError>,
55}
56
57impl GeneratedSqlDispatchAttempt {
58 const fn new(
60 entity_name: &'static str,
61 explain_order_field: Option<&'static str>,
62 result: Result<SqlDispatchResult, QueryError>,
63 ) -> Self {
64 Self {
65 entity_name,
66 explain_order_field,
67 result,
68 }
69 }
70
71 #[must_use]
73 pub const fn entity_name(&self) -> &'static str {
74 self.entity_name
75 }
76
77 #[must_use]
79 pub const fn explain_order_field(&self) -> Option<&'static str> {
80 self.explain_order_field
81 }
82
83 pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
85 self.result
86 }
87}
88
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub(in crate::db::session::sql) enum SqlGroupingSurface {
91 Scalar,
92 Grouped,
93}
94
95const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
96 match surface {
97 SqlGroupingSurface::Scalar => {
98 "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
99 }
100 SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
101 }
102}
103
104fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
107 let sql_trimmed = sql.trim();
108 if sql_trimmed.is_empty() {
109 return Err(QueryError::unsupported_query(
110 "query endpoint requires a non-empty SQL string",
111 ));
112 }
113
114 Ok(sql_trimmed)
115}
116
117fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
120 let mut entities = Vec::with_capacity(authorities.len());
121
122 for authority in authorities {
123 entities.push(authority.model().name().to_string());
124 }
125
126 entities
127}
128
129fn sql_projection_labels_from_select_statement(
132 statement: &SqlStatement,
133) -> Result<Option<Vec<String>>, QueryError> {
134 let SqlStatement::Select(select) = statement else {
135 return Err(QueryError::invariant(
136 "SQL projection labels require SELECT statement shape",
137 ));
138 };
139 let SqlProjection::Items(items) = &select.projection else {
140 return Ok(None);
141 };
142
143 Ok(Some(
144 items
145 .iter()
146 .enumerate()
147 .map(|(index, item)| {
148 select
149 .projection_alias(index)
150 .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
151 })
152 .collect(),
153 ))
154}
155
156fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
159 match item {
160 SqlSelectItem::Field(field) => field.clone(),
161 SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
162 SqlSelectItem::TextFunction(call) => {
163 format!(
164 "{}({})",
165 grouped_sql_text_function_name(call.function),
166 call.field
167 )
168 }
169 }
170}
171
172fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
175 let SqlStatement::Select(select) = statement else {
176 return None;
177 };
178
179 select.projection_alias(0).map(str::to_string)
180}
181
182fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
184 let kind = match aggregate.kind {
185 SqlAggregateKind::Count => "COUNT",
186 SqlAggregateKind::Sum => "SUM",
187 SqlAggregateKind::Avg => "AVG",
188 SqlAggregateKind::Min => "MIN",
189 SqlAggregateKind::Max => "MAX",
190 };
191
192 match aggregate.field.as_deref() {
193 Some(field) => format!("{kind}({field})"),
194 None => format!("{kind}(*)"),
195 }
196}
197
198const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
201 match function {
202 SqlTextFunction::Trim => "TRIM",
203 SqlTextFunction::Ltrim => "LTRIM",
204 SqlTextFunction::Rtrim => "RTRIM",
205 SqlTextFunction::Lower => "LOWER",
206 SqlTextFunction::Upper => "UPPER",
207 SqlTextFunction::Length => "LENGTH",
208 SqlTextFunction::Left => "LEFT",
209 SqlTextFunction::Right => "RIGHT",
210 SqlTextFunction::StartsWith => "STARTS_WITH",
211 SqlTextFunction::EndsWith => "ENDS_WITH",
212 SqlTextFunction::Contains => "CONTAINS",
213 SqlTextFunction::Position => "POSITION",
214 SqlTextFunction::Replace => "REPLACE",
215 SqlTextFunction::Substring => "SUBSTRING",
216 }
217}
218
219fn authority_for_generated_sql_route(
221 route: &SqlStatementRoute,
222 authorities: &[EntityAuthority],
223) -> Result<EntityAuthority, QueryError> {
224 let sql_entity = route.entity();
225
226 for authority in authorities {
227 if identifiers_tail_match(sql_entity, authority.model().name()) {
228 return Ok(*authority);
229 }
230 }
231
232 Err(unsupported_generated_sql_entity_error(
233 sql_entity,
234 authorities,
235 ))
236}
237
238fn unsupported_generated_sql_entity_error(
241 entity_name: &str,
242 authorities: &[EntityAuthority],
243) -> QueryError {
244 let mut supported = String::new();
245
246 for (index, authority) in authorities.iter().enumerate() {
247 if index != 0 {
248 supported.push_str(", ");
249 }
250
251 supported.push_str(authority.model().name());
252 }
253
254 QueryError::unsupported_query(format!(
255 "query endpoint does not support entity '{entity_name}'; supported: {supported}"
256 ))
257}
258
259impl<C: CanisterKind> DbSession<C> {
260 fn prepare_structural_sql_projection_execution(
263 &self,
264 query: StructuralQuery,
265 authority: EntityAuthority,
266 ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
267 let (_, plan) =
270 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
271 let projection = plan.projection_spec(authority.model());
272 let columns = projection_labels_from_projection_spec(&projection);
273
274 Ok((columns, plan))
275 }
276
277 pub(in crate::db::session::sql) fn execute_structural_sql_projection(
281 &self,
282 query: StructuralQuery,
283 authority: EntityAuthority,
284 ) -> Result<SqlProjectionPayload, QueryError> {
285 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
287
288 let projected =
291 execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
292 .map_err(QueryError::execute)?;
293 let (rows, row_count) = projected.into_parts();
294
295 Ok(SqlProjectionPayload::new(columns, rows, row_count))
296 }
297
298 fn execute_structural_sql_projection_text(
302 &self,
303 query: StructuralQuery,
304 authority: EntityAuthority,
305 ) -> Result<SqlDispatchResult, QueryError> {
306 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
308
309 let projected =
312 execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
313 .map_err(QueryError::execute)?;
314 let (rows, row_count) = projected.into_parts();
315
316 Ok(SqlDispatchResult::ProjectionText {
317 columns,
318 rows,
319 row_count,
320 })
321 }
322
323 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
327 where
328 E: PersistedRow<Canister = C> + EntityValue,
329 {
330 let plan = self
331 .compile_query_with_visible_indexes(query)?
332 .into_prepared_execution_plan();
333 let deleted = self
334 .with_metrics(|| self.delete_executor::<E>().execute_sql_projection(plan))
335 .map_err(QueryError::execute)?;
336 let (rows, row_count) = deleted.into_parts();
337 let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
338
339 Ok(SqlProjectionPayload::new(
340 projection_labels_from_fields(E::MODEL.fields()),
341 rows,
342 row_count,
343 )
344 .into_dispatch_result())
345 }
346
347 fn lowered_sql_query_dispatch_inputs_for_authority(
350 parsed: &SqlParsedStatement,
351 authority: EntityAuthority,
352 unsupported_message: &'static str,
353 ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
354 let lowered = parsed.lower_query_lane_for_entity(
355 authority.model().name(),
356 authority.model().primary_key.name,
357 )?;
358 let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
359 .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
360 .transpose()?;
361 let query = lowered
362 .into_query()
363 .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
364
365 Ok((query, projection_columns.flatten()))
366 }
367
368 fn dispatch_sql_query_route_for_authority(
372 &self,
373 parsed: &SqlParsedStatement,
374 authority: EntityAuthority,
375 unsupported_message: &'static str,
376 dispatch_select: impl FnOnce(
377 &Self,
378 LoweredSelectShape,
379 EntityAuthority,
380 bool,
381 Option<Vec<String>>,
382 ) -> Result<SqlDispatchResult, QueryError>,
383 dispatch_delete: impl FnOnce(
384 &Self,
385 LoweredBaseQueryShape,
386 EntityAuthority,
387 ) -> Result<SqlDispatchResult, QueryError>,
388 ) -> Result<SqlDispatchResult, QueryError> {
389 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
392 let command =
393 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
394
395 return self.execute_sql_aggregate_dispatch_for_authority(
396 command,
397 authority,
398 sql_aggregate_dispatch_label_override(&parsed.statement),
399 );
400 }
401
402 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
403 return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
404 }
405
406 let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
409 parsed,
410 authority,
411 unsupported_message,
412 )?;
413 let grouped_surface = query.has_grouping();
414
415 match query {
416 LoweredSqlQuery::Select(select) => {
417 dispatch_select(self, select, authority, grouped_surface, projection_columns)
418 }
419 LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
420 }
421 }
422
423 fn dispatch_sql_explain_route_for_authority(
427 &self,
428 parsed: &SqlParsedStatement,
429 authority: EntityAuthority,
430 ) -> Result<SqlDispatchResult, QueryError> {
431 if let Some((mode, plan)) =
434 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
435 {
436 return self
437 .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
438 .map(SqlDispatchResult::Explain);
439 }
440
441 let lowered = parsed.lower_query_lane_for_entity(
444 authority.model().name(),
445 authority.model().primary_key.name,
446 )?;
447 if let Some(explain) =
448 self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
449 {
450 return Ok(SqlDispatchResult::Explain(explain));
451 }
452
453 self.explain_lowered_sql_for_authority(&lowered, authority)
454 .map(SqlDispatchResult::Explain)
455 }
456
457 pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
460 query: &Query<E>,
461 surface: SqlGroupingSurface,
462 ) -> Result<(), QueryError>
463 where
464 E: EntityKind,
465 {
466 match (surface, query.has_grouping()) {
467 (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
468 (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
469 QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
470 ),
471 }
472 }
473
474 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
476 where
477 E: PersistedRow<Canister = C> + EntityValue,
478 {
479 let parsed = self.parse_sql_statement(sql)?;
480
481 self.execute_sql_dispatch_parsed::<E>(&parsed)
482 }
483
484 pub fn execute_sql_dispatch_parsed<E>(
486 &self,
487 parsed: &SqlParsedStatement,
488 ) -> Result<SqlDispatchResult, QueryError>
489 where
490 E: PersistedRow<Canister = C> + EntityValue,
491 {
492 match parsed.route() {
493 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
494 parsed,
495 EntityAuthority::for_type::<E>(),
496 "execute_sql_dispatch accepts SELECT or DELETE only",
497 |session, select, authority, grouped_surface, projection_columns| {
498 if grouped_surface {
499 let columns = projection_columns.ok_or_else(|| {
500 QueryError::unsupported_query(
501 "grouped SQL dispatch requires explicit grouped projection items",
502 )
503 })?;
504
505 return session.execute_lowered_sql_grouped_dispatch_select_core(
506 select, authority, columns,
507 );
508 }
509
510 let payload = session.execute_lowered_sql_projection_core(select, authority)?;
511 if let Some(columns) = projection_columns {
512 let (_, rows, row_count) = payload.into_parts();
513
514 return Ok(SqlProjectionPayload::new(columns, rows, row_count)
515 .into_dispatch_result());
516 }
517
518 Ok(payload.into_dispatch_result())
519 },
520 |session, delete, _authority| {
521 let typed_query = bind_lowered_sql_query::<E>(
522 LoweredSqlQuery::Delete(delete),
523 MissingRowPolicy::Ignore,
524 )
525 .map_err(QueryError::from_sql_lowering_error)?;
526
527 session.execute_typed_sql_delete(&typed_query)
528 },
529 ),
530 SqlStatementRoute::Explain { .. } => self
531 .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
532 SqlStatementRoute::Describe { .. } => {
533 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
534 }
535 SqlStatementRoute::ShowIndexes { .. } => {
536 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
537 }
538 SqlStatementRoute::ShowColumns { .. } => {
539 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
540 }
541 SqlStatementRoute::ShowEntities => {
542 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
543 }
544 }
545 }
546
547 #[doc(hidden)]
554 pub fn execute_generated_query_surface_dispatch_for_authority(
555 &self,
556 parsed: &SqlParsedStatement,
557 authority: EntityAuthority,
558 ) -> Result<SqlDispatchResult, QueryError> {
559 match parsed.route() {
560 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
561 parsed,
562 authority,
563 "generated SQL query surface requires query or EXPLAIN statement lanes",
564 |session, select, authority, grouped_surface, projection_columns| {
565 if grouped_surface {
566 let columns = projection_columns.ok_or_else(|| {
567 QueryError::unsupported_query(
568 "grouped SQL dispatch requires explicit grouped projection items",
569 )
570 })?;
571
572 return session
573 .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
574 }
575
576 let result =
577 session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
578 if let Some(columns) = projection_columns {
579 let SqlDispatchResult::ProjectionText {
580 rows, row_count, ..
581 } = result
582 else {
583 return Err(QueryError::invariant(
584 "generated scalar SQL dispatch text path must emit projection text rows",
585 ));
586 };
587
588 return Ok(SqlDispatchResult::ProjectionText {
589 columns,
590 rows,
591 row_count,
592 });
593 }
594
595 Ok(result)
596 },
597 |session, delete, authority| {
598 session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
599 },
600 ),
601 SqlStatementRoute::Explain { .. } => {
602 self.dispatch_sql_explain_route_for_authority(parsed, authority)
603 }
604 SqlStatementRoute::Describe { .. }
605 | SqlStatementRoute::ShowIndexes { .. }
606 | SqlStatementRoute::ShowColumns { .. }
607 | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
608 "generated SQL query surface requires query or EXPLAIN statement lanes",
609 )),
610 }
611 }
612
613 #[doc(hidden)]
619 #[must_use]
620 pub fn execute_generated_query_surface_sql(
621 &self,
622 sql: &str,
623 authorities: &[EntityAuthority],
624 ) -> GeneratedSqlDispatchAttempt {
625 let sql_trimmed = match trim_generated_query_sql_input(sql) {
628 Ok(sql_trimmed) => sql_trimmed,
629 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
630 };
631 let parsed = match self.parse_sql_statement(sql_trimmed) {
632 Ok(parsed) => parsed,
633 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
634 };
635
636 if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
639 return GeneratedSqlDispatchAttempt::new(
640 "",
641 None,
642 Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
643 authorities,
644 ))),
645 );
646 }
647 let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
648 Ok(authority) => authority,
649 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
650 };
651
652 let entity_name = authority.model().name();
656 let explain_order_field = parsed
657 .route()
658 .is_explain()
659 .then_some(authority.model().primary_key.name);
660 let result = match parsed.route() {
661 SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
662 self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
663 }
664 SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
665 self.describe_entity_model(authority.model()),
666 )),
667 SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
668 self.show_indexes_for_store_model(authority.store_path(), authority.model()),
669 )),
670 SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
671 self.show_columns_for_model(authority.model()),
672 )),
673 SqlStatementRoute::ShowEntities => unreachable!(
674 "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
675 ),
676 };
677
678 GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
679 }
680}