Skip to main content

wasm_dbms/
database.rs

1// Rust guideline compliant 2026-03-01
2// X-WHERE-CLAUSE, M-CANONICAL-DOCS, M-PANIC-ON-BUG
3
4//! Core DBMS database struct providing CRUD and transaction operations.
5
6mod filter_analyzer;
7mod index_reader;
8
9use std::cmp::Ordering;
10use std::collections::HashSet;
11
12use wasm_dbms_api::prelude::{
13    CandidColumnDef, ColumnDef, DataTypeKind, Database, DbmsError, DbmsResult, DeleteBehavior,
14    Filter, ForeignFetcher, ForeignKeyDef, InsertRecord, OrderDirection, Query, QueryError,
15    TableColumns, TableError, TableRecord, TableSchema, TransactionError, TransactionId,
16    UpdateRecord, Value, ValuesSource,
17};
18use wasm_dbms_memory::RecordAddress;
19use wasm_dbms_memory::prelude::{
20    AccessControl, AccessControlList, MemoryAccess, MemoryProvider, NextRecord, TableRegistry,
21};
22
23use self::filter_analyzer::{IndexPlan, analyze_filter};
24use self::index_reader::{IndexReader, IndexSearchResult};
25use crate::context::DbmsContext;
26use crate::schema::DatabaseSchema;
27use crate::transaction::journal::{Journal, JournaledWriter};
28use crate::transaction::{DatabaseOverlay, Transaction, TransactionOp};
29
30/// Default capacity for SELECT queries.
31const DEFAULT_SELECT_CAPACITY: usize = 128;
32
33/// The main DBMS database struct, generic over `MemoryProvider` and
34/// `AccessControl`.
35///
36/// This struct borrows from a [`DbmsContext`] and provides all CRUD
37/// operations, transaction management, and query execution.
38pub struct WasmDbmsDatabase<'ctx, M, A = AccessControlList>
39where
40    M: MemoryProvider,
41    A: AccessControl,
42{
43    /// Reference to the DBMS context owning all state.
44    ctx: &'ctx DbmsContext<M, A>,
45    /// Schema for dynamic dispatch of table operations.
46    schema: Box<dyn DatabaseSchema<M, A> + 'ctx>,
47    /// Active transaction ID, if any.
48    transaction: Option<TransactionId>,
49}
50
51impl<'ctx, M, A> WasmDbmsDatabase<'ctx, M, A>
52where
53    M: MemoryProvider,
54    A: AccessControl,
55{
56    /// Creates a one-shot (non-transactional) database instance.
57    pub fn oneshot(ctx: &'ctx DbmsContext<M, A>, schema: impl DatabaseSchema<M, A> + 'ctx) -> Self {
58        Self {
59            ctx,
60            schema: Box::new(schema),
61            transaction: None,
62        }
63    }
64
65    /// Creates a transactional database instance.
66    pub fn from_transaction(
67        ctx: &'ctx DbmsContext<M, A>,
68        schema: impl DatabaseSchema<M, A> + 'ctx,
69        transaction_id: TransactionId,
70    ) -> Self {
71        Self {
72            ctx,
73            schema: Box::new(schema),
74            transaction: Some(transaction_id),
75        }
76    }
77
78    /// Executes a closure with a mutable reference to the current transaction.
79    fn with_transaction_mut<F, R>(&self, f: F) -> DbmsResult<R>
80    where
81        F: FnOnce(&mut Transaction) -> DbmsResult<R>,
82    {
83        let txid = self.transaction.as_ref().ok_or(DbmsError::Transaction(
84            TransactionError::NoActiveTransaction,
85        ))?;
86
87        let mut ts = self.ctx.transaction_session.borrow_mut();
88        let tx = ts.get_transaction_mut(txid)?;
89        f(tx)
90    }
91
92    /// Executes a closure with a reference to the current transaction.
93    fn with_transaction<F, R>(&self, f: F) -> DbmsResult<R>
94    where
95        F: FnOnce(&Transaction) -> DbmsResult<R>,
96    {
97        let txid = self.transaction.as_ref().ok_or(DbmsError::Transaction(
98            TransactionError::NoActiveTransaction,
99        ))?;
100
101        let ts = self.ctx.transaction_session.borrow();
102        let tx = ts.get_transaction(txid)?;
103        f(tx)
104    }
105
106    /// Executes a closure atomically using a write-ahead journal.
107    ///
108    /// All writes performed inside `f` are recorded. On success the journal
109    /// is committed (entries discarded). On error the journal is rolled back,
110    /// restoring every modified byte to its pre-call state.
111    ///
112    /// When a journal is already active (e.g., inside [`Database::commit`]),
113    /// this method delegates to the outer journal and does not manage its own.
114    ///
115    /// # Panics
116    ///
117    /// Panics if the rollback itself fails, because a failed rollback leaves
118    /// memory in an irrecoverably corrupt state (M-PANIC-ON-BUG).
119    fn atomic<F, R>(&self, f: F) -> DbmsResult<R>
120    where
121        F: FnOnce(&WasmDbmsDatabase<'ctx, M, A>) -> DbmsResult<R>,
122    {
123        let nested = self.ctx.journal.borrow().is_some();
124        if !nested {
125            *self.ctx.journal.borrow_mut() = Some(Journal::new());
126        }
127        match f(self) {
128            Ok(res) => {
129                if !nested && let Some(journal) = self.ctx.journal.borrow_mut().take() {
130                    journal.commit();
131                }
132                Ok(res)
133            }
134            Err(err) => {
135                if !nested && let Some(journal) = self.ctx.journal.borrow_mut().take() {
136                    journal
137                        .rollback(&mut self.ctx.mm.borrow_mut())
138                        .expect("critical: failed to rollback journal");
139                }
140                Err(err)
141            }
142        }
143    }
144
145    /// Checks whether any foreign key references exist for the given record.
146    ///
147    /// Returns `true` if at least one referencing row exists in any table.
148    fn has_foreign_key_references<T>(
149        &self,
150        record_values: &[(ColumnDef, Value)],
151    ) -> DbmsResult<bool>
152    where
153        T: TableSchema,
154    {
155        let pk = Self::extract_pk::<T>(record_values)?;
156
157        for (table, columns) in self.schema.referenced_tables(T::table_name()) {
158            for column in columns.iter() {
159                let filter = Filter::eq(column, pk.clone());
160                let query = Query::builder().field(column).filter(Some(filter)).build();
161                let rows = self.schema.select(self, table, query)?;
162                if !rows.is_empty() {
163                    return Ok(true);
164                }
165            }
166        }
167        Ok(false)
168    }
169
170    /// Deletes foreign key related records recursively for cascade deletes.
171    fn delete_foreign_keys_cascade<T>(
172        &self,
173        record_values: &[(ColumnDef, Value)],
174    ) -> DbmsResult<u64>
175    where
176        T: TableSchema,
177    {
178        let pk = Self::extract_pk::<T>(record_values)?;
179
180        let mut count = 0;
181        for (table, columns) in self.schema.referenced_tables(T::table_name()) {
182            for column in columns.iter() {
183                let filter = Filter::eq(column, pk.clone());
184                let res = self
185                    .schema
186                    .delete(self, table, DeleteBehavior::Cascade, Some(filter))?;
187                count += res;
188            }
189        }
190        Ok(count)
191    }
192
193    /// Extracts the primary key value from a record's column-value pairs.
194    fn extract_pk<T>(record_values: &[(ColumnDef, Value)]) -> DbmsResult<Value>
195    where
196        T: TableSchema,
197    {
198        record_values
199            .iter()
200            .find(|(col_def, _)| col_def.primary_key)
201            .ok_or(DbmsError::Query(QueryError::UnknownColumn(
202                T::primary_key().to_string(),
203            )))
204            .map(|(_, v)| v.clone())
205    }
206
207    /// Retrieves the current overlay from the active transaction.
208    fn overlay(&self) -> DbmsResult<DatabaseOverlay> {
209        self.with_transaction(|tx| Ok(tx.overlay().clone()))
210    }
211
212    /// Returns whether the record matches the provided filter.
213    fn record_matches_filter(
214        &self,
215        record_values: &[(ColumnDef, Value)],
216        filter: &Filter,
217    ) -> DbmsResult<bool> {
218        filter.matches(record_values).map_err(DbmsError::from)
219    }
220
221    /// Filters record columns down to only the selected fields.
222    fn apply_column_selection<T>(&self, results: &mut [TableColumns], query: &Query)
223    where
224        T: TableSchema,
225    {
226        if query.all_selected() {
227            return;
228        }
229        let selected_columns = query.columns::<T>();
230        results
231            .iter_mut()
232            .flat_map(|record| record.iter_mut())
233            .filter(|(source, _)| *source == ValuesSource::This)
234            .for_each(|(_, cols)| {
235                cols.retain(|(col_def, _)| selected_columns.contains(&col_def.name.to_string()));
236            });
237    }
238
239    /// Batch-fetches eager relations for collected results.
240    fn batch_load_eager_relations<T>(
241        &self,
242        results: &mut [TableColumns],
243        query: &Query,
244    ) -> DbmsResult<()>
245    where
246        T: TableSchema,
247    {
248        if query.eager_relations.is_empty() {
249            return Ok(());
250        }
251
252        let fetcher = T::foreign_fetcher();
253
254        for relation in &query.eager_relations {
255            let fk_columns = Self::collect_fk_values::<T>(results, relation)?;
256
257            for (local_column, pk_values) in &fk_columns {
258                let batch_map = fetcher.fetch_batch(self, relation, pk_values)?;
259
260                Self::verify_fk_batch(&batch_map, pk_values, relation)?;
261                Self::attach_foreign_data(results, &batch_map, relation, local_column);
262            }
263        }
264
265        Ok(())
266    }
267
268    /// Collects distinct FK values across all records for a given relation.
269    fn collect_fk_values<T>(
270        results: &[TableColumns],
271        relation: &str,
272    ) -> DbmsResult<Vec<(&'static str, Vec<Value>)>>
273    where
274        T: TableSchema,
275    {
276        let mut fk_columns: Vec<(&'static str, HashSet<Value>)> = vec![];
277
278        for record_columns in results {
279            let Some(cols) = Self::this_columns(record_columns) else {
280                continue;
281            };
282
283            let mut found_fk = false;
284            for (col_def, value) in cols {
285                let Some(fk) = &col_def.foreign_key else {
286                    continue;
287                };
288                if *fk.foreign_table != *relation {
289                    continue;
290                }
291
292                found_fk = true;
293                match fk_columns.iter_mut().find(|(lc, _)| *lc == fk.local_column) {
294                    Some((_, values)) => {
295                        values.insert(value.clone());
296                    }
297                    None => {
298                        let mut set = HashSet::new();
299                        set.insert(value.clone());
300                        fk_columns.push((fk.local_column, set));
301                    }
302                }
303            }
304
305            if !found_fk {
306                return Err(DbmsError::Query(QueryError::InvalidQuery(format!(
307                    "Cannot load relation '{relation}' for table '{}': no foreign key found",
308                    T::table_name()
309                ))));
310            }
311        }
312
313        Ok(fk_columns
314            .into_iter()
315            .map(|(col, set)| (col, set.into_iter().collect()))
316            .collect())
317    }
318
319    /// Verifies all FK values were found in the batch result.
320    fn verify_fk_batch(
321        batch_map: &std::collections::HashMap<Value, Vec<(ColumnDef, Value)>>,
322        pk_values: &[Value],
323        relation: &str,
324    ) -> DbmsResult<()> {
325        if let Some(missing) = pk_values.iter().find(|v| !batch_map.contains_key(v)) {
326            return Err(DbmsError::Query(QueryError::BrokenForeignKeyReference {
327                table: relation.to_string(),
328                key: missing.clone(),
329            }));
330        }
331        Ok(())
332    }
333
334    /// Attaches batch-fetched foreign data to each record.
335    fn attach_foreign_data(
336        results: &mut [TableColumns],
337        batch_map: &std::collections::HashMap<Value, Vec<(ColumnDef, Value)>>,
338        relation: &str,
339        local_column: &str,
340    ) {
341        for record_columns in results.iter_mut() {
342            let fk_value = Self::this_columns(record_columns).and_then(|cols| {
343                cols.iter().find_map(|(col_def, value)| {
344                    let fk = col_def.foreign_key.as_ref()?;
345                    (fk.foreign_table == relation && fk.local_column == local_column)
346                        .then(|| value.clone())
347                })
348            });
349
350            let Some(fk_val) = fk_value else { continue };
351            let Some(foreign_values) = batch_map.get(&fk_val) else {
352                continue;
353            };
354
355            record_columns.push((
356                ValuesSource::Foreign {
357                    table: relation.to_string(),
358                    column: local_column.to_string(),
359                },
360                foreign_values.clone(),
361            ));
362        }
363    }
364
365    /// Extracts the `ValuesSource::This` columns from a record.
366    fn this_columns(
367        record: &[(ValuesSource, Vec<(ColumnDef, Value)>)],
368    ) -> Option<&Vec<(ColumnDef, Value)>> {
369        record
370            .iter()
371            .find(|(src, _)| *src == ValuesSource::This)
372            .map(|(_, cols)| cols)
373    }
374
375    /// Retrieves existing rows matching a filter, returning `(primary_key, full_row)` pairs.
376    #[expect(
377        clippy::type_complexity,
378        reason = "complex return type is necessary for returning both PK and full row data"
379    )]
380    fn existing_rows_for_filter<T>(
381        &self,
382        filter: Option<Filter>,
383    ) -> DbmsResult<Vec<(Value, Vec<(ColumnDef, Value)>)>>
384    where
385        T: TableSchema,
386    {
387        let pk = T::primary_key();
388        let query = Query::builder().filter(filter).build();
389        let records = self.select::<T>(query)?;
390        let rows = records
391            .into_iter()
392            .map(|record| {
393                let values = record.to_values();
394                let pk_value = values
395                    .iter()
396                    .find(|(col_def, _)| col_def.name == pk)
397                    .expect("primary key not found")
398                    .1
399                    .clone();
400                (pk_value, values)
401            })
402            .collect();
403
404        Ok(rows)
405    }
406
407    /// Loads the table registry for a given table schema.
408    fn load_table_registry<T>(&self) -> DbmsResult<TableRegistry>
409    where
410        T: TableSchema,
411    {
412        let sr = self.ctx.schema_registry.borrow();
413        let registry_pages = sr
414            .table_registry_page::<T>()
415            .ok_or(DbmsError::Table(TableError::TableNotFound))?;
416
417        let mut mm = self.ctx.mm.borrow_mut();
418        TableRegistry::load(registry_pages, &mut *mm).map_err(DbmsError::from)
419    }
420
421    /// Sorts query results by a column.
422    fn sort_query_results(
423        &self,
424        results: &mut [TableColumns],
425        column: &str,
426        direction: OrderDirection,
427    ) {
428        results.sort_by(|a, b| {
429            fn get_value<'a>(
430                values: &'a [(ValuesSource, Vec<(ColumnDef, Value)>)],
431                column: &str,
432            ) -> Option<&'a Value> {
433                values
434                    .iter()
435                    .find(|(source, _)| *source == ValuesSource::This)
436                    .and_then(|(_, cols)| {
437                        cols.iter()
438                            .find(|(col_def, _)| col_def.name == column)
439                            .map(|(_, value)| value)
440                    })
441            }
442
443            let a_value = get_value(a, column);
444            let b_value = get_value(b, column);
445
446            sort_values_with_direction(a_value, b_value, direction)
447        });
448    }
449
450    fn execute_index_plan<MA>(
451        &self,
452        reader: &IndexReader<'_>,
453        plan: &IndexPlan,
454        mm: &mut MA,
455    ) -> DbmsResult<IndexSearchResult>
456    where
457        MA: MemoryAccess,
458    {
459        let columns = [plan.column()];
460        match plan {
461            IndexPlan::Eq { value, .. } => {
462                let key = [value.clone()];
463                reader
464                    .search_eq(&columns, &key, mm)
465                    .map_err(DbmsError::from)
466            }
467            IndexPlan::Range { start, end, .. } => {
468                let start_key = start.as_ref().map(|value| vec![value.clone()]);
469                let end_key = end.as_ref().map(|value| vec![value.clone()]);
470                reader
471                    .search_range(&columns, start_key.as_deref(), end_key.as_deref(), mm)
472                    .map_err(DbmsError::from)
473            }
474            IndexPlan::In { values, .. } => {
475                let keys: Vec<Vec<Value>> =
476                    values.iter().cloned().map(|value| vec![value]).collect();
477                reader
478                    .search_in(&columns, &keys, mm)
479                    .map_err(DbmsError::from)
480            }
481        }
482    }
483
484    #[expect(
485        clippy::type_complexity,
486        reason = "complex return type is necessary for returning addresses and overlay PKs"
487    )]
488    fn try_index_select<T>(
489        &self,
490        query: &Query,
491        table_registry: &TableRegistry,
492        table_overlay: &DatabaseOverlay,
493    ) -> DbmsResult<Option<Vec<Vec<(ColumnDef, Value)>>>>
494    where
495        T: TableSchema,
496    {
497        let Some(filter) = &query.filter else {
498            return Ok(None);
499        };
500
501        let Some(analyzed) = analyze_filter(filter, T::indexes()) else {
502            return Ok(None);
503        };
504
505        let mut mm = self.ctx.mm.borrow_mut();
506        let reader = IndexReader::new(
507            table_registry.index_ledger(),
508            table_overlay.index_overlay(T::table_name()),
509        );
510        let search_result = self.execute_index_plan(&reader, &analyzed.plan, &mut *mm)?;
511
512        let mut indexed_rows = Vec::new();
513        let pk_name = T::primary_key();
514
515        for address in &search_result.addresses {
516            let record: T = table_registry
517                .read_at(*address, &mut *mm)
518                .map_err(DbmsError::from)?;
519            let values = record.to_values();
520            let Some(pk) = values
521                .iter()
522                .find(|(column, _)| column.name == pk_name)
523                .map(|(_, value)| value)
524            else {
525                continue;
526            };
527
528            if search_result.removed_pks.contains(pk) || search_result.overlay_pks.contains(pk) {
529                continue;
530            }
531
532            if let Some(remaining_filter) = &analyzed.remaining_filter
533                && !self.record_matches_filter(&values, remaining_filter)?
534            {
535                continue;
536            }
537
538            indexed_rows.push(values);
539        }
540
541        if let Some(overlay) = table_overlay.table_overlay(T::table_name()) {
542            let mut pending_overlay_pks = search_result.overlay_pks.clone();
543
544            for row in overlay.iter_inserted() {
545                let Some(pk) = row
546                    .iter()
547                    .find(|(column, _)| column.name == pk_name)
548                    .map(|(_, value)| value)
549                else {
550                    continue;
551                };
552
553                if !pending_overlay_pks.remove(pk) {
554                    continue;
555                }
556                if let Some(remaining_filter) = &analyzed.remaining_filter
557                    && !self.record_matches_filter(&row, remaining_filter)?
558                {
559                    continue;
560                }
561
562                indexed_rows.push(row);
563            }
564
565            if !pending_overlay_pks.is_empty() {
566                let pk_reader = IndexReader::new(table_registry.index_ledger(), None);
567                let pk_columns = [T::primary_key()];
568
569                for pk in pending_overlay_pks {
570                    let pk_key = [pk];
571                    let pk_lookup = pk_reader.search_eq(&pk_columns, &pk_key, &mut *mm)?;
572                    for address in pk_lookup.addresses {
573                        let record: T = table_registry
574                            .read_at(address, &mut *mm)
575                            .map_err(DbmsError::from)?;
576                        let values = record.to_values();
577                        let Some(patched_values) = overlay.patch_row(values) else {
578                            continue;
579                        };
580
581                        if let Some(remaining_filter) = &analyzed.remaining_filter
582                            && !self.record_matches_filter(&patched_values, remaining_filter)?
583                        {
584                            continue;
585                        }
586
587                        indexed_rows.push(patched_values);
588                    }
589                }
590            }
591        }
592
593        Ok(Some(indexed_rows))
594    }
595
596    /// Core select logic returning intermediate `TableColumns`.
597    #[doc(hidden)]
598    pub fn select_columns<T>(&self, query: Query) -> DbmsResult<Vec<TableColumns>>
599    where
600        T: TableSchema,
601    {
602        let table_registry = self.load_table_registry::<T>()?;
603        let mut table_overlay = if self.transaction.is_some() {
604            self.overlay()?
605        } else {
606            DatabaseOverlay::default()
607        };
608
609        let mut results = Vec::with_capacity(query.limit.unwrap_or(DEFAULT_SELECT_CAPACITY));
610        let mut count = 0;
611
612        if let Some(indexed_rows) =
613            self.try_index_select::<T>(&query, &table_registry, &table_overlay)?
614        {
615            for values in indexed_rows {
616                count += 1;
617                if query.offset.is_some_and(|offset| count <= offset) {
618                    continue;
619                }
620                results.push(vec![(ValuesSource::This, values)]);
621                if query.limit.is_some_and(|limit| results.len() >= limit) {
622                    break;
623                }
624            }
625        } else {
626            let mut mm = self.ctx.mm.borrow_mut();
627            let table_reader = table_registry.read::<T, _>(&mut *mm);
628            let mut table_reader = table_overlay.reader(table_reader);
629
630            while let Some(values) = table_reader.try_next()? {
631                if let Some(filter) = &query.filter
632                    && !self.record_matches_filter(&values, filter)?
633                {
634                    continue;
635                }
636                count += 1;
637                if query.offset.is_some_and(|offset| count <= offset) {
638                    continue;
639                }
640                results.push(vec![(ValuesSource::This, values)]);
641                if query.limit.is_some_and(|limit| results.len() >= limit) {
642                    break;
643                }
644            }
645        }
646
647        self.batch_load_eager_relations::<T>(&mut results, &query)?;
648        self.apply_column_selection::<T>(&mut results, &query);
649
650        for (column, direction) in query.order_by.into_iter().rev() {
651            self.sort_query_results(&mut results, &column, direction);
652        }
653
654        Ok(results)
655    }
656
657    /// Executes a join query.
658    #[doc(hidden)]
659    pub fn select_join(
660        &self,
661        table: &str,
662        query: Query,
663    ) -> DbmsResult<Vec<Vec<(CandidColumnDef, Value)>>> {
664        self.schema.select_join(self, table, query)
665    }
666
667    /// Updates primary key references in tables referencing the updated table.
668    fn update_pk_referencing_updated_table<T>(
669        &self,
670        old_pk: Value,
671        new_pk: Value,
672        data_type: DataTypeKind,
673        pk_name: &'static str,
674    ) -> DbmsResult<u64>
675    where
676        T: TableSchema,
677    {
678        let mut count = 0;
679        for (ref_table, ref_col) in self
680            .schema
681            .referenced_tables(T::table_name())
682            .into_iter()
683            .flat_map(|(ref_table, ref_cols)| {
684                ref_cols
685                    .into_iter()
686                    .map(move |ref_col| (ref_table, ref_col))
687            })
688        {
689            let ref_patch_value = (
690                ColumnDef {
691                    name: ref_col,
692                    data_type,
693                    auto_increment: false,
694                    nullable: false,
695                    primary_key: false,
696                    unique: false,
697                    foreign_key: Some(ForeignKeyDef {
698                        foreign_table: T::table_name(),
699                        foreign_column: pk_name,
700                        local_column: ref_col,
701                    }),
702                },
703                new_pk.clone(),
704            );
705            let filter = Filter::eq(ref_col, old_pk.clone());
706
707            count += self
708                .schema
709                .update(self, ref_table, &[ref_patch_value], Some(filter))?;
710        }
711
712        Ok(count)
713    }
714
715    /// Sanitizes values using the table schema's sanitizers.
716    fn sanitize_values<T>(
717        &self,
718        values: Vec<(ColumnDef, Value)>,
719    ) -> DbmsResult<Vec<(ColumnDef, Value)>>
720    where
721        T: TableSchema,
722    {
723        let mut sanitized_values = Vec::with_capacity(values.len());
724        for (col_def, value) in values.into_iter() {
725            let value = match T::sanitizer(col_def.name) {
726                Some(sanitizer) => sanitizer.sanitize(value)?,
727                None => value,
728            };
729            sanitized_values.push((col_def, value));
730        }
731        Ok(sanitized_values)
732    }
733
734    /// Collects all records matching a filter from the table registry.
735    #[allow(clippy::type_complexity)]
736    fn collect_matching_records<T>(
737        &self,
738        table_registry: &TableRegistry,
739        filter: &Option<Filter>,
740    ) -> DbmsResult<Vec<(NextRecord<T>, Vec<(ColumnDef, Value)>)>>
741    where
742        T: TableSchema,
743    {
744        let mut mm = self.ctx.mm.borrow_mut();
745
746        // `collect_matching_records` is only used by the non-transactional update/delete paths.
747        // Transactional mutations first resolve rows via `existing_rows_for_filter`, which reads
748        // through `select()` and therefore includes the overlay. Using `overlay = None` here is
749        // intentional because the atomic write path is operating on committed storage only.
750        if let Some(filter) = filter
751            && let Some(analyzed) = analyze_filter(filter, T::indexes())
752        {
753            let reader = IndexReader::new(table_registry.index_ledger(), None);
754            let search_result = self.execute_index_plan(&reader, &analyzed.plan, &mut *mm)?;
755
756            let mut records = Vec::new();
757            for address in search_result.addresses {
758                let record: T = table_registry
759                    .read_at(address, &mut *mm)
760                    .map_err(DbmsError::from)?;
761                let record_values = record.clone().to_values();
762                if let Some(remaining_filter) = &analyzed.remaining_filter
763                    && !self.record_matches_filter(&record_values, remaining_filter)?
764                {
765                    continue;
766                }
767                records.push((
768                    NextRecord {
769                        record,
770                        page: address.page,
771                        offset: address.offset,
772                    },
773                    record_values,
774                ));
775            }
776
777            return Ok(records);
778        }
779
780        let mut table_reader = table_registry.read::<T, _>(&mut *mm);
781        let mut records = vec![];
782        while let Some(values) = table_reader.try_next()? {
783            let record_values = values.record.clone().to_values();
784            if let Some(filter) = filter
785                && !self.record_matches_filter(&record_values, filter)?
786            {
787                continue;
788            }
789            records.push((values, record_values));
790        }
791        Ok(records)
792    }
793
794    /// For each indexed column for the table, inserts the index for the given record address.
795    fn insert_index<T>(
796        &self,
797        table_registry: &mut TableRegistry,
798        record_address: RecordAddress,
799        values: &[(ColumnDef, Value)],
800        mm: &mut impl wasm_dbms_memory::MemoryAccess,
801    ) -> DbmsResult<()>
802    where
803        T: TableSchema,
804    {
805        let index_ledger = table_registry.index_ledger_mut();
806        for columns in T::indexes().iter().map(|index| index.columns()) {
807            let key = index_key(columns, values);
808            index_ledger.insert(columns, key, record_address, mm)?;
809        }
810
811        Ok(())
812    }
813
814    /// For each indexed column for the table, deletes the index for the given record address.
815    fn delete_index<T>(
816        &self,
817        table_registry: &mut TableRegistry,
818        record_address: RecordAddress,
819        values: &[(ColumnDef, Value)],
820        mm: &mut impl wasm_dbms_memory::MemoryAccess,
821    ) -> DbmsResult<()>
822    where
823        T: TableSchema,
824    {
825        let index_ledger = table_registry.index_ledger_mut();
826        for columns in T::indexes().iter().map(|index| index.columns()) {
827            let key = index_key(columns, values);
828            index_ledger.delete(columns, &key, record_address, mm)?;
829        }
830        Ok(())
831    }
832
833    /// For each indexed column for the table, updates the index for the given record address.
834    ///
835    /// When an indexed column's value changed, the old key is deleted and the new key is inserted.
836    /// When only the record address moved (same key), the pointer is updated in place.
837    fn update_index<T>(
838        &self,
839        table_registry: &mut TableRegistry,
840        old_record_address: RecordAddress,
841        new_record_address: RecordAddress,
842        old_values: &[(ColumnDef, Value)],
843        new_values: &[(ColumnDef, Value)],
844        mm: &mut impl wasm_dbms_memory::MemoryAccess,
845    ) -> DbmsResult<()>
846    where
847        T: TableSchema,
848    {
849        let index_ledger = table_registry.index_ledger_mut();
850        for columns in T::indexes().iter().map(|index| index.columns()) {
851            let old_key = index_key(columns, old_values);
852            let new_key = index_key(columns, new_values);
853            if old_key == new_key {
854                index_ledger.update(
855                    columns,
856                    &new_key,
857                    old_record_address,
858                    new_record_address,
859                    mm,
860                )?;
861            } else {
862                index_ledger.delete(columns, &old_key, old_record_address, mm)?;
863                index_ledger.insert(columns, new_key, new_record_address, mm)?;
864            }
865        }
866        Ok(())
867    }
868
869    /// Fills in auto-increment values for columns that are missing from the input.
870    fn fill_auto_increment_values<T>(
871        &self,
872        table_registry: &mut TableRegistry,
873        mut values: Vec<(ColumnDef, Value)>,
874    ) -> DbmsResult<Vec<(ColumnDef, Value)>>
875    where
876        T: TableSchema,
877    {
878        let mut mm = self.ctx.mm.borrow_mut();
879        // iter over auto-increment columns, for each of them check if the value is provided, if not get the next auto-increment value.
880        for auto_increment_column in T::columns().iter().filter(|col| col.auto_increment) {
881            if values
882                .iter()
883                .any(|(col_def, _)| col_def.name == auto_increment_column.name)
884            {
885                continue;
886            }
887            let next_value = table_registry
888                .next_autoincrement(auto_increment_column.name, &mut *mm)?
889                .ok_or(DbmsError::Table(TableError::SchemaMismatch))?;
890            values.push((*auto_increment_column, next_value));
891        }
892
893        Ok(values)
894    }
895}
896
897/// Provides ordering for two optional values by direction.
898pub fn sort_values_with_direction(
899    a: Option<&Value>,
900    b: Option<&Value>,
901    direction: OrderDirection,
902) -> Ordering {
903    match (a, b) {
904        (Some(a_val), Some(b_val)) => match direction {
905            OrderDirection::Ascending => a_val.cmp(b_val),
906            OrderDirection::Descending => b_val.cmp(a_val),
907        },
908        (Some(_), None) => std::cmp::Ordering::Greater,
909        (None, Some(_)) => std::cmp::Ordering::Less,
910        (None, None) => std::cmp::Ordering::Equal,
911    }
912}
913
914/// Converts column-value pairs to a schema entity.
915fn values_to_schema_entity<T>(values: Vec<(ColumnDef, Value)>) -> DbmsResult<T>
916where
917    T: TableSchema,
918{
919    let record = T::Insert::from_values(&values)?.into_record();
920    Ok(record)
921}
922
923/// Builds the index key for the given columns by extracting values from the record.
924///
925/// Columns not found in `values` default to [`Value::Null`].
926fn index_key(columns: &[&str], values: &[(ColumnDef, Value)]) -> Vec<Value> {
927    columns
928        .iter()
929        .map(|col| {
930            values
931                .iter()
932                .find(|(cd, _)| cd.name == *col)
933                .map(|(_, v)| v.clone())
934                .unwrap_or(Value::Null)
935        })
936        .collect()
937}
938
939impl<M, A> Database for WasmDbmsDatabase<'_, M, A>
940where
941    M: MemoryProvider,
942    A: AccessControl,
943{
944    fn select<T>(&self, query: Query) -> DbmsResult<Vec<T::Record>>
945    where
946        T: TableSchema,
947    {
948        if !query.joins.is_empty() {
949            return Err(DbmsError::Query(QueryError::JoinInsideTypedSelect));
950        }
951        let results = self.select_columns::<T>(query)?;
952        Ok(results.into_iter().map(T::Record::from_values).collect())
953    }
954
955    fn select_raw(&self, table: &str, query: Query) -> DbmsResult<Vec<Vec<(ColumnDef, Value)>>> {
956        self.schema.select(self, table, query)
957    }
958
959    fn insert<T>(&self, record: T::Insert) -> DbmsResult<()>
960    where
961        T: TableSchema,
962        T::Insert: InsertRecord<Schema = T>,
963    {
964        let mut table_registry = self.load_table_registry::<T>()?;
965        let record_values = record.clone().into_values();
966        let record_values =
967            self.fill_auto_increment_values::<T>(&mut table_registry, record_values)?;
968        let sanitized_values = self.sanitize_values::<T>(record_values)?;
969        self.schema
970            .validate_insert(self, T::table_name(), &sanitized_values)?;
971        if self.transaction.is_some() {
972            self.with_transaction_mut(|tx| tx.insert::<T>(sanitized_values))?;
973        } else {
974            self.atomic(|db| {
975                let record = T::Insert::from_values(&sanitized_values)?;
976                let mut mm = db.ctx.mm.borrow_mut();
977                // update journal with the insert operation before mutating memory
978                let mut journal_ref = db.ctx.journal.borrow_mut();
979                let journal = journal_ref
980                    .as_mut()
981                    .expect("journal must be active inside atomic");
982                let mut writer = JournaledWriter::new(&mut *mm, journal);
983                // insert the record in the table registry, and eventually update the indexes
984                let record_address = table_registry
985                    .insert(record.into_record(), &mut writer)
986                    .map_err(DbmsError::from)?;
987                self.insert_index::<T>(
988                    &mut table_registry,
989                    record_address,
990                    &sanitized_values,
991                    &mut writer,
992                )?;
993                Ok(())
994            })?;
995        }
996
997        Ok(())
998    }
999
1000    fn update<T>(&self, patch: T::Update) -> DbmsResult<u64>
1001    where
1002        T: TableSchema,
1003        T::Update: UpdateRecord<Schema = T>,
1004    {
1005        let filter = patch.where_clause().clone();
1006        if self.transaction.is_some() {
1007            let rows = self.existing_rows_for_filter::<T>(filter.clone())?;
1008            let count = rows.len() as u64;
1009            self.with_transaction_mut(|tx| tx.update::<T>(patch, filter, rows))?;
1010
1011            return Ok(count);
1012        }
1013
1014        let patch = patch.update_values();
1015
1016        let pk_in_patch = patch.iter().find_map(|(col_def, value)| {
1017            if col_def.primary_key {
1018                Some((col_def, value))
1019            } else {
1020                None
1021            }
1022        });
1023
1024        self.atomic(|db| {
1025            let mut count = 0;
1026
1027            let mut table_registry = db.load_table_registry::<T>()?;
1028            let records = db.collect_matching_records::<T>(&table_registry, &filter)?;
1029
1030            for (record, record_values) in records {
1031                let current_pk_value = record_values
1032                    .iter()
1033                    .find(|(col_def, _)| col_def.primary_key)
1034                    .expect("primary key not found")
1035                    .1
1036                    .clone();
1037
1038                let previous_record = values_to_schema_entity::<T>(record_values.clone())?;
1039                let old_values_for_index = record_values.clone();
1040                let mut record_values = record_values;
1041
1042                for (patch_col_def, patch_value) in &patch {
1043                    if let Some((_, record_value)) = record_values
1044                        .iter_mut()
1045                        .find(|(record_col_def, _)| record_col_def.name == patch_col_def.name)
1046                    {
1047                        *record_value = patch_value.clone();
1048                    }
1049                }
1050                let record_values = db.sanitize_values::<T>(record_values)?;
1051                db.schema.validate_update(
1052                    db,
1053                    T::table_name(),
1054                    &record_values,
1055                    current_pk_value.clone(),
1056                )?;
1057                let updated_record = values_to_schema_entity::<T>(record_values.clone())?;
1058                {
1059                    let mut mm = db.ctx.mm.borrow_mut();
1060                    // update journal with the update operation before mutating memory
1061                    let mut journal_ref = db.ctx.journal.borrow_mut();
1062                    let journal = journal_ref
1063                        .as_mut()
1064                        .expect("journal must be active inside atomic");
1065                    let mut writer = JournaledWriter::new(&mut *mm, journal);
1066                    // update table registry
1067                    let old_address = RecordAddress::new(record.page, record.offset);
1068                    let new_address = table_registry
1069                        .update(updated_record, previous_record, old_address, &mut writer)
1070                        .map_err(DbmsError::from)?;
1071                    // update indexes if needed
1072                    self.update_index::<T>(
1073                        &mut table_registry,
1074                        old_address,
1075                        new_address,
1076                        &old_values_for_index,
1077                        &record_values,
1078                        &mut writer,
1079                    )?;
1080                }
1081                count += 1;
1082
1083                if let Some((pk_column, new_pk_value)) = pk_in_patch {
1084                    count += db.update_pk_referencing_updated_table::<T>(
1085                        current_pk_value,
1086                        new_pk_value.clone(),
1087                        pk_column.data_type,
1088                        pk_column.name,
1089                    )?;
1090                }
1091            }
1092
1093            Ok(count)
1094        })
1095    }
1096
1097    fn delete<T>(&self, behaviour: DeleteBehavior, filter: Option<Filter>) -> DbmsResult<u64>
1098    where
1099        T: TableSchema,
1100    {
1101        if self.transaction.is_some() {
1102            let rows = self.existing_rows_for_filter::<T>(filter.clone())?;
1103            let count = rows.len() as u64;
1104
1105            self.with_transaction_mut(|tx| tx.delete::<T>(behaviour, filter, rows))?;
1106
1107            return Ok(count);
1108        }
1109
1110        self.atomic(|db| {
1111            let mut table_registry = db.load_table_registry::<T>()?;
1112            let records = db.collect_matching_records::<T>(&table_registry, &filter)?;
1113            let mut count = records.len() as u64;
1114            for (record, record_values) in records {
1115                match behaviour {
1116                    DeleteBehavior::Cascade => {
1117                        count += db.delete_foreign_keys_cascade::<T>(&record_values)?;
1118                    }
1119                    DeleteBehavior::Restrict => {
1120                        if db.has_foreign_key_references::<T>(&record_values)? {
1121                            return Err(DbmsError::Query(
1122                                QueryError::ForeignKeyConstraintViolation {
1123                                    referencing_table: T::table_name().to_string(),
1124                                    field: T::primary_key().to_string(),
1125                                },
1126                            ));
1127                        }
1128                    }
1129                }
1130                let mut mm = db.ctx.mm.borrow_mut();
1131                let mut journal_ref = db.ctx.journal.borrow_mut();
1132                let journal = journal_ref
1133                    .as_mut()
1134                    .expect("journal must be active inside atomic");
1135                // write table and index deletions to the journal before mutating memory
1136                let mut writer = JournaledWriter::new(&mut *mm, journal);
1137                let address = RecordAddress::new(record.page, record.offset);
1138                table_registry
1139                    .delete(record.record, address, &mut writer)
1140                    .map_err(DbmsError::from)?;
1141                self.delete_index::<T>(&mut table_registry, address, &record_values, &mut writer)?;
1142            }
1143
1144            Ok(count)
1145        })
1146    }
1147
1148    fn commit(&mut self) -> DbmsResult<()> {
1149        let Some(txid) = self.transaction.take() else {
1150            return Err(DbmsError::Transaction(
1151                TransactionError::NoActiveTransaction,
1152            ));
1153        };
1154        let transaction = {
1155            let mut ts = self.ctx.transaction_session.borrow_mut();
1156            ts.take_transaction(&txid)?
1157        };
1158
1159        *self.ctx.journal.borrow_mut() = Some(Journal::new());
1160
1161        for op in transaction.operations {
1162            let result = match op {
1163                TransactionOp::Insert { table, values } => self
1164                    .schema
1165                    .validate_insert(self, table, &values)
1166                    .and_then(|()| self.schema.insert(self, table, &values)),
1167                TransactionOp::Delete {
1168                    table,
1169                    behaviour,
1170                    filter,
1171                } => self
1172                    .schema
1173                    .delete(self, table, behaviour, filter)
1174                    .map(|_| ()),
1175                TransactionOp::Update {
1176                    table,
1177                    patch,
1178                    filter,
1179                } => self.schema.update(self, table, &patch, filter).map(|_| ()),
1180            };
1181
1182            if let Err(err) = result {
1183                if let Some(journal) = self.ctx.journal.borrow_mut().take() {
1184                    journal
1185                        .rollback(&mut self.ctx.mm.borrow_mut())
1186                        .expect("critical: failed to rollback journal");
1187                }
1188                return Err(err);
1189            }
1190        }
1191
1192        if let Some(journal) = self.ctx.journal.borrow_mut().take() {
1193            journal.commit();
1194        }
1195        Ok(())
1196    }
1197
1198    fn rollback(&mut self) -> DbmsResult<()> {
1199        let Some(txid) = self.transaction.take() else {
1200            return Err(DbmsError::Transaction(
1201                TransactionError::NoActiveTransaction,
1202            ));
1203        };
1204
1205        let mut ts = self.ctx.transaction_session.borrow_mut();
1206        ts.close_transaction(&txid);
1207        Ok(())
1208    }
1209}
1210
1211#[cfg(test)]
1212mod tests;