1use super::scheduler::{get_schedule_from_row, ScheduleError, Scheduler};
2use crate::database_logger::{BacktraceProvider, LogLevel, Record};
3use crate::db::datastore::locking_tx_datastore::MutTxId;
4use crate::db::relational_db::{MutTx, RelationalDB};
5use crate::error::{DBError, DatastoreError, IndexError, NodesError};
6use crate::replica_context::ReplicaContext;
7use core::mem;
8use parking_lot::{Mutex, MutexGuard};
9use smallvec::SmallVec;
10use spacetimedb_lib::Timestamp;
11use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
12use spacetimedb_sats::{
13 bsatn::{self, ToBsatn},
14 buffer::{CountWriter, TeeWriter},
15 AlgebraicValue, ProductValue,
16};
17use spacetimedb_table::indexes::RowPointer;
18use spacetimedb_table::table::RowRef;
19use std::ops::DerefMut;
20use std::sync::Arc;
21
22#[derive(Clone)]
23pub struct InstanceEnv {
24 pub replica_ctx: Arc<ReplicaContext>,
25 pub scheduler: Scheduler,
26 pub tx: TxSlot,
27 pub start_time: Timestamp,
29}
30
31#[derive(Clone, Default)]
32pub struct TxSlot {
33 inner: Arc<Mutex<Option<MutTxId>>>,
34}
35
36const MAX_CHUNKS_IN_POOL: usize = 32;
47
48const MAX_CHUNK_SIZE_IN_BYTES: usize = spacetimedb_primitives::ROW_ITER_CHUNK_SIZE * 4;
59
60#[derive(Default)]
65pub struct ChunkPool {
66 free_chunks: Vec<Vec<u8>>,
67}
68
69impl ChunkPool {
70 fn take(&mut self) -> Vec<u8> {
75 self.free_chunks.pop().unwrap_or_default()
76 }
77
78 pub fn put(&mut self, mut chunk: Vec<u8>) {
87 if chunk.capacity() > MAX_CHUNK_SIZE_IN_BYTES {
88 return;
89 }
90 if self.free_chunks.len() > MAX_CHUNKS_IN_POOL {
91 return;
92 }
93 chunk.clear();
94 self.free_chunks.push(chunk);
95 }
96}
97
98struct ChunkedWriter {
103 chunks: Vec<Vec<u8>>,
105 curr: Vec<u8>,
107}
108
109impl ChunkedWriter {
110 fn flush(&mut self, pool: &mut ChunkPool) {
113 if self.curr.len() > spacetimedb_primitives::ROW_ITER_CHUNK_SIZE {
114 let curr = mem::replace(&mut self.curr, pool.take());
115 self.chunks.push(curr);
116 }
117 }
118
119 fn new(pool: &mut ChunkPool) -> Self {
121 Self {
122 chunks: Vec::new(),
123 curr: pool.take(),
124 }
125 }
126
127 fn into_chunks(mut self) -> Vec<Vec<u8>> {
129 if !self.curr.is_empty() {
130 self.chunks.push(self.curr);
131 }
132 self.chunks
133 }
134
135 pub fn collect_iter(
136 pool: &mut ChunkPool,
137 iter: impl Iterator<Item = impl ToBsatn>,
138 rows_scanned: &mut usize,
139 bytes_scanned: &mut usize,
140 ) -> Vec<Vec<u8>> {
141 let mut chunked_writer = Self::new(pool);
142 for item in iter {
145 item.to_bsatn_extend(&mut chunked_writer.curr).unwrap();
147 chunked_writer.flush(pool);
149 *rows_scanned += 1;
151 }
152
153 let chunks = chunked_writer.into_chunks();
154
155 *bytes_scanned += chunks.iter().map(|chunk| chunk.len()).sum::<usize>();
157
158 chunks
159 }
160}
161
162impl InstanceEnv {
164 pub fn new(replica_ctx: Arc<ReplicaContext>, scheduler: Scheduler) -> Self {
165 Self {
166 replica_ctx,
167 scheduler,
168 tx: TxSlot::default(),
169 start_time: Timestamp::now(),
170 }
171 }
172
173 pub fn start_reducer(&mut self, ts: Timestamp) {
175 self.start_time = ts;
176 }
177
178 fn get_tx(&self) -> Result<impl DerefMut<Target = MutTxId> + '_, GetTxError> {
179 self.tx.get()
180 }
181
182 #[tracing::instrument(level = "trace", skip_all)]
183 pub fn console_log(&self, level: LogLevel, record: &Record, bt: &dyn BacktraceProvider) {
184 self.replica_ctx.logger.write(level, record, bt);
185 log::trace!(
186 "MOD({}): {}",
187 self.replica_ctx.database_identity.to_abbreviated_hex(),
188 record.message
189 );
190 }
191
192 fn project_cols_bsatn(buffer: &mut [u8], cols: ColList, row_ref: RowRef<'_>) -> usize {
197 let counter = CountWriter::default();
200 let mut writer = TeeWriter::new(counter, buffer);
201 for col in cols.iter() {
202 let val = row_ref
204 .read_col::<AlgebraicValue>(col)
205 .expect("reading col as AV never panics");
206 bsatn::to_writer(&mut writer, &val).unwrap();
207 }
208 writer.w1.finish()
209 }
210
211 pub fn insert(&self, table_id: TableId, buffer: &mut [u8]) -> Result<usize, NodesError> {
212 let stdb = &*self.replica_ctx.relational_db;
213 let tx = &mut *self.get_tx()?;
214
215 let (row_len, row_ptr, insert_flags) = stdb
216 .insert(tx, table_id, buffer)
217 .map(|(gen_cols, row_ref, insert_flags)| {
218 let row_len = Self::project_cols_bsatn(buffer, gen_cols, row_ref);
219 (row_len, row_ref.pointer(), insert_flags)
220 })
221 .inspect_err(
222 #[cold]
223 #[inline(never)]
224 |e| match e {
225 DBError::Datastore(DatastoreError::Index(IndexError::UniqueConstraintViolation(_))) => {}
226 _ => {
227 let res = stdb.table_name_from_id_mut(tx, table_id);
228 if let Ok(Some(table_name)) = res {
229 log::debug!("insert(table: {table_name}, table_id: {table_id}): {e}")
230 } else {
231 log::debug!("insert(table_id: {table_id}): {e}")
232 }
233 }
234 },
235 )?;
236
237 if insert_flags.is_scheduler_table {
238 self.schedule_row(stdb, tx, table_id, row_ptr)?;
239 }
240
241 tx.metrics.bytes_written += buffer.len();
244
245 Ok(row_len)
246 }
247
248 #[cold]
249 #[inline(never)]
250 fn schedule_row(
251 &self,
252 stdb: &RelationalDB,
253 tx: &mut MutTx,
254 table_id: TableId,
255 row_ptr: RowPointer,
256 ) -> Result<(), NodesError> {
257 let (id_column, at_column) = stdb
258 .table_scheduled_id_and_at(tx, table_id)?
259 .expect("schedule_row should only be called when we know its a scheduler table");
260
261 let row_ref = tx.get(table_id, row_ptr).map_err(DBError::from)?.unwrap();
262 let (schedule_id, schedule_at) = get_schedule_from_row(&row_ref, id_column, at_column)
263 .map_err(|e| NodesError::ScheduleError(ScheduleError::DecodingError(e)))?;
266 self.scheduler
267 .schedule(
268 table_id,
269 schedule_id,
270 schedule_at,
271 id_column,
272 at_column,
273 self.start_time,
274 )
275 .map_err(NodesError::ScheduleError)?;
276
277 Ok(())
278 }
279
280 pub fn update(&self, table_id: TableId, index_id: IndexId, buffer: &mut [u8]) -> Result<usize, NodesError> {
281 let stdb = &*self.replica_ctx.relational_db;
282 let tx = &mut *self.get_tx()?;
283
284 let (row_len, row_ptr, update_flags) = stdb
285 .update(tx, table_id, index_id, buffer)
286 .map(|(gen_cols, row_ref, update_flags)| {
287 let row_len = Self::project_cols_bsatn(buffer, gen_cols, row_ref);
288 (row_len, row_ref.pointer(), update_flags)
289 })
290 .inspect_err(
291 #[cold]
292 #[inline(never)]
293 |e| match e {
294 DBError::Datastore(DatastoreError::Index(IndexError::UniqueConstraintViolation(_))) => {}
295 _ => {
296 let res = stdb.table_name_from_id_mut(tx, table_id);
297 if let Ok(Some(table_name)) = res {
298 log::debug!("update(table: {table_name}, table_id: {table_id}, index_id: {index_id}): {e}")
299 } else {
300 log::debug!("update(table_id: {table_id}, index_id: {index_id}): {e}")
301 }
302 }
303 },
304 )?;
305
306 if update_flags.is_scheduler_table {
307 self.schedule_row(stdb, tx, table_id, row_ptr)?;
308 }
309 tx.metrics.bytes_written += buffer.len();
310 tx.metrics.rows_updated += 1;
311
312 Ok(row_len)
313 }
314
315 #[tracing::instrument(level = "trace", skip_all)]
316 pub fn datastore_delete_by_index_scan_range_bsatn(
317 &self,
318 index_id: IndexId,
319 prefix: &[u8],
320 prefix_elems: ColId,
321 rstart: &[u8],
322 rend: &[u8],
323 ) -> Result<u32, NodesError> {
324 let stdb = &*self.replica_ctx.relational_db;
325 let tx = &mut *self.tx.get()?;
326
327 let (table_id, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
329 let rows_to_delete = iter.map(|row_ref| row_ref.pointer()).collect::<SmallVec<[_; 1]>>();
331
332 tx.metrics.index_seeks += 1;
338 tx.metrics.rows_scanned += rows_to_delete.len();
339
340 Ok(stdb.delete(tx, table_id, rows_to_delete))
342 }
343
344 #[tracing::instrument(level = "trace", skip(self, relation))]
353 pub fn datastore_delete_all_by_eq_bsatn(&self, table_id: TableId, relation: &[u8]) -> Result<u32, NodesError> {
354 let stdb = &*self.replica_ctx.relational_db;
355 let tx = &mut *self.get_tx()?;
356
357 tx.metrics.bytes_scanned += relation.len();
359
360 let row_ty = stdb.row_schema_for_table(tx, table_id)?;
362 let relation = ProductValue::decode_smallvec(&row_ty, &mut &*relation).map_err(NodesError::DecodeRow)?;
365
366 tx.metrics.rows_scanned += relation.len();
370
371 Ok(stdb.delete_by_rel(tx, table_id, relation))
373 }
374
375 #[tracing::instrument(level = "trace", skip_all)]
380 pub fn table_id_from_name(&self, table_name: &str) -> Result<TableId, NodesError> {
381 let stdb = &*self.replica_ctx.relational_db;
382 let tx = &mut *self.get_tx()?;
383
384 stdb.table_id_from_name_mut(tx, table_name)?
386 .ok_or(NodesError::TableNotFound)
387 }
388
389 #[tracing::instrument(level = "trace", skip_all)]
394 pub fn index_id_from_name(&self, index_name: &str) -> Result<IndexId, NodesError> {
395 let stdb = &*self.replica_ctx.relational_db;
396 let tx = &mut *self.get_tx()?;
397
398 stdb.index_id_from_name_mut(tx, index_name)?
400 .ok_or(NodesError::IndexNotFound)
401 }
402
403 #[tracing::instrument(level = "trace", skip_all)]
408 pub fn datastore_table_row_count(&self, table_id: TableId) -> Result<u64, NodesError> {
409 let stdb = &*self.replica_ctx.relational_db;
410 let tx = &mut *self.get_tx()?;
411
412 stdb.table_row_count_mut(tx, table_id).ok_or(NodesError::TableNotFound)
414 }
415
416 #[tracing::instrument(level = "trace", skip_all)]
417 pub fn datastore_table_scan_bsatn_chunks(
418 &self,
419 pool: &mut ChunkPool,
420 table_id: TableId,
421 ) -> Result<Vec<Vec<u8>>, NodesError> {
422 let stdb = &*self.replica_ctx.relational_db;
423 let tx = &mut *self.tx.get()?;
424
425 let mut rows_scanned = 0;
427 let mut bytes_scanned = 0;
428
429 let chunks = ChunkedWriter::collect_iter(
431 pool,
432 stdb.iter_mut(tx, table_id)?,
433 &mut rows_scanned,
434 &mut bytes_scanned,
435 );
436
437 tx.metrics.rows_scanned += rows_scanned;
438 tx.metrics.bytes_scanned += bytes_scanned;
439
440 Ok(chunks)
441 }
442
443 #[tracing::instrument(level = "trace", skip_all)]
444 pub fn datastore_index_scan_range_bsatn_chunks(
445 &self,
446 pool: &mut ChunkPool,
447 index_id: IndexId,
448 prefix: &[u8],
449 prefix_elems: ColId,
450 rstart: &[u8],
451 rend: &[u8],
452 ) -> Result<Vec<Vec<u8>>, NodesError> {
453 let stdb = &*self.replica_ctx.relational_db;
454 let tx = &mut *self.tx.get()?;
455
456 let mut rows_scanned = 0;
458 let mut bytes_scanned = 0;
459
460 let (_, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
462
463 let chunks = ChunkedWriter::collect_iter(pool, iter, &mut rows_scanned, &mut bytes_scanned);
465
466 tx.metrics.index_seeks += 1;
467 tx.metrics.rows_scanned += rows_scanned;
468 tx.metrics.bytes_scanned += bytes_scanned;
469
470 Ok(chunks)
471 }
472}
473
474impl TxSlot {
475 pub fn set<T>(&mut self, tx: MutTxId, f: impl FnOnce() -> T) -> (MutTxId, T) {
476 let prev = self.inner.lock().replace(tx);
477 assert!(prev.is_none(), "reentrant TxSlot::set");
478 let remove_tx = || self.inner.lock().take();
479
480 let res = {
481 scopeguard::defer_on_unwind! { remove_tx(); }
482 f()
483 };
484
485 let tx = remove_tx().expect("tx was removed during transaction");
486 (tx, res)
487 }
488
489 pub fn get(&self) -> Result<impl DerefMut<Target = MutTxId> + '_, GetTxError> {
490 MutexGuard::try_map(self.inner.lock(), |map| map.as_mut()).map_err(|_| GetTxError)
491 }
492}
493
494#[derive(Debug)]
495pub struct GetTxError;
496impl From<GetTxError> for NodesError {
497 fn from(_: GetTxError) -> Self {
498 NodesError::NotInTransaction
499 }
500}
501
502#[cfg(test)]
503mod test {
504 use super::*;
505
506 use std::{ops::Bound, sync::Arc};
507
508 use crate::{
509 database_logger::DatabaseLogger,
510 db::relational_db::{
511 tests_utils::{begin_mut_tx, with_auto_commit, with_read_only, TestDB},
512 RelationalDB,
513 },
514 host::Scheduler,
515 messages::control_db::{Database, HostType},
516 replica_context::ReplicaContext,
517 subscription::module_subscription_actor::ModuleSubscriptions,
518 };
519 use anyhow::{anyhow, Result};
520 use spacetimedb_lib::db::auth::StAccess;
521 use spacetimedb_lib::{bsatn::to_vec, AlgebraicType, AlgebraicValue, Hash, Identity, ProductValue};
522 use spacetimedb_paths::{server::ModuleLogsDir, FromPathUnchecked};
523 use spacetimedb_primitives::{IndexId, TableId};
524 use spacetimedb_sats::product;
525 use tempfile::TempDir;
526
527 fn temp_logger() -> Result<DatabaseLogger> {
529 let temp = TempDir::new()?;
530 let path = ModuleLogsDir::from_path_unchecked(temp.into_path());
531 let path = path.today();
532 Ok(DatabaseLogger::open(path))
533 }
534
535 fn replica_ctx(relational_db: Arc<RelationalDB>) -> Result<(ReplicaContext, tokio::runtime::Runtime)> {
538 let (subs, runtime) = ModuleSubscriptions::for_test_new_runtime(relational_db.clone());
539 Ok((
540 ReplicaContext {
541 database: Database {
542 id: 0,
543 database_identity: Identity::ZERO,
544 owner_identity: Identity::ZERO,
545 host_type: HostType::Wasm,
546 initial_program: Hash::ZERO,
547 },
548 replica_id: 0,
549 logger: Arc::new(temp_logger()?),
550 subscriptions: subs,
551 relational_db,
552 },
553 runtime,
554 ))
555 }
556
557 fn instance_env(db: Arc<RelationalDB>) -> Result<(InstanceEnv, tokio::runtime::Runtime)> {
559 let (scheduler, _) = Scheduler::open(db.clone());
560 let (replica_context, runtime) = replica_ctx(db)?;
561 Ok((
562 InstanceEnv {
563 replica_ctx: Arc::new(replica_context),
564 scheduler,
565 tx: TxSlot::default(),
566 start_time: Timestamp::now(),
567 },
568 runtime,
569 ))
570 }
571
572 fn relational_db() -> Result<Arc<RelationalDB>> {
575 let TestDB { db, .. } = TestDB::in_memory()?;
576 Ok(Arc::new(db))
577 }
578
579 fn product_row(i: usize) -> ProductValue {
581 let str = i.to_string();
582 let str = str.repeat(i);
583 let id = i as u64;
584 product!(id, str)
585 }
586
587 fn bsatn_row(i: usize) -> Result<Vec<u8>> {
589 Ok(to_vec(&product_row(i))?)
590 }
591
592 fn create_table_with_index(db: &RelationalDB) -> Result<(TableId, IndexId)> {
606 let table_id = db.create_table_for_test(
607 "t",
608 &[("id", AlgebraicType::U64), ("str", AlgebraicType::String)],
609 &[0.into()],
610 )?;
611 let index_id = with_read_only(db, |tx| {
612 db.schema_for_table(tx, table_id)?
613 .indexes
614 .iter()
615 .find(|schema| {
616 schema
617 .index_algorithm
618 .columns()
619 .as_singleton()
620 .is_some_and(|col_id| col_id.idx() == 0)
621 })
622 .map(|schema| schema.index_id)
623 .ok_or_else(|| anyhow!("Index not found for ColId `{}`", 0))
624 })?;
625 with_auto_commit(db, |tx| -> Result<_> {
626 for i in 1..=5 {
627 db.insert(tx, table_id, &bsatn_row(i)?)?;
628 }
629 Ok(())
630 })?;
631 Ok((table_id, index_id))
632 }
633
634 fn create_table_with_unique_index(db: &RelationalDB) -> Result<(TableId, IndexId)> {
635 let table_id = db.create_table_for_test_with_the_works(
636 "t",
637 &[("id", AlgebraicType::U64), ("str", AlgebraicType::String)],
638 &[0.into()],
639 &[0.into()],
640 StAccess::Public,
641 )?;
642 let index_id = with_read_only(db, |tx| {
643 db.schema_for_table(tx, table_id)?
644 .indexes
645 .iter()
646 .find(|schema| {
647 schema
648 .index_algorithm
649 .columns()
650 .as_singleton()
651 .is_some_and(|col_id| col_id.idx() == 0)
652 })
653 .map(|schema| schema.index_id)
654 .ok_or_else(|| anyhow!("Index not found for ColId `{}`", 0))
655 })?;
656 with_auto_commit(db, |tx| -> Result<_> {
657 for i in 1..=5 {
658 db.insert(tx, table_id, &bsatn_row(i)?)?;
659 }
660 Ok(())
661 })?;
662 Ok((table_id, index_id))
663 }
664
665 #[test]
666 fn table_scan_metrics() -> Result<()> {
667 let db = relational_db()?;
668 let (env, _runtime) = instance_env(db.clone())?;
669
670 let (table_id, _) = create_table_with_index(&db)?;
671
672 let mut tx_slot = env.tx.clone();
673
674 let f = || env.datastore_table_scan_bsatn_chunks(&mut ChunkPool::default(), table_id);
675 let tx = begin_mut_tx(&db);
676 let (tx, scan_result) = tx_slot.set(tx, f);
677
678 scan_result?;
679
680 let bytes_scanned = (1..=5)
681 .map(bsatn_row)
682 .filter_map(|bsatn_result| bsatn_result.ok())
683 .map(|bsatn| bsatn.len())
684 .sum::<usize>();
685
686 assert_eq!(0, tx.metrics.index_seeks);
690 assert_eq!(5, tx.metrics.rows_scanned);
691 assert_eq!(bytes_scanned, tx.metrics.bytes_scanned);
692 assert_eq!(0, tx.metrics.bytes_written);
693 assert_eq!(0, tx.metrics.bytes_sent_to_clients);
694 Ok(())
695 }
696
697 #[test]
698 fn index_scan_metrics() -> Result<()> {
699 let db = relational_db()?;
700 let (env, _runtime) = instance_env(db.clone())?;
701
702 let (_, index_id) = create_table_with_index(&db)?;
703
704 let mut tx_slot = env.tx.clone();
705
706 let f = || -> Result<_> {
708 let index_key_3 = to_vec(&Bound::Included(AlgebraicValue::U64(3)))?;
709 let index_key_5 = to_vec(&Bound::Included(AlgebraicValue::U64(5)))?;
710 env.datastore_index_scan_range_bsatn_chunks(
711 &mut ChunkPool::default(),
712 index_id,
713 &[],
714 0.into(),
715 &index_key_3,
716 &index_key_3,
717 )?;
718 env.datastore_index_scan_range_bsatn_chunks(
719 &mut ChunkPool::default(),
720 index_id,
721 &[],
722 0.into(),
723 &index_key_5,
724 &index_key_5,
725 )?;
726 Ok(())
727 };
728 let tx = begin_mut_tx(&db);
729 let (tx, scan_result) = tx_slot.set(tx, f);
730
731 scan_result?;
732
733 let bytes_scanned = [3, 5]
734 .into_iter()
735 .map(bsatn_row)
736 .filter_map(|bsatn_result| bsatn_result.ok())
737 .map(|bsatn| bsatn.len())
738 .sum::<usize>();
739
740 assert_eq!(2, tx.metrics.index_seeks);
742 assert_eq!(2, tx.metrics.rows_scanned);
743 assert_eq!(bytes_scanned, tx.metrics.bytes_scanned);
744 assert_eq!(0, tx.metrics.bytes_written);
745 assert_eq!(0, tx.metrics.bytes_sent_to_clients);
746 Ok(())
747 }
748
749 #[test]
750 fn insert_metrics() -> Result<()> {
751 let db = relational_db()?;
752 let (env, _runtime) = instance_env(db.clone())?;
753
754 let (table_id, _) = create_table_with_index(&db)?;
755
756 let mut tx_slot = env.tx.clone();
757
758 let f = || -> Result<_> {
760 for i in 6..=9 {
761 let mut buffer = bsatn_row(i)?;
762 env.insert(table_id, &mut buffer)?;
763 }
764 Ok(())
765 };
766 let tx = begin_mut_tx(&db);
767 let (tx, insert_result) = tx_slot.set(tx, f);
768
769 insert_result?;
770
771 let bytes_written = (6..=9)
772 .map(bsatn_row)
773 .filter_map(|bsatn_result| bsatn_result.ok())
774 .map(|bsatn| bsatn.len())
775 .sum::<usize>();
776
777 assert_eq!(0, tx.metrics.index_seeks);
779 assert_eq!(0, tx.metrics.rows_scanned);
780 assert_eq!(0, tx.metrics.bytes_scanned);
781 assert_eq!(bytes_written, tx.metrics.bytes_written);
782 assert_eq!(0, tx.metrics.bytes_sent_to_clients);
783 Ok(())
784 }
785
786 #[test]
787 fn update_metrics() -> Result<()> {
788 let db = relational_db()?;
789 let (env, _runtime) = instance_env(db.clone())?;
790
791 let (table_id, index_id) = create_table_with_unique_index(&db)?;
792
793 let mut tx_slot = env.tx.clone();
794
795 let row_id: u64 = 1;
796 let row_val: String = "string".to_string();
797 let mut new_row_bytes = to_vec(&product!(row_id, row_val))?;
798 let new_row_len = new_row_bytes.len();
799 let f = || -> Result<_> {
801 env.update(table_id, index_id, new_row_bytes.as_mut_slice())?;
802 Ok(())
803 };
804 let tx = begin_mut_tx(&db);
805 let (tx, res) = tx_slot.set(tx, f);
806
807 res?;
808
809 assert_eq!(new_row_len, tx.metrics.bytes_written);
810 Ok(())
811 }
812
813 #[test]
814 fn delete_by_index_metrics() -> Result<()> {
815 let db = relational_db()?;
816 let (env, _runtime) = instance_env(db.clone())?;
817
818 let (_, index_id) = create_table_with_index(&db)?;
819
820 let mut tx_slot = env.tx.clone();
821
822 let f = || -> Result<_> {
824 let index_key = to_vec(&Bound::Included(AlgebraicValue::U64(3)))?;
825 env.datastore_delete_by_index_scan_range_bsatn(index_id, &[], 0.into(), &index_key, &index_key)?;
826 Ok(())
827 };
828 let tx = begin_mut_tx(&db);
829 let (tx, delete_result) = tx_slot.set(tx, f);
830
831 delete_result?;
832
833 assert_eq!(1, tx.metrics.index_seeks);
834 assert_eq!(1, tx.metrics.rows_scanned);
835 assert_eq!(0, tx.metrics.bytes_scanned);
836 assert_eq!(0, tx.metrics.bytes_written);
837 assert_eq!(0, tx.metrics.bytes_sent_to_clients);
838 Ok(())
839 }
840
841 #[test]
842 fn delete_by_value_metrics() -> Result<()> {
843 let db = relational_db()?;
844 let (env, _runtime) = instance_env(db.clone())?;
845
846 let (table_id, _) = create_table_with_index(&db)?;
847
848 let mut tx_slot = env.tx.clone();
849
850 let bsatn_rows = to_vec(&(3..=5).map(product_row).collect::<Vec<_>>())?;
851
852 let f = || -> Result<_> {
854 env.datastore_delete_all_by_eq_bsatn(table_id, &bsatn_rows)?;
855 Ok(())
856 };
857 let tx = begin_mut_tx(&db);
858 let (tx, delete_result) = tx_slot.set(tx, f);
859
860 delete_result?;
861
862 let bytes_scanned = bsatn_rows.len();
863
864 assert_eq!(0, tx.metrics.index_seeks);
865 assert_eq!(3, tx.metrics.rows_scanned);
866 assert_eq!(bytes_scanned, tx.metrics.bytes_scanned);
867 assert_eq!(0, tx.metrics.bytes_written);
868 assert_eq!(0, tx.metrics.bytes_sent_to_clients);
869 Ok(())
870 }
871}