use std::sync::Arc;
use parking_lot::Mutex;
use super::operators::{Operator, OperatorResult};
#[derive(Debug, Clone, Default)]
pub struct ProfileStats {
pub rows_out: u64,
pub time_ns: u64,
pub calls: u64,
}
pub type SharedProfileStats = Arc<Mutex<ProfileStats>>;
pub struct ProfiledOperator {
inner: Box<dyn Operator>,
stats: SharedProfileStats,
}
impl ProfiledOperator {
pub fn new(inner: Box<dyn Operator>, stats: SharedProfileStats) -> Self {
Self { inner, stats }
}
}
impl Operator for ProfiledOperator {
fn next(&mut self) -> OperatorResult {
{
let mut s = self.stats.lock();
s.calls += 1;
}
#[cfg(not(target_arch = "wasm32"))]
let start = std::time::Instant::now();
let result = self.inner.next();
#[cfg(not(target_arch = "wasm32"))]
{
#[allow(clippy::cast_possible_truncation)]
let elapsed = start.elapsed().as_nanos() as u64;
self.stats.lock().time_ns += elapsed;
}
if let Ok(Some(ref chunk)) = result {
self.stats.lock().rows_out += chunk.row_count() as u64;
}
result
}
fn reset(&mut self) {
self.inner.reset();
}
fn name(&self) -> &'static str {
self.inner.name()
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
self
}
}
const _: () = {
const fn assert_send_sync<T: Send + Sync>() {}
#[allow(dead_code)]
fn check() {
assert_send_sync::<ProfiledOperator>();
}
};
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::chunk::DataChunk;
use crate::execution::vector::ValueVector;
use grafeo_common::types::LogicalType;
struct MockOperator {
chunks_remaining: usize,
rows_per_chunk: usize,
}
impl MockOperator {
fn new(chunks: usize, rows_per_chunk: usize) -> Self {
Self {
chunks_remaining: chunks,
rows_per_chunk,
}
}
}
impl Operator for MockOperator {
fn next(&mut self) -> OperatorResult {
if self.chunks_remaining == 0 {
return Ok(None);
}
self.chunks_remaining -= 1;
let mut col = ValueVector::with_capacity(LogicalType::Int64, self.rows_per_chunk);
#[allow(clippy::cast_possible_wrap)]
for i in 0..self.rows_per_chunk {
col.push(grafeo_common::types::Value::Int64(i as i64));
}
let chunk = DataChunk::new(vec![col]);
Ok(Some(chunk))
}
fn reset(&mut self) {}
fn name(&self) -> &'static str {
"MockOperator"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
self
}
}
#[test]
fn profile_stats_default_is_zero() {
let stats = ProfileStats::default();
assert_eq!(stats.rows_out, 0);
assert_eq!(stats.time_ns, 0);
assert_eq!(stats.calls, 0);
}
#[test]
fn profiled_operator_counts_rows_and_calls() {
let mock = MockOperator::new(3, 10);
let stats = Arc::new(Mutex::new(ProfileStats::default()));
let mut profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
while profiled.next().unwrap().is_some() {}
let s = stats.lock();
assert_eq!(s.rows_out, 30); assert_eq!(s.calls, 4); }
#[cfg(not(target_arch = "wasm32"))]
#[test]
fn profiled_operator_measures_time() {
let mock = MockOperator::new(1, 5);
let stats = Arc::new(Mutex::new(ProfileStats::default()));
let mut profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
let _ = profiled.next();
assert!(stats.lock().time_ns > 0);
}
#[test]
fn profiled_operator_delegates_name() {
let mock = MockOperator::new(0, 0);
let stats = Arc::new(Mutex::new(ProfileStats::default()));
let profiled = ProfiledOperator::new(Box::new(mock), Arc::clone(&stats));
assert_eq!(profiled.name(), "MockOperator");
}
}