Skip to main content

grafeo_core/execution/
profile.rs

1//! Query profiling infrastructure.
2//!
3//! Provides [`ProfiledOperator`], a wrapper that collects runtime statistics
4//! (row counts, timing, call counts) around any pull-based [`Operator`].
5//! Used by the `PROFILE` statement to annotate each operator with actual
6//! execution metrics.
7
8use std::sync::Arc;
9
10use parking_lot::Mutex;
11
12use super::operators::{Operator, OperatorResult};
13
14/// Runtime statistics for a single operator in a profiled query.
15#[derive(Debug, Clone, Default)]
16pub struct ProfileStats {
17    /// Total rows produced as output.
18    pub rows_out: u64,
19    /// Wall-clock time spent in this operator (nanoseconds), including children.
20    pub time_ns: u64,
21    /// Number of times `next()` was called on this operator.
22    pub calls: u64,
23}
24
25/// Shared handle to profile stats, written by `ProfiledOperator` during
26/// execution and read afterwards for formatting.
27pub type SharedProfileStats = Arc<Mutex<ProfileStats>>;
28
29/// Wraps a pull-based [`Operator`] to collect runtime statistics.
30///
31/// Each call to [`next()`](Operator::next) is timed and the output rows
32/// are counted. Statistics are written into a [`SharedProfileStats`] handle
33/// so they can be collected after execution completes.
34pub struct ProfiledOperator {
35    inner: Box<dyn Operator>,
36    stats: SharedProfileStats,
37}
38
39impl ProfiledOperator {
40    /// Creates a new profiled wrapper around the given operator.
41    pub fn new(inner: Box<dyn Operator>, stats: SharedProfileStats) -> Self {
42        Self { inner, stats }
43    }
44}
45
46impl Operator for ProfiledOperator {
47    fn next(&mut self) -> OperatorResult {
48        {
49            let mut s = self.stats.lock();
50            s.calls += 1;
51        }
52
53        #[cfg(not(target_arch = "wasm32"))]
54        let start = std::time::Instant::now();
55
56        let result = self.inner.next();
57
58        #[cfg(not(target_arch = "wasm32"))]
59        {
60            // reason: per-call elapsed nanos fits u64 for any practical duration
61            #[allow(clippy::cast_possible_truncation)]
62            let elapsed = start.elapsed().as_nanos() as u64;
63            self.stats.lock().time_ns += elapsed;
64        }
65
66        if let Ok(Some(ref chunk)) = result {
67            self.stats.lock().rows_out += chunk.row_count() as u64;
68        }
69
70        result
71    }
72
73    fn reset(&mut self) {
74        self.inner.reset();
75    }
76
77    fn name(&self) -> &'static str {
78        self.inner.name()
79    }
80
81    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
82        self
83    }
84}
85
86// ProfiledOperator is Send + Sync because:
87// - inner: Box<dyn Operator> is Send + Sync (trait bound)
88// - stats: Arc<parking_lot::Mutex<ProfileStats>> is Send + Sync
89const _: () = {
90    const fn assert_send_sync<T: Send + Sync>() {}
91    // Called at compile time to verify the bounds hold.
92    #[allow(dead_code)]
93    fn check() {
94        assert_send_sync::<ProfiledOperator>();
95    }
96};
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101    use crate::execution::chunk::DataChunk;
102    use crate::execution::vector::ValueVector;
103    use grafeo_common::types::LogicalType;
104
105    /// A mock operator that yields a fixed number of chunks, each with `rows_per_chunk` rows.
106    struct MockOperator {
107        chunks_remaining: usize,
108        rows_per_chunk: usize,
109    }
110
111    impl MockOperator {
112        fn new(chunks: usize, rows_per_chunk: usize) -> Self {
113            Self {
114                chunks_remaining: chunks,
115                rows_per_chunk,
116            }
117        }
118    }
119
120    impl Operator for MockOperator {
121        fn next(&mut self) -> OperatorResult {
122            if self.chunks_remaining == 0 {
123                return Ok(None);
124            }
125            self.chunks_remaining -= 1;
126            let mut col = ValueVector::with_capacity(LogicalType::Int64, self.rows_per_chunk);
127            // reason: test chunk rows are small, fit i64
128            #[allow(clippy::cast_possible_wrap)]
129            for i in 0..self.rows_per_chunk {
130                col.push(grafeo_common::types::Value::Int64(i as i64));
131            }
132            let chunk = DataChunk::new(vec![col]);
133            Ok(Some(chunk))
134        }
135
136        fn reset(&mut self) {}
137
138        fn name(&self) -> &'static str {
139            "MockOperator"
140        }
141
142        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
143            self
144        }
145    }
146
147    #[test]
148    fn profile_stats_default_is_zero() {
149        let stats = ProfileStats::default();
150        assert_eq!(stats.rows_out, 0);
151        assert_eq!(stats.time_ns, 0);
152        assert_eq!(stats.calls, 0);
153    }
154
155    #[test]
156    fn profiled_operator_counts_rows_and_calls() {
157        let mock = MockOperator::new(3, 10);
158        let stats = Arc::new(Mutex::new(ProfileStats::default()));
159        let mut profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
160
161        // Drain operator (3 chunks + 1 None = 4 calls)
162        while profiled.next().unwrap().is_some() {}
163
164        let s = stats.lock();
165        assert_eq!(s.rows_out, 30); // 3 chunks x 10 rows
166        assert_eq!(s.calls, 4); // 3 data + 1 None
167    }
168
169    #[cfg(not(target_arch = "wasm32"))]
170    #[test]
171    fn profiled_operator_measures_time() {
172        let mock = MockOperator::new(1, 5);
173        let stats = Arc::new(Mutex::new(ProfileStats::default()));
174        let mut profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
175
176        let _ = profiled.next();
177        assert!(stats.lock().time_ns > 0);
178    }
179
180    #[test]
181    fn profiled_operator_delegates_name() {
182        let mock = MockOperator::new(0, 0);
183        let stats = Arc::new(Mutex::new(ProfileStats::default()));
184        let profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
185        assert_eq!(profiled.name(), "MockOperator");
186    }
187}