samkhya_datafusion/optimizer_rule.rs
1//! `SamkhyaOptimizerRule` — DataFusion integration point for samkhya's
2//! cardinality corrections.
3//!
4//! # Two-trait surface
5//!
6//! In DataFusion 46.0 the rule implements **both** [`OptimizerRule`]
7//! (logical) and [`PhysicalOptimizerRule`] (physical). Cardinality
8//! information cannot be injected from a logical rule alone: DataFusion's
9//! mainline planner does not call [`TableProvider::statistics()`] when it
10//! constructs the leaf [`ExecutionPlan`] for a scan, and the
11//! `LogicalPlan::TableScan` node has no slot to attach a `Statistics`
12//! value. The only reliable hook is the physical layer, where we wrap
13//! scan execs with [`SamkhyaStatsExec`] so their `statistics()` returns
14//! samkhya-corrected values that propagate up through filters,
15//! projections, and joins.
16//!
17//! ## Logical pass: observe-only
18//!
19//! The logical-side `rewrite` traverses the plan and counts `TableScan`
20//! nodes. It does not mutate the plan (returns `Transformed::no`); the
21//! traversal is retained as telemetry — it exercises the corrected-stats
22//! helper end-to-end and gives downstream code a stable hook into the
23//! optimizer pass without changing the logical tree.
24//!
25//! ## Physical pass: the actual injection
26//!
27//! The physical-side `optimize` walks the [`ExecutionPlan`] tree and
28//! records how many [`SamkhyaStatsExec`] leaves it observes. Those
29//! wrappers were installed by [`SamkhyaTableProvider::scan`] when the
30//! planner asked each table provider for its scan exec. The rule does
31//! not need to add wrappers itself — by the time `optimize` runs they
32//! are already in place — but it does validate the wiring and surfaces
33//! a `name()` so [`SessionStateBuilder::with_physical_optimizer_rule`]
34//! has something to register.
35//!
36//! # Why the rule registers as a physical rule even though it doesn't
37//! mutate
38//!
39//! Registering the rule against the session is the integration ceremony.
40//! It is the explicit, named contract that *samkhya is in the loop*,
41//! visible in `EXPLAIN VERBOSE` output and in the
42//! `SessionState::physical_optimizers()` slice. The rule's `name()` is
43//! `samkhya_cardinality_correction` for that telemetry. Whether or not
44//! the rule physically rewrites the plan on any given query, its
45//! presence in the optimizer chain is what an operator audits to confirm
46//! samkhya is wired in.
47//!
48//! This is the cold-start-safe posture required by samkhya's design: the
49//! rule cannot make plans worse, only equal-or-better.
50//!
51//! [`OptimizerRule`]: datafusion::optimizer::OptimizerRule
52//! [`PhysicalOptimizerRule`]: datafusion::physical_optimizer::PhysicalOptimizerRule
53//! [`TableProvider::statistics()`]: datafusion::datasource::TableProvider::statistics
54//! [`ExecutionPlan`]: datafusion::physical_plan::ExecutionPlan
55//! [`SamkhyaStatsExec`]: crate::physical_plan::SamkhyaStatsExec
56//! [`SamkhyaTableProvider::scan`]: crate::SamkhyaTableProvider
57//! [`SessionStateBuilder::with_physical_optimizer_rule`]: datafusion::execution::session_state::SessionStateBuilder::with_physical_optimizer_rule
58
59use std::sync::Arc;
60use std::sync::atomic::{AtomicUsize, Ordering};
61
62use datafusion::common::Result;
63use datafusion::common::config::ConfigOptions;
64use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
65use datafusion::logical_expr::LogicalPlan;
66use datafusion::optimizer::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule};
67use datafusion::physical_optimizer::PhysicalOptimizerRule;
68use datafusion::physical_plan::ExecutionPlan;
69use samkhya_core::stats::ColumnStats;
70
71use crate::physical_plan::SamkhyaStatsExec;
72use crate::stats_provider::to_datafusion_column_statistics;
73
74/// DataFusion adapter rule that bridges samkhya's corrected statistics
75/// into the optimizer.
76///
77/// Implements both [`OptimizerRule`] (logical observe-only) and
78/// [`PhysicalOptimizerRule`] (physical validator). The physical pass is
79/// the visible integration point; the actual scan-wrapping that makes
80/// `physical.statistics()?.num_rows` reflect samkhya's corrections is
81/// done by [`crate::SamkhyaTableProvider`]'s `scan()` at plan-construction
82/// time.
83///
84/// Register with both
85/// [`SessionStateBuilder::with_optimizer_rule`] and
86/// [`SessionStateBuilder::with_physical_optimizer_rule`].
87///
88/// [`SessionStateBuilder::with_optimizer_rule`]: datafusion::execution::session_state::SessionStateBuilder::with_optimizer_rule
89/// [`SessionStateBuilder::with_physical_optimizer_rule`]: datafusion::execution::session_state::SessionStateBuilder::with_physical_optimizer_rule
90#[derive(Debug, Default)]
91pub struct SamkhyaOptimizerRule {
92 /// Number of `SamkhyaStatsExec` wrappers observed on the most recent
93 /// physical-optimize pass. Exposed for tests / diagnostics; not part
94 /// of the public optimization contract.
95 samkhya_leaves_seen: AtomicUsize,
96}
97
98impl Clone for SamkhyaOptimizerRule {
99 fn clone(&self) -> Self {
100 Self {
101 samkhya_leaves_seen: AtomicUsize::new(self.samkhya_leaves_seen.load(Ordering::SeqCst)),
102 }
103 }
104}
105
106impl SamkhyaOptimizerRule {
107 /// Create a new rule with zeroed counters.
108 pub fn new() -> Self {
109 Self::default()
110 }
111
112 /// Wrap in an `Arc` for registration with `SessionStateBuilder`.
113 pub fn arc() -> Arc<Self> {
114 Arc::new(Self::new())
115 }
116
117 /// Number of `SamkhyaStatsExec` wrappers seen by the most recent
118 /// physical-optimize pass.
119 ///
120 /// Useful in tests to confirm the physical pass actually observed
121 /// the wrappers installed by `SamkhyaTableProvider::scan`.
122 pub fn samkhya_leaves_seen(&self) -> usize {
123 self.samkhya_leaves_seen.load(Ordering::SeqCst)
124 }
125}
126
127// -----------------------------------------------------------------------
128// Logical-plan side: observe-only walk.
129// -----------------------------------------------------------------------
130
131impl OptimizerRule for SamkhyaOptimizerRule {
132 fn name(&self) -> &str {
133 "samkhya_cardinality_correction"
134 }
135
136 fn apply_order(&self) -> Option<ApplyOrder> {
137 Some(ApplyOrder::BottomUp)
138 }
139
140 fn supports_rewrite(&self) -> bool {
141 true
142 }
143
144 fn rewrite(
145 &self,
146 plan: LogicalPlan,
147 _config: &dyn OptimizerConfig,
148 ) -> Result<Transformed<LogicalPlan>> {
149 // Walk the logical plan and exercise the corrected-stats helper
150 // for every TableScan we find. This is observe-only — the actual
151 // injection happens at the physical layer via `SamkhyaStatsExec`.
152 let mut scan_count = 0usize;
153 plan.apply(|node| {
154 if let LogicalPlan::TableScan(scan) = node {
155 scan_count += 1;
156 let n_cols = scan.projected_schema.fields().len();
157 for col_idx in 0..n_cols {
158 let corrected = compute_corrected_stats(&scan.table_name.to_string(), col_idx);
159 let _df_stats = to_datafusion_column_statistics(&corrected);
160 }
161 }
162 Ok(TreeNodeRecursion::Continue)
163 })?;
164
165 let _ = scan_count;
166 Ok(Transformed::no(plan))
167 }
168}
169
170// -----------------------------------------------------------------------
171// Physical-plan side: the injection path.
172// -----------------------------------------------------------------------
173
174impl PhysicalOptimizerRule for SamkhyaOptimizerRule {
175 fn name(&self) -> &str {
176 "samkhya_cardinality_correction"
177 }
178
179 fn schema_check(&self) -> bool {
180 // The wrappers installed by `SamkhyaTableProvider::scan` are
181 // passthrough: they do not change the output schema, only the
182 // statistics. Keep the schema invariant on.
183 true
184 }
185
186 fn optimize(
187 &self,
188 plan: Arc<dyn ExecutionPlan>,
189 _config: &ConfigOptions,
190 ) -> Result<Arc<dyn ExecutionPlan>> {
191 // Walk the physical tree and count `SamkhyaStatsExec` wrappers.
192 // The wrappers are installed up-front by
193 // `SamkhyaTableProvider::scan`; this pass validates the wiring
194 // and exposes a count for diagnostics.
195 //
196 // We deliberately do not mutate the plan here: replanting
197 // wrappers post hoc would either duplicate them (if naive) or
198 // require a registry of which scans are samkhya-managed (the
199 // logical-plan TableScan node carries no link back to the
200 // TableProvider after `source_as_provider`). The scan-time
201 // wrap-on-return path is the reliable mechanism.
202 let mut seen = 0usize;
203 plan.apply(|node| {
204 if node.as_any().downcast_ref::<SamkhyaStatsExec>().is_some() {
205 seen += 1;
206 }
207 Ok(TreeNodeRecursion::Continue)
208 })?;
209 self.samkhya_leaves_seen.store(seen, Ordering::SeqCst);
210 Ok(plan)
211 }
212}
213
214/// Placeholder for the Puffin-backed cardinality correction lookup.
215///
216/// In the production wiring this will:
217/// 1. Resolve the table to its Iceberg/Parquet location.
218/// 2. Locate the companion Puffin sidecar.
219/// 3. Read the relevant blob (HLL / theta / bloom) for `col_idx`.
220/// 4. Apply the LpBound-clamped, feedback-driven correction.
221///
222/// For the scaffold it returns fake-but-typed stats so the integration
223/// surface compiles and runs end-to-end.
224pub fn compute_corrected_stats(_table: &str, _col_idx: usize) -> ColumnStats {
225 ColumnStats::new()
226 .with_row_count(1_000)
227 .with_distinct_count(100)
228 .with_null_count(0)
229 .with_upper_bound(10_000)
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn rule_has_stable_name() {
238 let r = SamkhyaOptimizerRule::new();
239 assert_eq!(
240 <SamkhyaOptimizerRule as OptimizerRule>::name(&r),
241 "samkhya_cardinality_correction"
242 );
243 assert_eq!(
244 <SamkhyaOptimizerRule as PhysicalOptimizerRule>::name(&r),
245 "samkhya_cardinality_correction"
246 );
247 assert!(r.supports_rewrite());
248 assert!(matches!(r.apply_order(), Some(ApplyOrder::BottomUp)));
249 }
250
251 #[test]
252 fn placeholder_stats_are_populated() {
253 let s = compute_corrected_stats("t", 0);
254 assert_eq!(s.row_count, Some(1_000));
255 assert_eq!(s.distinct_count, Some(100));
256 assert_eq!(s.upper_bound_rows, Some(10_000));
257 }
258
259 #[test]
260 fn leaves_seen_starts_at_zero() {
261 let r = SamkhyaOptimizerRule::new();
262 assert_eq!(r.samkhya_leaves_seen(), 0);
263 }
264}