icydb_core/db/executor/
load.rs1use crate::{
2 db::{
3 Db,
4 executor::{
5 plan::{record_plan_metrics, set_rows_from_len},
6 trace::{QueryTraceSink, TraceExecutorKind, TracePhase, start_plan_trace},
7 },
8 query::plan::ExecutablePlan,
9 response::Response,
10 },
11 error::{ErrorClass, ErrorOrigin, InternalError},
12 obs::sink::{self, ExecKind, MetricsEvent, Span},
13 traits::EntityKind,
14};
15use std::{collections::HashMap, hash::Hash, marker::PhantomData};
16
17#[derive(Clone)]
22pub struct LoadExecutor<E: EntityKind> {
23 db: Db<E::Canister>,
24 debug: bool,
25 trace: Option<&'static dyn QueryTraceSink>,
26 _marker: PhantomData<E>,
27}
28
29impl<E: EntityKind> LoadExecutor<E> {
30 #[must_use]
35 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
36 Self {
37 db,
38 debug,
39 trace: None,
40 _marker: PhantomData,
41 }
42 }
43
44 #[must_use]
45 #[allow(dead_code)]
46 pub(crate) const fn with_trace_sink(
47 mut self,
48 sink: Option<&'static dyn QueryTraceSink>,
49 ) -> Self {
50 self.trace = sink;
51 self
52 }
53
54 fn debug_log(&self, s: impl Into<String>) {
55 if self.debug {
56 println!("{}", s.into());
57 }
58 }
59
60 pub fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
66 if !plan.mode().is_load() {
67 return Err(InternalError::new(
68 ErrorClass::Unsupported,
69 ErrorOrigin::Query,
70 "load executor requires load plans".to_string(),
71 ));
72 }
73 let trace = start_plan_trace(self.trace, TraceExecutorKind::Load, &plan);
74 let result = (|| {
75 let mut span = Span::<E>::new(ExecKind::Load);
76 let plan = plan.into_inner();
77
78 self.debug_log(format!("🧭 Executing plan on {}", E::PATH));
79
80 let ctx = self.db.context::<E>();
81 record_plan_metrics(&plan.access);
82
83 let data_rows = ctx.rows_from_access_plan(&plan.access, plan.consistency)?;
85 sink::record(MetricsEvent::RowsScanned {
86 entity_path: E::PATH,
87 rows_scanned: data_rows.len() as u64,
88 });
89
90 self.debug_log(format!(
91 "📦 Scanned {} data rows before deserialization",
92 data_rows.len()
93 ));
94
95 let mut rows = ctx.deserialize_rows(data_rows)?;
97 let access_rows = rows.len();
98 self.debug_log(format!(
99 "🧩 Deserialized {} entities before filtering",
100 rows.len()
101 ));
102
103 let stats = plan.apply_post_access::<E, _>(&mut rows)?;
105 if stats.filtered {
106 self.debug_log(format!(
107 "🔎 Applied predicate -> {} entities remaining",
108 rows.len()
109 ));
110 }
111 if stats.ordered {
112 self.debug_log("↕️ Applied order spec");
113 }
114 if stats.paged {
115 self.debug_log(format!("📏 Applied pagination -> {} entities", rows.len()));
116 }
117
118 if let Some(trace) = trace.as_ref() {
120 let to_u64 = |len| u64::try_from(len).unwrap_or(u64::MAX);
121 trace.phase(TracePhase::Access, to_u64(access_rows));
122 trace.phase(TracePhase::Filter, to_u64(stats.rows_after_filter));
123 trace.phase(TracePhase::Order, to_u64(stats.rows_after_order));
124 trace.phase(TracePhase::Page, to_u64(stats.rows_after_page));
125 }
126
127 set_rows_from_len(&mut span, rows.len());
128 self.debug_log(format!("✅ query complete -> {} final rows", rows.len()));
129
130 Ok(Response(rows))
131 })();
132
133 if let Some(trace) = trace {
134 match &result {
135 Ok(resp) => trace.finish(u64::try_from(resp.0.len()).unwrap_or(u64::MAX)),
136 Err(err) => trace.error(err),
137 }
138 }
139
140 result
141 }
142
143 pub fn require_one(&self, plan: ExecutablePlan<E>) -> Result<(), InternalError> {
145 self.execute(plan)?.require_one()
146 }
147
148 pub fn count(&self, plan: ExecutablePlan<E>) -> Result<u32, InternalError> {
150 Ok(self.execute(plan)?.count())
151 }
152
153 pub fn group_count_by<K, F>(
162 &self,
163 plan: ExecutablePlan<E>,
164 key_fn: F,
165 ) -> Result<HashMap<K, u32>, InternalError>
166 where
167 K: Eq + Hash,
168 F: Fn(&E) -> K,
169 {
170 let entities = self.execute(plan)?.entities();
171
172 let mut counts = HashMap::new();
173 for e in entities {
174 *counts.entry(key_fn(&e)).or_insert(0) += 1;
175 }
176
177 Ok(counts)
178 }
179}