Skip to main content

dynoxide/storage_backend/
rusqlite_impl.rs

1//! Native [`StorageBackend`] implementation backed by the rusqlite-typed
2//! [`Storage`].
3//!
4//! Each impl method is a thin async wrapper over the existing sync method on
5//! `Storage`: the body invokes the sync method synchronously and returns a
6//! ready future. No `spawn_blocking` is used, so today's behaviour of running
7//! rusqlite calls on the active executor thread is preserved exactly.
8//!
9//! # Load-bearing invariant: native futures never suspend
10//!
11//! Every method here resolves on the first poll (it runs synchronous work and
12//! returns `Ready`). The synchronous `Database` facade depends on this: it
13//! drives these futures with `block_on`, which is only safe to call from inside
14//! the tokio-based HTTP and MCP servers because an always-ready future never
15//! parks the worker thread. Do not introduce a real `.await` on external async
16//! I/O (a socket, a timer, a task spawn, a channel) into any method here
17//! without first moving the facade off `block_on`; otherwise the server worker
18//! threads will stall.
19
20use crate::errors::DynoxideError;
21use crate::storage::{
22    CreateTableMetadata, DatabaseInfo, QueryParams, ScanParams, Storage, StreamRecord,
23    TableMetadata, TableStats,
24};
25use crate::storage_backend::{BackendError, BaseItemRow, Clock, GsiItemRow, StorageBackend, error};
26use crate::types::Tag;
27
28/// Convert a [`DynoxideError`] into a [`BackendError`].
29///
30/// Storage's sync surface returns `Result<T, DynoxideError>`; the trait surface
31/// returns `Result<T, BackendError>`. The conversion preserves rusqlite error
32/// codes via [`error::from_rusqlite`]. A client-facing `ValidationException`
33/// (a backend method such as `set_tags` enforces the tag-count limit) is
34/// preserved as [`BackendError::Validation`] so the reverse conversion can
35/// restore its 400 envelope; any other variant falls through to
36/// [`BackendError::Other`] carrying the original `Display` output.
37fn dyno_to_backend(err: DynoxideError) -> BackendError {
38    match err {
39        DynoxideError::SqliteError(e) => error::from_rusqlite(e),
40        DynoxideError::ValidationException(msg) => BackendError::Validation(msg),
41        other => BackendError::Other(other.to_string()),
42    }
43}
44
45impl StorageBackend for Storage {
46    fn clock(&self) -> &dyn Clock {
47        Storage::clock(self)
48    }
49
50    async fn insert_table_metadata(&self, m: &CreateTableMetadata<'_>) -> Result<(), BackendError> {
51        Storage::insert_table_metadata(self, m).map_err(dyno_to_backend)
52    }
53
54    async fn get_table_metadata(
55        &self,
56        table_name: &str,
57    ) -> Result<Option<TableMetadata>, BackendError> {
58        Storage::get_table_metadata(self, table_name).map_err(dyno_to_backend)
59    }
60
61    async fn delete_table_metadata(&self, table_name: &str) -> Result<bool, BackendError> {
62        Storage::delete_table_metadata(self, table_name).map_err(dyno_to_backend)
63    }
64
65    async fn update_table_metadata(
66        &self,
67        table_name: &str,
68        attribute_definitions: &str,
69        gsi_definitions: Option<&str>,
70    ) -> Result<(), BackendError> {
71        Storage::update_table_metadata(self, table_name, attribute_definitions, gsi_definitions)
72            .map_err(dyno_to_backend)
73    }
74
75    async fn update_provisioned_throughput(
76        &self,
77        table_name: &str,
78        provisioned_throughput: &str,
79    ) -> Result<(), BackendError> {
80        Storage::update_provisioned_throughput(self, table_name, provisioned_throughput)
81            .map_err(dyno_to_backend)
82    }
83
84    async fn clear_provisioned_throughput(&self, table_name: &str) -> Result<(), BackendError> {
85        Storage::clear_provisioned_throughput(self, table_name).map_err(dyno_to_backend)
86    }
87
88    async fn update_billing_mode(
89        &self,
90        table_name: &str,
91        billing_mode: &str,
92    ) -> Result<(), BackendError> {
93        Storage::update_billing_mode(self, table_name, billing_mode).map_err(dyno_to_backend)
94    }
95
96    async fn update_table_class(
97        &self,
98        table_name: &str,
99        table_class: &str,
100    ) -> Result<(), BackendError> {
101        Storage::update_table_class(self, table_name, table_class).map_err(dyno_to_backend)
102    }
103
104    async fn update_on_demand_throughput(
105        &self,
106        table_name: &str,
107        on_demand_throughput: &str,
108    ) -> Result<(), BackendError> {
109        Storage::update_on_demand_throughput(self, table_name, on_demand_throughput)
110            .map_err(dyno_to_backend)
111    }
112
113    async fn get_tags(&self, table_name: &str) -> Result<Vec<Tag>, BackendError> {
114        Storage::get_tags(self, table_name).map_err(dyno_to_backend)
115    }
116
117    async fn set_tags(&self, table_name: &str, new_tags: &[Tag]) -> Result<(), BackendError> {
118        Storage::set_tags(self, table_name, new_tags).map_err(dyno_to_backend)
119    }
120
121    async fn update_deletion_protection(
122        &self,
123        table_name: &str,
124        enabled: bool,
125    ) -> Result<(), BackendError> {
126        Storage::update_deletion_protection(self, table_name, enabled).map_err(dyno_to_backend)
127    }
128
129    async fn remove_tags(&self, table_name: &str, keys: &[String]) -> Result<(), BackendError> {
130        Storage::remove_tags(self, table_name, keys).map_err(dyno_to_backend)
131    }
132
133    async fn list_table_names(&self) -> Result<Vec<String>, BackendError> {
134        Storage::list_table_names(self).map_err(dyno_to_backend)
135    }
136
137    async fn table_exists(&self, table_name: &str) -> Result<bool, BackendError> {
138        Storage::table_exists(self, table_name).map_err(dyno_to_backend)
139    }
140
141    async fn create_data_table(&self, table_name: &str) -> Result<(), BackendError> {
142        Storage::create_data_table(self, table_name).map_err(dyno_to_backend)
143    }
144
145    async fn drop_data_table(&self, table_name: &str) -> Result<(), BackendError> {
146        Storage::drop_data_table(self, table_name).map_err(dyno_to_backend)
147    }
148
149    async fn create_gsi_table(
150        &self,
151        table_name: &str,
152        index_name: &str,
153    ) -> Result<(), BackendError> {
154        Storage::create_gsi_table(self, table_name, index_name).map_err(dyno_to_backend)
155    }
156
157    async fn drop_gsi_table(&self, table_name: &str, index_name: &str) -> Result<(), BackendError> {
158        Storage::drop_gsi_table(self, table_name, index_name).map_err(dyno_to_backend)
159    }
160
161    async fn create_lsi_table(
162        &self,
163        table_name: &str,
164        index_name: &str,
165    ) -> Result<(), BackendError> {
166        Storage::create_lsi_table(self, table_name, index_name).map_err(dyno_to_backend)
167    }
168
169    async fn drop_lsi_table(&self, table_name: &str, index_name: &str) -> Result<(), BackendError> {
170        Storage::drop_lsi_table(self, table_name, index_name).map_err(dyno_to_backend)
171    }
172
173    async fn insert_gsi_item(
174        &self,
175        table_name: &str,
176        index_name: &str,
177        gsi_pk: &str,
178        gsi_sk: &str,
179        table_pk: &str,
180        table_sk: &str,
181        item_json: &str,
182    ) -> Result<(), BackendError> {
183        Storage::insert_gsi_item(
184            self, table_name, index_name, gsi_pk, gsi_sk, table_pk, table_sk, item_json,
185        )
186        .map_err(dyno_to_backend)
187    }
188
189    async fn insert_gsi_items(
190        &self,
191        table_name: &str,
192        index_name: &str,
193        rows: &[GsiItemRow],
194    ) -> Result<(), BackendError> {
195        Storage::insert_gsi_items(self, table_name, index_name, rows).map_err(dyno_to_backend)
196    }
197
198    async fn delete_gsi_item(
199        &self,
200        table_name: &str,
201        index_name: &str,
202        table_pk: &str,
203        table_sk: &str,
204    ) -> Result<(), BackendError> {
205        Storage::delete_gsi_item(self, table_name, index_name, table_pk, table_sk)
206            .map_err(dyno_to_backend)
207    }
208
209    async fn query_gsi_items(
210        &self,
211        table_name: &str,
212        index_name: &str,
213        gsi_pk: &str,
214        params: &QueryParams<'_>,
215    ) -> Result<Vec<(String, String, String)>, BackendError> {
216        Storage::query_gsi_items(self, table_name, index_name, gsi_pk, params)
217            .map_err(dyno_to_backend)
218    }
219
220    async fn scan_gsi_items(
221        &self,
222        table_name: &str,
223        index_name: &str,
224        params: &ScanParams<'_>,
225    ) -> Result<Vec<(String, String, String)>, BackendError> {
226        Storage::scan_gsi_items(self, table_name, index_name, params).map_err(dyno_to_backend)
227    }
228
229    async fn insert_lsi_item(
230        &self,
231        table_name: &str,
232        index_name: &str,
233        pk: &str,
234        sk: &str,
235        base_pk: &str,
236        base_sk: &str,
237        item_json: &str,
238    ) -> Result<(), BackendError> {
239        Storage::insert_lsi_item(
240            self, table_name, index_name, pk, sk, base_pk, base_sk, item_json,
241        )
242        .map_err(dyno_to_backend)
243    }
244
245    async fn delete_lsi_item(
246        &self,
247        table_name: &str,
248        index_name: &str,
249        base_pk: &str,
250        base_sk: &str,
251    ) -> Result<(), BackendError> {
252        Storage::delete_lsi_item(self, table_name, index_name, base_pk, base_sk)
253            .map_err(dyno_to_backend)
254    }
255
256    async fn query_lsi_items(
257        &self,
258        table_name: &str,
259        index_name: &str,
260        pk: &str,
261        params: &QueryParams<'_>,
262    ) -> Result<Vec<(String, String, String)>, BackendError> {
263        Storage::query_lsi_items(self, table_name, index_name, pk, params).map_err(dyno_to_backend)
264    }
265
266    async fn scan_lsi_items(
267        &self,
268        table_name: &str,
269        index_name: &str,
270        params: &ScanParams<'_>,
271    ) -> Result<Vec<(String, String, String)>, BackendError> {
272        Storage::scan_lsi_items(self, table_name, index_name, params).map_err(dyno_to_backend)
273    }
274
275    async fn begin_transaction(&self) -> Result<(), BackendError> {
276        Storage::begin_transaction(self).map_err(dyno_to_backend)
277    }
278
279    async fn commit(&self) -> Result<(), BackendError> {
280        Storage::commit(self).map_err(dyno_to_backend)
281    }
282
283    async fn rollback(&self) -> Result<(), BackendError> {
284        Storage::rollback(self).map_err(dyno_to_backend)
285    }
286
287    async fn enable_bulk_loading(&self) -> Result<(), BackendError> {
288        Storage::enable_bulk_loading(self).map_err(dyno_to_backend)
289    }
290
291    async fn disable_bulk_loading(&self) -> Result<(), BackendError> {
292        Storage::disable_bulk_loading(self).map_err(dyno_to_backend)
293    }
294
295    async fn put_item(
296        &self,
297        table_name: &str,
298        pk: &str,
299        sk: &str,
300        item_json: &str,
301        item_size: usize,
302    ) -> Result<Option<String>, BackendError> {
303        Storage::put_item(self, table_name, pk, sk, item_json, item_size).map_err(dyno_to_backend)
304    }
305
306    async fn put_item_with_hash(
307        &self,
308        table_name: &str,
309        pk: &str,
310        sk: &str,
311        item_json: &str,
312        item_size: usize,
313        hash_prefix: &str,
314    ) -> Result<Option<String>, BackendError> {
315        Storage::put_item_with_hash(self, table_name, pk, sk, item_json, item_size, hash_prefix)
316            .map_err(dyno_to_backend)
317    }
318
319    async fn put_base_items(
320        &self,
321        table_name: &str,
322        rows: &[BaseItemRow],
323    ) -> Result<(), BackendError> {
324        Storage::put_base_items(self, table_name, rows).map_err(dyno_to_backend)
325    }
326
327    async fn get_item(
328        &self,
329        table_name: &str,
330        pk: &str,
331        sk: &str,
332    ) -> Result<Option<String>, BackendError> {
333        Storage::get_item(self, table_name, pk, sk).map_err(dyno_to_backend)
334    }
335
336    async fn get_partition_size(&self, table_name: &str, pk: &str) -> Result<i64, BackendError> {
337        Storage::get_partition_size(self, table_name, pk).map_err(dyno_to_backend)
338    }
339
340    async fn get_lsi_partition_size(
341        &self,
342        table_name: &str,
343        index_name: &str,
344        pk: &str,
345    ) -> Result<i64, BackendError> {
346        Storage::get_lsi_partition_size(self, table_name, index_name, pk).map_err(dyno_to_backend)
347    }
348
349    async fn delete_item(
350        &self,
351        table_name: &str,
352        pk: &str,
353        sk: &str,
354    ) -> Result<Option<String>, BackendError> {
355        Storage::delete_item(self, table_name, pk, sk).map_err(dyno_to_backend)
356    }
357
358    async fn query_items(
359        &self,
360        table_name: &str,
361        pk: &str,
362        params: &QueryParams<'_>,
363    ) -> Result<Vec<(String, String, String)>, BackendError> {
364        Storage::query_items(self, table_name, pk, params).map_err(dyno_to_backend)
365    }
366
367    async fn scan_items(
368        &self,
369        table_name: &str,
370        params: &ScanParams<'_>,
371    ) -> Result<Vec<(String, String, String)>, BackendError> {
372        Storage::scan_items(self, table_name, params).map_err(dyno_to_backend)
373    }
374
375    async fn count_items(&self, table_name: &str) -> Result<i64, BackendError> {
376        Storage::count_items(self, table_name).map_err(dyno_to_backend)
377    }
378
379    async fn db_size_bytes(&self) -> Result<u64, BackendError> {
380        Storage::db_size_bytes(self).map_err(dyno_to_backend)
381    }
382
383    async fn table_count(&self) -> Result<usize, BackendError> {
384        Storage::table_count(self).map_err(dyno_to_backend)
385    }
386
387    async fn table_stats(&self) -> Result<Vec<TableStats>, BackendError> {
388        Storage::table_stats(self).map_err(dyno_to_backend)
389    }
390
391    async fn database_info(&self) -> Result<DatabaseInfo, BackendError> {
392        Storage::database_info(self).map_err(dyno_to_backend)
393    }
394
395    async fn vacuum(&self) -> Result<(), BackendError> {
396        Storage::vacuum(self).map_err(dyno_to_backend)
397    }
398
399    async fn enable_stream(
400        &self,
401        table_name: &str,
402        view_type: &str,
403        label: &str,
404    ) -> Result<(), BackendError> {
405        Storage::enable_stream(self, table_name, view_type, label).map_err(dyno_to_backend)
406    }
407
408    async fn disable_stream(&self, table_name: &str) -> Result<(), BackendError> {
409        Storage::disable_stream(self, table_name).map_err(dyno_to_backend)
410    }
411
412    async fn insert_stream_record(
413        &self,
414        table_name: &str,
415        event_name: &str,
416        keys_json: &str,
417        new_image: Option<&str>,
418        old_image: Option<&str>,
419        sequence_number: &str,
420        shard_id: &str,
421        created_at: i64,
422    ) -> Result<(), BackendError> {
423        Storage::insert_stream_record(
424            self,
425            table_name,
426            event_name,
427            keys_json,
428            new_image,
429            old_image,
430            sequence_number,
431            shard_id,
432            created_at,
433        )
434        .map_err(dyno_to_backend)
435    }
436
437    async fn insert_stream_record_with_identity(
438        &self,
439        table_name: &str,
440        event_name: &str,
441        keys_json: &str,
442        new_image: Option<&str>,
443        old_image: Option<&str>,
444        sequence_number: &str,
445        shard_id: &str,
446        created_at: i64,
447        user_identity: Option<&str>,
448    ) -> Result<(), BackendError> {
449        Storage::insert_stream_record_with_identity(
450            self,
451            table_name,
452            event_name,
453            keys_json,
454            new_image,
455            old_image,
456            sequence_number,
457            shard_id,
458            created_at,
459            user_identity,
460        )
461        .map_err(dyno_to_backend)
462    }
463
464    async fn next_stream_sequence_number(&self, table_name: &str) -> Result<i64, BackendError> {
465        Storage::next_stream_sequence_number(self, table_name).map_err(dyno_to_backend)
466    }
467
468    async fn get_stream_records(
469        &self,
470        table_name: &str,
471        shard_id: &str,
472        after_sequence: i64,
473        limit: usize,
474    ) -> Result<Vec<StreamRecord>, BackendError> {
475        Storage::get_stream_records(self, table_name, shard_id, after_sequence, limit)
476            .map_err(dyno_to_backend)
477    }
478
479    async fn list_stream_enabled_tables(&self) -> Result<Vec<TableMetadata>, BackendError> {
480        Storage::list_stream_enabled_tables(self).map_err(dyno_to_backend)
481    }
482
483    async fn update_ttl_config(
484        &self,
485        table_name: &str,
486        attribute_name: Option<&str>,
487        enabled: bool,
488    ) -> Result<(), BackendError> {
489        Storage::update_ttl_config(self, table_name, attribute_name, enabled)
490            .map_err(dyno_to_backend)
491    }
492
493    async fn list_ttl_enabled_tables(&self) -> Result<Vec<TableMetadata>, BackendError> {
494        Storage::list_ttl_enabled_tables(self).map_err(dyno_to_backend)
495    }
496
497    async fn get_shard_sequence_range(
498        &self,
499        table_name: &str,
500        shard_id: &str,
501    ) -> Result<(Option<String>, Option<String>), BackendError> {
502        Storage::get_shard_sequence_range(self, table_name, shard_id).map_err(dyno_to_backend)
503    }
504
505    async fn touch_cached_at(
506        &self,
507        table_name: &str,
508        pk: &str,
509        sk: &str,
510        timestamp: f64,
511    ) -> Result<(), BackendError> {
512        Storage::touch_cached_at(self, table_name, pk, sk, timestamp).map_err(dyno_to_backend)
513    }
514
515    async fn get_lru_items(
516        &self,
517        table_name: &str,
518        limit: usize,
519    ) -> Result<Vec<(String, String, i64)>, BackendError> {
520        Storage::get_lru_items(self, table_name, limit).map_err(dyno_to_backend)
521    }
522}