mod info;
mod record_profiling;
mod stage_metrics;
pub use info::StageProfilingInfo;
pub use record_profiling::{RecordProfilingMetrics, StageEntry};
pub use stage_metrics::StageMetrics;
extern crate alloc;
use alloc::{boxed::Box, sync::Arc};
use core::future::Future;
use core::pin::Pin;
use core::sync::atomic::Ordering;
use portable_atomic::AtomicU64;
use crate::buffer::BufferReader;
use crate::DbError;
pub(crate) type Clock = Arc<dyn Fn() -> u64 + Send + Sync>;
pub(crate) fn make_clock<R: aimdb_executor::TimeOps>(rt: Arc<R>) -> Clock {
let epoch = rt.now();
Arc::new(move || {
let now = rt.now();
match rt.duration_since(now, epoch.clone()) {
Some(d) => rt.duration_as_nanos(d),
None => 0,
}
})
}
const NO_PREV: u64 = u64::MAX;
pub(crate) struct ProducerProfilingState {
metrics: Arc<StageMetrics>,
clock: Clock,
last_produce_ns: AtomicU64,
}
impl ProducerProfilingState {
pub(crate) fn new(metrics: Arc<StageMetrics>, clock: Clock) -> Self {
Self {
metrics,
clock,
last_produce_ns: AtomicU64::new(NO_PREV),
}
}
pub(crate) fn record_produce(&self) {
let now = (self.clock)();
let prev = self.last_produce_ns.swap(now, Ordering::Relaxed);
if prev != NO_PREV {
self.metrics.record(now.saturating_sub(prev));
}
}
}
pub(crate) struct ProfilingBufferReader<T: Clone + Send> {
inner: Box<dyn BufferReader<T> + Send>,
metrics: Arc<StageMetrics>,
clock: Clock,
last_yield_ns: Option<u64>,
}
impl<T: Clone + Send> ProfilingBufferReader<T> {
pub(crate) fn new(
inner: Box<dyn BufferReader<T> + Send>,
metrics: Arc<StageMetrics>,
clock: Clock,
) -> Self {
Self {
inner,
metrics,
clock,
last_yield_ns: None,
}
}
fn on_yield(&mut self, started_ns: u64) {
if let Some(prev) = self.last_yield_ns {
self.metrics.record(started_ns.saturating_sub(prev));
}
self.last_yield_ns = Some((self.clock)());
}
}
impl<T: Clone + Send> BufferReader<T> for ProfilingBufferReader<T> {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + '_>> {
Box::pin(async move {
let started_ns = (self.clock)();
let result = self.inner.recv().await;
if result.is_ok() {
self.on_yield(started_ns);
}
result
})
}
fn try_recv(&mut self) -> Result<T, DbError> {
let started_ns = (self.clock)();
let result = self.inner.try_recv();
if result.is_ok() {
self.on_yield(started_ns);
}
result
}
}