1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
use std::sync::Arc;

use bytes::Bytes;
use derive_more::Display;
use parking_lot::RwLock;
use spacetimedb_commitlog::{payload::txdata, Varchar};
use spacetimedb_lib::{Address, Identity};
use spacetimedb_primitives::TableId;
use spacetimedb_sats::bsatn;

use crate::util::slow::SlowQueryConfig;
use crate::{db::db_metrics::DB_METRICS, host::Timestamp};

pub enum MetricType {
    IndexSeeks,
    KeysScanned,
    RowsFetched,
    RowsInserted,
    RowsDeleted,
}

#[derive(Default, Clone)]
struct BufferMetric {
    pub table_id: TableId,
    pub index_seeks: u64,
    pub keys_scanned: u64,
    pub rows_fetched: u64,
    pub rows_inserted: u64,
    pub rows_deleted: u64,
    pub cache_table_name: String,
}

impl BufferMetric {
    pub fn inc_by(&mut self, ty: MetricType, val: u64) {
        match ty {
            MetricType::IndexSeeks => {
                self.index_seeks += val;
            }
            MetricType::KeysScanned => {
                self.keys_scanned += val;
            }
            MetricType::RowsFetched => {
                self.rows_fetched += val;
            }
            MetricType::RowsInserted => {
                self.rows_inserted += val;
            }
            MetricType::RowsDeleted => {
                self.rows_deleted += val;
            }
        }
    }
}

impl BufferMetric {
    pub fn new(table_id: TableId, table_name: String) -> Self {
        Self {
            table_id,
            cache_table_name: table_name,
            ..Default::default()
        }
    }
}

#[derive(Default, Clone)]
pub struct Metrics(Vec<BufferMetric>);

impl Metrics {
    pub fn inc_by<F: FnOnce() -> String>(&mut self, table_id: TableId, ty: MetricType, val: u64, get_table_name: F) {
        if let Some(metric) = self.0.iter_mut().find(|x| x.table_id == table_id) {
            metric.inc_by(ty, val);
        } else {
            let table_name = get_table_name();
            let mut metric = BufferMetric::new(table_id, table_name);
            metric.inc_by(ty, val);
            self.0.push(metric);
        }
    }

    pub fn table_exists(&self, table_id: TableId) -> bool {
        self.0.iter().any(|x| x.table_id == table_id)
    }

    #[allow(dead_code)]
    fn flush(&mut self, workload: &WorkloadType, database: &Address, reducer: &str) {
        macro_rules! flush_metric {
            ($db_metric:expr, $metric:expr, $metric_field:ident) => {
                if $metric.$metric_field > 0 {
                    $db_metric
                        .with_label_values(
                            workload,
                            database,
                            reducer,
                            &$metric.table_id.0,
                            &$metric.cache_table_name,
                        )
                        .inc_by($metric.$metric_field);
                }
            };
        }

        self.0.iter().for_each(|metric| {
            flush_metric!(DB_METRICS.rdb_num_index_seeks, metric, index_seeks);
            flush_metric!(DB_METRICS.rdb_num_keys_scanned, metric, keys_scanned);
            flush_metric!(DB_METRICS.rdb_num_rows_fetched, metric, rows_fetched);
            flush_metric!(DB_METRICS.rdb_num_rows_inserted, metric, rows_inserted);
            flush_metric!(DB_METRICS.rdb_num_rows_deleted, metric, rows_deleted);
        });
    }
}

/// Represents the context under which a database runtime method is executed.
/// In particular it provides details about the currently executing txn to runtime operations.
/// More generally it acts as a container for information that database operations may require to function correctly.
#[derive(Default, Clone)]
pub struct ExecutionContext {
    /// The database on which a transaction is being executed.
    database: Address,
    /// The reducer from which the current transaction originated.
    reducer: Option<ReducerContext>,
    /// The type of workload that is being executed.
    workload: WorkloadType,
    /// The Metrics to be reported for this transaction.
    pub metrics: Arc<RwLock<Metrics>>,
    /// Configuration threshold for detecting slow queries.
    pub slow_query_config: SlowQueryConfig,
}

