icydb-core 0.195.1

IcyDB — A schema-first typed query engine and persistence runtime for Internet Computer canisters
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
//! Module: db::executor
//! Responsibility: runtime execution boundaries for validated query plans.
//! Does not own: logical query semantics or persistence encoding policy.
//! Boundary: consumes query/access/cursor contracts and drives load/delete/aggregate runtime.

mod aggregate;
mod authority;
mod covering;
mod delete;
mod diagnostics;
pub(in crate::db) mod explain;
mod group;
mod index_prefix_cardinality;
mod kernel;
mod mutation;
mod order;
mod pipeline;
mod plan_metrics;
pub(super) mod planning;
mod prepared_execution_plan;
mod profiling;
pub(in crate::db) mod projection;
pub(in crate::db) use planning::route;
mod runtime_context;
mod scan;
mod stream;
pub(in crate::db) mod terminal;
#[cfg(test)]
mod tests;
mod traversal;
mod util;
mod window;

use crate::db::access::{
    LoweredAccessError, LoweredIndexPrefixSpec, LoweredIndexRangeSpec, LoweredIndexScanContract,
    LoweredKey, lower_access,
};

pub(in crate::db) use crate::db::access::{
    ExecutableAccessNode, ExecutableAccessPlan, ExecutionPathPayload,
};
pub(in crate::db) use aggregate::runtime::RuntimeGroupedRow;
#[cfg(feature = "diagnostics")]
pub(in crate::db::executor) use aggregate::runtime::{
    GroupedCountFoldMetrics, with_grouped_count_fold_metrics,
};
#[cfg(feature = "diagnostics")]
pub(in crate::db) use aggregate::{
    ScalarAggregateTerminalAttribution, with_scalar_aggregate_terminal_attribution,
};
pub(in crate::db) use aggregate::{
    ScalarNumericFieldBoundaryRequest, ScalarProjectionBoundaryOutput,
    ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest,
};
#[cfg(feature = "sql")]
pub(in crate::db) use aggregate::{
    StructuralAggregateRequest, StructuralAggregateTerminal, StructuralAggregateTerminalKind,
};
pub use authority::EntityAuthority;
#[cfg(feature = "sql")]
pub(in crate::db::executor) use covering::resolve_covering_projection_components_from_lowered_specs;
pub(in crate::db::executor) use covering::{
    CoveringComponentValues, CoveringProjectionComponentRows, CoveringProjectionComponentWindow,
    covering_projection_scan_direction, covering_requires_row_presence_check,
    decode_single_covering_projection_pairs, decode_single_covering_projection_value,
    fold_covering_projection_component_rows_in_window, reorder_covering_projection_pairs,
    resolve_single_covering_projection_component_from_lowered_specs,
};
#[cfg(feature = "sql")]
pub(in crate::db::executor) use covering::{
    decode_covering_projection_component, decode_covering_projection_pairs,
    map_covering_projection_pairs,
};
pub(super) use delete::DeleteExecutor;
#[cfg(feature = "sql")]
pub(in crate::db) use delete::DeleteProjectionBounds;
pub(in crate::db) use diagnostics::ExecutionOptimization;
pub(in crate::db::executor) use diagnostics::ExecutionTrace;
#[cfg(all(test, feature = "sql-explain"))]
pub(in crate::db) use explain::assemble_load_execution_node_descriptor;
#[cfg(feature = "sql-explain")]
pub(in crate::db) use explain::{
    assemble_load_execution_node_descriptor_from_route_facts,
    freeze_load_execution_route_facts_for_authority,
};
#[cfg(feature = "sql")]
pub(in crate::db) use index_prefix_cardinality::lowered_index_prefix_cardinality_specs_from_plan;
pub(in crate::db) use index_prefix_cardinality::{
    LoweredIndexPrefixCardinalityPlan, exact_count_cardinality_prefixes_for_plan,
};
pub(in crate::db::executor) use index_prefix_cardinality::{
    expand_index_prefix_family_with_exact_child_prefixes, lowered_index_prefix_liveness,
    lowered_index_prefix_liveness_at_generation,
};
pub(in crate::db::executor) use kernel::ExecutionKernel;
pub use mutation::save::MutationMode;
pub(super) use mutation::save::SaveExecutor;
pub(in crate::db::executor) use order::{
    BoundedDirectOrderWindow, OrderReadableRow, apply_structural_order_window,
    apply_structural_order_window_to_data_rows, can_use_bounded_direct_order_collection,
    compare_orderable_row_with_boundary,
};
pub(super) use pipeline::contracts::LoadExecutor;
#[cfg(feature = "sql")]
pub(in crate::db) use pipeline::contracts::StructuralCursorPage;
pub(in crate::db) use pipeline::contracts::StructuralGroupedProjectionResult;
pub(in crate::db::executor) use pipeline::contracts::{
    AccessScanContinuationInput, AccessStreamBindings,
};
pub(in crate::db) use pipeline::contracts::{CursorPage, PageCursor};
#[cfg(feature = "diagnostics")]
pub(in crate::db) use pipeline::{
    GroupedCountAttribution, GroupedExecutePhaseAttribution, ScalarExecutePhaseAttribution,
};
pub(in crate::db::executor) use planning::continuation::{
    AccessWindow, ContinuationMode, GroupedContinuationContext, GroupedPaginationWindow,
    LoadCursorInput, LoadCursorResolver, PreparedLoadCursor, RouteContinuationPlan,
    ScalarContinuationContext,
};
pub(in crate::db::executor) use planning::preparation::ExecutionPreparation;
pub use planning::route::RouteExecutionMode;
pub(in crate::db::executor) use prepared_execution_plan::BytesByProjectionMode;
pub use prepared_execution_plan::ExecutionFamily;
pub(in crate::db) use prepared_execution_plan::PreparedExecutionPlan;
pub(in crate::db) use prepared_execution_plan::SharedPreparedExecutionPlan;
#[cfg(feature = "sql")]
pub(in crate::db::executor) use prepared_execution_plan::SharedPreparedProjectionRuntimeHandoff;
pub(in crate::db::executor) use prepared_execution_plan::{
    PreparedAggregatePlan, PreparedAggregateStreamingPlanHandoff, PreparedLoadPlan,
    PreparedScalarPlanCore, PreparedScalarRuntimeHandoff, classify_bytes_by_projection_mode,
};
pub(in crate::db::executor) use profiling::{
    ExecutionProfileStats, measure_execution_stats_phase, record_aggregation,
    record_key_stream_micros, record_key_stream_yield, record_ordering, record_projection,
    record_rows_after_predicate, with_execution_stats_capture,
};
#[cfg(feature = "sql")]
pub(in crate::db) use projection::CoveringProjectionMetricsRecorder;
#[cfg(all(test, feature = "sql"))]
pub(in crate::db) use projection::PreparedProjectionPlan;
#[cfg(feature = "sql")]
pub(in crate::db) use projection::ProjectionMaterializationMetricsRecorder;
#[cfg(all(test, feature = "sql"))]
pub(in crate::db) use projection::projection_eval_data_row_for_materialize_tests;
#[cfg(all(test, feature = "sql"))]
pub(in crate::db) use projection::projection_eval_row_layout_for_materialize_tests;
#[cfg(feature = "sql")]
pub(in crate::db) use projection::{
    StructuralProjectionRequest, execute_structural_projection_result,
};
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub(in crate::db) use projection::{
    current_pure_covering_decode_local_instructions,
    current_pure_covering_row_assembly_local_instructions,
};
pub(in crate::db) use runtime_context::{Context, StoreResolver};
#[cfg(feature = "diagnostics")]
pub use runtime_context::{RowCheckMetrics, with_row_check_metrics};
#[cfg(all(test, not(feature = "diagnostics")))]
pub(crate) use runtime_context::{RowCheckMetrics, with_row_check_metrics};
pub(in crate::db::executor) use runtime_context::{
    read_owned_data_row_with_consistency_from_store,
    read_row_presence_with_consistency_from_data_store, record_row_check_covering_candidate_seen,
    record_row_check_index_entry_scanned, record_row_check_index_key_owned_entry,
    record_row_check_index_row_identity_decoded, record_row_check_row_emitted,
    sum_row_payload_bytes_from_ordered_key_stream_with_store,
    sum_row_payload_bytes_full_scan_window_with_store,
    sum_row_payload_bytes_key_range_window_with_store,
};
#[cfg(feature = "sql")]
pub(in crate::db::executor) use stream::access::PrimaryRangeKeyStream;
pub(in crate::db::executor) use stream::access::{
    ACCESS_SCAN_CHUNK_ENTRIES, AccessStreamExecutionPolicy, ExecutableAccess, IndexLeafOrderPolicy,
    IndexScan, PrimaryScan, TraversalRuntime, active_lowered_index_prefix_specs,
    apply_index_scan_chunk_progress, branch_stream_chunk_entries,
    index_predicate_rejects_prefix_components, index_stream_chunk_entries_for_remaining,
    index_stream_output_limit_for_chunk,
};
pub(in crate::db::executor) use stream::key::{
    KeyOrderComparator, KeyStreamLoopControl, OrderedKeyStream, OrderedKeyStreamBox,
    exact_output_key_count_hint, key_stream_budget_is_redundant,
    ordered_key_stream_from_materialized_keys,
};
pub(in crate::db::executor) use stream::{
    FlatMergeOrderedChild, FlatMergeSiblingSet, FlatMergeStream, PrefixSetExecutionShape,
    PrefixSetMergeSafety,
};
pub(in crate::db::executor) use terminal::RetainedSlotLayout;
#[cfg(feature = "diagnostics")]
pub(in crate::db) use terminal::{DirectDataRowPhaseAttribution, KernelRowPhaseAttribution};
#[cfg(feature = "diagnostics")]
pub use terminal::{ScalarMaterializationLaneMetrics, with_scalar_materialization_lane_metrics};
#[cfg(all(test, not(feature = "diagnostics")))]
pub(crate) use terminal::{
    ScalarMaterializationLaneMetrics, with_scalar_materialization_lane_metrics,
};
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub(in crate::db) use terminal::{
    with_direct_data_row_phase_attribution, with_kernel_row_phase_attribution,
};
pub(in crate::db::executor) use util::{
    apply_data_key_ordered_dedup_window, apply_offset_limit_window, saturating_row_len,
    saturating_u32_len,
};
pub(in crate::db::executor) use window::page_window_state;

