Skip to main content

icydb_core/db/executor/
load.rs

1use crate::{
2    db::{
3        Db,
4        executor::{
5            plan::{record_plan_metrics, set_rows_from_len},
6            trace::{QueryTraceSink, TraceExecutorKind, TracePhase, start_plan_trace},
7        },
8        query::plan::ExecutablePlan,
9        response::Response,
10    },
11    error::{ErrorClass, ErrorOrigin, InternalError},
12    obs::sink::{self, ExecKind, MetricsEvent, Span},
13    traits::EntityKind,
14};
15use std::{collections::HashMap, hash::Hash, marker::PhantomData};
16
17///
18/// LoadExecutor
19///
20
21#[derive(Clone)]
22pub struct LoadExecutor<E: EntityKind> {
23    db: Db<E::Canister>,
24    debug: bool,
25    trace: Option<&'static dyn QueryTraceSink>,
26    _marker: PhantomData<E>,
27}
28
29impl<E: EntityKind> LoadExecutor<E> {
30    // ======================================================================
31    // Construction & diagnostics
32    // ======================================================================
33
34    #[must_use]
35    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
36        Self {
37            db,
38            debug,
39            trace: None,
40            _marker: PhantomData,
41        }
42    }
43
44    #[must_use]
45    #[allow(dead_code)]
46    pub(crate) const fn with_trace_sink(
47        mut self,
48        sink: Option<&'static dyn QueryTraceSink>,
49    ) -> Self {
50        self.trace = sink;
51        self
52    }
53
54    fn debug_log(&self, s: impl Into<String>) {
55        if self.debug {
56            println!("{}", s.into());
57        }
58    }
59
60    // ======================================================================
61    // Execution
62    // ======================================================================
63
64    /// Execute an executor-ready plan directly (no planner inference).
65    pub fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
66        if !plan.mode().is_load() {
67            return Err(InternalError::new(
68                ErrorClass::Unsupported,
69                ErrorOrigin::Query,
70                "load executor requires load plans".to_string(),
71            ));
72        }
73        let trace = start_plan_trace(self.trace, TraceExecutorKind::Load, &plan);
74        let result = (|| {
75            let mut span = Span::<E>::new(ExecKind::Load);
76            let plan = plan.into_inner();
77
78            self.debug_log(format!("🧭 Executing plan on {}", E::PATH));
79
80            let ctx = self.db.context::<E>();
81            record_plan_metrics(&plan.access);
82
83            // Access phase: resolve candidate rows from the store.
84            let data_rows = ctx.rows_from_access_plan(&plan.access, plan.consistency)?;
85            sink::record(MetricsEvent::RowsScanned {
86                entity_path: E::PATH,
87                rows_scanned: data_rows.len() as u64,
88            });
89
90            self.debug_log(format!(
91                "📦 Scanned {} data rows before deserialization",
92                data_rows.len()
93            ));
94
95            // Decode rows into entities before post-access filtering.
96            let mut rows = ctx.deserialize_rows(data_rows)?;
97            let access_rows = rows.len();
98            self.debug_log(format!(
99                "🧩 Deserialized {} entities before filtering",
100                rows.len()
101            ));
102
103            // Post-access phase: filter, order, and paginate.
104            let stats = plan.apply_post_access::<E, _>(&mut rows)?;
105            if stats.filtered {
106                self.debug_log(format!(
107                    "🔎 Applied predicate -> {} entities remaining",
108                    rows.len()
109                ));
110            }
111            if stats.ordered {
112                self.debug_log("↕️ Applied order spec");
113            }
114            if stats.paged {
115                self.debug_log(format!("📏 Applied pagination -> {} entities", rows.len()));
116            }
117
118            // Emit per-phase counts after the pipeline completes successfully.
119            if let Some(trace) = trace.as_ref() {
120                let to_u64 = |len| u64::try_from(len).unwrap_or(u64::MAX);
121                trace.phase(TracePhase::Access, to_u64(access_rows));
122                trace.phase(TracePhase::Filter, to_u64(stats.rows_after_filter));
123                trace.phase(TracePhase::Order, to_u64(stats.rows_after_order));
124                trace.phase(TracePhase::Page, to_u64(stats.rows_after_page));
125            }
126
127            set_rows_from_len(&mut span, rows.len());
128            self.debug_log(format!("✅ query complete -> {} final rows", rows.len()));
129
130            Ok(Response(rows))
131        })();
132
133        if let Some(trace) = trace {
134            match &result {
135                Ok(resp) => trace.finish(u64::try_from(resp.0.len()).unwrap_or(u64::MAX)),
136                Err(err) => trace.error(err),
137            }
138        }
139
140        result
141    }
142
143    /// Execute a plan and require exactly one row.
144    pub fn require_one(&self, plan: ExecutablePlan<E>) -> Result<(), InternalError> {
145        self.execute(plan)?.require_one()
146    }
147
148    /// Count rows matching a plan.
149    pub fn count(&self, plan: ExecutablePlan<E>) -> Result<u32, InternalError> {
150        Ok(self.execute(plan)?.count())
151    }
152
153    // ======================================================================
154    // Aggregations
155    // ======================================================================
156
157    /// Group rows matching a plan and count them by a derived key.
158    ///
159    /// This is intentionally implemented on the executor (not Response)
160    /// so it can later avoid full deserialization.
161    pub fn group_count_by<K, F>(
162        &self,
163        plan: ExecutablePlan<E>,
164        key_fn: F,
165    ) -> Result<HashMap<K, u32>, InternalError>
166    where
167        K: Eq + Hash,
168        F: Fn(&E) -> K,
169    {
170        let entities = self.execute(plan)?.entities();
171
172        let mut counts = HashMap::new();
173        for e in entities {
174            *counts.entry(key_fn(&e)).or_insert(0) += 1;
175        }
176
177        Ok(counts)
178    }
179}