spg-engine 7.34.2

Execution engine for SPG: glues spg-sql parsing to spg-storage. Foreign keys, joins, vectors, cold tier.
Documentation
//! Table-maintenance executors: `ANALYZE` (re-stat) and
//! `COMPACT COLD SEGMENTS` (cold-segment merge). Split out of
//! `lib.rs` (cut 19); verbatim move, the only edit beyond
//! visibility is reuniting the `exec_compact_cold_segments` doc,
//! whose first half had drifted above `set_session_param`.

use alloc::string::{String, ToString};
use alloc::vec::Vec;

use spg_storage::{ColumnSchema, CompactReport, DataType, IndexKind, Row, StorageError, Value};

use crate::{
    COMPACTION_TARGET_DEFAULT_BYTES, Engine, EngineError, QueryResult, canonical_value_repr,
    is_internal_table_name, sort_values_for_histogram, statistics,
};

impl Engine {
    /// v6.2.0 — `ANALYZE [<table>]` runtime. Bare `ANALYZE` walks
    /// every user table; `ANALYZE <name>` re-stats one. For each
    /// target table, single-pass scan + per-column histogram +
    /// `null_frac` + `n_distinct`. Replaces the table's prior
    /// stats; resets the modified-row counter.
    ///
    /// v6.2.0 doesn't sample — it scans the full table. v6.2.x
    /// can add reservoir sampling at the > 100 K-row mark; not a
    /// scope blocker for the current commit since rows ≤ 100 K
    /// analyse in milliseconds.
    pub(crate) fn exec_analyze(
        &mut self,
        target: Option<&str>,
    ) -> Result<QueryResult, EngineError> {
        let names: Vec<String> = if let Some(name) = target {
            // Verify the table exists; surface a clear error if not.
            if self.catalog.get(name).is_none() {
                return Err(EngineError::Storage(StorageError::TableNotFound {
                    name: name.to_string(),
                }));
            }
            alloc::vec![name.to_string()]
        } else {
            self.catalog
                .table_names()
                .into_iter()
                .filter(|n| !is_internal_table_name(n))
                .collect()
        };
        let mut analysed = 0usize;
        for table_name in &names {
            self.analyze_one_table(table_name)?;
            analysed += 1;
        }
        // v6.3.1 — plan cache invalidation. Bump stats version so
        // future lookups see the new generation, and selectively
        // evict every plan whose `source_tables` overlap with the
        // ANALYZE target set. Bare ANALYZE (all tables) clears the
        // whole cache.
        if analysed > 0 {
            self.statistics.bump_version();
            if target.is_some() {
                for t in &names {
                    self.plan_cache.evict_referencing(t);
                }
            } else {
                self.plan_cache.clear();
            }
        }
        Ok(QueryResult::CommandOk {
            affected: analysed,
            modified_catalog: true,
        })
    }

    /// Walk a single table's rows once and (re-)populate per-column
    /// stats. Drops the existing stats for `table` first so columns
    /// that have been DROP-ed between ANALYZEs don't leave stale
    /// rows.
    fn analyze_one_table(&mut self, table_name: &str) -> Result<(), EngineError> {
        let table = self.catalog.get(table_name).ok_or_else(|| {
            EngineError::Storage(StorageError::TableNotFound {
                name: table_name.to_string(),
            })
        })?;
        let schema = table.schema().clone();
        let row_count = table.rows().len();
        // For each column, collect (sorted) non-NULL textual values
        // + count NULLs; then ask `statistics::build_histogram` to
        // produce the 101 bounds and `estimate_n_distinct` the
        // distinct count.
        self.statistics.clear_table(table_name);
        for (col_pos, col_schema) in schema.columns.iter().enumerate() {
            // v6.2.0 skip: vector columns have their own stats
            // shape (HNSW graph topology). v6.2 deliberation #1.
            if matches!(col_schema.ty, DataType::Vector { .. }) {
                continue;
            }
            let mut non_null_values: Vec<Value> = Vec::with_capacity(row_count);
            let mut nulls: u64 = 0;
            for row in table.rows() {
                match row.values.get(col_pos) {
                    Some(Value::Null) | None => nulls += 1,
                    Some(v) => non_null_values.push(v.clone()),
                }
            }
            // Sort by type-aware ordering (Int as int, Text as
            // lex, etc.) so histogram bounds reflect the column's
            // natural order — not lexicographic on the string
            // representation, which would put "9" after "49".
            non_null_values.sort_by(|a, b| sort_values_for_histogram(a, b));
            let non_null: Vec<String> = non_null_values.iter().map(canonical_value_repr).collect();
            let null_frac = if row_count == 0 {
                0.0
            } else {
                #[allow(clippy::cast_precision_loss)]
                let f = nulls as f32 / row_count as f32;
                f
            };
            let n_distinct = statistics::estimate_n_distinct(&non_null);
            let histogram_bounds = statistics::build_histogram(&non_null);
            self.statistics.set(
                table_name.to_string(),
                col_schema.name.clone(),
                statistics::ColumnStats {
                    null_frac,
                    n_distinct,
                    histogram_bounds,
                },
            );
        }
        self.statistics.reset_modified(table_name);
        // v6.7.0 — refresh the per-table cold_rows cache. Walk the
        // BTree indices and count Cold locators (MAX across
        // indices); store the result on the table. Surfaced via
        // `spg_statistic.cold_row_count` (new column) and
        // `spg_stat_segment.table_name` (new column).
        let cold_count = {
            let table = self
                .active_catalog()
                .get(table_name)
                .expect("table still present");
            table.count_cold_locators()
        };
        let table_mut = self
            .active_catalog_mut()
            .get_mut(table_name)
            .expect("table still present");
        table_mut.set_cold_row_count(cold_count);
        Ok(())
    }