///
/// ExecutionPlan
///
/// Canonical route-to-kernel execution contract for read execution.
/// This is route-owned policy output (mode, hints, fast-path ordering),
/// while `PreparedExecutionPlan` remains the validated query/lowered-spec container.
///

pub(in crate::db::executor) type ExecutionPlan = planning::route::ExecutionRoutePlan;

/// Return whether initial scalar load execution would need post-access ORDER BY materialization.
pub(in crate::db) fn initial_read_plan_requires_materialized_sort(
    prepared_plan: &SharedPreparedExecutionPlan,
) -> Result<bool, InternalError> {
    if prepared_plan.logical_plan().grouped_plan().is_some() || !prepared_plan.mode().is_load() {
        return Ok(false);
    }

    let has_order = prepared_plan
        .logical_plan()
        .scalar_plan()
        .order
        .as_ref()
        .is_some_and(|order| !order.fields.is_empty());
    if !has_order {
        return Ok(false);
    }

    let continuation = ScalarContinuationContext::initial();
    let route_plan = planning::route::build_execution_route_plan(
        prepared_plan.logical_plan(),
        planning::route::RoutePlanRequest::Load {
            continuation: &continuation,
            probe_fetch_hint: None,
            authority: Some(prepared_plan.authority()),
            load_terminal_fast_path: None,
        },
    )?;

    Ok(matches!(
        route_plan.load_order_route_reason(),
        planning::route::LoadOrderRouteReason::RequiresMaterializedSort
    ))
}

