spacetimedb/host/
instance_env.rs

1use super::scheduler::{get_schedule_from_row, ScheduleError, Scheduler};
2use crate::database_logger::{BacktraceProvider, LogLevel, Record};
3use crate::db::datastore::locking_tx_datastore::MutTxId;
4use crate::db::relational_db::{MutTx, RelationalDB};
5use crate::error::{DBError, DatastoreError, IndexError, NodesError};
6use crate::replica_context::ReplicaContext;
7use core::mem;
8use parking_lot::{Mutex, MutexGuard};
9use smallvec::SmallVec;
10use spacetimedb_lib::Timestamp;
11use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
12use spacetimedb_sats::{
13    bsatn::{self, ToBsatn},
14    buffer::{CountWriter, TeeWriter},
15    AlgebraicValue, ProductValue,
16};
17use spacetimedb_table::indexes::RowPointer;
18use spacetimedb_table::table::RowRef;
19use std::ops::DerefMut;
20use std::sync::Arc;
21
22#[derive(Clone)]
23pub struct InstanceEnv {
24    pub replica_ctx: Arc<ReplicaContext>,
25    pub scheduler: Scheduler,
26    pub tx: TxSlot,
27    /// The timestamp the current reducer began running.
28    pub start_time: Timestamp,
29}
30
31#[derive(Clone, Default)]
32pub struct TxSlot {
33    inner: Arc<Mutex<Option<MutTxId>>>,
34}
35
36/// The maximum number of chunks stored in a single [`ChunkPool`].
37///
38/// When returning a chunk to the pool via [`ChunkPool::put`],
39/// if the pool contains more than [`MAX_CHUNKS_IN_POOL`] chunks,
40/// the returned chunk will be freed rather than added to the pool.
41///
42/// This, together with [`MAX_CHUNK_SIZE_IN_BYTES`],
43/// prevents the heap usage of a [`ChunkPool`] from growing without bound.
44///
45/// This number chosen completely arbitrarily by pgoldman 2025-04-10.
46const MAX_CHUNKS_IN_POOL: usize = 32;
47
48/// The maximum size of chunks which can be saved in a [`ChunkPool`].
49///
50/// When returning a chunk to the pool via [`ChunkPool::put`],
51/// if the returned chunk is larger than [`MAX_CHUNK_SIZE_IN_BYTES`],
52/// the returned chunk will be freed rather than added to the pool.
53///
54/// This, together with [`MAX_CHUNKS_IN_POOL`],
55/// prevents the heap usage of a [`ChunkPool`] from growing without bound.
56///
57/// We switch to a new chunk when we pass ROW_ITER_CHUNK_SIZE, so this adds a buffer of 4x.
58const MAX_CHUNK_SIZE_IN_BYTES: usize = spacetimedb_primitives::ROW_ITER_CHUNK_SIZE * 4;
59
60/// A pool of available unused chunks.
61///
62/// The number of chunks stored in a `ChunkPool` is limited by [`MAX_CHUNKS_IN_POOL`],
63/// and the size of each individual saved chunk is limited by [`MAX_CHUNK_SIZE_IN_BYTES`].
64#[derive(Default)]
65pub struct ChunkPool {
66    free_chunks: Vec<Vec<u8>>,
67}
68
69impl ChunkPool {
70    /// Takes an unused chunk from this pool
71    /// or creates a new chunk if none are available.
72    /// New chunks are not actually allocated,
73    /// but will be, on first use.
74    fn take(&mut self) -> Vec<u8> {
75        self.free_chunks.pop().unwrap_or_default()
76    }
77
78    /// Return a chunk back to the pool, or frees it, as appropriate.
79    ///
80    /// `chunk` will be freed if either:
81    ///
82    /// - `self` already contains at least [`MAX_CHUNKS_IN_POOL`] chunks, or
83    /// - `chunk.capacity()` is greater than [`MAX_CHUNK_SIZE_IN_BYTES`].
84    ///
85    /// These limits place an upper bound on the memory usage of a single [`ChunkPool`].
86    pub fn put(&mut self, mut chunk: Vec<u8>) {
87        if chunk.capacity() > MAX_CHUNK_SIZE_IN_BYTES {
88            return;
89        }
90        if self.free_chunks.len() > MAX_CHUNKS_IN_POOL {
91            return;
92        }
93        chunk.clear();
94        self.free_chunks.push(chunk);
95    }
96}
97
98/// Construct a new `ChunkedWriter` using [`Self::new`].
99/// Do not impl `Default` for this struct or construct it manually;
100/// it is important that all allocated chunks are taken from the [`ChunkPool`],
101/// rather than directly from the global allocator.
102struct ChunkedWriter {
103    /// Chunks collected thus far.
104    chunks: Vec<Vec<u8>>,
105    /// Current in progress chunk that will be added to `chunks`.
106    curr: Vec<u8>,
107}
108
109impl ChunkedWriter {
110    /// Flushes the data collected in the current chunk
111    /// if it's larger than our chunking threshold.
112    fn flush(&mut self, pool: &mut ChunkPool) {
113        if self.curr.len() > spacetimedb_primitives::ROW_ITER_CHUNK_SIZE {
114            let curr = mem::replace(&mut self.curr, pool.take());
115            self.chunks.push(curr);
116        }
117    }
118
119    /// Creates a new `ChunkedWriter` with an empty chunk allocated from the pool.
120    fn new(pool: &mut ChunkPool) -> Self {
121        Self {
122            chunks: Vec::new(),
123            curr: pool.take(),
124        }
125    }
126
127    /// Finalises the writer and returns all the chunks.
128    fn into_chunks(mut self) -> Vec<Vec<u8>> {
129        if !self.curr.is_empty() {
130            self.chunks.push(self.curr);
131        }
132        self.chunks
133    }
134
135    pub fn collect_iter(
136        pool: &mut ChunkPool,
137        iter: impl Iterator<Item = impl ToBsatn>,
138        rows_scanned: &mut usize,
139        bytes_scanned: &mut usize,
140    ) -> Vec<Vec<u8>> {
141        let mut chunked_writer = Self::new(pool);
142        // Consume the iterator, serializing each `item`,
143        // while allowing a chunk to be created at boundaries.
144        for item in iter {
145            // Write the item directly to the BSATN `chunked_writer` buffer.
146            item.to_bsatn_extend(&mut chunked_writer.curr).unwrap();
147            // Flush at item boundaries.
148            chunked_writer.flush(pool);
149            // Update rows scanned
150            *rows_scanned += 1;
151        }
152
153        let chunks = chunked_writer.into_chunks();
154
155        // Update (BSATN) bytes scanned
156        *bytes_scanned += chunks.iter().map(|chunk| chunk.len()).sum::<usize>();
157
158        chunks
159    }
160}
161
162// Generic 'instance environment' delegated to from various host types.
163impl InstanceEnv {
164    pub fn new(replica_ctx: Arc<ReplicaContext>, scheduler: Scheduler) -> Self {
165        Self {
166            replica_ctx,
167            scheduler,
168            tx: TxSlot::default(),
169            start_time: Timestamp::now(),
170        }
171    }
172
173    /// Signal to this `InstanceEnv` that a reducer call is beginning.
174    pub fn start_reducer(&mut self, ts: Timestamp) {
175        self.start_time = ts;
176    }
177
178    fn get_tx(&self) -> Result<impl DerefMut<Target = MutTxId> + '_, GetTxError> {
179        self.tx.get()
180    }
181
182    #[tracing::instrument(level = "trace", skip_all)]
183    pub fn console_log(&self, level: LogLevel, record: &Record, bt: &dyn BacktraceProvider) {
184        self.replica_ctx.logger.write(level, record, bt);
185        log::trace!(
186            "MOD({}): {}",
187            self.replica_ctx.database_identity.to_abbreviated_hex(),
188            record.message
189        );
190    }
191
192    /// Project `cols` in `row_ref` encoded in BSATN to `buffer`
193    /// and return the full length of the BSATN.
194    ///
195    /// Assumes that the full encoding of `cols` will fit in `buffer`.
196    fn project_cols_bsatn(buffer: &mut [u8], cols: ColList, row_ref: RowRef<'_>) -> usize {
197        // We get back a col-list with the columns with generated values.
198        // Write those back to `buffer` and then the encoded length to `row_len`.
199        let counter = CountWriter::default();
200        let mut writer = TeeWriter::new(counter, buffer);
201        for col in cols.iter() {
202            // Read the column value to AV and then serialize.
203            let val = row_ref
204                .read_col::<AlgebraicValue>(col)
205                .expect("reading col as AV never panics");
206            bsatn::to_writer(&mut writer, &val).unwrap();
207        }
208        writer.w1.finish()
209    }
210
211    pub fn insert(&self, table_id: TableId, buffer: &mut [u8]) -> Result<usize, NodesError> {
212        let stdb = &*self.replica_ctx.relational_db;
213        let tx = &mut *self.get_tx()?;
214
215        let (row_len, row_ptr, insert_flags) = stdb
216            .insert(tx, table_id, buffer)
217            .map(|(gen_cols, row_ref, insert_flags)| {
218                let row_len = Self::project_cols_bsatn(buffer, gen_cols, row_ref);
219                (row_len, row_ref.pointer(), insert_flags)
220            })
221            .inspect_err(
222                #[cold]
223                #[inline(never)]
224                |e| match e {
225                    DBError::Datastore(DatastoreError::Index(IndexError::UniqueConstraintViolation(_))) => {}
226                    _ => {
227                        let res = stdb.table_name_from_id_mut(tx, table_id);
228                        if let Ok(Some(table_name)) = res {
229                            log::debug!("insert(table: {table_name}, table_id: {table_id}): {e}")
230                        } else {
231                            log::debug!("insert(table_id: {table_id}): {e}")
232                        }
233                    }
234                },
235            )?;
236
237        if insert_flags.is_scheduler_table {
238            self.schedule_row(stdb, tx, table_id, row_ptr)?;
239        }
240
241        // Note, we update the metric for bytes written after the insert.
242        // This is to capture auto-inc columns.
243        tx.metrics.bytes_written += buffer.len();
244
245        Ok(row_len)
246    }
247
248    #[cold]
249    #[inline(never)]
250    fn schedule_row(
251        &self,
252        stdb: &RelationalDB,
253        tx: &mut MutTx,
254        table_id: TableId,
255        row_ptr: RowPointer,
256    ) -> Result<(), NodesError> {
257        let (id_column, at_column) = stdb
258            .table_scheduled_id_and_at(tx, table_id)?
259            .expect("schedule_row should only be called when we know its a scheduler table");
260
261        let row_ref = tx.get(table_id, row_ptr).map_err(DBError::from)?.unwrap();
262        let (schedule_id, schedule_at) = get_schedule_from_row(&row_ref, id_column, at_column)
263            // NOTE(centril): Should never happen,
264            // as we successfully inserted and thus `ret` is verified against the table schema.
265            .map_err(|e| NodesError::ScheduleError(ScheduleError::DecodingError(e)))?;
266        self.scheduler
267            .schedule(
268                table_id,
269                schedule_id,
270                schedule_at,
271                id_column,
272                at_column,
273                self.start_time,
274            )
275            .map_err(NodesError::ScheduleError)?;
276
277        Ok(())
278    }
279
280    pub fn update(&self, table_id: TableId, index_id: IndexId, buffer: &mut [u8]) -> Result<usize, NodesError> {
281        let stdb = &*self.replica_ctx.relational_db;
282        let tx = &mut *self.get_tx()?;
283
284        let (row_len, row_ptr, update_flags) = stdb
285            .update(tx, table_id, index_id, buffer)
286            .map(|(gen_cols, row_ref, update_flags)| {
287                let row_len = Self::project_cols_bsatn(buffer, gen_cols, row_ref);
288                (row_len, row_ref.pointer(), update_flags)
289            })
290            .inspect_err(
291                #[cold]
292                #[inline(never)]
293                |e| match e {
294                    DBError::Datastore(DatastoreError::Index(IndexError::UniqueConstraintViolation(_))) => {}
295                    _ => {
296                        let res = stdb.table_name_from_id_mut(tx, table_id);
297                        if let Ok(Some(table_name)) = res {
298                            log::debug!("update(table: {table_name}, table_id: {table_id}, index_id: {index_id}): {e}")
299                        } else {
300                            log::debug!("update(table_id: {table_id}, index_id: {index_id}): {e}")
301                        }
302                    }
303                },
304            )?;
305
306        if update_flags.is_scheduler_table {
307            self.schedule_row(stdb, tx, table_id, row_ptr)?;
308        }
309        tx.metrics.bytes_written += buffer.len();
310        tx.metrics.rows_updated += 1;
311
312        Ok(row_len)
313    }
314
315    #[tracing::instrument(level = "trace", skip_all)]
316    pub fn datastore_delete_by_index_scan_range_bsatn(
317        &self,
318        index_id: IndexId,
319        prefix: &[u8],
320        prefix_elems: ColId,
321        rstart: &[u8],
322        rend: &[u8],
323    ) -> Result<u32, NodesError> {
324        let stdb = &*self.replica_ctx.relational_db;
325        let tx = &mut *self.tx.get()?;
326
327        // Find all rows in the table to delete.
328        let (table_id, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
329        // Re. `SmallVec`, `delete_by_field` only cares about 1 element, so optimize for that.
330        let rows_to_delete = iter.map(|row_ref| row_ref.pointer()).collect::<SmallVec<[_; 1]>>();
331
332        // Note, we're deleting rows based on the result of a btree scan.
333        // Hence we must update our `index_seeks` and `rows_scanned` metrics.
334        //
335        // Note that we're not updating `bytes_scanned` at all,
336        // because we never dereference any of the returned `RowPointer`s.
337        tx.metrics.index_seeks += 1;
338        tx.metrics.rows_scanned += rows_to_delete.len();
339
340        // Delete them and count how many we deleted.
341        Ok(stdb.delete(tx, table_id, rows_to_delete))
342    }
343
344    /// Deletes all rows in the table identified by `table_id`
345    /// where the rows match one in `relation`
346    /// which is a bsatn encoding of `Vec<ProductValue>`.
347    ///
348    /// Returns an error if
349    /// - not in a transaction.
350    /// - the table didn't exist.
351    /// - a row couldn't be decoded to the table schema type.
352    #[tracing::instrument(level = "trace", skip(self, relation))]
353    pub fn datastore_delete_all_by_eq_bsatn(&self, table_id: TableId, relation: &[u8]) -> Result<u32, NodesError> {
354        let stdb = &*self.replica_ctx.relational_db;
355        let tx = &mut *self.get_tx()?;
356
357        // Track the number of bytes coming from the caller
358        tx.metrics.bytes_scanned += relation.len();
359
360        // Find the row schema using it to decode a vector of product values.
361        let row_ty = stdb.row_schema_for_table(tx, table_id)?;
362        // `TableType::delete` cares about a single element
363        // so in that case we can avoid the allocation by using `smallvec`.
364        let relation = ProductValue::decode_smallvec(&row_ty, &mut &*relation).map_err(NodesError::DecodeRow)?;
365
366        // Note, we track the number of rows coming from the caller,
367        // regardless of whether or not we actually delete them,
368        // since we have to derive row ids for each one of them.
369        tx.metrics.rows_scanned += relation.len();
370
371        // Delete them and return how many we deleted.
372        Ok(stdb.delete_by_rel(tx, table_id, relation))
373    }
374
375    /// Returns the `table_id` associated with the given `table_name`.
376    ///
377    /// Errors with `GetTxError` if not in a transaction
378    /// and `TableNotFound` if the table does not exist.
379    #[tracing::instrument(level = "trace", skip_all)]
380    pub fn table_id_from_name(&self, table_name: &str) -> Result<TableId, NodesError> {
381        let stdb = &*self.replica_ctx.relational_db;
382        let tx = &mut *self.get_tx()?;
383
384        // Query the table id from the name.
385        stdb.table_id_from_name_mut(tx, table_name)?
386            .ok_or(NodesError::TableNotFound)
387    }
388
389    /// Returns the `index_id` associated with the given `index_name`.
390    ///
391    /// Errors with `GetTxError` if not in a transaction
392    /// and `IndexNotFound` if the index does not exist.
393    #[tracing::instrument(level = "trace", skip_all)]
394    pub fn index_id_from_name(&self, index_name: &str) -> Result<IndexId, NodesError> {
395        let stdb = &*self.replica_ctx.relational_db;
396        let tx = &mut *self.get_tx()?;
397
398        // Query the index id from the name.
399        stdb.index_id_from_name_mut(tx, index_name)?
400            .ok_or(NodesError::IndexNotFound)
401    }
402
403    /// Returns the number of rows in the table identified by `table_id`.
404    ///
405    /// Errors with `GetTxError` if not in a transaction
406    /// and `TableNotFound` if the table does not exist.
407    #[tracing::instrument(level = "trace", skip_all)]
408    pub fn datastore_table_row_count(&self, table_id: TableId) -> Result<u64, NodesError> {
409        let stdb = &*self.replica_ctx.relational_db;
410        let tx = &mut *self.get_tx()?;
411
412        // Query the row count for id.
413        stdb.table_row_count_mut(tx, table_id).ok_or(NodesError::TableNotFound)
414    }
415
416    #[tracing::instrument(level = "trace", skip_all)]
417    pub fn datastore_table_scan_bsatn_chunks(
418        &self,
419        pool: &mut ChunkPool,
420        table_id: TableId,
421    ) -> Result<Vec<Vec<u8>>, NodesError> {
422        let stdb = &*self.replica_ctx.relational_db;
423        let tx = &mut *self.tx.get()?;
424
425        // Track the number of rows and the number of bytes scanned by the iterator
426        let mut rows_scanned = 0;
427        let mut bytes_scanned = 0;
428
429        // Scan table and serialize rows to bsatn
430        let chunks = ChunkedWriter::collect_iter(
431            pool,
432            stdb.iter_mut(tx, table_id)?,
433            &mut rows_scanned,
434            &mut bytes_scanned,
435        );
436
437        tx.metrics.rows_scanned += rows_scanned;
438        tx.metrics.bytes_scanned += bytes_scanned;
439
440        Ok(chunks)
441    }
442
443    #[tracing::instrument(level = "trace", skip_all)]
444    pub fn datastore_index_scan_range_bsatn_chunks(
445        &self,
446        pool: &mut ChunkPool,
447        index_id: IndexId,
448        prefix: &[u8],
449        prefix_elems: ColId,
450        rstart: &[u8],
451        rend: &[u8],
452    ) -> Result<Vec<Vec<u8>>, NodesError> {
453        let stdb = &*self.replica_ctx.relational_db;
454        let tx = &mut *self.tx.get()?;
455
456        // Track rows and bytes scanned by the iterator
457        let mut rows_scanned = 0;
458        let mut bytes_scanned = 0;
459
460        // Open index iterator
461        let (_, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
462
463        // Scan the index and serialize rows to bsatn
464        let chunks = ChunkedWriter::collect_iter(pool, iter, &mut rows_scanned, &mut bytes_scanned);
465
466        tx.metrics.index_seeks += 1;
467        tx.metrics.rows_scanned += rows_scanned;
468        tx.metrics.bytes_scanned += bytes_scanned;
469
470        Ok(chunks)
471    }
472}
473
474impl TxSlot {
475    pub fn set<T>(&mut self, tx: MutTxId, f: impl FnOnce() -> T) -> (MutTxId, T) {
476        let prev = self.inner.lock().replace(tx);
477        assert!(prev.is_none(), "reentrant TxSlot::set");
478        let remove_tx = || self.inner.lock().take();
479
480        let res = {
481            scopeguard::defer_on_unwind! { remove_tx(); }
482            f()
483        };
484
485        let tx = remove_tx().expect("tx was removed during transaction");
486        (tx, res)
487    }
488
489    pub fn get(&self) -> Result<impl DerefMut<Target = MutTxId> + '_, GetTxError> {
490        MutexGuard::try_map(self.inner.lock(), |map| map.as_mut()).map_err(|_| GetTxError)
491    }
492}
493
494#[derive(Debug)]
495pub struct GetTxError;
496impl From<GetTxError> for NodesError {
497    fn from(_: GetTxError) -> Self {
498        NodesError::NotInTransaction
499    }
500}
501
502#[cfg(test)]
503mod test {
504    use super::*;
505
506    use std::{ops::Bound, sync::Arc};
507
508    use crate::{
509        database_logger::DatabaseLogger,
510        db::relational_db::{
511            tests_utils::{begin_mut_tx, with_auto_commit, with_read_only, TestDB},
512            RelationalDB,
513        },
514        host::Scheduler,
515        messages::control_db::{Database, HostType},
516        replica_context::ReplicaContext,
517        subscription::module_subscription_actor::ModuleSubscriptions,
518    };
519    use anyhow::{anyhow, Result};
520    use spacetimedb_lib::db::auth::StAccess;
521    use spacetimedb_lib::{bsatn::to_vec, AlgebraicType, AlgebraicValue, Hash, Identity, ProductValue};
522    use spacetimedb_paths::{server::ModuleLogsDir, FromPathUnchecked};
523    use spacetimedb_primitives::{IndexId, TableId};
524    use spacetimedb_sats::product;
525    use tempfile::TempDir;
526
527    /// An `InstanceEnv` requires a `DatabaseLogger`
528    fn temp_logger() -> Result<DatabaseLogger> {
529        let temp = TempDir::new()?;
530        let path = ModuleLogsDir::from_path_unchecked(temp.into_path());
531        let path = path.today();
532        Ok(DatabaseLogger::open(path))
533    }
534
535    /// An `InstanceEnv` requires a `ReplicaContext`.
536    /// For our purposes this is just a wrapper for `RelationalDB`.
537    fn replica_ctx(relational_db: Arc<RelationalDB>) -> Result<(ReplicaContext, tokio::runtime::Runtime)> {
538        let (subs, runtime) = ModuleSubscriptions::for_test_new_runtime(relational_db.clone());
539        Ok((
540            ReplicaContext {
541                database: Database {
542                    id: 0,
543                    database_identity: Identity::ZERO,
544                    owner_identity: Identity::ZERO,
545                    host_type: HostType::Wasm,
546                    initial_program: Hash::ZERO,
547                },
548                replica_id: 0,
549                logger: Arc::new(temp_logger()?),
550                subscriptions: subs,
551                relational_db,
552            },
553            runtime,
554        ))
555    }
556
557    /// An `InstanceEnv` used for testing the database syscalls.
558    fn instance_env(db: Arc<RelationalDB>) -> Result<(InstanceEnv, tokio::runtime::Runtime)> {
559        let (scheduler, _) = Scheduler::open(db.clone());
560        let (replica_context, runtime) = replica_ctx(db)?;
561        Ok((
562            InstanceEnv {
563                replica_ctx: Arc::new(replica_context),
564                scheduler,
565                tx: TxSlot::default(),
566                start_time: Timestamp::now(),
567            },
568            runtime,
569        ))
570    }
571
572    /// An in-memory `RelationalDB` for testing.
573    /// It does not persist data to disk.
574    fn relational_db() -> Result<Arc<RelationalDB>> {
575        let TestDB { db, .. } = TestDB::in_memory()?;
576        Ok(Arc::new(db))
577    }
578
579    /// Generate a `ProductValue` for use in [create_table_with_index]
580    fn product_row(i: usize) -> ProductValue {
581        let str = i.to_string();
582        let str = str.repeat(i);
583        let id = i as u64;
584        product!(id, str)
585    }
586
587    /// Generate a BSATN encoded row for use in [create_table_with_index]
588    fn bsatn_row(i: usize) -> Result<Vec<u8>> {
589        Ok(to_vec(&product_row(i))?)
590    }
591
592    /// Instantiate the following table:
593    ///
594    /// ```text
595    /// id | str
596    /// -- | ---
597    /// 1  | "1"
598    /// 2  | "22"
599    /// 3  | "333"
600    /// 4  | "4444"
601    /// 5  | "55555"
602    /// ```
603    ///
604    /// with an index on `id`.
605    fn create_table_with_index(db: &RelationalDB) -> Result<(TableId, IndexId)> {
606        let table_id = db.create_table_for_test(
607            "t",
608            &[("id", AlgebraicType::U64), ("str", AlgebraicType::String)],
609            &[0.into()],
610        )?;
611        let index_id = with_read_only(db, |tx| {
612            db.schema_for_table(tx, table_id)?
613                .indexes
614                .iter()
615                .find(|schema| {
616                    schema
617                        .index_algorithm
618                        .columns()
619                        .as_singleton()
620                        .is_some_and(|col_id| col_id.idx() == 0)
621                })
622                .map(|schema| schema.index_id)
623                .ok_or_else(|| anyhow!("Index not found for ColId `{}`", 0))
624        })?;
625        with_auto_commit(db, |tx| -> Result<_> {
626            for i in 1..=5 {
627                db.insert(tx, table_id, &bsatn_row(i)?)?;
628            }
629            Ok(())
630        })?;
631        Ok((table_id, index_id))
632    }
633
634    fn create_table_with_unique_index(db: &RelationalDB) -> Result<(TableId, IndexId)> {
635        let table_id = db.create_table_for_test_with_the_works(
636            "t",
637            &[("id", AlgebraicType::U64), ("str", AlgebraicType::String)],
638            &[0.into()],
639            &[0.into()],
640            StAccess::Public,
641        )?;
642        let index_id = with_read_only(db, |tx| {
643            db.schema_for_table(tx, table_id)?
644                .indexes
645                .iter()
646                .find(|schema| {
647                    schema
648                        .index_algorithm
649                        .columns()
650                        .as_singleton()
651                        .is_some_and(|col_id| col_id.idx() == 0)
652                })
653                .map(|schema| schema.index_id)
654                .ok_or_else(|| anyhow!("Index not found for ColId `{}`", 0))
655        })?;
656        with_auto_commit(db, |tx| -> Result<_> {
657            for i in 1..=5 {
658                db.insert(tx, table_id, &bsatn_row(i)?)?;
659            }
660            Ok(())
661        })?;
662        Ok((table_id, index_id))
663    }
664
665    #[test]
666    fn table_scan_metrics() -> Result<()> {
667        let db = relational_db()?;
668        let (env, _runtime) = instance_env(db.clone())?;
669
670        let (table_id, _) = create_table_with_index(&db)?;
671
672        let mut tx_slot = env.tx.clone();
673
674        let f = || env.datastore_table_scan_bsatn_chunks(&mut ChunkPool::default(), table_id);
675        let tx = begin_mut_tx(&db);
676        let (tx, scan_result) = tx_slot.set(tx, f);
677
678        scan_result?;
679
680        let bytes_scanned = (1..=5)
681            .map(bsatn_row)
682            .filter_map(|bsatn_result| bsatn_result.ok())
683            .map(|bsatn| bsatn.len())
684            .sum::<usize>();
685
686        // The only non-zero metrics should be rows and bytes scanned.
687        // The table has 5 rows, so we should have 5 rows scanned.
688        // We should also have scanned the same number of bytes that we inserted.
689        assert_eq!(0, tx.metrics.index_seeks);
690        assert_eq!(5, tx.metrics.rows_scanned);
691        assert_eq!(bytes_scanned, tx.metrics.bytes_scanned);
692        assert_eq!(0, tx.metrics.bytes_written);
693        assert_eq!(0, tx.metrics.bytes_sent_to_clients);
694        Ok(())
695    }
696
697    #[test]
698    fn index_scan_metrics() -> Result<()> {
699        let db = relational_db()?;
700        let (env, _runtime) = instance_env(db.clone())?;
701
702        let (_, index_id) = create_table_with_index(&db)?;
703
704        let mut tx_slot = env.tx.clone();
705
706        // Perform two index scans
707        let f = || -> Result<_> {
708            let index_key_3 = to_vec(&Bound::Included(AlgebraicValue::U64(3)))?;
709            let index_key_5 = to_vec(&Bound::Included(AlgebraicValue::U64(5)))?;
710            env.datastore_index_scan_range_bsatn_chunks(
711                &mut ChunkPool::default(),
712                index_id,
713                &[],
714                0.into(),
715                &index_key_3,
716                &index_key_3,
717            )?;
718            env.datastore_index_scan_range_bsatn_chunks(
719                &mut ChunkPool::default(),
720                index_id,
721                &[],
722                0.into(),
723                &index_key_5,
724                &index_key_5,
725            )?;
726            Ok(())
727        };
728        let tx = begin_mut_tx(&db);
729        let (tx, scan_result) = tx_slot.set(tx, f);
730
731        scan_result?;
732
733        let bytes_scanned = [3, 5]
734            .into_iter()
735            .map(bsatn_row)
736            .filter_map(|bsatn_result| bsatn_result.ok())
737            .map(|bsatn| bsatn.len())
738            .sum::<usize>();
739
740        // We performed two index scans to fetch rows 3 and 5
741        assert_eq!(2, tx.metrics.index_seeks);
742        assert_eq!(2, tx.metrics.rows_scanned);
743        assert_eq!(bytes_scanned, tx.metrics.bytes_scanned);
744        assert_eq!(0, tx.metrics.bytes_written);
745        assert_eq!(0, tx.metrics.bytes_sent_to_clients);
746        Ok(())
747    }
748
749    #[test]
750    fn insert_metrics() -> Result<()> {
751        let db = relational_db()?;
752        let (env, _runtime) = instance_env(db.clone())?;
753
754        let (table_id, _) = create_table_with_index(&db)?;
755
756        let mut tx_slot = env.tx.clone();
757
758        // Insert 4 new rows into `t`
759        let f = || -> Result<_> {
760            for i in 6..=9 {
761                let mut buffer = bsatn_row(i)?;
762                env.insert(table_id, &mut buffer)?;
763            }
764            Ok(())
765        };
766        let tx = begin_mut_tx(&db);
767        let (tx, insert_result) = tx_slot.set(tx, f);
768
769        insert_result?;
770
771        let bytes_written = (6..=9)
772            .map(bsatn_row)
773            .filter_map(|bsatn_result| bsatn_result.ok())
774            .map(|bsatn| bsatn.len())
775            .sum::<usize>();
776
777        // The only metric affected by inserts is bytes written
778        assert_eq!(0, tx.metrics.index_seeks);
779        assert_eq!(0, tx.metrics.rows_scanned);
780        assert_eq!(0, tx.metrics.bytes_scanned);
781        assert_eq!(bytes_written, tx.metrics.bytes_written);
782        assert_eq!(0, tx.metrics.bytes_sent_to_clients);
783        Ok(())
784    }
785
786    #[test]
787    fn update_metrics() -> Result<()> {
788        let db = relational_db()?;
789        let (env, _runtime) = instance_env(db.clone())?;
790
791        let (table_id, index_id) = create_table_with_unique_index(&db)?;
792
793        let mut tx_slot = env.tx.clone();
794
795        let row_id: u64 = 1;
796        let row_val: String = "string".to_string();
797        let mut new_row_bytes = to_vec(&product!(row_id, row_val))?;
798        let new_row_len = new_row_bytes.len();
799        // Delete a single row via the index
800        let f = || -> Result<_> {
801            env.update(table_id, index_id, new_row_bytes.as_mut_slice())?;
802            Ok(())
803        };
804        let tx = begin_mut_tx(&db);
805        let (tx, res) = tx_slot.set(tx, f);
806
807        res?;
808
809        assert_eq!(new_row_len, tx.metrics.bytes_written);
810        Ok(())
811    }
812
813    #[test]
814    fn delete_by_index_metrics() -> Result<()> {
815        let db = relational_db()?;
816        let (env, _runtime) = instance_env(db.clone())?;
817
818        let (_, index_id) = create_table_with_index(&db)?;
819
820        let mut tx_slot = env.tx.clone();
821
822        // Delete a single row via the index
823        let f = || -> Result<_> {
824            let index_key = to_vec(&Bound::Included(AlgebraicValue::U64(3)))?;
825            env.datastore_delete_by_index_scan_range_bsatn(index_id, &[], 0.into(), &index_key, &index_key)?;
826            Ok(())
827        };
828        let tx = begin_mut_tx(&db);
829        let (tx, delete_result) = tx_slot.set(tx, f);
830
831        delete_result?;
832
833        assert_eq!(1, tx.metrics.index_seeks);
834        assert_eq!(1, tx.metrics.rows_scanned);
835        assert_eq!(0, tx.metrics.bytes_scanned);
836        assert_eq!(0, tx.metrics.bytes_written);
837        assert_eq!(0, tx.metrics.bytes_sent_to_clients);
838        Ok(())
839    }
840
841    #[test]
842    fn delete_by_value_metrics() -> Result<()> {
843        let db = relational_db()?;
844        let (env, _runtime) = instance_env(db.clone())?;
845
846        let (table_id, _) = create_table_with_index(&db)?;
847
848        let mut tx_slot = env.tx.clone();
849
850        let bsatn_rows = to_vec(&(3..=5).map(product_row).collect::<Vec<_>>())?;
851
852        // Delete 3 rows by value
853        let f = || -> Result<_> {
854            env.datastore_delete_all_by_eq_bsatn(table_id, &bsatn_rows)?;
855            Ok(())
856        };
857        let tx = begin_mut_tx(&db);
858        let (tx, delete_result) = tx_slot.set(tx, f);
859
860        delete_result?;
861
862        let bytes_scanned = bsatn_rows.len();
863
864        assert_eq!(0, tx.metrics.index_seeks);
865        assert_eq!(3, tx.metrics.rows_scanned);
866        assert_eq!(bytes_scanned, tx.metrics.bytes_scanned);
867        assert_eq!(0, tx.metrics.bytes_written);
868        assert_eq!(0, tx.metrics.bytes_sent_to_clients);
869        Ok(())
870    }
871}