Skip to main content

dynoxide/storage_backend/
mod.rs

1//! Storage backend abstraction.
2//!
3//! Defines the [`StorageBackend`] trait that decouples Dynoxide's data layer
4//! from a specific SQLite binding. The native [`rusqlite`]-backed
5//! [`Storage`](crate::storage::Storage) implements the trait, and the
6//! `wasm-sqlite` build adds [`wasm_backend::WasmBridgeBackend`], which runs the
7//! same SQL against a JS wa-sqlite database over a wasm-bindgen bridge. Both
8//! backends issue identical SQL because they share the builders in
9//! [`sql_builders`].
10//!
11//! The native build consumes the trait monomorphically through `Storage`; the
12//! wasm build consumes it through `WasmBridgeBackend`. The escape hatches
13//! `Storage::conn()` and `Storage::conn_mut()` are not exposed by the trait
14//! and remain native-only.
15//!
16//! # No `Send + Sync` super-trait
17//!
18//! [`Storage`](crate::storage::Storage) carries a `RefCell<HashMap<...>>` for
19//! its metadata cache, so `Storage: !Sync`. A `Send + Sync` super-trait would
20//! refuse the impl on `Storage`. With no dynamic dispatch site in scope,
21//! auto-trait propagation across `.await` is decided per-callsite anyway, so
22//! adding `Send` to the super-trait would not earn any compile-time
23//! guarantee on the futures returned by trait methods.
24
25pub mod clock;
26pub mod error;
27// Internal: the shared SQL contract between the rusqlite and wasm backends.
28// `pub` only because both backend modules consume it across the cfg split; it
29// is not a stable API, hence `#[doc(hidden)]`.
30#[doc(hidden)]
31pub mod sql_builders;
32// The native rusqlite-backed `Storage` exists whenever either SQLite backend
33// feature is on (the crate refuses to build with neither), and the handlers
34// now consume `StorageBackend` through it, so the impl must track the same
35// condition rather than `native-sqlite` alone.
36#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
37pub mod rusqlite_impl;
38#[cfg(feature = "wasm-sqlite")]
39pub mod wasm_backend;
40
41use crate::storage::{
42    CreateTableMetadata, DatabaseInfo, QueryParams, ScanParams, StreamRecord, TableMetadata,
43    TableStats,
44};
45use crate::types::Tag;
46
47pub use clock::{Clock, ManualClock, SystemClock};
48pub use error::BackendError;
49#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
50pub use error::from_rusqlite;
51#[doc(hidden)]
52pub use sql_builders::SqlParam;
53#[cfg(feature = "wasm-sqlite")]
54pub use wasm_backend::WasmBridgeBackend;
55
56/// One base-table row for a bulk insert via [`StorageBackend::put_base_items`].
57///
58/// Unlike [`StorageBackend::put_item_with_hash`], which preserves any existing
59/// `cached_at` value, the bulk path writes `cached_at` verbatim: this mirrors
60/// the import flow, which sets the timestamp explicitly (or clears it) for
61/// every row it loads.
62#[derive(Debug, Clone)]
63pub struct BaseItemRow {
64    /// Partition key string.
65    pub pk: String,
66    /// Sort key string (empty for tables without a sort key).
67    pub sk: String,
68    /// Serialised item JSON.
69    pub item_json: String,
70    /// Item size in bytes.
71    pub item_size: usize,
72    /// Cache timestamp written verbatim; `None` clears the column.
73    pub cached_at: Option<f64>,
74    /// Hash prefix used for parallel-scan ordering.
75    pub hash_prefix: String,
76}
77
78/// One GSI-table row for a bulk insert via [`StorageBackend::insert_gsi_items`].
79///
80/// The fields mirror the argument order of the single-row
81/// [`StorageBackend::insert_gsi_item`].
82#[derive(Debug, Clone)]
83pub struct GsiItemRow {
84    /// GSI partition key string.
85    pub gsi_pk: String,
86    /// GSI sort key string (empty when the index has no sort key).
87    pub gsi_sk: String,
88    /// Base-table partition key string.
89    pub table_pk: String,
90    /// Base-table sort key string.
91    pub table_sk: String,
92    /// Projected item JSON.
93    pub item_json: String,
94}
95
96/// Backend-neutral storage interface.
97///
98/// Method signatures mirror [`Storage`](crate::storage::Storage)'s public
99/// surface 1:1, with three mechanical transformations:
100///
101/// 1. `Result<T, DynoxideError>` becomes `Result<T, BackendError>`.
102/// 2. `fn` becomes `async fn`.
103/// 3. Filesystem-typed and rusqlite-typed methods are excluded; they remain
104///    on the native [`Storage`](crate::storage::Storage) only.
105///
106/// The trait is not consumed dynamically today. The native
107/// [`Storage`](crate::storage::Storage) and the wasm
108/// [`WasmBridgeBackend`](wasm_backend::WasmBridgeBackend) each implement it
109/// monomorphically.
110///
111/// The `#[allow(async_fn_in_trait)]` reflects the monomorphic-only consumption
112/// model. The lint can be revisited if and when `dyn StorageBackend` becomes
113/// a real callsite.
114#[allow(async_fn_in_trait)]
115pub trait StorageBackend {
116    // -----------------------------------------------------------------------
117    // Capabilities
118    // -----------------------------------------------------------------------
119
120    /// Wall-clock access for the stream and TTL paths.
121    ///
122    /// Sync because reading the clock is not I/O. The native backend returns
123    /// its injected [`Clock`]; a real wa-sqlite backend supplies its own.
124    fn clock(&self) -> &dyn Clock;
125
126    // -----------------------------------------------------------------------
127    // Table metadata
128    // -----------------------------------------------------------------------
129
130    async fn insert_table_metadata(&self, m: &CreateTableMetadata<'_>) -> Result<(), BackendError>;
131
132    async fn get_table_metadata(
133        &self,
134        table_name: &str,
135    ) -> Result<Option<TableMetadata>, BackendError>;
136
137    async fn delete_table_metadata(&self, table_name: &str) -> Result<bool, BackendError>;
138
139    async fn update_table_metadata(
140        &self,
141        table_name: &str,
142        attribute_definitions: &str,
143        gsi_definitions: Option<&str>,
144    ) -> Result<(), BackendError>;
145
146    async fn update_provisioned_throughput(
147        &self,
148        table_name: &str,
149        provisioned_throughput: &str,
150    ) -> Result<(), BackendError>;
151
152    async fn clear_provisioned_throughput(&self, table_name: &str) -> Result<(), BackendError>;
153
154    async fn update_billing_mode(
155        &self,
156        table_name: &str,
157        billing_mode: &str,
158    ) -> Result<(), BackendError>;
159
160    async fn update_table_class(
161        &self,
162        table_name: &str,
163        table_class: &str,
164    ) -> Result<(), BackendError>;
165
166    async fn update_on_demand_throughput(
167        &self,
168        table_name: &str,
169        on_demand_throughput: &str,
170    ) -> Result<(), BackendError>;
171
172    async fn get_tags(&self, table_name: &str) -> Result<Vec<Tag>, BackendError>;
173
174    async fn set_tags(&self, table_name: &str, new_tags: &[Tag]) -> Result<(), BackendError>;
175
176    async fn update_deletion_protection(
177        &self,
178        table_name: &str,
179        enabled: bool,
180    ) -> Result<(), BackendError>;
181
182    async fn remove_tags(&self, table_name: &str, keys: &[String]) -> Result<(), BackendError>;
183
184    async fn list_table_names(&self) -> Result<Vec<String>, BackendError>;
185
186    async fn table_exists(&self, table_name: &str) -> Result<bool, BackendError>;
187
188    // -----------------------------------------------------------------------
189    // Dynamic data tables (DDL)
190    // -----------------------------------------------------------------------
191
192    async fn create_data_table(&self, table_name: &str) -> Result<(), BackendError>;
193
194    async fn drop_data_table(&self, table_name: &str) -> Result<(), BackendError>;
195
196    async fn create_gsi_table(
197        &self,
198        table_name: &str,
199        index_name: &str,
200    ) -> Result<(), BackendError>;
201
202    async fn drop_gsi_table(&self, table_name: &str, index_name: &str) -> Result<(), BackendError>;
203
204    async fn create_lsi_table(
205        &self,
206        table_name: &str,
207        index_name: &str,
208    ) -> Result<(), BackendError>;
209
210    async fn drop_lsi_table(&self, table_name: &str, index_name: &str) -> Result<(), BackendError>;
211
212    // -----------------------------------------------------------------------
213    // GSI item operations
214    // -----------------------------------------------------------------------
215
216    #[allow(clippy::too_many_arguments)]
217    async fn insert_gsi_item(
218        &self,
219        table_name: &str,
220        index_name: &str,
221        gsi_pk: &str,
222        gsi_sk: &str,
223        table_pk: &str,
224        table_sk: &str,
225        item_json: &str,
226    ) -> Result<(), BackendError>;
227
228    /// Bulk-insert many rows into one GSI table.
229    ///
230    /// Batch-shaped so a backend can amortise per-row round-trips (the native
231    /// backend reuses a single cached prepared statement). Used by the GSI
232    /// backfill path; the per-row [`insert_gsi_item`](Self::insert_gsi_item)
233    /// covers single writes during normal fan-out.
234    async fn insert_gsi_items(
235        &self,
236        table_name: &str,
237        index_name: &str,
238        rows: &[GsiItemRow],
239    ) -> Result<(), BackendError>;
240
241    async fn delete_gsi_item(
242        &self,
243        table_name: &str,
244        index_name: &str,
245        table_pk: &str,
246        table_sk: &str,
247    ) -> Result<(), BackendError>;
248
249    async fn query_gsi_items(
250        &self,
251        table_name: &str,
252        index_name: &str,
253        gsi_pk: &str,
254        params: &QueryParams<'_>,
255    ) -> Result<Vec<(String, String, String)>, BackendError>;
256
257    async fn scan_gsi_items(
258        &self,
259        table_name: &str,
260        index_name: &str,
261        params: &ScanParams<'_>,
262    ) -> Result<Vec<(String, String, String)>, BackendError>;
263
264    // -----------------------------------------------------------------------
265    // LSI item operations
266    // -----------------------------------------------------------------------
267
268    #[allow(clippy::too_many_arguments)]
269    async fn insert_lsi_item(
270        &self,
271        table_name: &str,
272        index_name: &str,
273        pk: &str,
274        sk: &str,
275        base_pk: &str,
276        base_sk: &str,
277        item_json: &str,
278    ) -> Result<(), BackendError>;
279
280    async fn delete_lsi_item(
281        &self,
282        table_name: &str,
283        index_name: &str,
284        base_pk: &str,
285        base_sk: &str,
286    ) -> Result<(), BackendError>;
287
288    async fn query_lsi_items(
289        &self,
290        table_name: &str,
291        index_name: &str,
292        pk: &str,
293        params: &QueryParams<'_>,
294    ) -> Result<Vec<(String, String, String)>, BackendError>;
295
296    async fn scan_lsi_items(
297        &self,
298        table_name: &str,
299        index_name: &str,
300        params: &ScanParams<'_>,
301    ) -> Result<Vec<(String, String, String)>, BackendError>;
302
303    // -----------------------------------------------------------------------
304    // Transactions
305    // -----------------------------------------------------------------------
306
307    async fn begin_transaction(&self) -> Result<(), BackendError>;
308    async fn commit(&self) -> Result<(), BackendError>;
309    async fn rollback(&self) -> Result<(), BackendError>;
310
311    // -----------------------------------------------------------------------
312    // Bulk-loading PRAGMAs
313    // -----------------------------------------------------------------------
314
315    async fn enable_bulk_loading(&self) -> Result<(), BackendError>;
316    async fn disable_bulk_loading(&self) -> Result<(), BackendError>;
317
318    // -----------------------------------------------------------------------
319    // Item CRUD
320    // -----------------------------------------------------------------------
321
322    async fn put_item(
323        &self,
324        table_name: &str,
325        pk: &str,
326        sk: &str,
327        item_json: &str,
328        item_size: usize,
329    ) -> Result<Option<String>, BackendError>;
330
331    #[allow(clippy::too_many_arguments)]
332    async fn put_item_with_hash(
333        &self,
334        table_name: &str,
335        pk: &str,
336        sk: &str,
337        item_json: &str,
338        item_size: usize,
339        hash_prefix: &str,
340    ) -> Result<Option<String>, BackendError>;
341
342    /// Bulk-insert many base-table rows in one call (`INSERT OR REPLACE`).
343    ///
344    /// Batch-shaped so a backend can amortise per-row round-trips (the native
345    /// backend reuses a single cached prepared statement). Used by the import
346    /// path. Writes `cached_at` verbatim from each [`BaseItemRow`]; see the
347    /// note there for how this differs from
348    /// [`put_item_with_hash`](Self::put_item_with_hash).
349    async fn put_base_items(
350        &self,
351        table_name: &str,
352        rows: &[BaseItemRow],
353    ) -> Result<(), BackendError>;
354
355    async fn get_item(
356        &self,
357        table_name: &str,
358        pk: &str,
359        sk: &str,
360    ) -> Result<Option<String>, BackendError>;
361
362    async fn get_partition_size(&self, table_name: &str, pk: &str) -> Result<i64, BackendError>;
363
364    async fn get_lsi_partition_size(
365        &self,
366        table_name: &str,
367        index_name: &str,
368        pk: &str,
369    ) -> Result<i64, BackendError>;
370
371    async fn delete_item(
372        &self,
373        table_name: &str,
374        pk: &str,
375        sk: &str,
376    ) -> Result<Option<String>, BackendError>;
377
378    async fn query_items(
379        &self,
380        table_name: &str,
381        pk: &str,
382        params: &QueryParams<'_>,
383    ) -> Result<Vec<(String, String, String)>, BackendError>;
384
385    async fn scan_items(
386        &self,
387        table_name: &str,
388        params: &ScanParams<'_>,
389    ) -> Result<Vec<(String, String, String)>, BackendError>;
390
391    async fn count_items(&self, table_name: &str) -> Result<i64, BackendError>;
392
393    // -----------------------------------------------------------------------
394    // Introspection
395    // -----------------------------------------------------------------------
396
397    async fn db_size_bytes(&self) -> Result<u64, BackendError>;
398    async fn table_count(&self) -> Result<usize, BackendError>;
399    async fn table_stats(&self) -> Result<Vec<TableStats>, BackendError>;
400    async fn database_info(&self) -> Result<DatabaseInfo, BackendError>;
401    async fn vacuum(&self) -> Result<(), BackendError>;
402
403    // -----------------------------------------------------------------------
404    // Streams
405    // -----------------------------------------------------------------------
406
407    async fn enable_stream(
408        &self,
409        table_name: &str,
410        view_type: &str,
411        label: &str,
412    ) -> Result<(), BackendError>;
413
414    async fn disable_stream(&self, table_name: &str) -> Result<(), BackendError>;
415
416    #[allow(clippy::too_many_arguments)]
417    async fn insert_stream_record(
418        &self,
419        table_name: &str,
420        event_name: &str,
421        keys_json: &str,
422        new_image: Option<&str>,
423        old_image: Option<&str>,
424        sequence_number: &str,
425        shard_id: &str,
426        created_at: i64,
427    ) -> Result<(), BackendError>;
428
429    #[allow(clippy::too_many_arguments)]
430    async fn insert_stream_record_with_identity(
431        &self,
432        table_name: &str,
433        event_name: &str,
434        keys_json: &str,
435        new_image: Option<&str>,
436        old_image: Option<&str>,
437        sequence_number: &str,
438        shard_id: &str,
439        created_at: i64,
440        user_identity: Option<&str>,
441    ) -> Result<(), BackendError>;
442
443    async fn next_stream_sequence_number(&self, table_name: &str) -> Result<i64, BackendError>;
444
445    async fn get_stream_records(
446        &self,
447        table_name: &str,
448        shard_id: &str,
449        after_sequence: i64,
450        limit: usize,
451    ) -> Result<Vec<StreamRecord>, BackendError>;
452
453    async fn list_stream_enabled_tables(&self) -> Result<Vec<TableMetadata>, BackendError>;
454
455    // -----------------------------------------------------------------------
456    // TTL operations
457    // -----------------------------------------------------------------------
458
459    async fn update_ttl_config(
460        &self,
461        table_name: &str,
462        attribute_name: Option<&str>,
463        enabled: bool,
464    ) -> Result<(), BackendError>;
465
466    async fn list_ttl_enabled_tables(&self) -> Result<Vec<TableMetadata>, BackendError>;
467
468    async fn get_shard_sequence_range(
469        &self,
470        table_name: &str,
471        shard_id: &str,
472    ) -> Result<(Option<String>, Option<String>), BackendError>;
473
474    // -----------------------------------------------------------------------
475    // Cache tracking
476    // -----------------------------------------------------------------------
477
478    async fn touch_cached_at(
479        &self,
480        table_name: &str,
481        pk: &str,
482        sk: &str,
483        timestamp: f64,
484    ) -> Result<(), BackendError>;
485
486    async fn get_lru_items(
487        &self,
488        table_name: &str,
489        limit: usize,
490    ) -> Result<Vec<(String, String, i64)>, BackendError>;
491}