Skip to main content

dynoxide/storage_backend/
mod.rs

1//! Storage backend abstraction.
2//!
3//! Defines the [`StorageBackend`] trait that decouples Dynoxide's data layer
4//! from a specific SQLite binding. The native [`rusqlite`]-backed
5//! [`Storage`](crate::storage::Storage) implements the trait, and the
6//! `wasm-sqlite` build adds [`wasm_backend::WasmBridgeBackend`], which runs the
7//! same SQL against a JS SQLite database over a wasm-bindgen bridge. Both
8//! backends issue identical SQL because they share the builders in
9//! [`sql_builders`].
10//!
11//! The native build consumes the trait monomorphically through `Storage`; the
12//! wasm build consumes it through `WasmBridgeBackend`. The escape hatches
13//! `Storage::conn()` and `Storage::conn_mut()` are not exposed by the trait
14//! and remain native-only.
15//!
16//! # No `Send + Sync` super-trait
17//!
18//! [`Storage`](crate::storage::Storage) carries a `RefCell<HashMap<...>>` for
19//! its metadata cache, so `Storage: !Sync`. A `Send + Sync` super-trait would
20//! refuse the impl on `Storage`. With no dynamic dispatch site in scope,
21//! auto-trait propagation across `.await` is decided per-callsite anyway, so
22//! adding `Send` to the super-trait would not earn any compile-time
23//! guarantee on the futures returned by trait methods.
24
25pub mod clock;
26pub mod error;
27// Internal: the shared SQL contract between the rusqlite and wasm backends.
28// `pub` only because both backend modules consume it across the cfg split; it
29// is not a stable API, hence `#[doc(hidden)]`.
30#[doc(hidden)]
31pub mod sql_builders;
32// The native rusqlite-backed `Storage` exists whenever either SQLite backend
33// feature is on (the crate refuses to build with neither), and the handlers
34// now consume `StorageBackend` through it, so the impl must track the same
35// condition rather than `native-sqlite` alone.
36#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
37pub mod rusqlite_impl;
38#[cfg(feature = "wasm-sqlite")]
39pub mod wasm_backend;
40
41use crate::storage::{
42    CreateTableMetadata, DatabaseInfo, QueryParams, ScanParams, StreamRecord, TableMetadata,
43    TableStats,
44};
45use crate::types::Tag;
46
47pub use clock::{Clock, ManualClock, SystemClock};
48pub use error::BackendError;
49#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
50pub use error::from_rusqlite;
51#[doc(hidden)]
52pub use sql_builders::SqlParam;
53#[cfg(feature = "wasm-sqlite")]
54pub use wasm_backend::WasmBridgeBackend;
55
56/// One base-table row for a bulk insert via [`StorageBackend::put_base_items`].
57///
58/// Unlike [`StorageBackend::put_item_with_hash`], which preserves any existing
59/// `cached_at` value, the bulk path writes `cached_at` verbatim: this mirrors
60/// the import flow, which sets the timestamp explicitly (or clears it) for
61/// every row it loads.
62#[derive(Debug, Clone)]
63pub struct BaseItemRow {
64    /// Partition key string.
65    pub pk: String,
66    /// Sort key string (empty for tables without a sort key).
67    pub sk: String,
68    /// Serialised item JSON.
69    pub item_json: String,
70    /// Item size in bytes.
71    pub item_size: usize,
72    /// Cache timestamp written verbatim; `None` clears the column.
73    pub cached_at: Option<f64>,
74    /// Hash prefix used for parallel-scan ordering.
75    pub hash_prefix: String,
76}
77
78/// One GSI-table row for a bulk insert via [`StorageBackend::insert_gsi_items`].
79///
80/// The fields mirror the argument order of the single-row
81/// [`StorageBackend::insert_gsi_item`].
82#[derive(Debug, Clone)]
83pub struct GsiItemRow {
84    /// GSI partition key string.
85    pub gsi_pk: String,
86    /// GSI sort key string (empty when the index has no sort key).
87    pub gsi_sk: String,
88    /// Base-table partition key string.
89    pub table_pk: String,
90    /// Base-table sort key string.
91    pub table_sk: String,
92    /// Projected item JSON.
93    pub item_json: String,
94}
95
96/// One index-table write operation, backend-neutral.
97///
98/// The per-write and per-delete GSI/LSI fan-out builds an ordered list of these
99/// and hands it to [`StorageBackend::apply_index_writes`] in a single call. The
100/// default impl replays each op through the matching per-item method, identical
101/// to the per-op loop it replaces; the wasm backend overrides it to collapse the
102/// list into one bridge crossing. Each variant's fields mirror the argument
103/// order of the per-item method it stands in for.
104#[derive(Debug, Clone)]
105pub enum IndexWriteOp {
106    /// Remove this base key's entry from a GSI table.
107    DeleteGsi {
108        table_name: String,
109        index_name: String,
110        table_pk: String,
111        table_sk: String,
112    },
113    /// Insert (or replace) this item's projected entry into a GSI table.
114    InsertGsi {
115        table_name: String,
116        index_name: String,
117        gsi_pk: String,
118        gsi_sk: String,
119        table_pk: String,
120        table_sk: String,
121        item_json: String,
122    },
123    /// Remove this base key's entry from an LSI table.
124    DeleteLsi {
125        table_name: String,
126        index_name: String,
127        base_pk: String,
128        base_sk: String,
129    },
130    /// Insert (or replace) this item's projected entry into an LSI table.
131    InsertLsi {
132        table_name: String,
133        index_name: String,
134        pk: String,
135        sk: String,
136        base_pk: String,
137        base_sk: String,
138        item_json: String,
139    },
140}
141
142/// Backend-neutral storage interface.
143///
144/// Method signatures mirror [`Storage`](crate::storage::Storage)'s public
145/// surface 1:1, with three mechanical transformations:
146///
147/// 1. `Result<T, DynoxideError>` becomes `Result<T, BackendError>`.
148/// 2. `fn` becomes `async fn`.
149/// 3. Filesystem-typed and rusqlite-typed methods are excluded; they remain
150///    on the native [`Storage`](crate::storage::Storage) only.
151///
152/// The trait is not consumed dynamically today. The native
153/// [`Storage`](crate::storage::Storage) and the wasm
154/// [`WasmBridgeBackend`](wasm_backend::WasmBridgeBackend) each implement it
155/// monomorphically.
156///
157/// The `#[allow(async_fn_in_trait)]` reflects the monomorphic-only consumption
158/// model. The lint can be revisited if and when `dyn StorageBackend` becomes
159/// a real callsite.
160#[allow(async_fn_in_trait)]
161pub trait StorageBackend {
162    // -----------------------------------------------------------------------
163    // Capabilities
164    // -----------------------------------------------------------------------
165
166    /// Wall-clock access for the stream and TTL paths.
167    ///
168    /// Sync because reading the clock is not I/O. The native backend returns
169    /// its injected [`Clock`]; the wasm SQLite backend supplies its own.
170    fn clock(&self) -> &dyn Clock;
171
172    // -----------------------------------------------------------------------
173    // Table metadata
174    // -----------------------------------------------------------------------
175
176    async fn insert_table_metadata(&self, m: &CreateTableMetadata<'_>) -> Result<(), BackendError>;
177
178    async fn get_table_metadata(
179        &self,
180        table_name: &str,
181    ) -> Result<Option<TableMetadata>, BackendError>;
182
183    async fn delete_table_metadata(&self, table_name: &str) -> Result<bool, BackendError>;
184
185    async fn update_table_metadata(
186        &self,
187        table_name: &str,
188        attribute_definitions: &str,
189        gsi_definitions: Option<&str>,
190    ) -> Result<(), BackendError>;
191
192    async fn update_provisioned_throughput(
193        &self,
194        table_name: &str,
195        provisioned_throughput: &str,
196    ) -> Result<(), BackendError>;
197
198    async fn clear_provisioned_throughput(&self, table_name: &str) -> Result<(), BackendError>;
199
200    async fn update_billing_mode(
201        &self,
202        table_name: &str,
203        billing_mode: &str,
204    ) -> Result<(), BackendError>;
205
206    async fn update_table_class(
207        &self,
208        table_name: &str,
209        table_class: &str,
210    ) -> Result<(), BackendError>;
211
212    async fn update_on_demand_throughput(
213        &self,
214        table_name: &str,
215        on_demand_throughput: &str,
216    ) -> Result<(), BackendError>;
217
218    async fn get_tags(&self, table_name: &str) -> Result<Vec<Tag>, BackendError>;
219
220    async fn set_tags(&self, table_name: &str, new_tags: &[Tag]) -> Result<(), BackendError>;
221
222    async fn update_deletion_protection(
223        &self,
224        table_name: &str,
225        enabled: bool,
226    ) -> Result<(), BackendError>;
227
228    async fn remove_tags(&self, table_name: &str, keys: &[String]) -> Result<(), BackendError>;
229
230    async fn list_table_names(&self) -> Result<Vec<String>, BackendError>;
231
232    async fn table_exists(&self, table_name: &str) -> Result<bool, BackendError>;
233
234    // -----------------------------------------------------------------------
235    // Dynamic data tables (DDL)
236    // -----------------------------------------------------------------------
237
238    async fn create_data_table(&self, table_name: &str) -> Result<(), BackendError>;
239
240    async fn drop_data_table(&self, table_name: &str) -> Result<(), BackendError>;
241
242    async fn create_gsi_table(
243        &self,
244        table_name: &str,
245        index_name: &str,
246    ) -> Result<(), BackendError>;
247
248    async fn drop_gsi_table(&self, table_name: &str, index_name: &str) -> Result<(), BackendError>;
249
250    async fn create_lsi_table(
251        &self,
252        table_name: &str,
253        index_name: &str,
254    ) -> Result<(), BackendError>;
255
256    async fn drop_lsi_table(&self, table_name: &str, index_name: &str) -> Result<(), BackendError>;
257
258    // -----------------------------------------------------------------------
259    // GSI item operations
260    // -----------------------------------------------------------------------
261
262    #[allow(clippy::too_many_arguments)]
263    async fn insert_gsi_item(
264        &self,
265        table_name: &str,
266        index_name: &str,
267        gsi_pk: &str,
268        gsi_sk: &str,
269        table_pk: &str,
270        table_sk: &str,
271        item_json: &str,
272    ) -> Result<(), BackendError>;
273
274    /// Bulk-insert many rows into one GSI table.
275    ///
276    /// Batch-shaped so a backend can amortise per-row round-trips (the native
277    /// backend reuses a single cached prepared statement). Used by the GSI
278    /// backfill path; the per-row [`insert_gsi_item`](Self::insert_gsi_item)
279    /// covers single writes during normal fan-out.
280    async fn insert_gsi_items(
281        &self,
282        table_name: &str,
283        index_name: &str,
284        rows: &[GsiItemRow],
285    ) -> Result<(), BackendError>;
286
287    async fn delete_gsi_item(
288        &self,
289        table_name: &str,
290        index_name: &str,
291        table_pk: &str,
292        table_sk: &str,
293    ) -> Result<(), BackendError>;
294
295    async fn query_gsi_items(
296        &self,
297        table_name: &str,
298        index_name: &str,
299        gsi_pk: &str,
300        params: &QueryParams<'_>,
301    ) -> Result<Vec<(String, String, String)>, BackendError>;
302
303    async fn scan_gsi_items(
304        &self,
305        table_name: &str,
306        index_name: &str,
307        params: &ScanParams<'_>,
308    ) -> Result<Vec<(String, String, String)>, BackendError>;
309
310    // -----------------------------------------------------------------------
311    // LSI item operations
312    // -----------------------------------------------------------------------
313
314    #[allow(clippy::too_many_arguments)]
315    async fn insert_lsi_item(
316        &self,
317        table_name: &str,
318        index_name: &str,
319        pk: &str,
320        sk: &str,
321        base_pk: &str,
322        base_sk: &str,
323        item_json: &str,
324    ) -> Result<(), BackendError>;
325
326    async fn delete_lsi_item(
327        &self,
328        table_name: &str,
329        index_name: &str,
330        base_pk: &str,
331        base_sk: &str,
332    ) -> Result<(), BackendError>;
333
334    async fn query_lsi_items(
335        &self,
336        table_name: &str,
337        index_name: &str,
338        pk: &str,
339        params: &QueryParams<'_>,
340    ) -> Result<Vec<(String, String, String)>, BackendError>;
341
342    async fn scan_lsi_items(
343        &self,
344        table_name: &str,
345        index_name: &str,
346        params: &ScanParams<'_>,
347    ) -> Result<Vec<(String, String, String)>, BackendError>;
348
349    // -----------------------------------------------------------------------
350    // Index write fan-out
351    // -----------------------------------------------------------------------
352
353    /// Apply an ordered batch of GSI/LSI write operations.
354    ///
355    /// The GSI/LSI maintenance helpers build the list and call this once per
356    /// fan-out instead of invoking the per-item methods one at a time. The
357    /// default impl replays each op through the matching per-item method in
358    /// order, so a backend that does not override it behaves exactly as the
359    /// per-op loop did. The wasm backend overrides this to issue the whole list
360    /// in a single bridge crossing.
361    ///
362    /// Owns no transaction: the caller's open transaction supplies atomicity, so
363    /// a mid-batch failure is rolled back by that caller. An empty list does no
364    /// work.
365    async fn apply_index_writes(&self, ops: &[IndexWriteOp]) -> Result<(), BackendError> {
366        for op in ops {
367            match op {
368                IndexWriteOp::DeleteGsi {
369                    table_name,
370                    index_name,
371                    table_pk,
372                    table_sk,
373                } => {
374                    self.delete_gsi_item(table_name, index_name, table_pk, table_sk)
375                        .await?;
376                }
377                IndexWriteOp::InsertGsi {
378                    table_name,
379                    index_name,
380                    gsi_pk,
381                    gsi_sk,
382                    table_pk,
383                    table_sk,
384                    item_json,
385                } => {
386                    self.insert_gsi_item(
387                        table_name, index_name, gsi_pk, gsi_sk, table_pk, table_sk, item_json,
388                    )
389                    .await?;
390                }
391                IndexWriteOp::DeleteLsi {
392                    table_name,
393                    index_name,
394                    base_pk,
395                    base_sk,
396                } => {
397                    self.delete_lsi_item(table_name, index_name, base_pk, base_sk)
398                        .await?;
399                }
400                IndexWriteOp::InsertLsi {
401                    table_name,
402                    index_name,
403                    pk,
404                    sk,
405                    base_pk,
406                    base_sk,
407                    item_json,
408                } => {
409                    self.insert_lsi_item(
410                        table_name, index_name, pk, sk, base_pk, base_sk, item_json,
411                    )
412                    .await?;
413                }
414            }
415        }
416        Ok(())
417    }
418
419    // -----------------------------------------------------------------------
420    // Transactions
421    // -----------------------------------------------------------------------
422
423    async fn begin_transaction(&self) -> Result<(), BackendError>;
424    async fn commit(&self) -> Result<(), BackendError>;
425    async fn rollback(&self) -> Result<(), BackendError>;
426
427    // -----------------------------------------------------------------------
428    // Bulk-loading PRAGMAs
429    // -----------------------------------------------------------------------
430
431    async fn enable_bulk_loading(&self) -> Result<(), BackendError>;
432    async fn disable_bulk_loading(&self) -> Result<(), BackendError>;
433
434    // -----------------------------------------------------------------------
435    // Item CRUD
436    // -----------------------------------------------------------------------
437
438    async fn put_item(
439        &self,
440        table_name: &str,
441        pk: &str,
442        sk: &str,
443        item_json: &str,
444        item_size: usize,
445    ) -> Result<Option<String>, BackendError>;
446
447    #[allow(clippy::too_many_arguments)]
448    async fn put_item_with_hash(
449        &self,
450        table_name: &str,
451        pk: &str,
452        sk: &str,
453        item_json: &str,
454        item_size: usize,
455        hash_prefix: &str,
456    ) -> Result<Option<String>, BackendError>;
457
458    /// Bulk-insert many base-table rows in one call (`INSERT OR REPLACE`).
459    ///
460    /// Batch-shaped so a backend can amortise per-row round-trips (the native
461    /// backend reuses a single cached prepared statement). Used by the import
462    /// path. Writes `cached_at` verbatim from each [`BaseItemRow`]; see the
463    /// note there for how this differs from
464    /// [`put_item_with_hash`](Self::put_item_with_hash).
465    async fn put_base_items(
466        &self,
467        table_name: &str,
468        rows: &[BaseItemRow],
469    ) -> Result<(), BackendError>;
470
471    async fn get_item(
472        &self,
473        table_name: &str,
474        pk: &str,
475        sk: &str,
476    ) -> Result<Option<String>, BackendError>;
477
478    async fn get_partition_size(&self, table_name: &str, pk: &str) -> Result<i64, BackendError>;
479
480    async fn get_lsi_partition_size(
481        &self,
482        table_name: &str,
483        index_name: &str,
484        pk: &str,
485    ) -> Result<i64, BackendError>;
486
487    async fn delete_item(
488        &self,
489        table_name: &str,
490        pk: &str,
491        sk: &str,
492    ) -> Result<Option<String>, BackendError>;
493
494    async fn query_items(
495        &self,
496        table_name: &str,
497        pk: &str,
498        params: &QueryParams<'_>,
499    ) -> Result<Vec<(String, String, String)>, BackendError>;
500
501    async fn scan_items(
502        &self,
503        table_name: &str,
504        params: &ScanParams<'_>,
505    ) -> Result<Vec<(String, String, String)>, BackendError>;
506
507    async fn count_items(&self, table_name: &str) -> Result<i64, BackendError>;
508
509    // -----------------------------------------------------------------------
510    // Introspection
511    // -----------------------------------------------------------------------
512
513    async fn db_size_bytes(&self) -> Result<u64, BackendError>;
514    async fn table_count(&self) -> Result<usize, BackendError>;
515    async fn table_stats(&self) -> Result<Vec<TableStats>, BackendError>;
516    async fn database_info(&self) -> Result<DatabaseInfo, BackendError>;
517    async fn vacuum(&self) -> Result<(), BackendError>;
518
519    // -----------------------------------------------------------------------
520    // Streams
521    // -----------------------------------------------------------------------
522
523    async fn enable_stream(
524        &self,
525        table_name: &str,
526        view_type: &str,
527        label: &str,
528    ) -> Result<(), BackendError>;
529
530    async fn disable_stream(&self, table_name: &str) -> Result<(), BackendError>;
531
532    #[allow(clippy::too_many_arguments)]
533    async fn insert_stream_record(
534        &self,
535        table_name: &str,
536        event_name: &str,
537        keys_json: &str,
538        new_image: Option<&str>,
539        old_image: Option<&str>,
540        sequence_number: &str,
541        shard_id: &str,
542        created_at: i64,
543    ) -> Result<(), BackendError>;
544
545    #[allow(clippy::too_many_arguments)]
546    async fn insert_stream_record_with_identity(
547        &self,
548        table_name: &str,
549        event_name: &str,
550        keys_json: &str,
551        new_image: Option<&str>,
552        old_image: Option<&str>,
553        sequence_number: &str,
554        shard_id: &str,
555        created_at: i64,
556        user_identity: Option<&str>,
557    ) -> Result<(), BackendError>;
558
559    async fn next_stream_sequence_number(&self, table_name: &str) -> Result<i64, BackendError>;
560
561    async fn get_stream_records(
562        &self,
563        table_name: &str,
564        shard_id: &str,
565        after_sequence: i64,
566        limit: usize,
567    ) -> Result<Vec<StreamRecord>, BackendError>;
568
569    async fn list_stream_enabled_tables(&self) -> Result<Vec<TableMetadata>, BackendError>;
570
571    // -----------------------------------------------------------------------
572    // TTL operations
573    // -----------------------------------------------------------------------
574
575    async fn update_ttl_config(
576        &self,
577        table_name: &str,
578        attribute_name: Option<&str>,
579        enabled: bool,
580    ) -> Result<(), BackendError>;
581
582    async fn list_ttl_enabled_tables(&self) -> Result<Vec<TableMetadata>, BackendError>;
583
584    async fn get_shard_sequence_range(
585        &self,
586        table_name: &str,
587        shard_id: &str,
588    ) -> Result<(Option<String>, Option<String>), BackendError>;
589
590    // -----------------------------------------------------------------------
591    // Cache tracking
592    // -----------------------------------------------------------------------
593
594    async fn touch_cached_at(
595        &self,
596        table_name: &str,
597        pk: &str,
598        sk: &str,
599        timestamp: f64,
600    ) -> Result<(), BackendError>;
601
602    async fn get_lru_items(
603        &self,
604        table_name: &str,
605        limit: usize,
606    ) -> Result<Vec<(String, String, i64)>, BackendError>;
607}