/// If an [`ExecutionContext`] is a reducer context, describes the reducer.
///
/// Note that this information is written to persistent storage.
#[derive(Clone)]
pub struct ReducerContext {
    /// The name of the reducer.
    pub name: String,
    /// The [`Identity`] of the caller.
    pub caller_identity: Identity,
    /// The [`Address`] of the caller.
    pub caller_address: Address,
    /// The timestamp of the reducer invocation.
    pub timestamp: Timestamp,
    /// The BSATN-encoded arguments given to the reducer.
    ///
    /// Note that [`Bytes`] is a refcounted value, but the memory it points to
    /// can be large-ish. The reference should be freed as soon as possible.
    pub arg_bsatn: Bytes,
}

impl From<&ReducerContext> for txdata::Inputs {
    fn from(
        ReducerContext {
            name,
            caller_identity,
            caller_address,
            timestamp,
            arg_bsatn,
        }: &ReducerContext,
    ) -> Self {
        let reducer_name = Arc::new(Varchar::from_str_truncate(name));
        let cap = arg_bsatn.len()
        /* caller_identity */
        + 32
        /* caller_address */
        + 16
        /* timestamp */
        + 8;
        let mut buf = Vec::with_capacity(cap);
        bsatn::to_writer(&mut buf, caller_identity).unwrap();
        bsatn::to_writer(&mut buf, caller_address).unwrap();
        bsatn::to_writer(&mut buf, timestamp).unwrap();
        buf.extend_from_slice(arg_bsatn);

        txdata::Inputs {
            reducer_name,
            reducer_args: buf.into(),
        }
    }
}

/// Classifies a transaction according to its workload.
/// A transaction can be executing a reducer.
/// It can be used to satisfy a one-off sql query or subscription.
/// It can also be an internal operation that is not associated with a reducer or sql request.
#[derive(Clone, Copy, Display, Hash, PartialEq, Eq, strum::AsRefStr)]
pub enum WorkloadType {
    Reducer,
    Sql,
    Subscribe,
    Update,
    Internal,
}

impl Default for WorkloadType {
    fn default() -> Self {
        Self::Internal
    }
}

impl ExecutionContext {
    /// Returns an [ExecutionContext] with the provided parameters and empty metrics.
    fn new(
        database: Address,
        reducer: Option<ReducerContext>,
        workload: WorkloadType,
        slow_query_config: SlowQueryConfig,
    ) -> Self {
        Self {
            database,
            reducer,
            workload,
            metrics: <_>::default(),
            slow_query_config,
        }
    }

    /// Returns an [ExecutionContext] for a reducer transaction.
    pub fn reducer(database: Address, ctx: ReducerContext) -> Self {
        Self::new(database, Some(ctx), WorkloadType::Reducer, Default::default())
    }

    /// Returns an [ExecutionContext] for a one-off sql query.
    pub fn sql(database: Address, slow_query_config: SlowQueryConfig) -> Self {
        Self::new(database, None, WorkloadType::Sql, slow_query_config)
    }

    /// Returns an [ExecutionContext] for an initial subscribe call.
    pub fn subscribe(database: Address, slow_query_config: SlowQueryConfig) -> Self {
        Self::new(database, None, WorkloadType::Subscribe, slow_query_config)
    }

    /// Returns an [ExecutionContext] for a subscription update.
    pub fn incremental_update(database: Address, slow_query_config: SlowQueryConfig) -> Self {
        Self::new(database, None, WorkloadType::Update, slow_query_config)
    }

    /// Returns an [ExecutionContext] for an internal database operation.
    pub fn internal(database: Address) -> Self {
        Self::new(database, None, WorkloadType::Internal, Default::default())
    }

    /// Returns the address of the database on which we are operating.
    #[inline]
    pub fn database(&self) -> Address {
        self.database
    }

    /// If this is a reducer context, returns the name of the reducer.
    #[inline]
    pub fn reducer_name(&self) -> &str {
        self.reducer.as_ref().map(|ctx| ctx.name.as_str()).unwrap_or_default()
    }

    /// If this is a reducer context, returns the full reducer metadata.
    #[inline]
    pub fn reducer_context(&self) -> Option<&ReducerContext> {
        self.reducer.as_ref()
    }

    /// Returns the type of workload that is being executed.
    #[inline]
    pub fn workload(&self) -> WorkloadType {
        self.workload
    }
}

impl Drop for ExecutionContext {
    fn drop(&mut self) {
        let workload = self.workload;
        let database = self.database;
        let reducer = self.reducer_name();
        self.metrics.write().flush(&workload, &database, reducer);
    }
}