Skip to main content

samkhya_datafusion/
optimizer_rule.rs

1//! `SamkhyaOptimizerRule` — DataFusion integration point for samkhya's
2//! cardinality corrections.
3//!
4//! # Two-trait surface
5//!
6//! In DataFusion 46.0 the rule implements **both** [`OptimizerRule`]
7//! (logical) and [`PhysicalOptimizerRule`] (physical). Cardinality
8//! information cannot be injected from a logical rule alone: DataFusion's
9//! mainline planner does not call [`TableProvider::statistics()`] when it
10//! constructs the leaf [`ExecutionPlan`] for a scan, and the
11//! `LogicalPlan::TableScan` node has no slot to attach a `Statistics`
12//! value. The only reliable hook is the physical layer, where we wrap
13//! scan execs with [`SamkhyaStatsExec`] so their `statistics()` returns
14//! samkhya-corrected values that propagate up through filters,
15//! projections, and joins.
16//!
17//! ## Logical pass: observe-only
18//!
19//! The logical-side `rewrite` traverses the plan and counts `TableScan`
20//! nodes. It does not mutate the plan (returns `Transformed::no`); the
21//! traversal is retained as telemetry — it exercises the corrected-stats
22//! helper end-to-end and gives downstream code a stable hook into the
23//! optimizer pass without changing the logical tree.
24//!
25//! ## Physical pass: the actual injection
26//!
27//! The physical-side `optimize` walks the [`ExecutionPlan`] tree and
28//! records how many [`SamkhyaStatsExec`] leaves it observes. Those
29//! wrappers were installed by [`SamkhyaTableProvider::scan`] when the
30//! planner asked each table provider for its scan exec. The rule does
31//! not need to add wrappers itself — by the time `optimize` runs they
32//! are already in place — but it does validate the wiring and surfaces
33//! a `name()` so [`SessionStateBuilder::with_physical_optimizer_rule`]
34//! has something to register.
35//!
36//! # Why the rule registers as a physical rule even though it doesn't
37//! mutate
38//!
39//! Registering the rule against the session is the integration ceremony.
40//! It is the explicit, named contract that *samkhya is in the loop*,
41//! visible in `EXPLAIN VERBOSE` output and in the
42//! `SessionState::physical_optimizers()` slice. The rule's `name()` is
43//! `samkhya_cardinality_correction` for that telemetry. Whether or not
44//! the rule physically rewrites the plan on any given query, its
45//! presence in the optimizer chain is what an operator audits to confirm
46//! samkhya is wired in.
47//!
48//! This is the cold-start-safe posture required by samkhya's design: the
49//! rule cannot make plans worse, only equal-or-better.
50//!
51//! [`OptimizerRule`]: datafusion::optimizer::OptimizerRule
52//! [`PhysicalOptimizerRule`]: datafusion::physical_optimizer::PhysicalOptimizerRule
53//! [`TableProvider::statistics()`]: datafusion::datasource::TableProvider::statistics
54//! [`ExecutionPlan`]: datafusion::physical_plan::ExecutionPlan
55//! [`SamkhyaStatsExec`]: crate::physical_plan::SamkhyaStatsExec
56//! [`SamkhyaTableProvider::scan`]: crate::SamkhyaTableProvider
57//! [`SessionStateBuilder::with_physical_optimizer_rule`]: datafusion::execution::session_state::SessionStateBuilder::with_physical_optimizer_rule
58
59use std::sync::Arc;
60use std::sync::atomic::{AtomicUsize, Ordering};
61
62use datafusion::common::Result;
63use datafusion::common::config::ConfigOptions;
64use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
65use datafusion::logical_expr::LogicalPlan;
66use datafusion::optimizer::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule};
67use datafusion::physical_optimizer::PhysicalOptimizerRule;
68use datafusion::physical_plan::ExecutionPlan;
69use samkhya_core::stats::ColumnStats;
70
71use crate::physical_plan::SamkhyaStatsExec;
72use crate::stats_provider::to_datafusion_column_statistics;
73
74/// DataFusion adapter rule that bridges samkhya's corrected statistics
75/// into the optimizer.
76///
77/// Implements both [`OptimizerRule`] (logical observe-only) and
78/// [`PhysicalOptimizerRule`] (physical validator). The physical pass is
79/// the visible integration point; the actual scan-wrapping that makes
80/// `physical.statistics()?.num_rows` reflect samkhya's corrections is
81/// done by [`crate::SamkhyaTableProvider`]'s `scan()` at plan-construction
82/// time.
83///
84/// Register with both
85/// [`SessionStateBuilder::with_optimizer_rule`] and
86/// [`SessionStateBuilder::with_physical_optimizer_rule`].
87///
88/// [`SessionStateBuilder::with_optimizer_rule`]: datafusion::execution::session_state::SessionStateBuilder::with_optimizer_rule
89/// [`SessionStateBuilder::with_physical_optimizer_rule`]: datafusion::execution::session_state::SessionStateBuilder::with_physical_optimizer_rule
90#[derive(Debug, Default)]
91pub struct SamkhyaOptimizerRule {
92    /// Number of `SamkhyaStatsExec` wrappers observed on the most recent
93    /// physical-optimize pass. Exposed for tests / diagnostics; not part
94    /// of the public optimization contract.
95    samkhya_leaves_seen: AtomicUsize,
96}
97
98impl Clone for SamkhyaOptimizerRule {
99    fn clone(&self) -> Self {
100        Self {
101            samkhya_leaves_seen: AtomicUsize::new(self.samkhya_leaves_seen.load(Ordering::SeqCst)),
102        }
103    }
104}
105
106impl SamkhyaOptimizerRule {
107    /// Create a new rule with zeroed counters.
108    pub fn new() -> Self {
109        Self::default()
110    }
111
112    /// Wrap in an `Arc` for registration with `SessionStateBuilder`.
113    pub fn arc() -> Arc<Self> {
114        Arc::new(Self::new())
115    }
116
117    /// Number of `SamkhyaStatsExec` wrappers seen by the most recent
118    /// physical-optimize pass.
119    ///
120    /// Useful in tests to confirm the physical pass actually observed
121    /// the wrappers installed by `SamkhyaTableProvider::scan`.
122    pub fn samkhya_leaves_seen(&self) -> usize {
123        self.samkhya_leaves_seen.load(Ordering::SeqCst)
124    }
125}
126
127// -----------------------------------------------------------------------
128// Logical-plan side: observe-only walk.
129// -----------------------------------------------------------------------
130
131impl OptimizerRule for SamkhyaOptimizerRule {
132    fn name(&self) -> &str {
133        "samkhya_cardinality_correction"
134    }
135
136    fn apply_order(&self) -> Option<ApplyOrder> {
137        Some(ApplyOrder::BottomUp)
138    }
139
140    fn supports_rewrite(&self) -> bool {
141        true
142    }
143
144    fn rewrite(
145        &self,
146        plan: LogicalPlan,
147        _config: &dyn OptimizerConfig,
148    ) -> Result<Transformed<LogicalPlan>> {
149        // Walk the logical plan and exercise the corrected-stats helper
150        // for every TableScan we find. This is observe-only — the actual
151        // injection happens at the physical layer via `SamkhyaStatsExec`.
152        let mut scan_count = 0usize;
153        plan.apply(|node| {
154            if let LogicalPlan::TableScan(scan) = node {
155                scan_count += 1;
156                let n_cols = scan.projected_schema.fields().len();
157                for col_idx in 0..n_cols {
158                    let corrected = compute_corrected_stats(&scan.table_name.to_string(), col_idx);
159                    let _df_stats = to_datafusion_column_statistics(&corrected);
160                }
161            }
162            Ok(TreeNodeRecursion::Continue)
163        })?;
164
165        let _ = scan_count;
166        Ok(Transformed::no(plan))
167    }
168}
169
170// -----------------------------------------------------------------------
171// Physical-plan side: the injection path.
172// -----------------------------------------------------------------------
173
174impl PhysicalOptimizerRule for SamkhyaOptimizerRule {
175    fn name(&self) -> &str {
176        "samkhya_cardinality_correction"
177    }
178
179    fn schema_check(&self) -> bool {
180        // The wrappers installed by `SamkhyaTableProvider::scan` are
181        // passthrough: they do not change the output schema, only the
182        // statistics. Keep the schema invariant on.
183        true
184    }
185
186    fn optimize(
187        &self,
188        plan: Arc<dyn ExecutionPlan>,
189        _config: &ConfigOptions,
190    ) -> Result<Arc<dyn ExecutionPlan>> {
191        // Walk the physical tree and count `SamkhyaStatsExec` wrappers.
192        // The wrappers are installed up-front by
193        // `SamkhyaTableProvider::scan`; this pass validates the wiring
194        // and exposes a count for diagnostics.
195        //
196        // We deliberately do not mutate the plan here: replanting
197        // wrappers post hoc would either duplicate them (if naive) or
198        // require a registry of which scans are samkhya-managed (the
199        // logical-plan TableScan node carries no link back to the
200        // TableProvider after `source_as_provider`). The scan-time
201        // wrap-on-return path is the reliable mechanism.
202        let mut seen = 0usize;
203        plan.apply(|node| {
204            if node.as_any().downcast_ref::<SamkhyaStatsExec>().is_some() {
205                seen += 1;
206            }
207            Ok(TreeNodeRecursion::Continue)
208        })?;
209        self.samkhya_leaves_seen.store(seen, Ordering::SeqCst);
210        Ok(plan)
211    }
212}
213
214/// Placeholder for the Puffin-backed cardinality correction lookup.
215///
216/// In the production wiring this will:
217/// 1. Resolve the table to its Iceberg/Parquet location.
218/// 2. Locate the companion Puffin sidecar.
219/// 3. Read the relevant blob (HLL / theta / bloom) for `col_idx`.
220/// 4. Apply the LpBound-clamped, feedback-driven correction.
221///
222/// For the scaffold it returns fake-but-typed stats so the integration
223/// surface compiles and runs end-to-end.
224pub fn compute_corrected_stats(_table: &str, _col_idx: usize) -> ColumnStats {
225    ColumnStats::new()
226        .with_row_count(1_000)
227        .with_distinct_count(100)
228        .with_null_count(0)
229        .with_upper_bound(10_000)
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn rule_has_stable_name() {
238        let r = SamkhyaOptimizerRule::new();
239        assert_eq!(
240            <SamkhyaOptimizerRule as OptimizerRule>::name(&r),
241            "samkhya_cardinality_correction"
242        );
243        assert_eq!(
244            <SamkhyaOptimizerRule as PhysicalOptimizerRule>::name(&r),
245            "samkhya_cardinality_correction"
246        );
247        assert!(r.supports_rewrite());
248        assert!(matches!(r.apply_order(), Some(ApplyOrder::BottomUp)));
249    }
250
251    #[test]
252    fn placeholder_stats_are_populated() {
253        let s = compute_corrected_stats("t", 0);
254        assert_eq!(s.row_count, Some(1_000));
255        assert_eq!(s.distinct_count, Some(100));
256        assert_eq!(s.upper_bound_rows, Some(10_000));
257    }
258
259    #[test]
260    fn leaves_seen_starts_at_zero() {
261        let r = SamkhyaOptimizerRule::new();
262        assert_eq!(r.samkhya_leaves_seen(), 0);
263    }
264}