1mod computed;
7mod lowered;
8
9use crate::{
10 db::{
11 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
12 executor::{
13 EntityAuthority, execute_sql_projection_rows_for_canister,
14 execute_sql_projection_text_rows_for_canister,
15 },
16 identifiers_tail_match,
17 query::intent::StructuralQuery,
18 session::sql::{
19 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
20 aggregate::parsed_requires_dedicated_sql_aggregate_lane,
21 computed_projection,
22 projection::{
23 SqlProjectionPayload, projection_labels_from_fields,
24 projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
25 },
26 },
27 sql::lowering::{
28 LoweredSqlQuery, bind_lowered_sql_query, lower_sql_command_from_prepared_statement,
29 },
30 sql::parser::{
31 SqlAggregateCall, SqlAggregateKind, SqlProjection, SqlSelectItem, SqlStatement,
32 SqlTextFunction,
33 },
34 },
35 traits::{CanisterKind, EntityKind, EntityValue},
36};
37
38#[cfg(feature = "perf-attribution")]
39pub use lowered::LoweredSqlDispatchExecutorAttribution;
40
41#[doc(hidden)]
50pub struct GeneratedSqlDispatchAttempt {
51 entity_name: &'static str,
52 explain_order_field: Option<&'static str>,
53 result: Result<SqlDispatchResult, QueryError>,
54}
55
56impl GeneratedSqlDispatchAttempt {
57 const fn new(
59 entity_name: &'static str,
60 explain_order_field: Option<&'static str>,
61 result: Result<SqlDispatchResult, QueryError>,
62 ) -> Self {
63 Self {
64 entity_name,
65 explain_order_field,
66 result,
67 }
68 }
69
70 #[must_use]
72 pub const fn entity_name(&self) -> &'static str {
73 self.entity_name
74 }
75
76 #[must_use]
78 pub const fn explain_order_field(&self) -> Option<&'static str> {
79 self.explain_order_field
80 }
81
82 pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
84 self.result
85 }
86}
87
88#[derive(Clone, Copy, Debug, Eq, PartialEq)]
89pub(in crate::db::session::sql) enum SqlGroupingSurface {
90 Scalar,
91 Grouped,
92}
93
94const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
95 match surface {
96 SqlGroupingSurface::Scalar => {
97 "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
98 }
99 SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
100 }
101}
102
103fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
106 let sql_trimmed = sql.trim();
107 if sql_trimmed.is_empty() {
108 return Err(QueryError::unsupported_query(
109 "query endpoint requires a non-empty SQL string",
110 ));
111 }
112
113 Ok(sql_trimmed)
114}
115
116fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
119 let mut entities = Vec::with_capacity(authorities.len());
120
121 for authority in authorities {
122 entities.push(authority.model().name().to_string());
123 }
124
125 entities
126}
127
128fn grouped_sql_projection_labels_from_statement(
130 statement: &SqlStatement,
131) -> Result<Vec<String>, QueryError> {
132 let SqlStatement::Select(select) = statement else {
133 return Err(QueryError::invariant(
134 "grouped SQL projection labels require SELECT statement shape",
135 ));
136 };
137 let SqlProjection::Items(items) = &select.projection else {
138 return Err(QueryError::unsupported_query(
139 "grouped SQL dispatch requires explicit grouped projection items",
140 ));
141 };
142
143 Ok(items
144 .iter()
145 .map(grouped_sql_projection_item_label)
146 .collect())
147}
148
149fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
152 match item {
153 SqlSelectItem::Field(field) => field.clone(),
154 SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
155 SqlSelectItem::TextFunction(call) => {
156 format!(
157 "{}({})",
158 grouped_sql_text_function_name(call.function),
159 call.field
160 )
161 }
162 }
163}
164
165fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
167 let kind = match aggregate.kind {
168 SqlAggregateKind::Count => "COUNT",
169 SqlAggregateKind::Sum => "SUM",
170 SqlAggregateKind::Avg => "AVG",
171 SqlAggregateKind::Min => "MIN",
172 SqlAggregateKind::Max => "MAX",
173 };
174
175 match aggregate.field.as_deref() {
176 Some(field) => format!("{kind}({field})"),
177 None => format!("{kind}(*)"),
178 }
179}
180
181const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
184 match function {
185 SqlTextFunction::Trim => "TRIM",
186 SqlTextFunction::Ltrim => "LTRIM",
187 SqlTextFunction::Rtrim => "RTRIM",
188 SqlTextFunction::Lower => "LOWER",
189 SqlTextFunction::Upper => "UPPER",
190 SqlTextFunction::Length => "LENGTH",
191 SqlTextFunction::Left => "LEFT",
192 SqlTextFunction::Right => "RIGHT",
193 SqlTextFunction::StartsWith => "STARTS_WITH",
194 SqlTextFunction::EndsWith => "ENDS_WITH",
195 SqlTextFunction::Contains => "CONTAINS",
196 SqlTextFunction::Position => "POSITION",
197 SqlTextFunction::Replace => "REPLACE",
198 SqlTextFunction::Substring => "SUBSTRING",
199 }
200}
201
202fn authority_for_generated_sql_route(
204 route: &SqlStatementRoute,
205 authorities: &[EntityAuthority],
206) -> Result<EntityAuthority, QueryError> {
207 let sql_entity = route.entity();
208
209 for authority in authorities {
210 if identifiers_tail_match(sql_entity, authority.model().name()) {
211 return Ok(*authority);
212 }
213 }
214
215 Err(unsupported_generated_sql_entity_error(
216 sql_entity,
217 authorities,
218 ))
219}
220
221fn unsupported_generated_sql_entity_error(
224 entity_name: &str,
225 authorities: &[EntityAuthority],
226) -> QueryError {
227 let mut supported = String::new();
228
229 for (index, authority) in authorities.iter().enumerate() {
230 if index != 0 {
231 supported.push_str(", ");
232 }
233
234 supported.push_str(authority.model().name());
235 }
236
237 QueryError::unsupported_query(format!(
238 "query endpoint does not support entity '{entity_name}'; supported: {supported}"
239 ))
240}
241
242impl<C: CanisterKind> DbSession<C> {
243 pub(in crate::db::session::sql) fn execute_structural_sql_projection(
247 &self,
248 query: StructuralQuery,
249 authority: EntityAuthority,
250 ) -> Result<SqlProjectionPayload, QueryError> {
251 let (_, plan) =
254 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
255 let projection = plan.projection_spec(authority.model());
256 let columns = projection_labels_from_projection_spec(&projection);
257
258 let projected =
261 execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
262 .map_err(QueryError::execute)?;
263 let (rows, row_count) = projected.into_parts();
264
265 Ok(SqlProjectionPayload::new(columns, rows, row_count))
266 }
267
268 fn execute_structural_sql_projection_text(
272 &self,
273 query: StructuralQuery,
274 authority: EntityAuthority,
275 ) -> Result<SqlDispatchResult, QueryError> {
276 let (_, plan) =
279 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
280 let projection = plan.projection_spec(authority.model());
281 let columns = projection_labels_from_projection_spec(&projection);
282
283 let projected =
286 execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
287 .map_err(QueryError::execute)?;
288 let (rows, row_count) = projected.into_parts();
289
290 Ok(SqlDispatchResult::ProjectionText {
291 columns,
292 rows,
293 row_count,
294 })
295 }
296
297 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
301 where
302 E: PersistedRow<Canister = C> + EntityValue,
303 {
304 let plan = self
305 .compile_query_with_visible_indexes(query)?
306 .into_executable();
307 let deleted = self
308 .with_metrics(|| self.delete_executor::<E>().execute_sql_projection(plan))
309 .map_err(QueryError::execute)?;
310 let (rows, row_count) = deleted.into_parts();
311 let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
312
313 Ok(SqlProjectionPayload::new(
314 projection_labels_from_fields(E::MODEL.fields()),
315 rows,
316 row_count,
317 )
318 .into_dispatch_result())
319 }
320
321 pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
324 query: &Query<E>,
325 surface: SqlGroupingSurface,
326 ) -> Result<(), QueryError>
327 where
328 E: EntityKind,
329 {
330 match (surface, query.has_grouping()) {
331 (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
332 (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
333 QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
334 ),
335 }
336 }
337
338 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
340 where
341 E: PersistedRow<Canister = C> + EntityValue,
342 {
343 let parsed = self.parse_sql_statement(sql)?;
344
345 self.execute_sql_dispatch_parsed::<E>(&parsed)
346 }
347
348 pub fn execute_sql_dispatch_parsed<E>(
350 &self,
351 parsed: &SqlParsedStatement,
352 ) -> Result<SqlDispatchResult, QueryError>
353 where
354 E: PersistedRow<Canister = C> + EntityValue,
355 {
356 match parsed.route() {
357 SqlStatementRoute::Query { .. } => {
358 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
359 let authority = EntityAuthority::for_type::<E>();
360 let command =
361 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
362
363 return self.execute_sql_aggregate_dispatch_for_authority(command, authority);
364 }
365
366 if let Some(plan) =
367 computed_projection::computed_sql_projection_plan(&parsed.statement)?
368 {
369 return self.execute_computed_sql_projection_dispatch::<E>(plan);
370 }
371
372 let lowered = parsed
376 .lower_query_lane_for_entity(E::MODEL.name(), E::MODEL.primary_key.name)?;
377 let grouped_columns = lowered
378 .query()
379 .filter(|query| query.has_grouping())
380 .map(|_| grouped_sql_projection_labels_from_statement(&parsed.statement))
381 .transpose()?;
382
383 match lowered.into_query() {
387 Some(LoweredSqlQuery::Select(select)) => match grouped_columns {
388 Some(columns) => self.execute_lowered_sql_grouped_dispatch_select_core(
389 select,
390 EntityAuthority::for_type::<E>(),
391 columns,
392 ),
393 None => self
394 .execute_lowered_sql_projection_core(
395 select,
396 EntityAuthority::for_type::<E>(),
397 )
398 .map(SqlProjectionPayload::into_dispatch_result),
399 },
400 Some(LoweredSqlQuery::Delete(delete)) => {
401 let typed_query = bind_lowered_sql_query::<E>(
402 LoweredSqlQuery::Delete(delete),
403 MissingRowPolicy::Ignore,
404 )
405 .map_err(QueryError::from_sql_lowering_error)?;
406
407 self.execute_typed_sql_delete(&typed_query)
408 }
409 None => Err(QueryError::unsupported_query(
410 "execute_sql_dispatch accepts SELECT or DELETE only",
411 )),
412 }
413 }
414 SqlStatementRoute::Explain { .. } => {
415 if let Some((mode, plan)) =
416 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
417 {
418 return self
419 .explain_computed_sql_projection_dispatch::<E>(mode, plan)
420 .map(SqlDispatchResult::Explain);
421 }
422
423 let lowered = lower_sql_command_from_prepared_statement(
424 parsed.prepare(E::MODEL.name())?,
425 E::MODEL.primary_key.name,
426 )
427 .map_err(QueryError::from_sql_lowering_error)?;
428 if let Some(explain) = self.explain_lowered_sql_execution_for_authority(
429 &lowered,
430 EntityAuthority::for_type::<E>(),
431 )? {
432 return Ok(SqlDispatchResult::Explain(explain));
433 }
434
435 self.explain_lowered_sql_for_authority(&lowered, EntityAuthority::for_type::<E>())
436 .map(SqlDispatchResult::Explain)
437 }
438 SqlStatementRoute::Describe { .. } => {
439 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
440 }
441 SqlStatementRoute::ShowIndexes { .. } => {
442 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
443 }
444 SqlStatementRoute::ShowColumns { .. } => {
445 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
446 }
447 SqlStatementRoute::ShowEntities => {
448 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
449 }
450 }
451 }
452
453 #[doc(hidden)]
460 pub fn execute_generated_query_surface_dispatch_for_authority(
461 &self,
462 parsed: &SqlParsedStatement,
463 authority: EntityAuthority,
464 ) -> Result<SqlDispatchResult, QueryError> {
465 match parsed.route() {
466 SqlStatementRoute::Query { .. } => {
467 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
468 let command =
469 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
470
471 return self.execute_sql_aggregate_dispatch_for_authority(command, authority);
472 }
473
474 if let Some(plan) =
475 computed_projection::computed_sql_projection_plan(&parsed.statement)?
476 {
477 return self
478 .execute_computed_sql_projection_dispatch_for_authority(plan, authority);
479 }
480
481 let lowered = parsed.lower_query_lane_for_entity(
482 authority.model().name(),
483 authority.model().primary_key.name,
484 )?;
485 let grouped_columns = lowered
486 .query()
487 .filter(|query| query.has_grouping())
488 .map(|_| grouped_sql_projection_labels_from_statement(&parsed.statement))
489 .transpose()?;
490
491 match lowered.into_query() {
492 Some(LoweredSqlQuery::Select(select)) => match grouped_columns {
493 Some(columns) => self.execute_lowered_sql_grouped_dispatch_select_core(
494 select, authority, columns,
495 ),
496 None => {
497 self.execute_lowered_sql_dispatch_select_text_core(select, authority)
498 }
499 },
500 Some(LoweredSqlQuery::Delete(delete)) => {
501 self.execute_lowered_sql_dispatch_delete_core(&delete, authority)
502 }
503 None => Err(QueryError::unsupported_query(
504 "generated SQL query surface requires query or EXPLAIN statement lanes",
505 )),
506 }
507 }
508 SqlStatementRoute::Explain { .. } => {
509 if let Some((mode, plan)) =
510 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
511 {
512 return self
513 .explain_computed_sql_projection_dispatch_for_authority(
514 mode, plan, authority,
515 )
516 .map(SqlDispatchResult::Explain);
517 }
518
519 let lowered = parsed.lower_query_lane_for_entity(
520 authority.model().name(),
521 authority.model().primary_key.name,
522 )?;
523 if let Some(explain) =
524 self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
525 {
526 return Ok(SqlDispatchResult::Explain(explain));
527 }
528
529 self.explain_lowered_sql_for_authority(&lowered, authority)
530 .map(SqlDispatchResult::Explain)
531 }
532 SqlStatementRoute::Describe { .. }
533 | SqlStatementRoute::ShowIndexes { .. }
534 | SqlStatementRoute::ShowColumns { .. }
535 | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
536 "generated SQL query surface requires query or EXPLAIN statement lanes",
537 )),
538 }
539 }
540
541 #[doc(hidden)]
547 #[must_use]
548 pub fn execute_generated_query_surface_sql(
549 &self,
550 sql: &str,
551 authorities: &[EntityAuthority],
552 ) -> GeneratedSqlDispatchAttempt {
553 let sql_trimmed = match trim_generated_query_sql_input(sql) {
556 Ok(sql_trimmed) => sql_trimmed,
557 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
558 };
559 let parsed = match self.parse_sql_statement(sql_trimmed) {
560 Ok(parsed) => parsed,
561 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
562 };
563
564 if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
567 return GeneratedSqlDispatchAttempt::new(
568 "",
569 None,
570 Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
571 authorities,
572 ))),
573 );
574 }
575 let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
576 Ok(authority) => authority,
577 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
578 };
579
580 let entity_name = authority.model().name();
584 let explain_order_field = parsed
585 .route()
586 .is_explain()
587 .then_some(authority.model().primary_key.name);
588 let result = match parsed.route() {
589 SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
590 self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
591 }
592 SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
593 self.describe_entity_model(authority.model()),
594 )),
595 SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
596 self.show_indexes_for_store_model(authority.store_path(), authority.model()),
597 )),
598 SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
599 self.show_columns_for_model(authority.model()),
600 )),
601 SqlStatementRoute::ShowEntities => unreachable!(
602 "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
603 ),
604 };
605
606 GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
607 }
608}