use std::sync::Arc;
use bytes::Bytes;
use derive_more::Display;
use parking_lot::RwLock;
use spacetimedb_commitlog::{payload::txdata, Varchar};
use spacetimedb_lib::{Address, Identity};
use spacetimedb_primitives::TableId;
use spacetimedb_sats::bsatn;
use crate::util::slow::SlowQueryConfig;
use crate::{db::db_metrics::DB_METRICS, host::Timestamp};
pub enum MetricType {
IndexSeeks,
KeysScanned,
RowsFetched,
RowsInserted,
RowsDeleted,
}
#[derive(Default, Clone)]
struct BufferMetric {
pub table_id: TableId,
pub index_seeks: u64,
pub keys_scanned: u64,
pub rows_fetched: u64,
pub rows_inserted: u64,
pub rows_deleted: u64,
pub cache_table_name: String,
}
impl BufferMetric {
pub fn inc_by(&mut self, ty: MetricType, val: u64) {
match ty {
MetricType::IndexSeeks => {
self.index_seeks += val;
}
MetricType::KeysScanned => {
self.keys_scanned += val;
}
MetricType::RowsFetched => {
self.rows_fetched += val;
}
MetricType::RowsInserted => {
self.rows_inserted += val;
}
MetricType::RowsDeleted => {
self.rows_deleted += val;
}
}
}
}
impl BufferMetric {
pub fn new(table_id: TableId, table_name: String) -> Self {
Self {
table_id,
cache_table_name: table_name,
..Default::default()
}
}
}
#[derive(Default, Clone)]
pub struct Metrics(Vec<BufferMetric>);
impl Metrics {
pub fn inc_by<F: FnOnce() -> String>(&mut self, table_id: TableId, ty: MetricType, val: u64, get_table_name: F) {
if let Some(metric) = self.0.iter_mut().find(|x| x.table_id == table_id) {
metric.inc_by(ty, val);
} else {
let table_name = get_table_name();
let mut metric = BufferMetric::new(table_id, table_name);
metric.inc_by(ty, val);
self.0.push(metric);
}
}
pub fn table_exists(&self, table_id: TableId) -> bool {
self.0.iter().any(|x| x.table_id == table_id)
}
#[allow(dead_code)]
fn flush(&mut self, workload: &WorkloadType, database: &Address, reducer: &str) {
macro_rules! flush_metric {
($db_metric:expr, $metric:expr, $metric_field:ident) => {
if $metric.$metric_field > 0 {
$db_metric
.with_label_values(
workload,
database,
reducer,
&$metric.table_id.0,
&$metric.cache_table_name,
)
.inc_by($metric.$metric_field);
}
};
}
self.0.iter().for_each(|metric| {
flush_metric!(DB_METRICS.rdb_num_index_seeks, metric, index_seeks);
flush_metric!(DB_METRICS.rdb_num_keys_scanned, metric, keys_scanned);
flush_metric!(DB_METRICS.rdb_num_rows_fetched, metric, rows_fetched);
flush_metric!(DB_METRICS.rdb_num_rows_inserted, metric, rows_inserted);
flush_metric!(DB_METRICS.rdb_num_rows_deleted, metric, rows_deleted);
});
}
}
#[derive(Default, Clone)]
pub struct ExecutionContext {
database: Address,
reducer: Option<ReducerContext>,
workload: WorkloadType,
pub metrics: Arc<RwLock<Metrics>>,
pub slow_query_config: SlowQueryConfig,
}
#[derive(Clone)]
pub struct ReducerContext {
pub name: String,
pub caller_identity: Identity,
pub caller_address: Address,
pub timestamp: Timestamp,
pub arg_bsatn: Bytes,
}
impl From<&ReducerContext> for txdata::Inputs {
fn from(
ReducerContext {
name,
caller_identity,
caller_address,
timestamp,
arg_bsatn,
}: &ReducerContext,
) -> Self {
let reducer_name = Arc::new(Varchar::from_str_truncate(name));
let cap = arg_bsatn.len()
+ 32
+ 16
+ 8;
let mut buf = Vec::with_capacity(cap);
bsatn::to_writer(&mut buf, caller_identity).unwrap();
bsatn::to_writer(&mut buf, caller_address).unwrap();
bsatn::to_writer(&mut buf, timestamp).unwrap();
buf.extend_from_slice(arg_bsatn);
txdata::Inputs {
reducer_name,
reducer_args: buf.into(),
}
}
}
#[derive(Clone, Copy, Display, Hash, PartialEq, Eq, strum::AsRefStr)]
pub enum WorkloadType {
Reducer,
Sql,
Subscribe,
Update,
Internal,
}
impl Default for WorkloadType {
fn default() -> Self {
Self::Internal
}
}
impl ExecutionContext {
fn new(
database: Address,
reducer: Option<ReducerContext>,
workload: WorkloadType,
slow_query_config: SlowQueryConfig,
) -> Self {
Self {
database,
reducer,
workload,
metrics: <_>::default(),
slow_query_config,
}
}
pub fn reducer(database: Address, ctx: ReducerContext) -> Self {
Self::new(database, Some(ctx), WorkloadType::Reducer, Default::default())
}
pub fn sql(database: Address, slow_query_config: SlowQueryConfig) -> Self {
Self::new(database, None, WorkloadType::Sql, slow_query_config)
}
pub fn subscribe(database: Address, slow_query_config: SlowQueryConfig) -> Self {
Self::new(database, None, WorkloadType::Subscribe, slow_query_config)
}
pub fn incremental_update(database: Address, slow_query_config: SlowQueryConfig) -> Self {
Self::new(database, None, WorkloadType::Update, slow_query_config)
}
pub fn internal(database: Address) -> Self {
Self::new(database, None, WorkloadType::Internal, Default::default())
}
#[inline]
pub fn database(&self) -> Address {
self.database
}
#[inline]
pub fn reducer_name(&self) -> &str {
self.reducer.as_ref().map(|ctx| ctx.name.as_str()).unwrap_or_default()
}
#[inline]
pub fn reducer_context(&self) -> Option<&ReducerContext> {
self.reducer.as_ref()
}
#[inline]
pub fn workload(&self) -> WorkloadType {
self.workload
}
}
impl Drop for ExecutionContext {
fn drop(&mut self) {
let workload = self.workload;
let database = self.database;
let reducer = self.reducer_name();
self.metrics.write().flush(&workload, &database, reducer);
}
}