Skip to main content

shifty_engine/
profile.rs

1//! Opt-in execution telemetry (doc §269). Collected via a thread-local so no
2//! validation API signatures change. Enable with `enable()`, consume with
3//! `take()`.
4//!
5//! Shape-cache counters are accumulated locally by each evaluator and published
6//! once when it is dropped, avoiding a thread-local operation per lookup.
7
8use std::cell::RefCell;
9use std::time::Instant;
10
11/// Per-query performance record.
12#[derive(Debug, Clone)]
13pub struct QueryRecord {
14    /// Stable fingerprint derived from the canonical query text (first 64 chars).
15    pub fingerprint: String,
16    /// Whether the native executor handled this query (always Fallback in stage 1).
17    pub executor: ExecutorKind,
18    /// How many times this query was invoked (one per focus node in stage 1).
19    pub invocations: u64,
20    /// Total wall-clock execution time across all invocations, in microseconds.
21    pub total_exec_us: u64,
22}
23
24/// Per-shape or per-rule wall-clock record. One entry per distinct label
25/// (shape IRI, `@N` slot id, or `rule[N]`).
26#[derive(Debug, Clone)]
27pub struct ShapeRecord {
28    /// Shape IRI (named shapes), `@N` arena slot (blank-node shapes), or
29    /// `rule[N]` (inference rules).
30    pub label: String,
31    /// Number of evaluation calls (one per focus node for validation, one per
32    /// rule firing for inference).
33    pub invocations: u64,
34    /// Total wall-clock time across all invocations, in microseconds.
35    pub total_us: u64,
36}
37
38/// Aggregate shape-cache telemetry for one profiling session.
39#[derive(Debug, Clone, Default, PartialEq, Eq)]
40pub struct ShapeCacheRecord {
41    /// Number of per-snapshot evaluators that reported cache activity.
42    pub evaluators: u64,
43    /// Lookups served from an existing memo entry.
44    pub hits: u64,
45    /// Lookups without an existing memo entry, including recursion back-edges.
46    pub misses: u64,
47    /// Completed results admitted to the memo.
48    pub insertions: u64,
49    /// Lookups that encountered the same `(ShapeId, Term)` on the active stack.
50    pub recursion_back_edges: u64,
51    /// Completed results not admitted because they depended on a back-edge.
52    pub non_cacheable_results: u64,
53    /// Largest final entry count reported by any one evaluator.
54    pub peak_entries: usize,
55    /// Approximate maximum bytes retained by any one evaluator.
56    ///
57    /// Includes hash-table bucket storage and owned RDF-term string payloads,
58    /// but not allocator metadata.
59    pub estimated_peak_bytes: usize,
60}
61
62/// One evaluator's cache counters, merged into [`ShapeCacheRecord`] on drop.
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
64pub(crate) struct ShapeCacheSample {
65    pub hits: u64,
66    pub misses: u64,
67    pub insertions: u64,
68    pub recursion_back_edges: u64,
69    pub non_cacheable_results: u64,
70    pub entries: usize,
71    pub estimated_bytes: usize,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum ExecutorKind {
76    /// Spareval fallback; carries the capability-analysis reason if known.
77    Fallback {
78        reason: Option<String>,
79    },
80    Native,
81}
82
83#[derive(Debug, Default)]
84pub struct ProfileCollector {
85    records: Vec<QueryRecord>,
86    shape_records: Vec<ShapeRecord>,
87    shape_cache: ShapeCacheRecord,
88}
89
90impl ProfileCollector {
91    pub fn new() -> Self {
92        ProfileCollector::default()
93    }
94
95    pub fn record_invocation(&mut self, fingerprint: &str, exec_us: u64, executor: ExecutorKind) {
96        if let Some(r) = self
97            .records
98            .iter_mut()
99            .find(|r| r.fingerprint == fingerprint)
100        {
101            r.invocations += 1;
102            r.total_exec_us += exec_us;
103        } else {
104            self.records.push(QueryRecord {
105                fingerprint: fingerprint.to_string(),
106                executor,
107                invocations: 1,
108                total_exec_us: exec_us,
109            });
110        }
111    }
112
113    pub fn record_shape_invocation(&mut self, label: &str, exec_us: u64) {
114        if let Some(r) = self.shape_records.iter_mut().find(|r| r.label == label) {
115            r.invocations += 1;
116            r.total_us += exec_us;
117        } else {
118            self.shape_records.push(ShapeRecord {
119                label: label.to_string(),
120                invocations: 1,
121                total_us: exec_us,
122            });
123        }
124    }
125
126    pub(crate) fn record_shape_cache(&mut self, sample: ShapeCacheSample) {
127        self.shape_cache.evaluators += 1;
128        self.shape_cache.hits += sample.hits;
129        self.shape_cache.misses += sample.misses;
130        self.shape_cache.insertions += sample.insertions;
131        self.shape_cache.recursion_back_edges += sample.recursion_back_edges;
132        self.shape_cache.non_cacheable_results += sample.non_cacheable_results;
133        self.shape_cache.peak_entries = self.shape_cache.peak_entries.max(sample.entries);
134        self.shape_cache.estimated_peak_bytes = self
135            .shape_cache
136            .estimated_peak_bytes
137            .max(sample.estimated_bytes);
138    }
139
140    pub fn records(&self) -> &[QueryRecord] {
141        &self.records
142    }
143
144    pub fn shape_records(&self) -> &[ShapeRecord] {
145        &self.shape_records
146    }
147
148    pub fn shape_cache(&self) -> &ShapeCacheRecord {
149        &self.shape_cache
150    }
151
152    pub fn print_summary(&self) {
153        if !self.shape_records.is_empty() {
154            println!(
155                "profile: {} distinct shape(s)/rule(s)",
156                self.shape_records.len()
157            );
158            let mut sorted = self.shape_records.to_vec();
159            sorted.sort_by_key(|b| std::cmp::Reverse(b.total_us));
160            for r in &sorted {
161                let avg_us = r.total_us.checked_div(r.invocations).unwrap_or(0);
162                println!(
163                    "  {}: {} call(s), {}µs total, {}µs avg",
164                    r.label, r.invocations, r.total_us, avg_us,
165                );
166            }
167        }
168        if self.shape_cache.evaluators > 0 {
169            let lookups = self.shape_cache.hits + self.shape_cache.misses;
170            let hit_rate = if lookups == 0 {
171                0.0
172            } else {
173                self.shape_cache.hits as f64 * 100.0 / lookups as f64
174            };
175            println!(
176                "profile: shape cache: {} evaluator(s), {} hit(s), {} miss(es), \
177                 {hit_rate:.1}% hit rate",
178                self.shape_cache.evaluators, self.shape_cache.hits, self.shape_cache.misses,
179            );
180            println!(
181                "  {} insertion(s), {} recursion back-edge(s), {} non-cacheable result(s)",
182                self.shape_cache.insertions,
183                self.shape_cache.recursion_back_edges,
184                self.shape_cache.non_cacheable_results,
185            );
186            println!(
187                "  peak: {} entries, ~{} bytes",
188                self.shape_cache.peak_entries, self.shape_cache.estimated_peak_bytes,
189            );
190        }
191        if self.records.is_empty() {
192            if self.shape_records.is_empty() && self.shape_cache.evaluators == 0 {
193                println!("profile: no data collected");
194            }
195            return;
196        }
197        println!(
198            "profile: {} distinct SPARQL query/queries",
199            self.records.len()
200        );
201        let mut sorted = self.records.to_vec();
202        sorted.sort_by_key(|b| std::cmp::Reverse(b.total_exec_us));
203        for r in &sorted {
204            let exec_str = match &r.executor {
205                ExecutorKind::Fallback { reason: None } => "fallback".to_string(),
206                ExecutorKind::Fallback { reason: Some(s) } => format!("fallback({s})"),
207                ExecutorKind::Native => "native".to_string(),
208            };
209            let avg_us = r.total_exec_us.checked_div(r.invocations).unwrap_or(0);
210            println!(
211                "  [{exec_str}] {}: {} call(s), {}µs total, {}µs avg",
212                r.fingerprint, r.invocations, r.total_exec_us, avg_us,
213            );
214        }
215    }
216}
217
218thread_local! {
219    static PROFILER: RefCell<Option<ProfileCollector>> = const { RefCell::new(None) };
220}
221
222/// Enable profiling for the current thread. Resets any previous collector.
223pub fn enable() {
224    PROFILER.with(|p| *p.borrow_mut() = Some(ProfileCollector::new()));
225}
226
227/// Disable profiling and return the collected data, if any.
228pub fn take() -> Option<ProfileCollector> {
229    PROFILER.with(|p| p.borrow_mut().take())
230}
231
232/// Whether telemetry is enabled for the current thread.
233pub(crate) fn is_enabled() -> bool {
234    PROFILER.with(|p| p.borrow().is_some())
235}
236
237/// Record one query invocation. No-op when profiling is disabled.
238pub fn record(fingerprint: &str, exec_us: u64, executor: ExecutorKind) {
239    PROFILER.with(|p| {
240        if let Some(col) = p.borrow_mut().as_mut() {
241            col.record_invocation(fingerprint, exec_us, executor);
242        }
243    });
244}
245
246/// Record one shape/rule evaluation. No-op when profiling is disabled.
247pub fn record_shape(label: &str, exec_us: u64) {
248    PROFILER.with(|p| {
249        if let Some(col) = p.borrow_mut().as_mut() {
250            col.record_shape_invocation(label, exec_us);
251        }
252    });
253}
254
255/// Merge one evaluator's shape-cache telemetry. No-op when profiling is
256/// disabled.
257pub(crate) fn record_shape_cache(sample: ShapeCacheSample) {
258    PROFILER.with(|p| {
259        if let Some(col) = p.borrow_mut().as_mut() {
260            col.record_shape_cache(sample);
261        }
262    });
263}
264
265/// Helper: measure `f` and record the result under `fingerprint`. Returns the
266/// value produced by `f`.
267pub fn timed<T>(fingerprint: &str, f: impl FnOnce() -> T) -> T {
268    let start = Instant::now();
269    let result = f();
270    let us = start.elapsed().as_micros() as u64;
271    record(fingerprint, us, ExecutorKind::Fallback { reason: None });
272    result
273}
274
275/// Derive a short fingerprint from a canonical query string.
276pub fn fingerprint(query: &str) -> String {
277    let trimmed = query.trim();
278    let preview: String = trimmed.chars().take(60).collect();
279    // Replace newlines/runs of whitespace with a single space for readability.
280    preview.split_whitespace().collect::<Vec<_>>().join(" ")
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    #[test]
288    fn aggregates_shape_cache_samples() {
289        let mut collector = ProfileCollector::new();
290        collector.record_shape_cache(ShapeCacheSample {
291            hits: 3,
292            misses: 5,
293            insertions: 4,
294            recursion_back_edges: 1,
295            non_cacheable_results: 2,
296            entries: 4,
297            estimated_bytes: 400,
298        });
299        collector.record_shape_cache(ShapeCacheSample {
300            hits: 7,
301            misses: 2,
302            insertions: 2,
303            recursion_back_edges: 0,
304            non_cacheable_results: 0,
305            entries: 2,
306            estimated_bytes: 250,
307        });
308
309        assert_eq!(
310            collector.shape_cache(),
311            &ShapeCacheRecord {
312                evaluators: 2,
313                hits: 10,
314                misses: 7,
315                insertions: 6,
316                recursion_back_edges: 1,
317                non_cacheable_results: 2,
318                peak_entries: 4,
319                estimated_peak_bytes: 400,
320            }
321        );
322    }
323}