/// Validate plans at executor boundaries using structural entity authority.
pub(in crate::db::executor) fn validate_executor_plan_for_authority(
    authority: &EntityAuthority,
    plan: &AccessPlannedQuery,
) -> Result<(), InternalError> {
    authority.validate_executor_plan(plan)
}

// Design notes:
// - SchemaInfo is the planner-visible schema (relational attributes). Executors may see
//   additional tuple payload not represented in SchemaInfo.
// - Unsupported or opaque values are treated as incomparable; executor validation may
//   skip type checks for these values.
// - ORDER BY is stable; incomparable values preserve input order.
// - Corruption indicates invalid persisted bytes or store mismatches; invariant violations
//   indicate executor/planner contract breaches.

#[cfg(test)]
use crate::{db::CompiledQuery, traits::EntityKind};
use crate::{
    db::{cursor::CursorPlanError, data::DecodedDataStoreKey, query::plan::AccessPlannedQuery},
    error::{ErrorClass, ErrorOrigin, InternalError},
};

///
/// ExecutorPlanError
///
/// Executor-owned plan-surface failures produced during runtime cursor validation.
/// Mapped to query-owned plan errors only at query/session boundaries.
///

#[derive(Debug)]
pub(in crate::db) enum ExecutorPlanError {
    Cursor(Box<CursorPlanError>),
}

