grafeo_core/execution/
profile.rs1use std::sync::Arc;
9
10use parking_lot::Mutex;
11
12use super::operators::{Operator, OperatorResult};
13
14#[derive(Debug, Clone, Default)]
16pub struct ProfileStats {
17 pub rows_out: u64,
19 pub time_ns: u64,
21 pub calls: u64,
23}
24
25pub type SharedProfileStats = Arc<Mutex<ProfileStats>>;
28
29pub struct ProfiledOperator {
35 inner: Box<dyn Operator>,
36 stats: SharedProfileStats,
37}
38
39impl ProfiledOperator {
40 pub fn new(inner: Box<dyn Operator>, stats: SharedProfileStats) -> Self {
42 Self { inner, stats }
43 }
44}
45
46impl Operator for ProfiledOperator {
47 fn next(&mut self) -> OperatorResult {
48 {
49 let mut s = self.stats.lock();
50 s.calls += 1;
51 }
52
53 #[cfg(not(target_arch = "wasm32"))]
54 let start = std::time::Instant::now();
55
56 let result = self.inner.next();
57
58 #[cfg(not(target_arch = "wasm32"))]
59 {
60 #[allow(clippy::cast_possible_truncation)]
62 let elapsed = start.elapsed().as_nanos() as u64;
63 self.stats.lock().time_ns += elapsed;
64 }
65
66 if let Ok(Some(ref chunk)) = result {
67 self.stats.lock().rows_out += chunk.row_count() as u64;
68 }
69
70 result
71 }
72
73 fn reset(&mut self) {
74 self.inner.reset();
75 }
76
77 fn name(&self) -> &'static str {
78 self.inner.name()
79 }
80
81 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
82 self
83 }
84}
85
86const _: () = {
90 const fn assert_send_sync<T: Send + Sync>() {}
91 #[allow(dead_code)]
93 fn check() {
94 assert_send_sync::<ProfiledOperator>();
95 }
96};
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101 use crate::execution::chunk::DataChunk;
102 use crate::execution::vector::ValueVector;
103 use grafeo_common::types::LogicalType;
104
105 struct MockOperator {
107 chunks_remaining: usize,
108 rows_per_chunk: usize,
109 }
110
111 impl MockOperator {
112 fn new(chunks: usize, rows_per_chunk: usize) -> Self {
113 Self {
114 chunks_remaining: chunks,
115 rows_per_chunk,
116 }
117 }
118 }
119
120 impl Operator for MockOperator {
121 fn next(&mut self) -> OperatorResult {
122 if self.chunks_remaining == 0 {
123 return Ok(None);
124 }
125 self.chunks_remaining -= 1;
126 let mut col = ValueVector::with_capacity(LogicalType::Int64, self.rows_per_chunk);
127 #[allow(clippy::cast_possible_wrap)]
129 for i in 0..self.rows_per_chunk {
130 col.push(grafeo_common::types::Value::Int64(i as i64));
131 }
132 let chunk = DataChunk::new(vec![col]);
133 Ok(Some(chunk))
134 }
135
136 fn reset(&mut self) {}
137
138 fn name(&self) -> &'static str {
139 "MockOperator"
140 }
141
142 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
143 self
144 }
145 }
146
147 #[test]
148 fn profile_stats_default_is_zero() {
149 let stats = ProfileStats::default();
150 assert_eq!(stats.rows_out, 0);
151 assert_eq!(stats.time_ns, 0);
152 assert_eq!(stats.calls, 0);
153 }
154
155 #[test]
156 fn profiled_operator_counts_rows_and_calls() {
157 let mock = MockOperator::new(3, 10);
158 let stats = Arc::new(Mutex::new(ProfileStats::default()));
159 let mut profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
160
161 while profiled.next().unwrap().is_some() {}
163
164 let s = stats.lock();
165 assert_eq!(s.rows_out, 30); assert_eq!(s.calls, 4); }
168
169 #[cfg(not(target_arch = "wasm32"))]
170 #[test]
171 fn profiled_operator_measures_time() {
172 let mock = MockOperator::new(1, 5);
173 let stats = Arc::new(Mutex::new(ProfileStats::default()));
174 let mut profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
175
176 let _ = profiled.next();
177 assert!(stats.lock().time_ns > 0);
178 }
179
180 #[test]
181 fn profiled_operator_delegates_name() {
182 let mock = MockOperator::new(0, 0);
183 let stats = Arc::new(Mutex::new(ProfileStats::default()));
184 let profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
185 assert_eq!(profiled.name(), "MockOperator");
186 }
187}