Skip to main content

icydb_core/db/executor/
trace.rs

1//! Executor query tracing boundary.
2//!
3//! Tracing is optional, injected by the caller, and must not affect execution semantics.
4
5use crate::{
6    db::query::plan::{AccessPath, AccessPlan, ExecutablePlan, PlanFingerprint},
7    error::{ErrorClass, ErrorOrigin, InternalError},
8    traits::EntityKind,
9};
10use sha2::{Digest, Sha256};
11
12///
13/// QueryTraceSink
14///
15
16pub trait QueryTraceSink: Send + Sync {
17    fn on_event(&self, event: QueryTraceEvent);
18}
19
20///
21/// TraceExecutorKind
22///
23
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum TraceExecutorKind {
26    Load,
27    Save,
28    Delete,
29}
30
31///
32/// TraceAccess
33///
34
35#[derive(Clone, Copy, Debug, Eq, PartialEq)]
36pub enum TraceAccess {
37    ByKey,
38    ByKeys { count: u32 },
39    KeyRange,
40    IndexPrefix { name: &'static str, prefix_len: u32 },
41    FullScan,
42    Union { branches: u32 },
43    Intersection { branches: u32 },
44}
45
46///
47/// TracePhase
48///
49
50#[derive(Clone, Copy, Debug, Eq, PartialEq)]
51pub enum TracePhase {
52    Access,
53    PostAccess,
54}
55
56///
57/// QueryTraceEvent
58///
59
60#[derive(Clone, Copy, Debug, Eq, PartialEq)]
61pub enum QueryTraceEvent {
62    Start {
63        fingerprint: PlanFingerprint,
64        executor: TraceExecutorKind,
65        access: Option<TraceAccess>,
66    },
67    Phase {
68        fingerprint: PlanFingerprint,
69        executor: TraceExecutorKind,
70        access: Option<TraceAccess>,
71        phase: TracePhase,
72        rows: u64,
73    },
74    Finish {
75        fingerprint: PlanFingerprint,
76        executor: TraceExecutorKind,
77        access: Option<TraceAccess>,
78        rows: u64,
79    },
80    Error {
81        fingerprint: PlanFingerprint,
82        executor: TraceExecutorKind,
83        access: Option<TraceAccess>,
84        class: ErrorClass,
85        origin: ErrorOrigin,
86    },
87}
88
89///
90/// TraceScope
91///
92
93pub struct TraceScope {
94    sink: &'static dyn QueryTraceSink,
95    fingerprint: PlanFingerprint,
96    executor: TraceExecutorKind,
97    access: Option<TraceAccess>,
98}
99
100impl TraceScope {
101    fn new(
102        sink: &'static dyn QueryTraceSink,
103        fingerprint: PlanFingerprint,
104        executor: TraceExecutorKind,
105        access: Option<TraceAccess>,
106    ) -> Self {
107        sink.on_event(QueryTraceEvent::Start {
108            fingerprint,
109            executor,
110            access,
111        });
112        Self {
113            sink,
114            fingerprint,
115            executor,
116            access,
117        }
118    }
119
120    pub(crate) fn finish(self, rows: u64) {
121        self.sink.on_event(QueryTraceEvent::Finish {
122            fingerprint: self.fingerprint,
123            executor: self.executor,
124            access: self.access,
125            rows,
126        });
127    }
128
129    pub(crate) fn phase(&self, phase: TracePhase, rows: u64) {
130        self.sink.on_event(QueryTraceEvent::Phase {
131            fingerprint: self.fingerprint,
132            executor: self.executor,
133            access: self.access,
134            phase,
135            rows,
136        });
137    }
138
139    pub(crate) fn error(self, err: &InternalError) {
140        self.sink.on_event(QueryTraceEvent::Error {
141            fingerprint: self.fingerprint,
142            executor: self.executor,
143            access: self.access,
144            class: err.class,
145            origin: err.origin,
146        });
147    }
148}
149
150pub fn start_plan_trace<E: EntityKind>(
151    sink: Option<&'static dyn QueryTraceSink>,
152    executor: TraceExecutorKind,
153    plan: &ExecutablePlan<E>,
154) -> Option<TraceScope> {
155    let sink = sink?;
156    let access = Some(trace_access_from_plan(plan.access()));
157    let fingerprint = plan.fingerprint();
158    Some(TraceScope::new(sink, fingerprint, executor, access))
159}
160
161pub fn start_exec_trace(
162    sink: Option<&'static dyn QueryTraceSink>,
163    executor: TraceExecutorKind,
164    entity_path: &'static str,
165    access: Option<TraceAccess>,
166    detail: Option<&'static str>,
167) -> Option<TraceScope> {
168    let sink = sink?;
169    let fingerprint = exec_fingerprint(executor, entity_path, detail);
170    Some(TraceScope::new(sink, fingerprint, executor, access))
171}
172
173/// Convert a `usize` count to `u64` with saturation.
174#[must_use]
175pub fn saturating_rows(value: usize) -> u64 {
176    u64::try_from(value).unwrap_or(u64::MAX)
177}
178
179/// Emit canonical access/post-access trace phases with saturated row counts.
180pub fn emit_access_post_access_phases(
181    trace: Option<&TraceScope>,
182    access_rows: usize,
183    post_access_rows: usize,
184) {
185    let Some(trace) = trace else {
186        return;
187    };
188
189    trace.phase(TracePhase::Access, saturating_rows(access_rows));
190    trace.phase(TracePhase::PostAccess, saturating_rows(post_access_rows));
191}
192
193/// Finish or error a trace scope from an executor result.
194pub fn finish_trace_from_result<T>(
195    trace: Option<TraceScope>,
196    result: &Result<T, InternalError>,
197    ok_rows: impl Fn(&T) -> usize,
198) {
199    let Some(trace) = trace else {
200        return;
201    };
202
203    match result {
204        Ok(ok) => trace.finish(saturating_rows(ok_rows(ok))),
205        Err(err) => trace.error(err),
206    }
207}
208
209fn trace_access_from_plan<K>(plan: &AccessPlan<K>) -> TraceAccess {
210    match plan {
211        AccessPlan::Path(path) => trace_access_from_path(path),
212        AccessPlan::Union(children) => {
213            // NOTE: Diagnostics are best-effort; overflow saturates to preserve determinism.
214            TraceAccess::Union {
215                branches: u32::try_from(children.len()).unwrap_or(u32::MAX),
216            }
217        }
218        AccessPlan::Intersection(children) => {
219            // NOTE: Diagnostics are best-effort; overflow saturates to preserve determinism.
220            TraceAccess::Intersection {
221                branches: u32::try_from(children.len()).unwrap_or(u32::MAX),
222            }
223        }
224    }
225}
226
227fn trace_access_from_path<K>(path: &AccessPath<K>) -> TraceAccess {
228    match path {
229        AccessPath::ByKey(_) => TraceAccess::ByKey,
230        AccessPath::ByKeys(keys) => {
231            // NOTE: Diagnostics are best-effort; overflow saturates to preserve determinism.
232            TraceAccess::ByKeys {
233                count: u32::try_from(keys.len()).unwrap_or(u32::MAX),
234            }
235        }
236        AccessPath::KeyRange { .. } => TraceAccess::KeyRange,
237        AccessPath::IndexPrefix { index, values } => {
238            // NOTE: Diagnostics are best-effort; overflow saturates to preserve determinism.
239            TraceAccess::IndexPrefix {
240                name: index.name,
241                prefix_len: u32::try_from(values.len()).unwrap_or(u32::MAX),
242            }
243        }
244        AccessPath::FullScan => TraceAccess::FullScan,
245    }
246}
247
248fn exec_fingerprint(
249    executor: TraceExecutorKind,
250    entity_path: &'static str,
251    detail: Option<&'static str>,
252) -> PlanFingerprint {
253    let mut hasher = Sha256::new();
254    hasher.update(b"execfp:v1");
255    hasher.update([executor_tag(executor)]);
256    write_str(&mut hasher, entity_path);
257    match detail {
258        Some(detail) => {
259            hasher.update([1u8]);
260            write_str(&mut hasher, detail);
261        }
262        None => {
263            hasher.update([0u8]);
264        }
265    }
266    let digest = hasher.finalize();
267    let mut out = [0u8; 32];
268    out.copy_from_slice(&digest);
269    PlanFingerprint::from_bytes(out)
270}
271
272const fn executor_tag(executor: TraceExecutorKind) -> u8 {
273    match executor {
274        TraceExecutorKind::Load => 0x01,
275        TraceExecutorKind::Save => 0x02,
276        TraceExecutorKind::Delete => 0x03,
277    }
278}
279
280fn write_str(hasher: &mut Sha256, value: &str) {
281    // NOTE: Diagnostics-only fingerprinting saturates on overflow to avoid panics.
282    let len = u32::try_from(value.len()).unwrap_or(u32::MAX);
283    hasher.update(len.to_be_bytes());
284    hasher.update(value.as_bytes());
285}