mod covering;
mod materialize;
mod render;
#[cfg(all(feature = "sql", feature = "diagnostics"))]
use crate::db::{
executor::{
EntityAuthority, projection::PreparedProjectionShape,
projection::prepare_projection_shape_from_plan,
},
query::plan::AccessPlannedQuery,
session::sql::projection::runtime::render::render_projected_sql_rows_text,
};
#[cfg(feature = "sql")]
use crate::{
db::{
Db,
executor::{
SharedPreparedExecutionPlan,
pipeline::execute_initial_scalar_retained_slot_page_for_canister,
},
session::sql::projection::runtime::{
covering::{
try_execute_covering_sql_projection_rows_for_canister,
try_execute_hybrid_covering_sql_projection_rows_for_canister,
},
materialize::{finalize_sql_projection_rows, project_structural_sql_projection_page},
},
},
error::InternalError,
traits::CanisterKind,
value::Value,
};
#[cfg(all(feature = "sql", feature = "diagnostics"))]
use std::cell::Cell;
#[allow(unused_imports)]
#[cfg(all(test, not(feature = "diagnostics")))]
pub(crate) use crate::db::session::sql::projection::runtime::materialize::{
SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
};
#[allow(unused_imports)]
#[cfg(feature = "diagnostics")]
pub use crate::db::session::sql::projection::runtime::materialize::{
SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
};
#[cfg(feature = "sql")]
#[derive(Debug)]
pub(in crate::db) struct SqlProjectionRows {
rows: Vec<Vec<Value>>,
row_count: u32,
}
#[cfg(feature = "sql")]
impl SqlProjectionRows {
#[must_use]
pub(in crate::db) const fn new(rows: Vec<Vec<Value>>, row_count: u32) -> Self {
Self { rows, row_count }
}
#[must_use]
pub(in crate::db) fn into_parts(self) -> (Vec<Vec<Value>>, u32) {
(self.rows, self.row_count)
}
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct SqlProjectionTextExecutorAttribution {
pub prepare_projection: u64,
pub scalar_runtime: u64,
pub materialize_projection: u64,
pub result_rows: u64,
pub total: u64,
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
std::thread_local! {
static PURE_COVERING_DECODE_LOCAL_INSTRUCTIONS: Cell<u64> = const { Cell::new(0) };
static PURE_COVERING_ROW_ASSEMBLY_LOCAL_INSTRUCTIONS: Cell<u64> = const { Cell::new(0) };
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub(in crate::db::session::sql::projection::runtime) fn record_pure_covering_decode_local_instructions(
delta: u64,
) {
if delta == 0 {
return;
}
PURE_COVERING_DECODE_LOCAL_INSTRUCTIONS.with(|counter| {
counter.set(counter.get().saturating_add(delta));
});
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub(in crate::db::session::sql::projection::runtime) fn record_pure_covering_row_assembly_local_instructions(
delta: u64,
) {
if delta == 0 {
return;
}
PURE_COVERING_ROW_ASSEMBLY_LOCAL_INSTRUCTIONS.with(|counter| {
counter.set(counter.get().saturating_add(delta));
});
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub(in crate::db) fn current_pure_covering_decode_local_instructions() -> u64 {
PURE_COVERING_DECODE_LOCAL_INSTRUCTIONS.with(Cell::get)
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub(in crate::db) fn current_pure_covering_row_assembly_local_instructions() -> u64 {
PURE_COVERING_ROW_ASSEMBLY_LOCAL_INSTRUCTIONS.with(Cell::get)
}
#[cfg(all(feature = "sql", feature = "diagnostics", target_arch = "wasm32"))]
fn read_local_instruction_counter() -> u64 {
canic_cdk::api::performance_counter(1)
}
#[cfg(all(feature = "sql", feature = "diagnostics", not(target_arch = "wasm32")))]
const fn read_local_instruction_counter() -> u64 {
0
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub(in crate::db::session::sql::projection::runtime) fn measure_structural_result<T, E>(
run: impl FnOnce() -> Result<T, E>,
) -> (u64, Result<T, E>) {
let start = read_local_instruction_counter();
let result = run();
let delta = read_local_instruction_counter().saturating_sub(start);
(delta, result)
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub(in crate::db) fn attribute_sql_projection_text_rows_for_canister<C>(
db: &Db<C>,
debug: bool,
authority: EntityAuthority,
plan: AccessPlannedQuery,
) -> Result<SqlProjectionTextExecutorAttribution, InternalError>
where
C: CanisterKind,
{
let row_layout = authority.row_layout();
let (prepare_projection_local_instructions, prepared_projection) =
measure_structural_result(|| {
Ok::<PreparedProjectionShape, InternalError>(prepare_projection_shape_from_plan(
authority.model(),
&plan,
))
});
let prepared_projection = prepared_projection?;
let runtime_plan = plan.clone();
let (scalar_runtime_local_instructions, page) = measure_structural_result(|| {
execute_initial_scalar_retained_slot_page_for_canister(db, debug, authority, runtime_plan)
});
let page = page?;
let (materialize_projection_local_instructions, rendered_rows) =
measure_structural_result(|| {
let projected =
project_structural_sql_projection_page(row_layout, &prepared_projection, page)?;
let projected = finalize_sql_projection_rows(&plan, projected)?;
Ok::<Vec<Vec<String>>, InternalError>(render_projected_sql_rows_text(projected))
});
let rendered_rows = rendered_rows?;
let (result_rows_local_instructions, row_count) = measure_structural_result(|| {
Ok::<u32, InternalError>(u32::try_from(rendered_rows.len()).unwrap_or(u32::MAX))
});
let _row_count = row_count?;
let total_local_instructions = prepare_projection_local_instructions
.saturating_add(scalar_runtime_local_instructions)
.saturating_add(materialize_projection_local_instructions)
.saturating_add(result_rows_local_instructions);
Ok(SqlProjectionTextExecutorAttribution {
prepare_projection: prepare_projection_local_instructions,
scalar_runtime: scalar_runtime_local_instructions,
materialize_projection: materialize_projection_local_instructions,
result_rows: result_rows_local_instructions,
total: total_local_instructions,
})
}
#[cfg(feature = "sql")]
pub(in crate::db) fn execute_sql_projection_rows_for_canister<C>(
db: &Db<C>,
debug: bool,
prepared_plan: SharedPreparedExecutionPlan,
) -> Result<SqlProjectionRows, InternalError>
where
C: CanisterKind,
{
let authority = prepared_plan.authority();
let plan = prepared_plan.logical_plan();
if let Some(projected) =
try_execute_covering_sql_projection_rows_for_canister(db, authority, plan)?
{
let projected = finalize_sql_projection_rows(plan, projected)?;
let row_count = u32::try_from(projected.len()).unwrap_or(u32::MAX);
return Ok(SqlProjectionRows::new(projected, row_count));
}
if let Some(projected) =
try_execute_hybrid_covering_sql_projection_rows_for_canister(db, authority, plan)?
{
let projected = finalize_sql_projection_rows(plan, projected)?;
let row_count = u32::try_from(projected.len()).unwrap_or(u32::MAX);
return Ok(SqlProjectionRows::new(projected, row_count));
}
let row_layout = authority.row_layout();
let prepared_projection = prepared_plan.prepared_projection_shape().ok_or_else(|| {
InternalError::query_executor_invariant(
"structural SQL projection execution requires one frozen scalar projection shape",
)
})?;
let page =
execute_initial_scalar_retained_slot_page_for_canister(db, debug, authority, plan.clone())?;
let projected = project_structural_sql_projection_page(row_layout, prepared_projection, page)?;
let projected = finalize_sql_projection_rows(plan, projected)?;
let row_count = u32::try_from(projected.len()).unwrap_or(u32::MAX);
Ok(SqlProjectionRows::new(projected, row_count))
}