use std::{future::Future, pin::Pin, sync::Arc};
use reifydb_core::{
actors::server::Operation,
event::{
EventBus,
metric::{Request, RequestExecutedEvent},
},
};
use reifydb_metric::accumulator::StatementStatsAccumulator;
use reifydb_runtime::context::clock::Clock;
use reifydb_sub_server::{
execute::ExecuteError,
interceptor::{RequestContext, RequestInterceptor, ResponseContext},
};
use reifydb_value::value::datetime::DateTime;
pub struct RequestMetricsInterceptor {
event_bus: EventBus,
accumulator: Arc<StatementStatsAccumulator>,
clock: Clock,
}
impl RequestMetricsInterceptor {
pub fn new(event_bus: EventBus, accumulator: Arc<StatementStatsAccumulator>, clock: Clock) -> Self {
Self {
event_bus,
accumulator,
clock,
}
}
}
impl RequestInterceptor for RequestMetricsInterceptor {
fn pre_execute<'a>(
&'a self,
_ctx: &'a mut RequestContext,
) -> Pin<Box<dyn Future<Output = Result<(), ExecuteError>> + Send + 'a>> {
Box::pin(async { Ok(()) })
}
fn post_execute<'a>(&'a self, ctx: &'a ResponseContext) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
let event_bus = self.event_bus.clone();
let accumulator = self.accumulator.clone();
let clock = self.clock.clone();
Box::pin(async move {
let success = ctx.result.is_ok();
let request_record = match ctx.operation {
Operation::Query => {
for stmt in &ctx.metrics.statements {
accumulator.record(
stmt.fingerprint,
&stmt.normalized_rql,
stmt.execute_duration_us,
stmt.compile_duration_us,
stmt.rows_affected,
success,
);
}
Request::Query {
fingerprint: ctx.metrics.fingerprint,
statements: ctx.metrics.statements.clone(),
}
}
Operation::Command => Request::Command {
fingerprint: ctx.metrics.fingerprint,
statements: ctx.metrics.statements.clone(),
},
Operation::Admin => Request::Admin {
fingerprint: ctx.metrics.fingerprint,
statements: ctx.metrics.statements.clone(),
},
Operation::Subscribe => return,
};
let timestamp = DateTime::from_timestamp_millis(clock.now_millis()).unwrap();
event_bus.emit(RequestExecutedEvent::new(
request_record,
ctx.metrics.total,
ctx.metrics.compute,
success,
timestamp,
));
})
}
}