dynoxide/storage_backend/mod.rs
1//! Storage backend abstraction.
2//!
3//! Defines the [`StorageBackend`] trait that decouples Dynoxide's data layer
4//! from a specific SQLite binding. The native [`rusqlite`]-backed
5//! [`Storage`](crate::storage::Storage) implements the trait, and the
6//! `wasm-sqlite` build adds [`wasm_backend::WasmBridgeBackend`], which runs the
7//! same SQL against a JS SQLite database over a wasm-bindgen bridge. Both
8//! backends issue identical SQL because they share the builders in
9//! [`sql_builders`].
10//!
11//! The native build consumes the trait monomorphically through `Storage`; the
12//! wasm build consumes it through `WasmBridgeBackend`. The escape hatches
13//! `Storage::conn()` and `Storage::conn_mut()` are not exposed by the trait
14//! and remain native-only.
15//!
16//! # No `Send + Sync` super-trait
17//!
18//! [`Storage`](crate::storage::Storage) carries a `RefCell<HashMap<...>>` for
19//! its metadata cache, so `Storage: !Sync`. A `Send + Sync` super-trait would
20//! refuse the impl on `Storage`. With no dynamic dispatch site in scope,
21//! auto-trait propagation across `.await` is decided per-callsite anyway, so
22//! adding `Send` to the super-trait would not earn any compile-time
23//! guarantee on the futures returned by trait methods.
24
25pub mod clock;
26pub mod error;
27// Internal: the shared SQL contract between the rusqlite and wasm backends.
28// `pub` only because both backend modules consume it across the cfg split; it
29// is not a stable API, hence `#[doc(hidden)]`.
30#[doc(hidden)]
31pub mod sql_builders;
32// The native rusqlite-backed `Storage` exists whenever either SQLite backend
33// feature is on (the crate refuses to build with neither), and the handlers
34// now consume `StorageBackend` through it, so the impl must track the same
35// condition rather than `native-sqlite` alone.
36#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
37pub mod rusqlite_impl;
38#[cfg(feature = "wasm-sqlite")]
39pub mod wasm_backend;
40
41use crate::storage::{
42 CreateTableMetadata, DatabaseInfo, QueryParams, ScanParams, StreamRecord, TableMetadata,
43 TableStats,
44};
45use crate::types::Tag;
46
47pub use clock::{Clock, ManualClock, SystemClock};
48pub use error::BackendError;
49#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
50pub use error::from_rusqlite;
51#[doc(hidden)]
52pub use sql_builders::SqlParam;
53#[cfg(feature = "wasm-sqlite")]
54pub use wasm_backend::WasmBridgeBackend;
55
56/// One base-table row for a bulk insert via [`StorageBackend::put_base_items`].
57///
58/// Unlike [`StorageBackend::put_item_with_hash`], which preserves any existing
59/// `cached_at` value, the bulk path writes `cached_at` verbatim: this mirrors
60/// the import flow, which sets the timestamp explicitly (or clears it) for
61/// every row it loads.
62#[derive(Debug, Clone)]
63pub struct BaseItemRow {
64 /// Partition key string.
65 pub pk: String,
66 /// Sort key string (empty for tables without a sort key).
67 pub sk: String,
68 /// Serialised item JSON.
69 pub item_json: String,
70 /// Item size in bytes.
71 pub item_size: usize,
72 /// Cache timestamp written verbatim; `None` clears the column.
73 pub cached_at: Option<f64>,
74 /// Hash prefix used for parallel-scan ordering.
75 pub hash_prefix: String,
76}
77
78/// One GSI-table row for a bulk insert via [`StorageBackend::insert_gsi_items`].
79///
80/// The fields mirror the argument order of the single-row
81/// [`StorageBackend::insert_gsi_item`].
82#[derive(Debug, Clone)]
83pub struct GsiItemRow {
84 /// GSI partition key string.
85 pub gsi_pk: String,
86 /// GSI sort key string (empty when the index has no sort key).
87 pub gsi_sk: String,
88 /// Base-table partition key string.
89 pub table_pk: String,
90 /// Base-table sort key string.
91 pub table_sk: String,
92 /// Projected item JSON.
93 pub item_json: String,
94}
95
96/// One index-table write operation, backend-neutral.
97///
98/// The per-write and per-delete GSI/LSI fan-out builds an ordered list of these
99/// and hands it to [`StorageBackend::apply_index_writes`] in a single call. The
100/// default impl replays each op through the matching per-item method, identical
101/// to the per-op loop it replaces; the wasm backend overrides it to collapse the
102/// list into one bridge crossing. Each variant's fields mirror the argument
103/// order of the per-item method it stands in for.
104#[derive(Debug, Clone)]
105pub enum IndexWriteOp {
106 /// Remove this base key's entry from a GSI table.
107 DeleteGsi {
108 table_name: String,
109 index_name: String,
110 table_pk: String,
111 table_sk: String,
112 },
113 /// Insert (or replace) this item's projected entry into a GSI table.
114 InsertGsi {
115 table_name: String,
116 index_name: String,
117 gsi_pk: String,
118 gsi_sk: String,
119 table_pk: String,
120 table_sk: String,
121 item_json: String,
122 },
123 /// Remove this base key's entry from an LSI table.
124 DeleteLsi {
125 table_name: String,
126 index_name: String,
127 base_pk: String,
128 base_sk: String,
129 },
130 /// Insert (or replace) this item's projected entry into an LSI table.
131 InsertLsi {
132 table_name: String,
133 index_name: String,
134 pk: String,
135 sk: String,
136 base_pk: String,
137 base_sk: String,
138 item_json: String,
139 },
140}
141
142/// Backend-neutral storage interface.
143///
144/// Method signatures mirror [`Storage`](crate::storage::Storage)'s public
145/// surface 1:1, with three mechanical transformations:
146///
147/// 1. `Result<T, DynoxideError>` becomes `Result<T, BackendError>`.
148/// 2. `fn` becomes `async fn`.
149/// 3. Filesystem-typed and rusqlite-typed methods are excluded; they remain
150/// on the native [`Storage`](crate::storage::Storage) only.
151///
152/// The trait is not consumed dynamically today. The native
153/// [`Storage`](crate::storage::Storage) and the wasm
154/// [`WasmBridgeBackend`](wasm_backend::WasmBridgeBackend) each implement it
155/// monomorphically.
156///
157/// The `#[allow(async_fn_in_trait)]` reflects the monomorphic-only consumption
158/// model. The lint can be revisited if and when `dyn StorageBackend` becomes
159/// a real callsite.
160#[allow(async_fn_in_trait)]
161pub trait StorageBackend {
162 // -----------------------------------------------------------------------
163 // Capabilities
164 // -----------------------------------------------------------------------
165
166 /// Wall-clock access for the stream and TTL paths.
167 ///
168 /// Sync because reading the clock is not I/O. The native backend returns
169 /// its injected [`Clock`]; the wasm SQLite backend supplies its own.
170 fn clock(&self) -> &dyn Clock;
171
172 // -----------------------------------------------------------------------
173 // Table metadata
174 // -----------------------------------------------------------------------
175
176 async fn insert_table_metadata(&self, m: &CreateTableMetadata<'_>) -> Result<(), BackendError>;
177
178 async fn get_table_metadata(
179 &self,
180 table_name: &str,
181 ) -> Result<Option<TableMetadata>, BackendError>;
182
183 async fn delete_table_metadata(&self, table_name: &str) -> Result<bool, BackendError>;
184
185 async fn update_table_metadata(
186 &self,
187 table_name: &str,
188 attribute_definitions: &str,
189 gsi_definitions: Option<&str>,
190 ) -> Result<(), BackendError>;
191
192 async fn update_provisioned_throughput(
193 &self,
194 table_name: &str,
195 provisioned_throughput: &str,
196 ) -> Result<(), BackendError>;
197
198 async fn clear_provisioned_throughput(&self, table_name: &str) -> Result<(), BackendError>;
199
200 async fn update_billing_mode(
201 &self,
202 table_name: &str,
203 billing_mode: &str,
204 ) -> Result<(), BackendError>;
205
206 async fn update_table_class(
207 &self,
208 table_name: &str,
209 table_class: &str,
210 ) -> Result<(), BackendError>;
211
212 async fn update_on_demand_throughput(
213 &self,
214 table_name: &str,
215 on_demand_throughput: &str,
216 ) -> Result<(), BackendError>;
217
218 async fn get_tags(&self, table_name: &str) -> Result<Vec<Tag>, BackendError>;
219
220 async fn set_tags(&self, table_name: &str, new_tags: &[Tag]) -> Result<(), BackendError>;
221
222 async fn update_deletion_protection(
223 &self,
224 table_name: &str,
225 enabled: bool,
226 ) -> Result<(), BackendError>;
227
228 async fn remove_tags(&self, table_name: &str, keys: &[String]) -> Result<(), BackendError>;
229
230 async fn list_table_names(&self) -> Result<Vec<String>, BackendError>;
231
232 async fn table_exists(&self, table_name: &str) -> Result<bool, BackendError>;
233
234 // -----------------------------------------------------------------------
235 // Dynamic data tables (DDL)
236 // -----------------------------------------------------------------------
237
238 async fn create_data_table(&self, table_name: &str) -> Result<(), BackendError>;
239
240 async fn drop_data_table(&self, table_name: &str) -> Result<(), BackendError>;
241
242 async fn create_gsi_table(
243 &self,
244 table_name: &str,
245 index_name: &str,
246 ) -> Result<(), BackendError>;
247
248 async fn drop_gsi_table(&self, table_name: &str, index_name: &str) -> Result<(), BackendError>;
249
250 async fn create_lsi_table(
251 &self,
252 table_name: &str,
253 index_name: &str,
254 ) -> Result<(), BackendError>;
255
256 async fn drop_lsi_table(&self, table_name: &str, index_name: &str) -> Result<(), BackendError>;
257
258 // -----------------------------------------------------------------------
259 // GSI item operations
260 // -----------------------------------------------------------------------
261
262 #[allow(clippy::too_many_arguments)]
263 async fn insert_gsi_item(
264 &self,
265 table_name: &str,
266 index_name: &str,
267 gsi_pk: &str,
268 gsi_sk: &str,
269 table_pk: &str,
270 table_sk: &str,
271 item_json: &str,
272 ) -> Result<(), BackendError>;
273
274 /// Bulk-insert many rows into one GSI table.
275 ///
276 /// Batch-shaped so a backend can amortise per-row round-trips (the native
277 /// backend reuses a single cached prepared statement). Used by the GSI
278 /// backfill path; the per-row [`insert_gsi_item`](Self::insert_gsi_item)
279 /// covers single writes during normal fan-out.
280 async fn insert_gsi_items(
281 &self,
282 table_name: &str,
283 index_name: &str,
284 rows: &[GsiItemRow],
285 ) -> Result<(), BackendError>;
286
287 async fn delete_gsi_item(
288 &self,
289 table_name: &str,
290 index_name: &str,
291 table_pk: &str,
292 table_sk: &str,
293 ) -> Result<(), BackendError>;
294
295 async fn query_gsi_items(
296 &self,
297 table_name: &str,
298 index_name: &str,
299 gsi_pk: &str,
300 params: &QueryParams<'_>,
301 ) -> Result<Vec<(String, String, String)>, BackendError>;
302
303 async fn scan_gsi_items(
304 &self,
305 table_name: &str,
306 index_name: &str,
307 params: &ScanParams<'_>,
308 ) -> Result<Vec<(String, String, String)>, BackendError>;
309
310 // -----------------------------------------------------------------------
311 // LSI item operations
312 // -----------------------------------------------------------------------
313
314 #[allow(clippy::too_many_arguments)]
315 async fn insert_lsi_item(
316 &self,
317 table_name: &str,
318 index_name: &str,
319 pk: &str,
320 sk: &str,
321 base_pk: &str,
322 base_sk: &str,
323 item_json: &str,
324 ) -> Result<(), BackendError>;
325
326 async fn delete_lsi_item(
327 &self,
328 table_name: &str,
329 index_name: &str,
330 base_pk: &str,
331 base_sk: &str,
332 ) -> Result<(), BackendError>;
333
334 async fn query_lsi_items(
335 &self,
336 table_name: &str,
337 index_name: &str,
338 pk: &str,
339 params: &QueryParams<'_>,
340 ) -> Result<Vec<(String, String, String)>, BackendError>;
341
342 async fn scan_lsi_items(
343 &self,
344 table_name: &str,
345 index_name: &str,
346 params: &ScanParams<'_>,
347 ) -> Result<Vec<(String, String, String)>, BackendError>;
348
349 // -----------------------------------------------------------------------
350 // Index write fan-out
351 // -----------------------------------------------------------------------
352
353 /// Apply an ordered batch of GSI/LSI write operations.
354 ///
355 /// The GSI/LSI maintenance helpers build the list and call this once per
356 /// fan-out instead of invoking the per-item methods one at a time. The
357 /// default impl replays each op through the matching per-item method in
358 /// order, so a backend that does not override it behaves exactly as the
359 /// per-op loop did. The wasm backend overrides this to issue the whole list
360 /// in a single bridge crossing.
361 ///
362 /// Owns no transaction: the caller's open transaction supplies atomicity, so
363 /// a mid-batch failure is rolled back by that caller. An empty list does no
364 /// work.
365 async fn apply_index_writes(&self, ops: &[IndexWriteOp]) -> Result<(), BackendError> {
366 for op in ops {
367 match op {
368 IndexWriteOp::DeleteGsi {
369 table_name,
370 index_name,
371 table_pk,
372 table_sk,
373 } => {
374 self.delete_gsi_item(table_name, index_name, table_pk, table_sk)
375 .await?;
376 }
377 IndexWriteOp::InsertGsi {
378 table_name,
379 index_name,
380 gsi_pk,
381 gsi_sk,
382 table_pk,
383 table_sk,
384 item_json,
385 } => {
386 self.insert_gsi_item(
387 table_name, index_name, gsi_pk, gsi_sk, table_pk, table_sk, item_json,
388 )
389 .await?;
390 }
391 IndexWriteOp::DeleteLsi {
392 table_name,
393 index_name,
394 base_pk,
395 base_sk,
396 } => {
397 self.delete_lsi_item(table_name, index_name, base_pk, base_sk)
398 .await?;
399 }
400 IndexWriteOp::InsertLsi {
401 table_name,
402 index_name,
403 pk,
404 sk,
405 base_pk,
406 base_sk,
407 item_json,
408 } => {
409 self.insert_lsi_item(
410 table_name, index_name, pk, sk, base_pk, base_sk, item_json,
411 )
412 .await?;
413 }
414 }
415 }
416 Ok(())
417 }
418
419 // -----------------------------------------------------------------------
420 // Transactions
421 // -----------------------------------------------------------------------
422
423 async fn begin_transaction(&self) -> Result<(), BackendError>;
424 async fn commit(&self) -> Result<(), BackendError>;
425 async fn rollback(&self) -> Result<(), BackendError>;
426
427 // -----------------------------------------------------------------------
428 // Bulk-loading PRAGMAs
429 // -----------------------------------------------------------------------
430
431 async fn enable_bulk_loading(&self) -> Result<(), BackendError>;
432 async fn disable_bulk_loading(&self) -> Result<(), BackendError>;
433
434 // -----------------------------------------------------------------------
435 // Item CRUD
436 // -----------------------------------------------------------------------
437
438 async fn put_item(
439 &self,
440 table_name: &str,
441 pk: &str,
442 sk: &str,
443 item_json: &str,
444 item_size: usize,
445 ) -> Result<Option<String>, BackendError>;
446
447 #[allow(clippy::too_many_arguments)]
448 async fn put_item_with_hash(
449 &self,
450 table_name: &str,
451 pk: &str,
452 sk: &str,
453 item_json: &str,
454 item_size: usize,
455 hash_prefix: &str,
456 ) -> Result<Option<String>, BackendError>;
457
458 /// Bulk-insert many base-table rows in one call (`INSERT OR REPLACE`).
459 ///
460 /// Batch-shaped so a backend can amortise per-row round-trips (the native
461 /// backend reuses a single cached prepared statement). Used by the import
462 /// path. Writes `cached_at` verbatim from each [`BaseItemRow`]; see the
463 /// note there for how this differs from
464 /// [`put_item_with_hash`](Self::put_item_with_hash).
465 async fn put_base_items(
466 &self,
467 table_name: &str,
468 rows: &[BaseItemRow],
469 ) -> Result<(), BackendError>;
470
471 async fn get_item(
472 &self,
473 table_name: &str,
474 pk: &str,
475 sk: &str,
476 ) -> Result<Option<String>, BackendError>;
477
478 async fn get_partition_size(&self, table_name: &str, pk: &str) -> Result<i64, BackendError>;
479
480 async fn get_lsi_partition_size(
481 &self,
482 table_name: &str,
483 index_name: &str,
484 pk: &str,
485 ) -> Result<i64, BackendError>;
486
487 async fn delete_item(
488 &self,
489 table_name: &str,
490 pk: &str,
491 sk: &str,
492 ) -> Result<Option<String>, BackendError>;
493
494 async fn query_items(
495 &self,
496 table_name: &str,
497 pk: &str,
498 params: &QueryParams<'_>,
499 ) -> Result<Vec<(String, String, String)>, BackendError>;
500
501 async fn scan_items(
502 &self,
503 table_name: &str,
504 params: &ScanParams<'_>,
505 ) -> Result<Vec<(String, String, String)>, BackendError>;
506
507 async fn count_items(&self, table_name: &str) -> Result<i64, BackendError>;
508
509 // -----------------------------------------------------------------------
510 // Introspection
511 // -----------------------------------------------------------------------
512
513 async fn db_size_bytes(&self) -> Result<u64, BackendError>;
514 async fn table_count(&self) -> Result<usize, BackendError>;
515 async fn table_stats(&self) -> Result<Vec<TableStats>, BackendError>;
516 async fn database_info(&self) -> Result<DatabaseInfo, BackendError>;
517 async fn vacuum(&self) -> Result<(), BackendError>;
518
519 // -----------------------------------------------------------------------
520 // Streams
521 // -----------------------------------------------------------------------
522
523 async fn enable_stream(
524 &self,
525 table_name: &str,
526 view_type: &str,
527 label: &str,
528 ) -> Result<(), BackendError>;
529
530 async fn disable_stream(&self, table_name: &str) -> Result<(), BackendError>;
531
532 #[allow(clippy::too_many_arguments)]
533 async fn insert_stream_record(
534 &self,
535 table_name: &str,
536 event_name: &str,
537 keys_json: &str,
538 new_image: Option<&str>,
539 old_image: Option<&str>,
540 sequence_number: &str,
541 shard_id: &str,
542 created_at: i64,
543 ) -> Result<(), BackendError>;
544
545 #[allow(clippy::too_many_arguments)]
546 async fn insert_stream_record_with_identity(
547 &self,
548 table_name: &str,
549 event_name: &str,
550 keys_json: &str,
551 new_image: Option<&str>,
552 old_image: Option<&str>,
553 sequence_number: &str,
554 shard_id: &str,
555 created_at: i64,
556 user_identity: Option<&str>,
557 ) -> Result<(), BackendError>;
558
559 async fn next_stream_sequence_number(&self, table_name: &str) -> Result<i64, BackendError>;
560
561 async fn get_stream_records(
562 &self,
563 table_name: &str,
564 shard_id: &str,
565 after_sequence: i64,
566 limit: usize,
567 ) -> Result<Vec<StreamRecord>, BackendError>;
568
569 async fn list_stream_enabled_tables(&self) -> Result<Vec<TableMetadata>, BackendError>;
570
571 // -----------------------------------------------------------------------
572 // TTL operations
573 // -----------------------------------------------------------------------
574
575 async fn update_ttl_config(
576 &self,
577 table_name: &str,
578 attribute_name: Option<&str>,
579 enabled: bool,
580 ) -> Result<(), BackendError>;
581
582 async fn list_ttl_enabled_tables(&self) -> Result<Vec<TableMetadata>, BackendError>;
583
584 async fn get_shard_sequence_range(
585 &self,
586 table_name: &str,
587 shard_id: &str,
588 ) -> Result<(Option<String>, Option<String>), BackendError>;
589
590 // -----------------------------------------------------------------------
591 // Cache tracking
592 // -----------------------------------------------------------------------
593
594 async fn touch_cached_at(
595 &self,
596 table_name: &str,
597 pk: &str,
598 sk: &str,
599 timestamp: f64,
600 ) -> Result<(), BackendError>;
601
602 async fn get_lru_items(
603 &self,
604 table_name: &str,
605 limit: usize,
606 ) -> Result<Vec<(String, String, i64)>, BackendError>;
607}