Skip to main content

grafeo_core/execution/
profile.rs

1//! Query profiling infrastructure.
2//!
3//! Provides [`ProfiledOperator`], a wrapper that collects runtime statistics
4//! (row counts, timing, call counts) around any pull-based [`Operator`].
5//! Used by the `PROFILE` statement to annotate each operator with actual
6//! execution metrics.
7
8use std::sync::Arc;
9
10use parking_lot::Mutex;
11
12use super::operators::{Operator, OperatorResult};
13
14/// Runtime statistics for a single operator in a profiled query.
15#[derive(Debug, Clone, Default)]
16pub struct ProfileStats {
17    /// Total rows produced as output.
18    pub rows_out: u64,
19    /// Wall-clock time spent in this operator (nanoseconds), including children.
20    pub time_ns: u64,
21    /// Number of times `next()` was called on this operator.
22    pub calls: u64,
23}
24
25/// Shared handle to profile stats, written by `ProfiledOperator` during
26/// execution and read afterwards for formatting.
27pub type SharedProfileStats = Arc<Mutex<ProfileStats>>;
28
29/// Wraps a pull-based [`Operator`] to collect runtime statistics.
30///
31/// Each call to [`next()`](Operator::next) is timed and the output rows
32/// are counted. Statistics are written into a [`SharedProfileStats`] handle
33/// so they can be collected after execution completes.
34pub struct ProfiledOperator {
35    inner: Box<dyn Operator>,
36    stats: SharedProfileStats,
37}
38
39impl ProfiledOperator {
40    /// Creates a new profiled wrapper around the given operator.
41    pub fn new(inner: Box<dyn Operator>, stats: SharedProfileStats) -> Self {
42        Self { inner, stats }
43    }
44}
45
46impl Operator for ProfiledOperator {
47    fn next(&mut self) -> OperatorResult {
48        {
49            let mut s = self.stats.lock();
50            s.calls += 1;
51        }
52
53        #[cfg(not(target_arch = "wasm32"))]
54        let start = std::time::Instant::now();
55
56        let result = self.inner.next();
57
58        #[cfg(not(target_arch = "wasm32"))]
59        {
60            let elapsed = start.elapsed().as_nanos() as u64;
61            self.stats.lock().time_ns += elapsed;
62        }
63
64        if let Ok(Some(ref chunk)) = result {
65            self.stats.lock().rows_out += chunk.row_count() as u64;
66        }
67
68        result
69    }
70
71    fn reset(&mut self) {
72        self.inner.reset();
73    }
74
75    fn name(&self) -> &'static str {
76        self.inner.name()
77    }
78}
79
80// ProfiledOperator is Send + Sync because:
81// - inner: Box<dyn Operator> is Send + Sync (trait bound)
82// - stats: Arc<parking_lot::Mutex<ProfileStats>> is Send + Sync
83const _: () = {
84    const fn assert_send_sync<T: Send + Sync>() {}
85    // Called at compile time to verify the bounds hold.
86    #[allow(dead_code)]
87    fn check() {
88        assert_send_sync::<ProfiledOperator>();
89    }
90};
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95    use crate::execution::chunk::DataChunk;
96    use crate::execution::vector::ValueVector;
97    use grafeo_common::types::LogicalType;
98
99    /// A mock operator that yields a fixed number of chunks, each with `rows_per_chunk` rows.
100    struct MockOperator {
101        chunks_remaining: usize,
102        rows_per_chunk: usize,
103    }
104
105    impl MockOperator {
106        fn new(chunks: usize, rows_per_chunk: usize) -> Self {
107            Self {
108                chunks_remaining: chunks,
109                rows_per_chunk,
110            }
111        }
112    }
113
114    impl Operator for MockOperator {
115        fn next(&mut self) -> OperatorResult {
116            if self.chunks_remaining == 0 {
117                return Ok(None);
118            }
119            self.chunks_remaining -= 1;
120            let mut col = ValueVector::with_capacity(LogicalType::Int64, self.rows_per_chunk);
121            for i in 0..self.rows_per_chunk {
122                col.push(grafeo_common::types::Value::Int64(i as i64));
123            }
124            let chunk = DataChunk::new(vec![col]);
125            Ok(Some(chunk))
126        }
127
128        fn reset(&mut self) {}
129
130        fn name(&self) -> &'static str {
131            "MockOperator"
132        }
133    }
134
135    #[test]
136    fn profile_stats_default_is_zero() {
137        let stats = ProfileStats::default();
138        assert_eq!(stats.rows_out, 0);
139        assert_eq!(stats.time_ns, 0);
140        assert_eq!(stats.calls, 0);
141    }
142
143    #[test]
144    fn profiled_operator_counts_rows_and_calls() {
145        let mock = MockOperator::new(3, 10);
146        let stats = Arc::new(Mutex::new(ProfileStats::default()));
147        let mut profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
148
149        // Drain operator (3 chunks + 1 None = 4 calls)
150        while profiled.next().unwrap().is_some() {}
151
152        let s = stats.lock();
153        assert_eq!(s.rows_out, 30); // 3 chunks x 10 rows
154        assert_eq!(s.calls, 4); // 3 data + 1 None
155    }
156
157    #[cfg(not(target_arch = "wasm32"))]
158    #[test]
159    fn profiled_operator_measures_time() {
160        let mock = MockOperator::new(1, 5);
161        let stats = Arc::new(Mutex::new(ProfileStats::default()));
162        let mut profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
163
164        let _ = profiled.next();
165        assert!(stats.lock().time_ns > 0);
166    }
167
168    #[test]
169    fn profiled_operator_delegates_name() {
170        let mock = MockOperator::new(0, 0);
171        let stats = Arc::new(Mutex::new(ProfileStats::default()));
172        let profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
173        assert_eq!(profiled.name(), "MockOperator");
174    }
175}