    /// v6.7.3 — `COMPACT COLD SEGMENTS` runtime path. Drives the
    /// engine-layer compaction shim with the default
    /// 4 MiB segment-size threshold. spg-server intercepts the
    /// SQL before it reaches the engine on a server build —
    /// it reads `SPG_COMPACTION_TARGET_SEGMENT_BYTES`, calls
    /// `Engine::compact_cold_segments_with_target` directly with
    /// the env value, and persists every merged segment to
    /// `<db>.spg/segments/`. This arm only fires for engine-only
    /// callers (spg-embedded, lib tests); in that mode merged
    /// segments live in memory and are dropped at process exit.
    pub(crate) fn exec_compact_cold_segments(&mut self) -> Result<QueryResult, EngineError> {
        let target = COMPACTION_TARGET_DEFAULT_BYTES;
        let reports = self.compact_cold_segments_with_target(target)?;
        let columns = alloc::vec![
            ColumnSchema::new("table_name", DataType::Text, false),
            ColumnSchema::new("index_name", DataType::Text, false),
            ColumnSchema::new("sources_merged", DataType::BigInt, false),
            ColumnSchema::new("merged_segment_id", DataType::BigInt, false),
            ColumnSchema::new("merged_rows", DataType::BigInt, false),
            ColumnSchema::new("deleted_rows_pruned", DataType::BigInt, false),
            ColumnSchema::new("bytes_reclaimed_estimate", DataType::BigInt, false),
        ];
        let rows: Vec<Row> = reports
            .into_iter()
            .map(|(tname, iname, report)| {
                Row::new(alloc::vec![
                    Value::Text(tname),
                    Value::Text(iname),
                    Value::BigInt(i64::try_from(report.sources.len()).unwrap_or(i64::MAX)),
                    Value::BigInt(i64::from(report.merged_segment_id.unwrap_or(0))),
                    Value::BigInt(i64::try_from(report.merged_rows).unwrap_or(i64::MAX)),
                    Value::BigInt(i64::try_from(report.deleted_rows_pruned).unwrap_or(i64::MAX),),
                    Value::BigInt(
                        i64::try_from(report.bytes_reclaimed_estimate).unwrap_or(i64::MAX),
                    ),
                ])
            })
            .collect();
        Ok(QueryResult::Rows { columns, rows })
    }

    /// v6.7.3 — public shim around `Catalog::compact_cold_segments`
    /// driving every BTree index on every user table. Returns one
    /// `(table, index, report)` triple for each merge that
    /// actually happened (no-op (table, index) pairs are filtered
    /// out so callers can size persist-side work to the live
    /// merges). Caller is responsible for persisting each
    /// `report.merged_segment_bytes` and updating the on-disk
    /// segment registry; engine layer is no_std and never
    /// touches disk.
    ///
    /// Marks every touched table's cached `cold_row_count` stale
    /// — compaction GC'd some shadowed rows, so the count must be
    /// re-derived on the next ANALYZE.
    pub fn compact_cold_segments_with_target(
        &mut self,
        target_segment_bytes: u64,
    ) -> Result<Vec<(String, String, CompactReport)>, EngineError> {
        let table_names = self.active_catalog().table_names();
        let mut reports: Vec<(String, String, CompactReport)> = Vec::new();
        for tname in table_names {
            if is_internal_table_name(&tname) {
                continue;
            }
            let idx_names: Vec<String> = {
                let Some(t) = self.active_catalog().get(&tname) else {
                    continue;
                };
                t.indices()
                    .iter()
                    .filter(|i| matches!(i.kind, IndexKind::BTree(_)))
                    .map(|i| i.name.clone())
                    .collect()
            };
            for iname in idx_names {
                let report = self
                    .active_catalog_mut()
                    .compact_cold_segments(&tname, &iname, target_segment_bytes)
                    .map_err(EngineError::Storage)?;
                if report.merged_segment_id.is_some() {
                    if let Some(t) = self.active_catalog_mut().get_mut(&tname) {
                        t.mark_cold_row_count_stale();
                    }
                    reports.push((tname.clone(), iname, report));
                }
            }
        }
        Ok(reports)
    }
}