1mod filter_analyzer;
7mod index_reader;
8
9use std::cmp::Ordering;
10use std::collections::HashSet;
11
12use wasm_dbms_api::prelude::{
13 CandidColumnDef, ColumnDef, DataTypeKind, Database, DbmsError, DbmsResult, DeleteBehavior,
14 Filter, ForeignFetcher, ForeignKeyDef, InsertRecord, OrderDirection, Query, QueryError,
15 TableColumns, TableError, TableRecord, TableSchema, TransactionError, TransactionId,
16 UpdateRecord, Value, ValuesSource,
17};
18use wasm_dbms_memory::RecordAddress;
19use wasm_dbms_memory::prelude::{
20 AccessControl, AccessControlList, MemoryAccess, MemoryProvider, NextRecord, TableRegistry,
21};
22
23use self::filter_analyzer::{IndexPlan, analyze_filter};
24use self::index_reader::{IndexReader, IndexSearchResult};
25use crate::context::DbmsContext;
26use crate::schema::DatabaseSchema;
27use crate::transaction::journal::{Journal, JournaledWriter};
28use crate::transaction::{DatabaseOverlay, Transaction, TransactionOp};
29
30const DEFAULT_SELECT_CAPACITY: usize = 128;
32
33pub struct WasmDbmsDatabase<'ctx, M, A = AccessControlList>
39where
40 M: MemoryProvider,
41 A: AccessControl,
42{
43 ctx: &'ctx DbmsContext<M, A>,
45 schema: Box<dyn DatabaseSchema<M, A> + 'ctx>,
47 transaction: Option<TransactionId>,
49}
50
51impl<'ctx, M, A> WasmDbmsDatabase<'ctx, M, A>
52where
53 M: MemoryProvider,
54 A: AccessControl,
55{
56 pub fn oneshot(ctx: &'ctx DbmsContext<M, A>, schema: impl DatabaseSchema<M, A> + 'ctx) -> Self {
58 Self {
59 ctx,
60 schema: Box::new(schema),
61 transaction: None,
62 }
63 }
64
65 pub fn from_transaction(
67 ctx: &'ctx DbmsContext<M, A>,
68 schema: impl DatabaseSchema<M, A> + 'ctx,
69 transaction_id: TransactionId,
70 ) -> Self {
71 Self {
72 ctx,
73 schema: Box::new(schema),
74 transaction: Some(transaction_id),
75 }
76 }
77
78 fn with_transaction_mut<F, R>(&self, f: F) -> DbmsResult<R>
80 where
81 F: FnOnce(&mut Transaction) -> DbmsResult<R>,
82 {
83 let txid = self.transaction.as_ref().ok_or(DbmsError::Transaction(
84 TransactionError::NoActiveTransaction,
85 ))?;
86
87 let mut ts = self.ctx.transaction_session.borrow_mut();
88 let tx = ts.get_transaction_mut(txid)?;
89 f(tx)
90 }
91
92 fn with_transaction<F, R>(&self, f: F) -> DbmsResult<R>
94 where
95 F: FnOnce(&Transaction) -> DbmsResult<R>,
96 {
97 let txid = self.transaction.as_ref().ok_or(DbmsError::Transaction(
98 TransactionError::NoActiveTransaction,
99 ))?;
100
101 let ts = self.ctx.transaction_session.borrow();
102 let tx = ts.get_transaction(txid)?;
103 f(tx)
104 }
105
106 fn atomic<F, R>(&self, f: F) -> DbmsResult<R>
120 where
121 F: FnOnce(&WasmDbmsDatabase<'ctx, M, A>) -> DbmsResult<R>,
122 {
123 let nested = self.ctx.journal.borrow().is_some();
124 if !nested {
125 *self.ctx.journal.borrow_mut() = Some(Journal::new());
126 }
127 match f(self) {
128 Ok(res) => {
129 if !nested && let Some(journal) = self.ctx.journal.borrow_mut().take() {
130 journal.commit();
131 }
132 Ok(res)
133 }
134 Err(err) => {
135 if !nested && let Some(journal) = self.ctx.journal.borrow_mut().take() {
136 journal
137 .rollback(&mut self.ctx.mm.borrow_mut())
138 .expect("critical: failed to rollback journal");
139 }
140 Err(err)
141 }
142 }
143 }
144
145 fn has_foreign_key_references<T>(
149 &self,
150 record_values: &[(ColumnDef, Value)],
151 ) -> DbmsResult<bool>
152 where
153 T: TableSchema,
154 {
155 let pk = Self::extract_pk::<T>(record_values)?;
156
157 for (table, columns) in self.schema.referenced_tables(T::table_name()) {
158 for column in columns.iter() {
159 let filter = Filter::eq(column, pk.clone());
160 let query = Query::builder().field(column).filter(Some(filter)).build();
161 let rows = self.schema.select(self, table, query)?;
162 if !rows.is_empty() {
163 return Ok(true);
164 }
165 }
166 }
167 Ok(false)
168 }
169
170 fn delete_foreign_keys_cascade<T>(
172 &self,
173 record_values: &[(ColumnDef, Value)],
174 ) -> DbmsResult<u64>
175 where
176 T: TableSchema,
177 {
178 let pk = Self::extract_pk::<T>(record_values)?;
179
180 let mut count = 0;
181 for (table, columns) in self.schema.referenced_tables(T::table_name()) {
182 for column in columns.iter() {
183 let filter = Filter::eq(column, pk.clone());
184 let res = self
185 .schema
186 .delete(self, table, DeleteBehavior::Cascade, Some(filter))?;
187 count += res;
188 }
189 }
190 Ok(count)
191 }
192
193 fn extract_pk<T>(record_values: &[(ColumnDef, Value)]) -> DbmsResult<Value>
195 where
196 T: TableSchema,
197 {
198 record_values
199 .iter()
200 .find(|(col_def, _)| col_def.primary_key)
201 .ok_or(DbmsError::Query(QueryError::UnknownColumn(
202 T::primary_key().to_string(),
203 )))
204 .map(|(_, v)| v.clone())
205 }
206
207 fn overlay(&self) -> DbmsResult<DatabaseOverlay> {
209 self.with_transaction(|tx| Ok(tx.overlay().clone()))
210 }
211
212 fn record_matches_filter(
214 &self,
215 record_values: &[(ColumnDef, Value)],
216 filter: &Filter,
217 ) -> DbmsResult<bool> {
218 filter.matches(record_values).map_err(DbmsError::from)
219 }
220
221 fn apply_column_selection<T>(&self, results: &mut [TableColumns], query: &Query)
223 where
224 T: TableSchema,
225 {
226 if query.all_selected() {
227 return;
228 }
229 let selected_columns = query.columns::<T>();
230 results
231 .iter_mut()
232 .flat_map(|record| record.iter_mut())
233 .filter(|(source, _)| *source == ValuesSource::This)
234 .for_each(|(_, cols)| {
235 cols.retain(|(col_def, _)| selected_columns.contains(&col_def.name.to_string()));
236 });
237 }
238
239 fn batch_load_eager_relations<T>(
241 &self,
242 results: &mut [TableColumns],
243 query: &Query,
244 ) -> DbmsResult<()>
245 where
246 T: TableSchema,
247 {
248 if query.eager_relations.is_empty() {
249 return Ok(());
250 }
251
252 let fetcher = T::foreign_fetcher();
253
254 for relation in &query.eager_relations {
255 let fk_columns = Self::collect_fk_values::<T>(results, relation)?;
256
257 for (local_column, pk_values) in &fk_columns {
258 let batch_map = fetcher.fetch_batch(self, relation, pk_values)?;
259
260 Self::verify_fk_batch(&batch_map, pk_values, relation)?;
261 Self::attach_foreign_data(results, &batch_map, relation, local_column);
262 }
263 }
264
265 Ok(())
266 }
267
268 fn collect_fk_values<T>(
270 results: &[TableColumns],
271 relation: &str,
272 ) -> DbmsResult<Vec<(&'static str, Vec<Value>)>>
273 where
274 T: TableSchema,
275 {
276 let mut fk_columns: Vec<(&'static str, HashSet<Value>)> = vec![];
277
278 for record_columns in results {
279 let Some(cols) = Self::this_columns(record_columns) else {
280 continue;
281 };
282
283 let mut found_fk = false;
284 for (col_def, value) in cols {
285 let Some(fk) = &col_def.foreign_key else {
286 continue;
287 };
288 if *fk.foreign_table != *relation {
289 continue;
290 }
291
292 found_fk = true;
293 match fk_columns.iter_mut().find(|(lc, _)| *lc == fk.local_column) {
294 Some((_, values)) => {
295 values.insert(value.clone());
296 }
297 None => {
298 let mut set = HashSet::new();
299 set.insert(value.clone());
300 fk_columns.push((fk.local_column, set));
301 }
302 }
303 }
304
305 if !found_fk {
306 return Err(DbmsError::Query(QueryError::InvalidQuery(format!(
307 "Cannot load relation '{relation}' for table '{}': no foreign key found",
308 T::table_name()
309 ))));
310 }
311 }
312
313 Ok(fk_columns
314 .into_iter()
315 .map(|(col, set)| (col, set.into_iter().collect()))
316 .collect())
317 }
318
319 fn verify_fk_batch(
321 batch_map: &std::collections::HashMap<Value, Vec<(ColumnDef, Value)>>,
322 pk_values: &[Value],
323 relation: &str,
324 ) -> DbmsResult<()> {
325 if let Some(missing) = pk_values.iter().find(|v| !batch_map.contains_key(v)) {
326 return Err(DbmsError::Query(QueryError::BrokenForeignKeyReference {
327 table: relation.to_string(),
328 key: missing.clone(),
329 }));
330 }
331 Ok(())
332 }
333
334 fn attach_foreign_data(
336 results: &mut [TableColumns],
337 batch_map: &std::collections::HashMap<Value, Vec<(ColumnDef, Value)>>,
338 relation: &str,
339 local_column: &str,
340 ) {
341 for record_columns in results.iter_mut() {
342 let fk_value = Self::this_columns(record_columns).and_then(|cols| {
343 cols.iter().find_map(|(col_def, value)| {
344 let fk = col_def.foreign_key.as_ref()?;
345 (fk.foreign_table == relation && fk.local_column == local_column)
346 .then(|| value.clone())
347 })
348 });
349
350 let Some(fk_val) = fk_value else { continue };
351 let Some(foreign_values) = batch_map.get(&fk_val) else {
352 continue;
353 };
354
355 record_columns.push((
356 ValuesSource::Foreign {
357 table: relation.to_string(),
358 column: local_column.to_string(),
359 },
360 foreign_values.clone(),
361 ));
362 }
363 }
364
365 fn this_columns(
367 record: &[(ValuesSource, Vec<(ColumnDef, Value)>)],
368 ) -> Option<&Vec<(ColumnDef, Value)>> {
369 record
370 .iter()
371 .find(|(src, _)| *src == ValuesSource::This)
372 .map(|(_, cols)| cols)
373 }
374
375 #[expect(
377 clippy::type_complexity,
378 reason = "complex return type is necessary for returning both PK and full row data"
379 )]
380 fn existing_rows_for_filter<T>(
381 &self,
382 filter: Option<Filter>,
383 ) -> DbmsResult<Vec<(Value, Vec<(ColumnDef, Value)>)>>
384 where
385 T: TableSchema,
386 {
387 let pk = T::primary_key();
388 let query = Query::builder().filter(filter).build();
389 let records = self.select::<T>(query)?;
390 let rows = records
391 .into_iter()
392 .map(|record| {
393 let values = record.to_values();
394 let pk_value = values
395 .iter()
396 .find(|(col_def, _)| col_def.name == pk)
397 .expect("primary key not found")
398 .1
399 .clone();
400 (pk_value, values)
401 })
402 .collect();
403
404 Ok(rows)
405 }
406
407 fn load_table_registry<T>(&self) -> DbmsResult<TableRegistry>
409 where
410 T: TableSchema,
411 {
412 let sr = self.ctx.schema_registry.borrow();
413 let registry_pages = sr
414 .table_registry_page::<T>()
415 .ok_or(DbmsError::Table(TableError::TableNotFound))?;
416
417 let mut mm = self.ctx.mm.borrow_mut();
418 TableRegistry::load(registry_pages, &mut *mm).map_err(DbmsError::from)
419 }
420
421 fn sort_query_results(
423 &self,
424 results: &mut [TableColumns],
425 column: &str,
426 direction: OrderDirection,
427 ) {
428 results.sort_by(|a, b| {
429 fn get_value<'a>(
430 values: &'a [(ValuesSource, Vec<(ColumnDef, Value)>)],
431 column: &str,
432 ) -> Option<&'a Value> {
433 values
434 .iter()
435 .find(|(source, _)| *source == ValuesSource::This)
436 .and_then(|(_, cols)| {
437 cols.iter()
438 .find(|(col_def, _)| col_def.name == column)
439 .map(|(_, value)| value)
440 })
441 }
442
443 let a_value = get_value(a, column);
444 let b_value = get_value(b, column);
445
446 sort_values_with_direction(a_value, b_value, direction)
447 });
448 }
449
450 fn execute_index_plan<MA>(
451 &self,
452 reader: &IndexReader<'_>,
453 plan: &IndexPlan,
454 mm: &mut MA,
455 ) -> DbmsResult<IndexSearchResult>
456 where
457 MA: MemoryAccess,
458 {
459 let columns = [plan.column()];
460 match plan {
461 IndexPlan::Eq { value, .. } => {
462 let key = [value.clone()];
463 reader
464 .search_eq(&columns, &key, mm)
465 .map_err(DbmsError::from)
466 }
467 IndexPlan::Range { start, end, .. } => {
468 let start_key = start.as_ref().map(|value| vec![value.clone()]);
469 let end_key = end.as_ref().map(|value| vec![value.clone()]);
470 reader
471 .search_range(&columns, start_key.as_deref(), end_key.as_deref(), mm)
472 .map_err(DbmsError::from)
473 }
474 IndexPlan::In { values, .. } => {
475 let keys: Vec<Vec<Value>> =
476 values.iter().cloned().map(|value| vec![value]).collect();
477 reader
478 .search_in(&columns, &keys, mm)
479 .map_err(DbmsError::from)
480 }
481 }
482 }
483
484 #[expect(
485 clippy::type_complexity,
486 reason = "complex return type is necessary for returning addresses and overlay PKs"
487 )]
488 fn try_index_select<T>(
489 &self,
490 query: &Query,
491 table_registry: &TableRegistry,
492 table_overlay: &DatabaseOverlay,
493 ) -> DbmsResult<Option<Vec<Vec<(ColumnDef, Value)>>>>
494 where
495 T: TableSchema,
496 {
497 let Some(filter) = &query.filter else {
498 return Ok(None);
499 };
500
501 let Some(analyzed) = analyze_filter(filter, T::indexes()) else {
502 return Ok(None);
503 };
504
505 let mut mm = self.ctx.mm.borrow_mut();
506 let reader = IndexReader::new(
507 table_registry.index_ledger(),
508 table_overlay.index_overlay(T::table_name()),
509 );
510 let search_result = self.execute_index_plan(&reader, &analyzed.plan, &mut *mm)?;
511
512 let mut indexed_rows = Vec::new();
513 let pk_name = T::primary_key();
514
515 for address in &search_result.addresses {
516 let record: T = table_registry
517 .read_at(*address, &mut *mm)
518 .map_err(DbmsError::from)?;
519 let values = record.to_values();
520 let Some(pk) = values
521 .iter()
522 .find(|(column, _)| column.name == pk_name)
523 .map(|(_, value)| value)
524 else {
525 continue;
526 };
527
528 if search_result.removed_pks.contains(pk) || search_result.overlay_pks.contains(pk) {
529 continue;
530 }
531
532 if let Some(remaining_filter) = &analyzed.remaining_filter
533 && !self.record_matches_filter(&values, remaining_filter)?
534 {
535 continue;
536 }
537
538 indexed_rows.push(values);
539 }
540
541 if let Some(overlay) = table_overlay.table_overlay(T::table_name()) {
542 let mut pending_overlay_pks = search_result.overlay_pks.clone();
543
544 for row in overlay.iter_inserted() {
545 let Some(pk) = row
546 .iter()
547 .find(|(column, _)| column.name == pk_name)
548 .map(|(_, value)| value)
549 else {
550 continue;
551 };
552
553 if !pending_overlay_pks.remove(pk) {
554 continue;
555 }
556 if let Some(remaining_filter) = &analyzed.remaining_filter
557 && !self.record_matches_filter(&row, remaining_filter)?
558 {
559 continue;
560 }
561
562 indexed_rows.push(row);
563 }
564
565 if !pending_overlay_pks.is_empty() {
566 let pk_reader = IndexReader::new(table_registry.index_ledger(), None);
567 let pk_columns = [T::primary_key()];
568
569 for pk in pending_overlay_pks {
570 let pk_key = [pk];
571 let pk_lookup = pk_reader.search_eq(&pk_columns, &pk_key, &mut *mm)?;
572 for address in pk_lookup.addresses {
573 let record: T = table_registry
574 .read_at(address, &mut *mm)
575 .map_err(DbmsError::from)?;
576 let values = record.to_values();
577 let Some(patched_values) = overlay.patch_row(values) else {
578 continue;
579 };
580
581 if let Some(remaining_filter) = &analyzed.remaining_filter
582 && !self.record_matches_filter(&patched_values, remaining_filter)?
583 {
584 continue;
585 }
586
587 indexed_rows.push(patched_values);
588 }
589 }
590 }
591 }
592
593 Ok(Some(indexed_rows))
594 }
595
596 #[doc(hidden)]
598 pub fn select_columns<T>(&self, query: Query) -> DbmsResult<Vec<TableColumns>>
599 where
600 T: TableSchema,
601 {
602 let table_registry = self.load_table_registry::<T>()?;
603 let mut table_overlay = if self.transaction.is_some() {
604 self.overlay()?
605 } else {
606 DatabaseOverlay::default()
607 };
608
609 let mut results = Vec::with_capacity(query.limit.unwrap_or(DEFAULT_SELECT_CAPACITY));
610 let mut count = 0;
611
612 if let Some(indexed_rows) =
613 self.try_index_select::<T>(&query, &table_registry, &table_overlay)?
614 {
615 for values in indexed_rows {
616 count += 1;
617 if query.offset.is_some_and(|offset| count <= offset) {
618 continue;
619 }
620 results.push(vec![(ValuesSource::This, values)]);
621 if query.limit.is_some_and(|limit| results.len() >= limit) {
622 break;
623 }
624 }
625 } else {
626 let mut mm = self.ctx.mm.borrow_mut();
627 let table_reader = table_registry.read::<T, _>(&mut *mm);
628 let mut table_reader = table_overlay.reader(table_reader);
629
630 while let Some(values) = table_reader.try_next()? {
631 if let Some(filter) = &query.filter
632 && !self.record_matches_filter(&values, filter)?
633 {
634 continue;
635 }
636 count += 1;
637 if query.offset.is_some_and(|offset| count <= offset) {
638 continue;
639 }
640 results.push(vec![(ValuesSource::This, values)]);
641 if query.limit.is_some_and(|limit| results.len() >= limit) {
642 break;
643 }
644 }
645 }
646
647 self.batch_load_eager_relations::<T>(&mut results, &query)?;
648 self.apply_column_selection::<T>(&mut results, &query);
649
650 for (column, direction) in query.order_by.into_iter().rev() {
651 self.sort_query_results(&mut results, &column, direction);
652 }
653
654 Ok(results)
655 }
656
657 #[doc(hidden)]
659 pub fn select_join(
660 &self,
661 table: &str,
662 query: Query,
663 ) -> DbmsResult<Vec<Vec<(CandidColumnDef, Value)>>> {
664 self.schema.select_join(self, table, query)
665 }
666
667 fn update_pk_referencing_updated_table<T>(
669 &self,
670 old_pk: Value,
671 new_pk: Value,
672 data_type: DataTypeKind,
673 pk_name: &'static str,
674 ) -> DbmsResult<u64>
675 where
676 T: TableSchema,
677 {
678 let mut count = 0;
679 for (ref_table, ref_col) in self
680 .schema
681 .referenced_tables(T::table_name())
682 .into_iter()
683 .flat_map(|(ref_table, ref_cols)| {
684 ref_cols
685 .into_iter()
686 .map(move |ref_col| (ref_table, ref_col))
687 })
688 {
689 let ref_patch_value = (
690 ColumnDef {
691 name: ref_col,
692 data_type,
693 auto_increment: false,
694 nullable: false,
695 primary_key: false,
696 unique: false,
697 foreign_key: Some(ForeignKeyDef {
698 foreign_table: T::table_name(),
699 foreign_column: pk_name,
700 local_column: ref_col,
701 }),
702 },
703 new_pk.clone(),
704 );
705 let filter = Filter::eq(ref_col, old_pk.clone());
706
707 count += self
708 .schema
709 .update(self, ref_table, &[ref_patch_value], Some(filter))?;
710 }
711
712 Ok(count)
713 }
714
715 fn sanitize_values<T>(
717 &self,
718 values: Vec<(ColumnDef, Value)>,
719 ) -> DbmsResult<Vec<(ColumnDef, Value)>>
720 where
721 T: TableSchema,
722 {
723 let mut sanitized_values = Vec::with_capacity(values.len());
724 for (col_def, value) in values.into_iter() {
725 let value = match T::sanitizer(col_def.name) {
726 Some(sanitizer) => sanitizer.sanitize(value)?,
727 None => value,
728 };
729 sanitized_values.push((col_def, value));
730 }
731 Ok(sanitized_values)
732 }
733
734 #[allow(clippy::type_complexity)]
736 fn collect_matching_records<T>(
737 &self,
738 table_registry: &TableRegistry,
739 filter: &Option<Filter>,
740 ) -> DbmsResult<Vec<(NextRecord<T>, Vec<(ColumnDef, Value)>)>>
741 where
742 T: TableSchema,
743 {
744 let mut mm = self.ctx.mm.borrow_mut();
745
746 if let Some(filter) = filter
751 && let Some(analyzed) = analyze_filter(filter, T::indexes())
752 {
753 let reader = IndexReader::new(table_registry.index_ledger(), None);
754 let search_result = self.execute_index_plan(&reader, &analyzed.plan, &mut *mm)?;
755
756 let mut records = Vec::new();
757 for address in search_result.addresses {
758 let record: T = table_registry
759 .read_at(address, &mut *mm)
760 .map_err(DbmsError::from)?;
761 let record_values = record.clone().to_values();
762 if let Some(remaining_filter) = &analyzed.remaining_filter
763 && !self.record_matches_filter(&record_values, remaining_filter)?
764 {
765 continue;
766 }
767 records.push((
768 NextRecord {
769 record,
770 page: address.page,
771 offset: address.offset,
772 },
773 record_values,
774 ));
775 }
776
777 return Ok(records);
778 }
779
780 let mut table_reader = table_registry.read::<T, _>(&mut *mm);
781 let mut records = vec![];
782 while let Some(values) = table_reader.try_next()? {
783 let record_values = values.record.clone().to_values();
784 if let Some(filter) = filter
785 && !self.record_matches_filter(&record_values, filter)?
786 {
787 continue;
788 }
789 records.push((values, record_values));
790 }
791 Ok(records)
792 }
793
794 fn insert_index<T>(
796 &self,
797 table_registry: &mut TableRegistry,
798 record_address: RecordAddress,
799 values: &[(ColumnDef, Value)],
800 mm: &mut impl wasm_dbms_memory::MemoryAccess,
801 ) -> DbmsResult<()>
802 where
803 T: TableSchema,
804 {
805 let index_ledger = table_registry.index_ledger_mut();
806 for columns in T::indexes().iter().map(|index| index.columns()) {
807 let key = index_key(columns, values);
808 index_ledger.insert(columns, key, record_address, mm)?;
809 }
810
811 Ok(())
812 }
813
814 fn delete_index<T>(
816 &self,
817 table_registry: &mut TableRegistry,
818 record_address: RecordAddress,
819 values: &[(ColumnDef, Value)],
820 mm: &mut impl wasm_dbms_memory::MemoryAccess,
821 ) -> DbmsResult<()>
822 where
823 T: TableSchema,
824 {
825 let index_ledger = table_registry.index_ledger_mut();
826 for columns in T::indexes().iter().map(|index| index.columns()) {
827 let key = index_key(columns, values);
828 index_ledger.delete(columns, &key, record_address, mm)?;
829 }
830 Ok(())
831 }
832
833 fn update_index<T>(
838 &self,
839 table_registry: &mut TableRegistry,
840 old_record_address: RecordAddress,
841 new_record_address: RecordAddress,
842 old_values: &[(ColumnDef, Value)],
843 new_values: &[(ColumnDef, Value)],
844 mm: &mut impl wasm_dbms_memory::MemoryAccess,
845 ) -> DbmsResult<()>
846 where
847 T: TableSchema,
848 {
849 let index_ledger = table_registry.index_ledger_mut();
850 for columns in T::indexes().iter().map(|index| index.columns()) {
851 let old_key = index_key(columns, old_values);
852 let new_key = index_key(columns, new_values);
853 if old_key == new_key {
854 index_ledger.update(
855 columns,
856 &new_key,
857 old_record_address,
858 new_record_address,
859 mm,
860 )?;
861 } else {
862 index_ledger.delete(columns, &old_key, old_record_address, mm)?;
863 index_ledger.insert(columns, new_key, new_record_address, mm)?;
864 }
865 }
866 Ok(())
867 }
868
869 fn fill_auto_increment_values<T>(
871 &self,
872 table_registry: &mut TableRegistry,
873 mut values: Vec<(ColumnDef, Value)>,
874 ) -> DbmsResult<Vec<(ColumnDef, Value)>>
875 where
876 T: TableSchema,
877 {
878 let mut mm = self.ctx.mm.borrow_mut();
879 for auto_increment_column in T::columns().iter().filter(|col| col.auto_increment) {
881 if values
882 .iter()
883 .any(|(col_def, _)| col_def.name == auto_increment_column.name)
884 {
885 continue;
886 }
887 let next_value = table_registry
888 .next_autoincrement(auto_increment_column.name, &mut *mm)?
889 .ok_or(DbmsError::Table(TableError::SchemaMismatch))?;
890 values.push((*auto_increment_column, next_value));
891 }
892
893 Ok(values)
894 }
895}
896
897pub fn sort_values_with_direction(
899 a: Option<&Value>,
900 b: Option<&Value>,
901 direction: OrderDirection,
902) -> Ordering {
903 match (a, b) {
904 (Some(a_val), Some(b_val)) => match direction {
905 OrderDirection::Ascending => a_val.cmp(b_val),
906 OrderDirection::Descending => b_val.cmp(a_val),
907 },
908 (Some(_), None) => std::cmp::Ordering::Greater,
909 (None, Some(_)) => std::cmp::Ordering::Less,
910 (None, None) => std::cmp::Ordering::Equal,
911 }
912}
913
914fn values_to_schema_entity<T>(values: Vec<(ColumnDef, Value)>) -> DbmsResult<T>
916where
917 T: TableSchema,
918{
919 let record = T::Insert::from_values(&values)?.into_record();
920 Ok(record)
921}
922
923fn index_key(columns: &[&str], values: &[(ColumnDef, Value)]) -> Vec<Value> {
927 columns
928 .iter()
929 .map(|col| {
930 values
931 .iter()
932 .find(|(cd, _)| cd.name == *col)
933 .map(|(_, v)| v.clone())
934 .unwrap_or(Value::Null)
935 })
936 .collect()
937}
938
939impl<M, A> Database for WasmDbmsDatabase<'_, M, A>
940where
941 M: MemoryProvider,
942 A: AccessControl,
943{
944 fn select<T>(&self, query: Query) -> DbmsResult<Vec<T::Record>>
945 where
946 T: TableSchema,
947 {
948 if !query.joins.is_empty() {
949 return Err(DbmsError::Query(QueryError::JoinInsideTypedSelect));
950 }
951 let results = self.select_columns::<T>(query)?;
952 Ok(results.into_iter().map(T::Record::from_values).collect())
953 }
954
955 fn select_raw(&self, table: &str, query: Query) -> DbmsResult<Vec<Vec<(ColumnDef, Value)>>> {
956 self.schema.select(self, table, query)
957 }
958
959 fn insert<T>(&self, record: T::Insert) -> DbmsResult<()>
960 where
961 T: TableSchema,
962 T::Insert: InsertRecord<Schema = T>,
963 {
964 let mut table_registry = self.load_table_registry::<T>()?;
965 let record_values = record.clone().into_values();
966 let record_values =
967 self.fill_auto_increment_values::<T>(&mut table_registry, record_values)?;
968 let sanitized_values = self.sanitize_values::<T>(record_values)?;
969 self.schema
970 .validate_insert(self, T::table_name(), &sanitized_values)?;
971 if self.transaction.is_some() {
972 self.with_transaction_mut(|tx| tx.insert::<T>(sanitized_values))?;
973 } else {
974 self.atomic(|db| {
975 let record = T::Insert::from_values(&sanitized_values)?;
976 let mut mm = db.ctx.mm.borrow_mut();
977 let mut journal_ref = db.ctx.journal.borrow_mut();
979 let journal = journal_ref
980 .as_mut()
981 .expect("journal must be active inside atomic");
982 let mut writer = JournaledWriter::new(&mut *mm, journal);
983 let record_address = table_registry
985 .insert(record.into_record(), &mut writer)
986 .map_err(DbmsError::from)?;
987 self.insert_index::<T>(
988 &mut table_registry,
989 record_address,
990 &sanitized_values,
991 &mut writer,
992 )?;
993 Ok(())
994 })?;
995 }
996
997 Ok(())
998 }
999
1000 fn update<T>(&self, patch: T::Update) -> DbmsResult<u64>
1001 where
1002 T: TableSchema,
1003 T::Update: UpdateRecord<Schema = T>,
1004 {
1005 let filter = patch.where_clause().clone();
1006 if self.transaction.is_some() {
1007 let rows = self.existing_rows_for_filter::<T>(filter.clone())?;
1008 let count = rows.len() as u64;
1009 self.with_transaction_mut(|tx| tx.update::<T>(patch, filter, rows))?;
1010
1011 return Ok(count);
1012 }
1013
1014 let patch = patch.update_values();
1015
1016 let pk_in_patch = patch.iter().find_map(|(col_def, value)| {
1017 if col_def.primary_key {
1018 Some((col_def, value))
1019 } else {
1020 None
1021 }
1022 });
1023
1024 self.atomic(|db| {
1025 let mut count = 0;
1026
1027 let mut table_registry = db.load_table_registry::<T>()?;
1028 let records = db.collect_matching_records::<T>(&table_registry, &filter)?;
1029
1030 for (record, record_values) in records {
1031 let current_pk_value = record_values
1032 .iter()
1033 .find(|(col_def, _)| col_def.primary_key)
1034 .expect("primary key not found")
1035 .1
1036 .clone();
1037
1038 let previous_record = values_to_schema_entity::<T>(record_values.clone())?;
1039 let old_values_for_index = record_values.clone();
1040 let mut record_values = record_values;
1041
1042 for (patch_col_def, patch_value) in &patch {
1043 if let Some((_, record_value)) = record_values
1044 .iter_mut()
1045 .find(|(record_col_def, _)| record_col_def.name == patch_col_def.name)
1046 {
1047 *record_value = patch_value.clone();
1048 }
1049 }
1050 let record_values = db.sanitize_values::<T>(record_values)?;
1051 db.schema.validate_update(
1052 db,
1053 T::table_name(),
1054 &record_values,
1055 current_pk_value.clone(),
1056 )?;
1057 let updated_record = values_to_schema_entity::<T>(record_values.clone())?;
1058 {
1059 let mut mm = db.ctx.mm.borrow_mut();
1060 let mut journal_ref = db.ctx.journal.borrow_mut();
1062 let journal = journal_ref
1063 .as_mut()
1064 .expect("journal must be active inside atomic");
1065 let mut writer = JournaledWriter::new(&mut *mm, journal);
1066 let old_address = RecordAddress::new(record.page, record.offset);
1068 let new_address = table_registry
1069 .update(updated_record, previous_record, old_address, &mut writer)
1070 .map_err(DbmsError::from)?;
1071 self.update_index::<T>(
1073 &mut table_registry,
1074 old_address,
1075 new_address,
1076 &old_values_for_index,
1077 &record_values,
1078 &mut writer,
1079 )?;
1080 }
1081 count += 1;
1082
1083 if let Some((pk_column, new_pk_value)) = pk_in_patch {
1084 count += db.update_pk_referencing_updated_table::<T>(
1085 current_pk_value,
1086 new_pk_value.clone(),
1087 pk_column.data_type,
1088 pk_column.name,
1089 )?;
1090 }
1091 }
1092
1093 Ok(count)
1094 })
1095 }
1096
1097 fn delete<T>(&self, behaviour: DeleteBehavior, filter: Option<Filter>) -> DbmsResult<u64>
1098 where
1099 T: TableSchema,
1100 {
1101 if self.transaction.is_some() {
1102 let rows = self.existing_rows_for_filter::<T>(filter.clone())?;
1103 let count = rows.len() as u64;
1104
1105 self.with_transaction_mut(|tx| tx.delete::<T>(behaviour, filter, rows))?;
1106
1107 return Ok(count);
1108 }
1109
1110 self.atomic(|db| {
1111 let mut table_registry = db.load_table_registry::<T>()?;
1112 let records = db.collect_matching_records::<T>(&table_registry, &filter)?;
1113 let mut count = records.len() as u64;
1114 for (record, record_values) in records {
1115 match behaviour {
1116 DeleteBehavior::Cascade => {
1117 count += db.delete_foreign_keys_cascade::<T>(&record_values)?;
1118 }
1119 DeleteBehavior::Restrict => {
1120 if db.has_foreign_key_references::<T>(&record_values)? {
1121 return Err(DbmsError::Query(
1122 QueryError::ForeignKeyConstraintViolation {
1123 referencing_table: T::table_name().to_string(),
1124 field: T::primary_key().to_string(),
1125 },
1126 ));
1127 }
1128 }
1129 }
1130 let mut mm = db.ctx.mm.borrow_mut();
1131 let mut journal_ref = db.ctx.journal.borrow_mut();
1132 let journal = journal_ref
1133 .as_mut()
1134 .expect("journal must be active inside atomic");
1135 let mut writer = JournaledWriter::new(&mut *mm, journal);
1137 let address = RecordAddress::new(record.page, record.offset);
1138 table_registry
1139 .delete(record.record, address, &mut writer)
1140 .map_err(DbmsError::from)?;
1141 self.delete_index::<T>(&mut table_registry, address, &record_values, &mut writer)?;
1142 }
1143
1144 Ok(count)
1145 })
1146 }
1147
1148 fn commit(&mut self) -> DbmsResult<()> {
1149 let Some(txid) = self.transaction.take() else {
1150 return Err(DbmsError::Transaction(
1151 TransactionError::NoActiveTransaction,
1152 ));
1153 };
1154 let transaction = {
1155 let mut ts = self.ctx.transaction_session.borrow_mut();
1156 ts.take_transaction(&txid)?
1157 };
1158
1159 *self.ctx.journal.borrow_mut() = Some(Journal::new());
1160
1161 for op in transaction.operations {
1162 let result = match op {
1163 TransactionOp::Insert { table, values } => self
1164 .schema
1165 .validate_insert(self, table, &values)
1166 .and_then(|()| self.schema.insert(self, table, &values)),
1167 TransactionOp::Delete {
1168 table,
1169 behaviour,
1170 filter,
1171 } => self
1172 .schema
1173 .delete(self, table, behaviour, filter)
1174 .map(|_| ()),
1175 TransactionOp::Update {
1176 table,
1177 patch,
1178 filter,
1179 } => self.schema.update(self, table, &patch, filter).map(|_| ()),
1180 };
1181
1182 if let Err(err) = result {
1183 if let Some(journal) = self.ctx.journal.borrow_mut().take() {
1184 journal
1185 .rollback(&mut self.ctx.mm.borrow_mut())
1186 .expect("critical: failed to rollback journal");
1187 }
1188 return Err(err);
1189 }
1190 }
1191
1192 if let Some(journal) = self.ctx.journal.borrow_mut().take() {
1193 journal.commit();
1194 }
1195 Ok(())
1196 }
1197
1198 fn rollback(&mut self) -> DbmsResult<()> {
1199 let Some(txid) = self.transaction.take() else {
1200 return Err(DbmsError::Transaction(
1201 TransactionError::NoActiveTransaction,
1202 ));
1203 };
1204
1205 let mut ts = self.ctx.transaction_session.borrow_mut();
1206 ts.close_transaction(&txid);
1207 Ok(())
1208 }
1209}
1210
1211#[cfg(test)]
1212mod tests;