grafeo_core/execution/
profile.rs1use std::sync::Arc;
9
10use parking_lot::Mutex;
11
12use super::operators::{Operator, OperatorResult};
13
14#[derive(Debug, Clone, Default)]
16pub struct ProfileStats {
17 pub rows_out: u64,
19 pub time_ns: u64,
21 pub calls: u64,
23}
24
25pub type SharedProfileStats = Arc<Mutex<ProfileStats>>;
28
29pub struct ProfiledOperator {
35 inner: Box<dyn Operator>,
36 stats: SharedProfileStats,
37}
38
39impl ProfiledOperator {
40 pub fn new(inner: Box<dyn Operator>, stats: SharedProfileStats) -> Self {
42 Self { inner, stats }
43 }
44}
45
46impl Operator for ProfiledOperator {
47 fn next(&mut self) -> OperatorResult {
48 {
49 let mut s = self.stats.lock();
50 s.calls += 1;
51 }
52
53 #[cfg(not(target_arch = "wasm32"))]
54 let start = std::time::Instant::now();
55
56 let result = self.inner.next();
57
58 #[cfg(not(target_arch = "wasm32"))]
59 {
60 let elapsed = start.elapsed().as_nanos() as u64;
61 self.stats.lock().time_ns += elapsed;
62 }
63
64 if let Ok(Some(ref chunk)) = result {
65 self.stats.lock().rows_out += chunk.row_count() as u64;
66 }
67
68 result
69 }
70
71 fn reset(&mut self) {
72 self.inner.reset();
73 }
74
75 fn name(&self) -> &'static str {
76 self.inner.name()
77 }
78}
79
80const _: () = {
84 const fn assert_send_sync<T: Send + Sync>() {}
85 #[allow(dead_code)]
87 fn check() {
88 assert_send_sync::<ProfiledOperator>();
89 }
90};
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95 use crate::execution::chunk::DataChunk;
96 use crate::execution::vector::ValueVector;
97 use grafeo_common::types::LogicalType;
98
99 struct MockOperator {
101 chunks_remaining: usize,
102 rows_per_chunk: usize,
103 }
104
105 impl MockOperator {
106 fn new(chunks: usize, rows_per_chunk: usize) -> Self {
107 Self {
108 chunks_remaining: chunks,
109 rows_per_chunk,
110 }
111 }
112 }
113
114 impl Operator for MockOperator {
115 fn next(&mut self) -> OperatorResult {
116 if self.chunks_remaining == 0 {
117 return Ok(None);
118 }
119 self.chunks_remaining -= 1;
120 let mut col = ValueVector::with_capacity(LogicalType::Int64, self.rows_per_chunk);
121 for i in 0..self.rows_per_chunk {
122 col.push(grafeo_common::types::Value::Int64(i as i64));
123 }
124 let chunk = DataChunk::new(vec![col]);
125 Ok(Some(chunk))
126 }
127
128 fn reset(&mut self) {}
129
130 fn name(&self) -> &'static str {
131 "MockOperator"
132 }
133 }
134
135 #[test]
136 fn profile_stats_default_is_zero() {
137 let stats = ProfileStats::default();
138 assert_eq!(stats.rows_out, 0);
139 assert_eq!(stats.time_ns, 0);
140 assert_eq!(stats.calls, 0);
141 }
142
143 #[test]
144 fn profiled_operator_counts_rows_and_calls() {
145 let mock = MockOperator::new(3, 10);
146 let stats = Arc::new(Mutex::new(ProfileStats::default()));
147 let mut profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
148
149 while profiled.next().unwrap().is_some() {}
151
152 let s = stats.lock();
153 assert_eq!(s.rows_out, 30); assert_eq!(s.calls, 4); }
156
157 #[cfg(not(target_arch = "wasm32"))]
158 #[test]
159 fn profiled_operator_measures_time() {
160 let mock = MockOperator::new(1, 5);
161 let stats = Arc::new(Mutex::new(ProfileStats::default()));
162 let mut profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
163
164 let _ = profiled.next();
165 assert!(stats.lock().time_ns > 0);
166 }
167
168 #[test]
169 fn profiled_operator_delegates_name() {
170 let mock = MockOperator::new(0, 0);
171 let stats = Arc::new(Mutex::new(ProfileStats::default()));
172 let profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
173 assert_eq!(profiled.name(), "MockOperator");
174 }
175}