1mod computed;
7mod lowered;
8
9use crate::{
10 db::{
11 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
12 executor::{
13 EntityAuthority, execute_sql_projection_rows_for_canister,
14 execute_sql_projection_text_rows_for_canister,
15 },
16 identifiers_tail_match,
17 query::{intent::StructuralQuery, plan::AccessPlannedQuery},
18 session::sql::{
19 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
20 aggregate::parsed_requires_dedicated_sql_aggregate_lane,
21 computed_projection,
22 projection::{
23 SqlProjectionPayload, projection_labels_from_fields,
24 projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
25 },
26 },
27 sql::lowering::{
28 LoweredSqlQuery, bind_lowered_sql_query, lower_sql_command_from_prepared_statement,
29 },
30 sql::parser::{
31 SqlAggregateCall, SqlAggregateKind, SqlProjection, SqlSelectItem, SqlStatement,
32 SqlTextFunction,
33 },
34 },
35 traits::{CanisterKind, EntityKind, EntityValue},
36};
37
38#[cfg(feature = "perf-attribution")]
39pub use lowered::LoweredSqlDispatchExecutorAttribution;
40
41#[doc(hidden)]
50pub struct GeneratedSqlDispatchAttempt {
51 entity_name: &'static str,
52 explain_order_field: Option<&'static str>,
53 result: Result<SqlDispatchResult, QueryError>,
54}
55
56impl GeneratedSqlDispatchAttempt {
57 const fn new(
59 entity_name: &'static str,
60 explain_order_field: Option<&'static str>,
61 result: Result<SqlDispatchResult, QueryError>,
62 ) -> Self {
63 Self {
64 entity_name,
65 explain_order_field,
66 result,
67 }
68 }
69
70 #[must_use]
72 pub const fn entity_name(&self) -> &'static str {
73 self.entity_name
74 }
75
76 #[must_use]
78 pub const fn explain_order_field(&self) -> Option<&'static str> {
79 self.explain_order_field
80 }
81
82 pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
84 self.result
85 }
86}
87
88#[derive(Clone, Copy, Debug, Eq, PartialEq)]
89pub(in crate::db::session::sql) enum SqlGroupingSurface {
90 Scalar,
91 Grouped,
92}
93
94const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
95 match surface {
96 SqlGroupingSurface::Scalar => {
97 "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
98 }
99 SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
100 }
101}
102
103fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
106 let sql_trimmed = sql.trim();
107 if sql_trimmed.is_empty() {
108 return Err(QueryError::unsupported_query(
109 "query endpoint requires a non-empty SQL string",
110 ));
111 }
112
113 Ok(sql_trimmed)
114}
115
116fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
119 let mut entities = Vec::with_capacity(authorities.len());
120
121 for authority in authorities {
122 entities.push(authority.model().name().to_string());
123 }
124
125 entities
126}
127
128fn grouped_sql_projection_labels_from_statement(
130 statement: &SqlStatement,
131) -> Result<Vec<String>, QueryError> {
132 let SqlStatement::Select(select) = statement else {
133 return Err(QueryError::invariant(
134 "grouped SQL projection labels require SELECT statement shape",
135 ));
136 };
137 let SqlProjection::Items(items) = &select.projection else {
138 return Err(QueryError::unsupported_query(
139 "grouped SQL dispatch requires explicit grouped projection items",
140 ));
141 };
142
143 Ok(items
144 .iter()
145 .map(grouped_sql_projection_item_label)
146 .collect())
147}
148
149fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
152 match item {
153 SqlSelectItem::Field(field) => field.clone(),
154 SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
155 SqlSelectItem::TextFunction(call) => {
156 format!(
157 "{}({})",
158 grouped_sql_text_function_name(call.function),
159 call.field
160 )
161 }
162 }
163}
164
165fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
167 let kind = match aggregate.kind {
168 SqlAggregateKind::Count => "COUNT",
169 SqlAggregateKind::Sum => "SUM",
170 SqlAggregateKind::Avg => "AVG",
171 SqlAggregateKind::Min => "MIN",
172 SqlAggregateKind::Max => "MAX",
173 };
174
175 match aggregate.field.as_deref() {
176 Some(field) => format!("{kind}({field})"),
177 None => format!("{kind}(*)"),
178 }
179}
180
181const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
184 match function {
185 SqlTextFunction::Trim => "TRIM",
186 SqlTextFunction::Ltrim => "LTRIM",
187 SqlTextFunction::Rtrim => "RTRIM",
188 SqlTextFunction::Lower => "LOWER",
189 SqlTextFunction::Upper => "UPPER",
190 SqlTextFunction::Length => "LENGTH",
191 SqlTextFunction::Left => "LEFT",
192 SqlTextFunction::Right => "RIGHT",
193 SqlTextFunction::StartsWith => "STARTS_WITH",
194 SqlTextFunction::EndsWith => "ENDS_WITH",
195 SqlTextFunction::Contains => "CONTAINS",
196 SqlTextFunction::Position => "POSITION",
197 SqlTextFunction::Replace => "REPLACE",
198 SqlTextFunction::Substring => "SUBSTRING",
199 }
200}
201
202fn authority_for_generated_sql_route(
204 route: &SqlStatementRoute,
205 authorities: &[EntityAuthority],
206) -> Result<EntityAuthority, QueryError> {
207 let sql_entity = route.entity();
208
209 for authority in authorities {
210 if identifiers_tail_match(sql_entity, authority.model().name()) {
211 return Ok(*authority);
212 }
213 }
214
215 Err(unsupported_generated_sql_entity_error(
216 sql_entity,
217 authorities,
218 ))
219}
220
221fn unsupported_generated_sql_entity_error(
224 entity_name: &str,
225 authorities: &[EntityAuthority],
226) -> QueryError {
227 let mut supported = String::new();
228
229 for (index, authority) in authorities.iter().enumerate() {
230 if index != 0 {
231 supported.push_str(", ");
232 }
233
234 supported.push_str(authority.model().name());
235 }
236
237 QueryError::unsupported_query(format!(
238 "query endpoint does not support entity '{entity_name}'; supported: {supported}"
239 ))
240}
241
242impl<C: CanisterKind> DbSession<C> {
243 fn prepare_structural_sql_projection_execution(
246 &self,
247 query: StructuralQuery,
248 authority: EntityAuthority,
249 ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
250 let (_, plan) =
253 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
254 let projection = plan.projection_spec(authority.model());
255 let columns = projection_labels_from_projection_spec(&projection);
256
257 Ok((columns, plan))
258 }
259
260 pub(in crate::db::session::sql) fn execute_structural_sql_projection(
264 &self,
265 query: StructuralQuery,
266 authority: EntityAuthority,
267 ) -> Result<SqlProjectionPayload, QueryError> {
268 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
270
271 let projected =
274 execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
275 .map_err(QueryError::execute)?;
276 let (rows, row_count) = projected.into_parts();
277
278 Ok(SqlProjectionPayload::new(columns, rows, row_count))
279 }
280
281 fn execute_structural_sql_projection_text(
285 &self,
286 query: StructuralQuery,
287 authority: EntityAuthority,
288 ) -> Result<SqlDispatchResult, QueryError> {
289 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
291
292 let projected =
295 execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
296 .map_err(QueryError::execute)?;
297 let (rows, row_count) = projected.into_parts();
298
299 Ok(SqlDispatchResult::ProjectionText {
300 columns,
301 rows,
302 row_count,
303 })
304 }
305
306 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
310 where
311 E: PersistedRow<Canister = C> + EntityValue,
312 {
313 let plan = self
314 .compile_query_with_visible_indexes(query)?
315 .into_executable();
316 let deleted = self
317 .with_metrics(|| self.delete_executor::<E>().execute_sql_projection(plan))
318 .map_err(QueryError::execute)?;
319 let (rows, row_count) = deleted.into_parts();
320 let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
321
322 Ok(SqlProjectionPayload::new(
323 projection_labels_from_fields(E::MODEL.fields()),
324 rows,
325 row_count,
326 )
327 .into_dispatch_result())
328 }
329
330 pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
333 query: &Query<E>,
334 surface: SqlGroupingSurface,
335 ) -> Result<(), QueryError>
336 where
337 E: EntityKind,
338 {
339 match (surface, query.has_grouping()) {
340 (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
341 (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
342 QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
343 ),
344 }
345 }
346
347 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
349 where
350 E: PersistedRow<Canister = C> + EntityValue,
351 {
352 let parsed = self.parse_sql_statement(sql)?;
353
354 self.execute_sql_dispatch_parsed::<E>(&parsed)
355 }
356
357 pub fn execute_sql_dispatch_parsed<E>(
359 &self,
360 parsed: &SqlParsedStatement,
361 ) -> Result<SqlDispatchResult, QueryError>
362 where
363 E: PersistedRow<Canister = C> + EntityValue,
364 {
365 match parsed.route() {
366 SqlStatementRoute::Query { .. } => {
367 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
368 let authority = EntityAuthority::for_type::<E>();
369 let command =
370 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
371
372 return self.execute_sql_aggregate_dispatch_for_authority(command, authority);
373 }
374
375 if let Some(plan) =
376 computed_projection::computed_sql_projection_plan(&parsed.statement)?
377 {
378 return self.execute_computed_sql_projection_dispatch::<E>(plan);
379 }
380
381 let lowered = parsed
385 .lower_query_lane_for_entity(E::MODEL.name(), E::MODEL.primary_key.name)?;
386 let grouped_columns = lowered
387 .query()
388 .filter(|query| query.has_grouping())
389 .map(|_| grouped_sql_projection_labels_from_statement(&parsed.statement))
390 .transpose()?;
391
392 match lowered.into_query() {
396 Some(LoweredSqlQuery::Select(select)) => match grouped_columns {
397 Some(columns) => self.execute_lowered_sql_grouped_dispatch_select_core(
398 select,
399 EntityAuthority::for_type::<E>(),
400 columns,
401 ),
402 None => self
403 .execute_lowered_sql_projection_core(
404 select,
405 EntityAuthority::for_type::<E>(),
406 )
407 .map(SqlProjectionPayload::into_dispatch_result),
408 },
409 Some(LoweredSqlQuery::Delete(delete)) => {
410 let typed_query = bind_lowered_sql_query::<E>(
411 LoweredSqlQuery::Delete(delete),
412 MissingRowPolicy::Ignore,
413 )
414 .map_err(QueryError::from_sql_lowering_error)?;
415
416 self.execute_typed_sql_delete(&typed_query)
417 }
418 None => Err(QueryError::unsupported_query(
419 "execute_sql_dispatch accepts SELECT or DELETE only",
420 )),
421 }
422 }
423 SqlStatementRoute::Explain { .. } => {
424 if let Some((mode, plan)) =
425 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
426 {
427 return self
428 .explain_computed_sql_projection_dispatch::<E>(mode, plan)
429 .map(SqlDispatchResult::Explain);
430 }
431
432 let lowered = lower_sql_command_from_prepared_statement(
433 parsed.prepare(E::MODEL.name())?,
434 E::MODEL.primary_key.name,
435 )
436 .map_err(QueryError::from_sql_lowering_error)?;
437 if let Some(explain) = self.explain_lowered_sql_execution_for_authority(
438 &lowered,
439 EntityAuthority::for_type::<E>(),
440 )? {
441 return Ok(SqlDispatchResult::Explain(explain));
442 }
443
444 self.explain_lowered_sql_for_authority(&lowered, EntityAuthority::for_type::<E>())
445 .map(SqlDispatchResult::Explain)
446 }
447 SqlStatementRoute::Describe { .. } => {
448 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
449 }
450 SqlStatementRoute::ShowIndexes { .. } => {
451 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
452 }
453 SqlStatementRoute::ShowColumns { .. } => {
454 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
455 }
456 SqlStatementRoute::ShowEntities => {
457 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
458 }
459 }
460 }
461
462 #[doc(hidden)]
469 pub fn execute_generated_query_surface_dispatch_for_authority(
470 &self,
471 parsed: &SqlParsedStatement,
472 authority: EntityAuthority,
473 ) -> Result<SqlDispatchResult, QueryError> {
474 match parsed.route() {
475 SqlStatementRoute::Query { .. } => {
476 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
477 let command =
478 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
479
480 return self.execute_sql_aggregate_dispatch_for_authority(command, authority);
481 }
482
483 if let Some(plan) =
484 computed_projection::computed_sql_projection_plan(&parsed.statement)?
485 {
486 return self
487 .execute_computed_sql_projection_dispatch_for_authority(plan, authority);
488 }
489
490 let lowered = parsed.lower_query_lane_for_entity(
491 authority.model().name(),
492 authority.model().primary_key.name,
493 )?;
494 let grouped_columns = lowered
495 .query()
496 .filter(|query| query.has_grouping())
497 .map(|_| grouped_sql_projection_labels_from_statement(&parsed.statement))
498 .transpose()?;
499
500 match lowered.into_query() {
501 Some(LoweredSqlQuery::Select(select)) => match grouped_columns {
502 Some(columns) => self.execute_lowered_sql_grouped_dispatch_select_core(
503 select, authority, columns,
504 ),
505 None => {
506 self.execute_lowered_sql_dispatch_select_text_core(select, authority)
507 }
508 },
509 Some(LoweredSqlQuery::Delete(delete)) => {
510 self.execute_lowered_sql_dispatch_delete_core(&delete, authority)
511 }
512 None => Err(QueryError::unsupported_query(
513 "generated SQL query surface requires query or EXPLAIN statement lanes",
514 )),
515 }
516 }
517 SqlStatementRoute::Explain { .. } => {
518 if let Some((mode, plan)) =
519 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
520 {
521 return self
522 .explain_computed_sql_projection_dispatch_for_authority(
523 mode, plan, authority,
524 )
525 .map(SqlDispatchResult::Explain);
526 }
527
528 let lowered = parsed.lower_query_lane_for_entity(
529 authority.model().name(),
530 authority.model().primary_key.name,
531 )?;
532 if let Some(explain) =
533 self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
534 {
535 return Ok(SqlDispatchResult::Explain(explain));
536 }
537
538 self.explain_lowered_sql_for_authority(&lowered, authority)
539 .map(SqlDispatchResult::Explain)
540 }
541 SqlStatementRoute::Describe { .. }
542 | SqlStatementRoute::ShowIndexes { .. }
543 | SqlStatementRoute::ShowColumns { .. }
544 | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
545 "generated SQL query surface requires query or EXPLAIN statement lanes",
546 )),
547 }
548 }
549
550 #[doc(hidden)]
556 #[must_use]
557 pub fn execute_generated_query_surface_sql(
558 &self,
559 sql: &str,
560 authorities: &[EntityAuthority],
561 ) -> GeneratedSqlDispatchAttempt {
562 let sql_trimmed = match trim_generated_query_sql_input(sql) {
565 Ok(sql_trimmed) => sql_trimmed,
566 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
567 };
568 let parsed = match self.parse_sql_statement(sql_trimmed) {
569 Ok(parsed) => parsed,
570 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
571 };
572
573 if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
576 return GeneratedSqlDispatchAttempt::new(
577 "",
578 None,
579 Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
580 authorities,
581 ))),
582 );
583 }
584 let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
585 Ok(authority) => authority,
586 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
587 };
588
589 let entity_name = authority.model().name();
593 let explain_order_field = parsed
594 .route()
595 .is_explain()
596 .then_some(authority.model().primary_key.name);
597 let result = match parsed.route() {
598 SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
599 self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
600 }
601 SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
602 self.describe_entity_model(authority.model()),
603 )),
604 SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
605 self.show_indexes_for_store_model(authority.store_path(), authority.model()),
606 )),
607 SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
608 self.show_columns_for_model(authority.model()),
609 )),
610 SqlStatementRoute::ShowEntities => unreachable!(
611 "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
612 ),
613 };
614
615 GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
616 }
617}