impl ExecutorPlanError {
    /// Construct one executor plan error from one cursor invariant violation.
    pub(in crate::db::executor) fn continuation_cursor_invariant() -> Self {
        Self::from(CursorPlanError::continuation_cursor_invariant())
    }

    /// Construct one executor plan error for load-only continuation cursors.
    pub(in crate::db::executor) fn continuation_cursor_requires_load_plan() -> Self {
        Self::continuation_cursor_invariant()
    }

    /// Construct one executor plan error for grouped cursor preparation
    /// attempted against non-grouped logical plans.
    pub(in crate::db::executor) fn grouped_cursor_preparation_requires_grouped_plan() -> Self {
        Self::continuation_cursor_invariant()
    }

    /// Construct one executor plan error for grouped cursor revalidation
    /// attempted against non-grouped logical plans.
    pub(in crate::db::executor) fn grouped_cursor_revalidation_requires_grouped_plan() -> Self {
        Self::continuation_cursor_invariant()
    }

    /// Construct one executor plan error for grouped boundary-arity access
    /// attempted against non-grouped logical plans.
    pub(in crate::db::executor) fn grouped_cursor_boundary_arity_requires_grouped_plan() -> Self {
        Self::continuation_cursor_invariant()
    }

    /// Construct one executor plan error for load-only continuation contracts.
    pub(in crate::db::executor) fn continuation_contract_requires_load_plan() -> Self {
        Self::continuation_cursor_invariant()
    }

    /// Construct one executor plan error for load execution descriptor access
    /// attempted against non-load prepared execution plans.
    #[cfg(test)]
    pub(in crate::db::executor) fn load_execution_descriptor_requires_load_plan() -> Self {
        Self::continuation_cursor_invariant()
    }

    /// Construct one executor plan error for invalid lowered index-prefix specs.
    pub(in crate::db::executor) fn lowered_index_prefix_spec_invalid() -> Self {
        Self::continuation_cursor_invariant()
    }

    /// Construct one executor plan error for invalid lowered index-range specs.
    pub(in crate::db::executor) fn lowered_index_range_spec_invalid() -> Self {
        Self::continuation_cursor_invariant()
    }

    /// Lift one executor plan error into the runtime internal taxonomy.
    pub(in crate::db::executor) fn into_internal_error(self) -> InternalError {
        match self {
            Self::Cursor(err) => err.into_internal_error(),
        }
    }
}

impl From<CursorPlanError> for ExecutorPlanError {
    fn from(err: CursorPlanError) -> Self {
        Self::Cursor(Box::new(err))
    }
}

///
/// ExecutorError
///
/// Executor-owned runtime failure taxonomy for execution boundaries.
/// Keeps conflict vs corruption classification explicit for internal mapping.
/// User-shape validation failures remain plan-layer errors.
///

#[derive(Debug)]
pub(in crate::db::executor) enum ExecutorError {
    Corruption { origin: ErrorOrigin },

    KeyExists,
}

impl ExecutorError {
    pub(in crate::db::executor) const fn class(&self) -> ErrorClass {
        match self {
            Self::KeyExists => ErrorClass::Conflict,
            Self::Corruption { .. } => ErrorClass::Corruption,
        }
    }

    pub(in crate::db::executor) const fn origin(&self) -> ErrorOrigin {
        match self {
            Self::KeyExists => ErrorOrigin::Store,
            Self::Corruption { origin } => *origin,
        }
    }

    pub(in crate::db::executor) const fn corruption(origin: ErrorOrigin) -> Self {
        Self::Corruption { origin }
    }

    // Construct a store-origin corruption error with canonical taxonomy.
    pub(in crate::db::executor) const fn store_corruption() -> Self {
        Self::corruption(ErrorOrigin::Store)
    }

    // Construct the canonical missing-row store corruption error.
    pub(in crate::db::executor) const fn missing_row(_key: &DecodedDataStoreKey) -> Self {
        Self::store_corruption()
    }

    // Construct the canonical persisted-row invariant-violation corruption error.
    pub(in crate::db::executor) const fn persisted_row_invariant_violation(
        _data_key: &DecodedDataStoreKey,
    ) -> Self {
        Self::store_corruption()
    }
}

impl From<ExecutorError> for InternalError {
    fn from(err: ExecutorError) -> Self {
        Self::classified(err.class(), err.origin())
    }
}

#[cfg(test)]
impl<E> From<CompiledQuery<E>> for PreparedExecutionPlan<E>
where
    E: EntityKind,
{
    fn from(value: CompiledQuery<E>) -> Self {
        Self::new(value.into_plan())
    }
}