icydb_core/db/session/sql/
mod.rs1mod aggregate;
8mod computed_projection;
9mod dispatch;
10mod explain;
11mod projection;
12mod surface;
13
14use crate::{
15 db::{
16 DbSession, EntityResponse, GroupedTextCursorPageWithTrace, MissingRowPolicy,
17 PagedGroupedExecutionWithTrace, PersistedRow, Query, QueryError,
18 executor::EntityAuthority,
19 query::{
20 intent::StructuralQuery,
21 plan::{AccessPlannedQuery, VisibleIndexes},
22 },
23 sql::{
24 lowering::{
25 bind_lowered_sql_query, lower_sql_command_from_prepared_statement,
26 prepare_sql_statement,
27 },
28 parser::{SqlStatement, parse_sql},
29 },
30 },
31 traits::{CanisterKind, EntityKind, EntityValue},
32};
33
34use crate::db::session::sql::aggregate::{
35 SqlAggregateSurface, parsed_requires_dedicated_sql_aggregate_lane,
36 unsupported_sql_aggregate_lane_message,
37};
38use crate::db::session::sql::surface::{
39 SqlSurface, session_sql_lane, sql_statement_route_from_statement, unsupported_sql_lane_message,
40};
41
42#[cfg(feature = "structural-read-metrics")]
43pub use crate::db::session::sql::projection::{
44 SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
45};
46pub use crate::db::session::sql::surface::{
47 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
48};
49#[cfg(feature = "perf-attribution")]
50pub use crate::db::{
51 session::sql::dispatch::LoweredSqlDispatchExecutorAttribution,
52 session::sql::projection::SqlProjectionTextExecutorAttribution,
53};
54
55#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56enum SqlComputedProjectionSurface {
57 QueryFrom,
58 ExecuteSql,
59 ExecuteSqlGrouped,
60}
61
62const fn unsupported_sql_computed_projection_message(
63 surface: SqlComputedProjectionSurface,
64) -> &'static str {
65 match surface {
66 SqlComputedProjectionSurface::QueryFrom => {
67 "query_from_sql does not accept computed text projection; use execute_sql_dispatch(...)"
68 }
69 SqlComputedProjectionSurface::ExecuteSql => {
70 "execute_sql rejects computed text projection; use execute_sql_dispatch(...)"
71 }
72 SqlComputedProjectionSurface::ExecuteSqlGrouped => {
73 "execute_sql_grouped rejects computed text projection; use execute_sql_dispatch(...)"
74 }
75 }
76}
77
78const fn unsupported_sql_write_surface_message(
79 surface: SqlSurface,
80 statement: &SqlStatement,
81) -> &'static str {
82 match (surface, statement) {
83 (SqlSurface::QueryFrom, SqlStatement::Insert(_)) => {
84 "query_from_sql rejects INSERT; use execute_sql_dispatch(...)"
85 }
86 (SqlSurface::QueryFrom, SqlStatement::Update(_)) => {
87 "query_from_sql rejects UPDATE; use execute_sql_dispatch(...)"
88 }
89 (SqlSurface::ExecuteSql, SqlStatement::Insert(_)) => {
90 "execute_sql rejects INSERT; use execute_sql_dispatch(...)"
91 }
92 (SqlSurface::ExecuteSql, SqlStatement::Update(_)) => {
93 "execute_sql rejects UPDATE; use execute_sql_dispatch(...)"
94 }
95 (SqlSurface::ExecuteSqlGrouped, SqlStatement::Insert(_)) => {
96 "execute_sql_grouped rejects INSERT; use execute_sql_dispatch(...)"
97 }
98 (SqlSurface::ExecuteSqlGrouped, SqlStatement::Update(_)) => {
99 "execute_sql_grouped rejects UPDATE; use execute_sql_dispatch(...)"
100 }
101 (SqlSurface::Explain, SqlStatement::Insert(_) | SqlStatement::Update(_)) => {
102 "explain_sql requires EXPLAIN"
103 }
104 (
105 _,
106 SqlStatement::Select(_)
107 | SqlStatement::Delete(_)
108 | SqlStatement::Explain(_)
109 | SqlStatement::Describe(_)
110 | SqlStatement::ShowIndexes(_)
111 | SqlStatement::ShowColumns(_)
112 | SqlStatement::ShowEntities(_),
113 ) => unreachable!(),
114 }
115}
116
117impl<C: CanisterKind> DbSession<C> {
118 pub(in crate::db::session::sql) fn build_structural_plan_with_visible_indexes_for_authority(
121 &self,
122 query: StructuralQuery,
123 authority: EntityAuthority,
124 ) -> Result<(VisibleIndexes<'_>, AccessPlannedQuery), QueryError> {
125 let visible_indexes =
126 self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
127 let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
128
129 Ok((visible_indexes, plan))
130 }
131
132 fn query_from_sql_parsed<E>(
135 parsed: &SqlParsedStatement,
136 lane_surface: SqlSurface,
137 computed_surface: SqlComputedProjectionSurface,
138 surface: SqlAggregateSurface,
139 ) -> Result<Query<E>, QueryError>
140 where
141 E: EntityKind<Canister = C>,
142 {
143 if matches!(
144 &parsed.statement,
145 SqlStatement::Insert(_) | SqlStatement::Update(_)
146 ) {
147 return Err(QueryError::unsupported_query(
148 unsupported_sql_write_surface_message(lane_surface, &parsed.statement),
149 ));
150 }
151
152 if computed_projection::computed_sql_projection_plan(&parsed.statement)?.is_some() {
153 return Err(QueryError::unsupported_query(
154 unsupported_sql_computed_projection_message(computed_surface),
155 ));
156 }
157
158 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
159 return Err(QueryError::unsupported_query(
160 unsupported_sql_aggregate_lane_message(surface),
161 ));
162 }
163
164 let lowered = lower_sql_command_from_prepared_statement(
165 parsed.prepare(E::MODEL.name())?,
166 E::MODEL.primary_key.name,
167 )
168 .map_err(QueryError::from_sql_lowering_error)?;
169 let lane = session_sql_lane(&lowered);
170 let Some(query) = lowered.query().cloned() else {
171 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
172 lane_surface,
173 lane,
174 )));
175 };
176 let query = bind_lowered_sql_query::<E>(query, MissingRowPolicy::Ignore)
177 .map_err(QueryError::from_sql_lowering_error)?;
178
179 Ok(query)
180 }
181
182 fn grouped_query_from_computed_sql_projection_plan<E>(
185 plan: &computed_projection::SqlComputedProjectionPlan,
186 ) -> Result<Query<E>, QueryError>
187 where
188 E: EntityKind<Canister = C>,
189 {
190 let lowered = lower_sql_command_from_prepared_statement(
191 prepare_sql_statement(plan.cloned_base_statement(), E::MODEL.name())
192 .map_err(QueryError::from_sql_lowering_error)?,
193 E::MODEL.primary_key.name,
194 )
195 .map_err(QueryError::from_sql_lowering_error)?;
196 let Some(query) = lowered.query().cloned() else {
197 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
198 SqlSurface::ExecuteSqlGrouped,
199 session_sql_lane(&lowered),
200 )));
201 };
202 let query = bind_lowered_sql_query::<E>(query, MissingRowPolicy::Ignore)
203 .map_err(QueryError::from_sql_lowering_error)?;
204 Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Grouped)?;
205
206 Ok(query)
207 }
208
209 pub fn parse_sql_statement(&self, sql: &str) -> Result<SqlParsedStatement, QueryError> {
213 let statement = parse_sql(sql).map_err(QueryError::from_sql_parse_error)?;
214 let route = sql_statement_route_from_statement(&statement);
215
216 Ok(SqlParsedStatement::new(statement, route))
217 }
218
219 pub fn sql_statement_route(&self, sql: &str) -> Result<SqlStatementRoute, QueryError> {
224 let parsed = self.parse_sql_statement(sql)?;
225
226 Ok(parsed.route().clone())
227 }
228
229 pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
234 where
235 E: EntityKind<Canister = C>,
236 {
237 let parsed = self.parse_sql_statement(sql)?;
238
239 Self::query_from_sql_parsed::<E>(
240 &parsed,
241 SqlSurface::QueryFrom,
242 SqlComputedProjectionSurface::QueryFrom,
243 SqlAggregateSurface::QueryFrom,
244 )
245 }
246
247 pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
249 where
250 E: PersistedRow<Canister = C> + EntityValue,
251 {
252 let parsed = self.parse_sql_statement(sql)?;
253 let query = Self::query_from_sql_parsed::<E>(
254 &parsed,
255 SqlSurface::ExecuteSql,
256 SqlComputedProjectionSurface::ExecuteSql,
257 SqlAggregateSurface::ExecuteSql,
258 )?;
259 Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Scalar)?;
260
261 self.execute_query(&query)
262 }
263
264 pub fn execute_sql_grouped<E>(
266 &self,
267 sql: &str,
268 cursor_token: Option<&str>,
269 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
270 where
271 E: PersistedRow<Canister = C> + EntityValue,
272 {
273 let parsed = self.parse_sql_statement(sql)?;
274
275 if matches!(&parsed.statement, SqlStatement::Delete(_)) {
276 return Err(QueryError::unsupported_query(
277 "execute_sql_grouped rejects DELETE; use execute_sql_dispatch(...)",
278 ));
279 }
280
281 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
282 if !plan.is_grouped() {
283 return Err(QueryError::unsupported_query(
284 unsupported_sql_computed_projection_message(
285 SqlComputedProjectionSurface::ExecuteSqlGrouped,
286 ),
287 ));
288 }
289
290 let query = Self::grouped_query_from_computed_sql_projection_plan::<E>(&plan)?;
291 let grouped = self.execute_grouped(&query, cursor_token)?;
292 let (rows, continuation_cursor, execution_trace) = grouped.into_parts();
293 let rows =
294 computed_projection::apply_computed_sql_projection_grouped_rows(rows, &plan)?;
295
296 return Ok(PagedGroupedExecutionWithTrace::new(
297 rows,
298 continuation_cursor,
299 execution_trace,
300 ));
301 }
302
303 let query = Self::query_from_sql_parsed::<E>(
304 &parsed,
305 SqlSurface::ExecuteSqlGrouped,
306 SqlComputedProjectionSurface::ExecuteSqlGrouped,
307 SqlAggregateSurface::ExecuteSqlGrouped,
308 )?;
309 Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Grouped)?;
310
311 self.execute_grouped(&query, cursor_token)
312 }
313
314 #[doc(hidden)]
316 pub fn execute_sql_grouped_text_cursor<E>(
317 &self,
318 sql: &str,
319 cursor_token: Option<&str>,
320 ) -> Result<GroupedTextCursorPageWithTrace, QueryError>
321 where
322 E: PersistedRow<Canister = C> + EntityValue,
323 {
324 let parsed = self.parse_sql_statement(sql)?;
325
326 if matches!(&parsed.statement, SqlStatement::Delete(_)) {
327 return Err(QueryError::unsupported_query(
328 "execute_sql_grouped rejects DELETE; use execute_sql_dispatch(...)",
329 ));
330 }
331
332 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
333 if !plan.is_grouped() {
334 return Err(QueryError::unsupported_query(
335 unsupported_sql_computed_projection_message(
336 SqlComputedProjectionSurface::ExecuteSqlGrouped,
337 ),
338 ));
339 }
340
341 let query = Self::grouped_query_from_computed_sql_projection_plan::<E>(&plan)?;
342 let (rows, continuation_cursor, execution_trace) =
343 self.execute_grouped_text_cursor(&query, cursor_token)?;
344 let rows =
345 computed_projection::apply_computed_sql_projection_grouped_rows(rows, &plan)?;
346
347 return Ok((rows, continuation_cursor, execution_trace));
348 }
349
350 let query = Self::query_from_sql_parsed::<E>(
351 &parsed,
352 SqlSurface::ExecuteSqlGrouped,
353 SqlComputedProjectionSurface::ExecuteSqlGrouped,
354 SqlAggregateSurface::ExecuteSqlGrouped,
355 )?;
356 Self::ensure_sql_query_grouping(&query, dispatch::SqlGroupingSurface::Grouped)?;
357
358 self.execute_grouped_text_cursor(&query, cursor_token)
359 }
360}