Skip to main content

gestalt/
indexeddb_provider.rs

1use std::cmp::Ordering;
2use std::collections::BTreeMap;
3
4use hyper_util::rt::TokioIo;
5use tokio::sync::mpsc;
6use tokio_stream::wrappers::ReceiverStream;
7use tonic::Request;
8use tonic::codegen::async_trait;
9use tonic::metadata::MetadataValue;
10use tonic::service::Interceptor;
11use tonic::service::interceptor::InterceptedService;
12use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
13use tower::service_fn;
14
15use crate::env::{ENV_HOST_SERVICE_SOCKET, ENV_HOST_SERVICE_TOKEN, HOST_SERVICE_BINDING_HEADER};
16use crate::generated::v1::{self as pb, indexed_db_client::IndexedDbClient};
17
18type IndexedDbTransport = InterceptedService<Channel, RelayTokenInterceptor>;
19
20const INDEXEDDB_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
21const CURSOR_CHANNEL_BUFFER: usize = 1;
22const TRANSACTION_CHANNEL_BUFFER: usize = 1;
23
24#[derive(Debug, thiserror::Error)]
25/// Errors returned by the IndexedDB transport client.
26pub enum IndexedDBError {
27    /// The requested record, object store, index, or cursor entry was missing.
28    #[error("not found")]
29    NotFound,
30    /// A create operation conflicted with an existing value.
31    #[error("already exists")]
32    AlreadyExists,
33    /// A cursor was opened in key-only mode and a value was requested.
34    #[error("cursor is keys-only; value not available")]
35    KeysOnly,
36    /// A provider-side helper received invalid input.
37    #[error("{0}")]
38    InvalidArgument(String),
39    /// An explicit transaction failed or was already closed.
40    #[error("{0}")]
41    Transaction(String),
42    /// The host-service transport could not be created.
43    #[error("{0}")]
44    Transport(#[from] tonic::transport::Error),
45    /// The host-service RPC returned a gRPC status.
46    #[error("{0}")]
47    Status(#[from] tonic::Status),
48    /// Required environment or target configuration was invalid.
49    #[error("{0}")]
50    Env(String),
51}
52
53/// JSON-like value stored in an object store row.
54pub type Record = BTreeMap<String, serde_json::Value>;
55
56/// Constrains a query or cursor by lower and upper bounds.
57#[derive(Debug, Clone, PartialEq)]
58pub struct KeyRange {
59    /// Lower bound, inclusive unless `lower_open` is true.
60    pub lower: Option<serde_json::Value>,
61    /// Upper bound, inclusive unless `upper_open` is true.
62    pub upper: Option<serde_json::Value>,
63    /// Whether the lower bound is exclusive.
64    pub lower_open: bool,
65    /// Whether the upper bound is exclusive.
66    pub upper_open: bool,
67}
68
69/// Describes one secondary index on an object store.
70#[derive(Debug, Clone, PartialEq)]
71pub struct IndexSchema {
72    /// Index name.
73    pub name: String,
74    /// Record path used as the index key.
75    pub key_path: Vec<String>,
76    /// Whether the index enforces uniqueness.
77    pub unique: bool,
78}
79
80/// Describes the indexes attached to an object store.
81#[derive(Debug, Clone, PartialEq)]
82pub struct ObjectStoreSchema {
83    /// Secondary indexes to create with the object store.
84    pub indexes: Vec<IndexSchema>,
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88/// Controls cursor traversal order.
89pub enum CursorDirection {
90    /// Iterate in ascending key order.
91    Next,
92    /// Iterate in ascending key order while collapsing duplicate index keys.
93    NextUnique,
94    /// Iterate in descending key order.
95    Prev,
96    /// Iterate in descending key order while collapsing duplicate index keys.
97    PrevUnique,
98}
99
100impl CursorDirection {
101    fn to_proto(self) -> i32 {
102        match self {
103            Self::Next => pb::CursorDirection::CursorNext as i32,
104            Self::NextUnique => pb::CursorDirection::CursorNextUnique as i32,
105            Self::Prev => pb::CursorDirection::CursorPrev as i32,
106            Self::PrevUnique => pb::CursorDirection::CursorPrevUnique as i32,
107        }
108    }
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112/// Controls whether an explicit transaction may mutate scoped stores.
113pub enum TransactionMode {
114    /// Transaction may only read from scoped object stores.
115    Readonly,
116    /// Transaction may read and write scoped object stores.
117    Readwrite,
118}
119
120impl TransactionMode {
121    fn to_proto(self) -> i32 {
122        match self {
123            Self::Readonly => pb::TransactionMode::TransactionReadonly as i32,
124            Self::Readwrite => pb::TransactionMode::TransactionReadwrite as i32,
125        }
126    }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
130/// Provider durability hint for explicit transactions.
131pub enum TransactionDurabilityHint {
132    /// Let the host choose its default durability behavior.
133    #[default]
134    Default,
135    /// Prefer stricter durability.
136    Strict,
137    /// Prefer relaxed durability.
138    Relaxed,
139}
140
141impl TransactionDurabilityHint {
142    fn to_proto(self) -> i32 {
143        match self {
144            Self::Default => pb::TransactionDurabilityHint::TransactionDurabilityDefault as i32,
145            Self::Strict => pb::TransactionDurabilityHint::TransactionDurabilityStrict as i32,
146            Self::Relaxed => pb::TransactionDurabilityHint::TransactionDurabilityRelaxed as i32,
147        }
148    }
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
152/// Options for an explicit transaction.
153pub struct TransactionOptions {
154    /// Durability hint for explicit transactions.
155    pub durability_hint: TransactionDurabilityHint,
156}
157
158#[async_trait]
159/// Fakeable client contract for IndexedDB-compatible storage.
160pub trait IndexedDBApi: Send {
161    /// The store handle this scope yields.
162    type ObjectStore: ObjectStoreApi;
163    /// The transaction handle this database yields.
164    type Transaction: TransactionApi;
165
166    /// Creates a named object store and returns a typed handle for it.
167    async fn create_object_store(
168        &mut self,
169        name: &str,
170        schema: ObjectStoreSchema,
171    ) -> Result<Self::ObjectStore, IndexedDBError>;
172
173    /// Deletes a named object store.
174    async fn delete_object_store(&mut self, name: &str) -> Result<(), IndexedDBError>;
175
176    /// Returns a typed handle for one object store.
177    fn object_store(&self, name: &str) -> Self::ObjectStore;
178
179    /// Opens an explicit transaction over a fixed object-store scope.
180    async fn transaction(
181        &self,
182        stores: &[&str],
183        mode: TransactionMode,
184        options: TransactionOptions,
185    ) -> Result<Self::Transaction, IndexedDBError>;
186}
187
188#[async_trait]
189/// Fakeable IndexedDB object-store contract.
190pub trait ObjectStoreApi: Send {
191    /// The index handle this store yields.
192    type Index: IndexApi;
193    /// The cursor handle this scope yields.
194    type Cursor: CursorApi;
195
196    /// Loads one record by primary key.
197    async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError>;
198
199    /// Resolves the primary key for id.
200    async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError>;
201
202    /// Inserts a new row and fails if the key already exists.
203    async fn add(&mut self, record: Record) -> Result<(), IndexedDBError>;
204
205    /// Upserts a row by primary key.
206    async fn put(&mut self, record: Record) -> Result<(), IndexedDBError>;
207
208    /// Deletes one row by primary key.
209    async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError>;
210
211    /// Deletes every row in the object store.
212    async fn clear(&mut self) -> Result<(), IndexedDBError>;
213
214    /// Loads every row that matches range.
215    async fn get_all(&mut self, range: Option<KeyRange>) -> Result<Vec<Record>, IndexedDBError>;
216
217    /// Loads every primary key that matches range.
218    async fn get_all_keys(
219        &mut self,
220        range: Option<KeyRange>,
221    ) -> Result<Vec<String>, IndexedDBError>;
222
223    /// Counts rows that match range.
224    async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError>;
225
226    /// Deletes rows that match range and returns the delete count.
227    async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError>;
228
229    /// Returns a typed handle for one secondary index.
230    fn index(&self, name: &str) -> Self::Index;
231
232    /// Opens a full-value cursor over the object store.
233    async fn open_cursor(
234        &mut self,
235        range: Option<KeyRange>,
236        direction: CursorDirection,
237    ) -> Result<Self::Cursor, IndexedDBError>;
238
239    /// Opens a key-only cursor over the object store.
240    async fn open_key_cursor(
241        &mut self,
242        range: Option<KeyRange>,
243        direction: CursorDirection,
244    ) -> Result<Self::Cursor, IndexedDBError>;
245}
246
247#[async_trait]
248/// Fakeable IndexedDB secondary-index contract.
249pub trait IndexApi: Send {
250    /// The cursor handle this scope yields.
251    type Cursor: CursorApi;
252
253    /// Loads the first row that matches values.
254    async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError>;
255
256    /// Resolves the primary key for the first row that matches values.
257    async fn get_key(&mut self, values: &[serde_json::Value]) -> Result<String, IndexedDBError>;
258
259    /// Loads every row that matches values and range.
260    async fn get_all(
261        &mut self,
262        values: &[serde_json::Value],
263        range: Option<KeyRange>,
264    ) -> Result<Vec<Record>, IndexedDBError>;
265
266    /// Loads every primary key that matches values and range.
267    async fn get_all_keys(
268        &mut self,
269        values: &[serde_json::Value],
270        range: Option<KeyRange>,
271    ) -> Result<Vec<String>, IndexedDBError>;
272
273    /// Counts rows that match values and range.
274    async fn count(
275        &mut self,
276        values: &[serde_json::Value],
277        range: Option<KeyRange>,
278    ) -> Result<i64, IndexedDBError>;
279
280    /// Deletes rows that match values and returns the delete count.
281    async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError>;
282
283    /// Deletes rows that match values and range and returns the delete count.
284    async fn delete_range(
285        &mut self,
286        values: &[serde_json::Value],
287        range: KeyRange,
288    ) -> Result<i64, IndexedDBError>;
289
290    /// Opens a full-value cursor over the secondary index.
291    async fn open_cursor(
292        &mut self,
293        values: &[serde_json::Value],
294        range: Option<KeyRange>,
295        direction: CursorDirection,
296    ) -> Result<Self::Cursor, IndexedDBError>;
297
298    /// Opens a key-only cursor over the secondary index.
299    async fn open_key_cursor(
300        &mut self,
301        values: &[serde_json::Value],
302        range: Option<KeyRange>,
303        direction: CursorDirection,
304    ) -> Result<Self::Cursor, IndexedDBError>;
305}
306
307#[async_trait]
308/// Fakeable explicit IndexedDB transaction contract.
309pub trait TransactionApi: Send {
310    /// The transaction-scoped store handle.
311    type ObjectStore<'a>: TransactionObjectStoreApi + 'a
312    where
313        Self: 'a;
314
315    /// Returns a transaction-scoped object store.
316    fn object_store<'a>(&'a mut self, name: &str) -> Self::ObjectStore<'a>;
317
318    /// Commits the transaction.
319    async fn commit(&mut self) -> Result<(), IndexedDBError>;
320
321    /// Aborts the transaction.
322    async fn abort(&mut self, reason: &str) -> Result<(), IndexedDBError>;
323}
324
325#[async_trait]
326/// Fakeable transaction-scoped object-store contract.
327pub trait TransactionObjectStoreApi: Send {
328    /// The transaction-scoped index handle.
329    type Index<'a>: TransactionIndexApi + 'a
330    where
331        Self: 'a;
332
333    /// Loads one record by primary key inside the transaction.
334    async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError>;
335
336    /// Resolves the primary key for id inside the transaction.
337    async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError>;
338
339    /// Inserts a new row inside the transaction.
340    async fn add(&mut self, record: Record) -> Result<(), IndexedDBError>;
341
342    /// Upserts a row inside the transaction.
343    async fn put(&mut self, record: Record) -> Result<(), IndexedDBError>;
344
345    /// Deletes one row inside the transaction.
346    async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError>;
347
348    /// Deletes every row in the object store inside the transaction.
349    async fn clear(&mut self) -> Result<(), IndexedDBError>;
350
351    /// Loads every row that matches range inside the transaction.
352    async fn get_all(&mut self, range: Option<KeyRange>) -> Result<Vec<Record>, IndexedDBError>;
353
354    /// Loads every primary key that matches range inside the transaction.
355    async fn get_all_keys(
356        &mut self,
357        range: Option<KeyRange>,
358    ) -> Result<Vec<String>, IndexedDBError>;
359
360    /// Counts rows that match range inside the transaction.
361    async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError>;
362
363    /// Deletes rows that match range inside the transaction.
364    async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError>;
365
366    /// Returns a transaction-scoped secondary index.
367    fn index<'a>(&'a mut self, name: &str) -> Self::Index<'a>;
368}
369
370#[async_trait]
371/// Fakeable transaction-scoped secondary-index contract.
372pub trait TransactionIndexApi: Send {
373    /// Loads the first row that matches values inside the transaction.
374    async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError>;
375
376    /// Resolves the primary key for the first matching row inside the transaction.
377    async fn get_key(&mut self, values: &[serde_json::Value]) -> Result<String, IndexedDBError>;
378
379    /// Loads every row that matches values and range inside the transaction.
380    async fn get_all(
381        &mut self,
382        values: &[serde_json::Value],
383        range: Option<KeyRange>,
384    ) -> Result<Vec<Record>, IndexedDBError>;
385
386    /// Loads every primary key that matches values and range inside the transaction.
387    async fn get_all_keys(
388        &mut self,
389        values: &[serde_json::Value],
390        range: Option<KeyRange>,
391    ) -> Result<Vec<String>, IndexedDBError>;
392
393    /// Counts rows that match values and range inside the transaction.
394    async fn count(
395        &mut self,
396        values: &[serde_json::Value],
397        range: Option<KeyRange>,
398    ) -> Result<i64, IndexedDBError>;
399
400    /// Deletes rows that match values inside the transaction.
401    async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError>;
402
403    /// Deletes rows that match values and range inside the transaction.
404    async fn delete_range(
405        &mut self,
406        values: &[serde_json::Value],
407        range: KeyRange,
408    ) -> Result<i64, IndexedDBError>;
409}
410
411#[async_trait]
412/// Fakeable IndexedDB cursor contract.
413pub trait CursorApi: Send {
414    /// Returns the current cursor key.
415    fn key(&self) -> Option<serde_json::Value>;
416
417    /// Returns the current row's primary key.
418    fn primary_key(&self) -> &str;
419
420    /// Returns the current row value.
421    fn value(&self) -> Result<Record, IndexedDBError>;
422
423    /// Advances the cursor by one row.
424    async fn continue_next(&mut self) -> Result<bool, IndexedDBError>;
425
426    /// Advances the cursor to key, or exhausts it if key does not exist.
427    async fn continue_to_key(&mut self, key: serde_json::Value) -> Result<bool, IndexedDBError>;
428
429    /// Skips count rows ahead.
430    async fn advance(&mut self, count: i32) -> Result<bool, IndexedDBError>;
431
432    /// Deletes the current row and keeps the cursor open.
433    async fn delete(&mut self) -> Result<(), IndexedDBError>;
434
435    /// Replaces the current row and keeps the cursor open.
436    async fn update(&mut self, value: Record) -> Result<(), IndexedDBError>;
437
438    /// Closes the cursor stream and releases its transport resources.
439    async fn close(self) -> Result<(), IndexedDBError>
440    where
441        Self: Sized;
442}
443
444/// Native open-cursor request used by provider-side cursor helpers.
445#[derive(Debug, Clone)]
446pub struct IndexedDBOpenCursorRequest {
447    /// Object store to open.
448    pub store: String,
449    /// Optional key range to apply.
450    pub range: Option<KeyRange>,
451    /// Cursor traversal direction.
452    pub direction: CursorDirection,
453    /// Whether returned cursor entries omit records.
454    pub keys_only: bool,
455    /// Secondary index name. Empty means object-store cursor.
456    pub index: String,
457    /// Index values supplied by an index query.
458    pub values: Vec<serde_json::Value>,
459}
460
461impl Default for IndexedDBOpenCursorRequest {
462    fn default() -> Self {
463        Self {
464            store: String::new(),
465            range: None,
466            direction: CursorDirection::Next,
467            keys_only: false,
468            index: String::new(),
469            values: Vec::new(),
470        }
471    }
472}
473
474/// One provider-side cursor row.
475#[derive(Debug, Clone, PartialEq)]
476pub struct IndexedDBCursorSnapshotEntry {
477    /// Object-store key, or secondary-index key for index cursors.
478    pub key: serde_json::Value,
479    /// Canonical primary key for the object-store row.
480    pub primary_key: String,
481    /// Native primary-key value used as a stable tie-breaker for duplicate index keys.
482    pub primary_key_value: serde_json::Value,
483    /// Row value returned by full-value cursors.
484    pub record: Record,
485}
486
487/// Provider-side IndexedDB cursor snapshot.
488///
489/// The snapshot sorts rows, applies IndexedDB range bounds, and implements
490/// movement semantics for native Rust providers without exposing wire message
491/// types.
492#[derive(Debug, Clone)]
493pub struct IndexedDBCursorSnapshot {
494    /// Whether entry keys contain secondary-index values.
495    pub index_cursor: bool,
496    /// Whether returned cursor entries should omit records.
497    pub keys_only: bool,
498    /// Whether entries are ordered from greatest to least key.
499    pub reverse: bool,
500    /// Whether duplicate index keys are collapsed while iterating.
501    pub unique: bool,
502    /// Sorted and range-filtered entries used by cursor movement.
503    pub entries: Vec<IndexedDBCursorSnapshotEntry>,
504    /// Current cursor position, or -1 when unpositioned.
505    pub pos: isize,
506}
507
508impl IndexedDBCursorSnapshot {
509    /// Creates an empty provider-side cursor snapshot from a native request.
510    pub fn new(req: &IndexedDBOpenCursorRequest) -> Self {
511        Self {
512            index_cursor: !req.index.is_empty(),
513            keys_only: req.keys_only,
514            reverse: matches!(
515                req.direction,
516                CursorDirection::Prev | CursorDirection::PrevUnique
517            ),
518            unique: matches!(
519                req.direction,
520                CursorDirection::NextUnique | CursorDirection::PrevUnique
521            ),
522            entries: Vec::new(),
523            pos: -1,
524        }
525    }
526
527    /// Sorts entries, applies the supplied key range, and stores the snapshot.
528    pub fn load(
529        &mut self,
530        mut entries: Vec<IndexedDBCursorSnapshotEntry>,
531        range: Option<&KeyRange>,
532    ) -> Result<(), IndexedDBError> {
533        entries.sort_by(|left, right| {
534            let mut cmp = compare_indexeddb_values(&left.key, &right.key);
535            if cmp == Ordering::Equal {
536                cmp = compare_indexeddb_values(&left.primary_key_value, &right.primary_key_value);
537            }
538            if self.reverse { cmp.reverse() } else { cmp }
539        });
540        self.entries = self.apply_range(entries, range)?;
541        self.pos = -1;
542        Ok(())
543    }
544
545    /// Returns entries that satisfy the supplied key range without mutating state.
546    pub fn apply_range(
547        &self,
548        entries: Vec<IndexedDBCursorSnapshotEntry>,
549        range: Option<&KeyRange>,
550    ) -> Result<Vec<IndexedDBCursorSnapshotEntry>, IndexedDBError> {
551        let Some(range) = range else {
552            return Ok(entries);
553        };
554        let (lower, upper) = indexeddb_range_bounds(Some(range), self.index_cursor);
555        let mut filtered = Vec::with_capacity(entries.len());
556        for entry in entries {
557            let key = normalize_indexeddb_bound(&entry.key, self.index_cursor);
558            if let Some(lower) = &lower {
559                let cmp = compare_indexeddb_values(&key, lower);
560                if range.lower_open && cmp != Ordering::Greater {
561                    continue;
562                }
563                if !range.lower_open && cmp == Ordering::Less {
564                    continue;
565                }
566            }
567            if let Some(upper) = &upper {
568                let cmp = compare_indexeddb_values(&key, upper);
569                if range.upper_open && cmp != Ordering::Less {
570                    continue;
571                }
572                if !range.upper_open && cmp == Ordering::Greater {
573                    continue;
574                }
575            }
576            filtered.push(entry);
577        }
578        Ok(filtered)
579    }
580
581    /// Advances to the next entry, or returns `None` when exhausted.
582    #[allow(clippy::should_implement_trait)]
583    pub fn next(&mut self) -> Result<Option<&IndexedDBCursorSnapshotEntry>, IndexedDBError> {
584        if self.unique
585            && self.index_cursor
586            && self.pos >= 0
587            && (self.pos as usize) < self.entries.len()
588        {
589            let previous = self.entries[self.pos as usize].key.clone();
590            self.pos += 1;
591            while (self.pos as usize) < self.entries.len() {
592                if compare_indexeddb_values(&self.entries[self.pos as usize].key, &previous)
593                    != Ordering::Equal
594                {
595                    return Ok(Some(self.current()?));
596                }
597                self.pos += 1;
598            }
599            return Ok(None);
600        }
601
602        self.pos += 1;
603        if (self.pos as usize) >= self.entries.len() {
604            return Ok(None);
605        }
606        Ok(Some(self.current()?))
607    }
608
609    /// Advances to `target` or the next entry past it for this direction.
610    pub fn continue_to_key(
611        &mut self,
612        target: &serde_json::Value,
613    ) -> Result<Option<&IndexedDBCursorSnapshotEntry>, IndexedDBError> {
614        let previous = if self.unique
615            && self.index_cursor
616            && self.pos >= 0
617            && (self.pos as usize) < self.entries.len()
618        {
619            Some(self.entries[self.pos as usize].key.clone())
620        } else {
621            None
622        };
623        self.pos += 1;
624        while (self.pos as usize) < self.entries.len() {
625            let current = &self.entries[self.pos as usize].key;
626            if let Some(previous) = &previous {
627                if self.unique
628                    && self.index_cursor
629                    && compare_indexeddb_values(current, previous) == Ordering::Equal
630                {
631                    self.pos += 1;
632                    continue;
633                }
634            }
635            let cmp = compare_indexeddb_values(current, target);
636            if self.reverse {
637                if cmp != Ordering::Greater {
638                    return Ok(Some(self.current()?));
639                }
640            } else if cmp != Ordering::Less {
641                return Ok(Some(self.current()?));
642            }
643            self.pos += 1;
644        }
645        Ok(None)
646    }
647
648    /// Skips `count` entries and returns the new current entry.
649    pub fn advance(
650        &mut self,
651        count: i32,
652    ) -> Result<Option<&IndexedDBCursorSnapshotEntry>, IndexedDBError> {
653        if count <= 0 {
654            return Err(IndexedDBError::InvalidArgument(
655                "advance count must be positive".to_string(),
656            ));
657        }
658        for i in 0..count {
659            if self.next()?.is_none() {
660                return Ok(None);
661            }
662            if i == count - 1 {
663                return Ok(Some(self.current()?));
664            }
665        }
666        Ok(None)
667    }
668
669    /// Returns the currently positioned entry.
670    pub fn current(&self) -> Result<&IndexedDBCursorSnapshotEntry, IndexedDBError> {
671        if self.pos < 0 || (self.pos as usize) >= self.entries.len() {
672            return Err(IndexedDBError::NotFound);
673        }
674        Ok(&self.entries[self.pos as usize])
675    }
676}
677
678/// Creates an empty provider-side cursor snapshot from a native request.
679pub fn new_indexeddb_cursor_snapshot(req: &IndexedDBOpenCursorRequest) -> IndexedDBCursorSnapshot {
680    IndexedDBCursorSnapshot::new(req)
681}
682
683/// Normalizes object-store or index cursor range bounds.
684///
685/// Scalar index bounds are compared as one-part composite keys so providers can
686/// share the same comparison path for scalar and compound indexes.
687pub fn indexeddb_range_bounds(
688    range: Option<&KeyRange>,
689    index_cursor: bool,
690) -> (Option<serde_json::Value>, Option<serde_json::Value>) {
691    let Some(range) = range else {
692        return (None, None);
693    };
694    let lower = range
695        .lower
696        .as_ref()
697        .map(|value| normalize_indexeddb_bound(value, index_cursor));
698    let upper = range
699        .upper
700        .as_ref()
701        .map(|value| normalize_indexeddb_bound(value, index_cursor));
702    (lower, upper)
703}
704
705/// Compares native IndexedDB key values.
706pub fn compare_indexeddb_values(left: &serde_json::Value, right: &serde_json::Value) -> Ordering {
707    match (left, right) {
708        (serde_json::Value::Array(left), serde_json::Value::Array(right)) => {
709            for (i, left_value) in left.iter().enumerate() {
710                let Some(right_value) = right.get(i) else {
711                    return Ordering::Greater;
712                };
713                let cmp = compare_indexeddb_values(left_value, right_value);
714                if cmp != Ordering::Equal {
715                    return cmp;
716                }
717            }
718            left.len().cmp(&right.len())
719        }
720        (serde_json::Value::String(left), serde_json::Value::String(right)) => left.cmp(right),
721        (serde_json::Value::Bool(left), serde_json::Value::Bool(right)) => left.cmp(right),
722        (serde_json::Value::Number(left), serde_json::Value::Number(right)) => {
723            compare_json_numbers(left, right)
724        }
725        _ => left.to_string().cmp(&right.to_string()),
726    }
727}
728
729fn normalize_indexeddb_bound(value: &serde_json::Value, index_cursor: bool) -> serde_json::Value {
730    if !index_cursor {
731        return value.clone();
732    }
733    if matches!(value, serde_json::Value::Array(_)) {
734        return value.clone();
735    }
736    serde_json::Value::Array(vec![value.clone()])
737}
738
739fn compare_json_numbers(left: &serde_json::Number, right: &serde_json::Number) -> Ordering {
740    if let (Some(left), Some(right)) = (left.as_i64(), right.as_i64()) {
741        return left.cmp(&right);
742    }
743    if let (Some(left), Some(right)) = (left.as_u64(), right.as_u64()) {
744        return left.cmp(&right);
745    }
746    if let (Some(left), Some(right)) = (left.as_i64(), right.as_u64()) {
747        if left < 0 {
748            return Ordering::Less;
749        }
750        return (left as u64).cmp(&right);
751    }
752    if let (Some(left), Some(right)) = (left.as_u64(), right.as_i64()) {
753        if right < 0 {
754            return Ordering::Greater;
755        }
756        return left.cmp(&(right as u64));
757    }
758    match (left.as_f64(), right.as_f64()) {
759        (Some(left), Some(right)) => left.partial_cmp(&right).unwrap_or(Ordering::Equal),
760        _ => left.to_string().cmp(&right.to_string()),
761    }
762}
763
764#[cfg(test)]
765mod tests {
766    use super::*;
767    use serde_json::json;
768
769    fn entry(
770        key: serde_json::Value,
771        primary_key: &str,
772        primary_key_value: serde_json::Value,
773    ) -> IndexedDBCursorSnapshotEntry {
774        IndexedDBCursorSnapshotEntry {
775            key,
776            primary_key: primary_key.to_string(),
777            primary_key_value,
778            record: Record::new(),
779        }
780    }
781
782    #[test]
783    fn cursor_snapshot_sorts_ranges_and_skips_duplicate_unique_index_keys() {
784        let mut snapshot = new_indexeddb_cursor_snapshot(&IndexedDBOpenCursorRequest {
785            direction: CursorDirection::NextUnique,
786            index: "by_status".to_string(),
787            ..Default::default()
788        });
789
790        snapshot
791            .load(
792                vec![
793                    entry(json!(["todo"]), "issue-2", json!("issue-2")),
794                    entry(json!(["done"]), "issue-3", json!("issue-3")),
795                    entry(json!(["todo"]), "issue-1", json!("issue-1")),
796                ],
797                Some(&KeyRange {
798                    lower: Some(json!(["done"])),
799                    upper: Some(json!(["todo"])),
800                    lower_open: false,
801                    upper_open: false,
802                }),
803            )
804            .expect("load");
805
806        assert_eq!(
807            snapshot.next().expect("first").unwrap().primary_key,
808            "issue-3"
809        );
810        assert_eq!(
811            snapshot.next().expect("second").unwrap().primary_key,
812            "issue-1"
813        );
814        assert!(snapshot.next().expect("exhausted").is_none());
815    }
816
817    #[test]
818    fn cursor_snapshot_advance_moves_exactly_count_entries_from_current_position() {
819        let mut snapshot = new_indexeddb_cursor_snapshot(&IndexedDBOpenCursorRequest::default());
820        snapshot
821            .load(
822                vec![
823                    entry(json!("a"), "a", json!("a")),
824                    entry(json!("b"), "b", json!("b")),
825                    entry(json!("c"), "c", json!("c")),
826                ],
827                None,
828            )
829            .expect("load");
830
831        assert_eq!(snapshot.next().expect("first").unwrap().primary_key, "a");
832        assert_eq!(
833            snapshot.advance(1).expect("second").unwrap().primary_key,
834            "b"
835        );
836        assert_eq!(
837            snapshot.advance(1).expect("third").unwrap().primary_key,
838            "c"
839        );
840    }
841
842    #[test]
843    fn cursor_snapshot_index_range_accepts_scalar_entry_keys() {
844        let mut snapshot = new_indexeddb_cursor_snapshot(&IndexedDBOpenCursorRequest {
845            index: "by_status".to_string(),
846            ..Default::default()
847        });
848        snapshot
849            .load(
850                vec![
851                    entry(json!("done"), "issue-2", json!("issue-2")),
852                    entry(json!("active"), "issue-1", json!("issue-1")),
853                ],
854                Some(&KeyRange {
855                    lower: Some(json!("active")),
856                    upper: Some(json!("active")),
857                    lower_open: false,
858                    upper_open: false,
859                }),
860            )
861            .expect("load");
862
863        let first = snapshot.next().expect("first").unwrap();
864        assert_eq!(first.primary_key, "issue-1");
865        assert_eq!(first.key, json!("active"));
866        assert!(snapshot.next().expect("exhausted").is_none());
867    }
868
869    #[test]
870    fn range_bounds_normalize_scalar_index_bounds() {
871        let (lower, upper) = indexeddb_range_bounds(
872            Some(&KeyRange {
873                lower: Some(json!("active")),
874                upper: Some(json!(["done"])),
875                lower_open: false,
876                upper_open: false,
877            }),
878            true,
879        );
880
881        assert_eq!(lower, Some(json!(["active"])));
882        assert_eq!(upper, Some(json!(["done"])));
883    }
884
885    #[test]
886    fn compare_values_orders_composite_keys() {
887        assert_eq!(
888            compare_indexeddb_values(&json!(["active", 1]), &json!(["active", 2])),
889            Ordering::Less
890        );
891        assert_eq!(
892            compare_indexeddb_values(&json!(["active", 2]), &json!(["active", 2])),
893            Ordering::Equal
894        );
895        assert_eq!(
896            compare_indexeddb_values(&json!(["active", 3]), &json!(["active", 2])),
897            Ordering::Greater
898        );
899    }
900
901    #[test]
902    fn compare_values_orders_large_integer_keys_exactly() {
903        assert_eq!(
904            compare_indexeddb_values(
905                &json!(9_007_199_254_740_993u64),
906                &json!(9_007_199_254_740_992u64)
907            ),
908            Ordering::Greater
909        );
910        assert_eq!(
911            compare_indexeddb_values(&json!(i64::MAX), &json!(u64::MAX)),
912            Ordering::Less
913        );
914    }
915}
916
917/// Streaming cursor over object store or secondary index rows.
918pub struct Cursor {
919    tx: mpsc::Sender<pb::CursorClientMessage>,
920    stream: tonic::Streaming<pb::CursorResponse>,
921    keys_only: bool,
922    index_cursor: bool,
923    entry: Option<pb::CursorEntry>,
924    done: bool,
925}
926
927impl Cursor {
928    /// Returns the current cursor key.
929    pub fn key(&self) -> Option<serde_json::Value> {
930        let entry = self.entry.as_ref()?;
931        match entry.key.len() {
932            0 => None,
933            1 if !self.index_cursor => Some(key_value_to_json(&entry.key[0])),
934            _ => Some(serde_json::Value::Array(
935                entry.key.iter().map(key_value_to_json).collect(),
936            )),
937        }
938    }
939
940    /// Returns the current row's primary key.
941    pub fn primary_key(&self) -> &str {
942        self.entry
943            .as_ref()
944            .map(|e| e.primary_key.as_str())
945            .unwrap_or("")
946    }
947
948    /// Returns the current row value.
949    pub fn value(&self) -> Result<Record, IndexedDBError> {
950        if self.keys_only {
951            return Err(IndexedDBError::KeysOnly);
952        }
953        let entry = self.entry.as_ref().ok_or(IndexedDBError::NotFound)?;
954        Ok(entry
955            .record
956            .as_ref()
957            .map(pb_record_to_record)
958            .unwrap_or_default())
959    }
960
961    /// Advances the cursor by one row.
962    pub async fn continue_next(&mut self) -> Result<bool, IndexedDBError> {
963        let cmd = pb::cursor_command::Command::Next(true);
964        self.send_and_recv(cmd).await
965    }
966
967    /// Advances the cursor to key, or exhausts it if key does not exist.
968    pub async fn continue_to_key(
969        &mut self,
970        key: serde_json::Value,
971    ) -> Result<bool, IndexedDBError> {
972        let cmd = pb::cursor_command::Command::ContinueToKey(pb::CursorKeyTarget {
973            key: cursor_key_to_proto(&key, self.index_cursor),
974        });
975        self.send_and_recv(cmd).await
976    }
977
978    /// Skips count rows ahead.
979    pub async fn advance(&mut self, count: i32) -> Result<bool, IndexedDBError> {
980        let cmd = pb::cursor_command::Command::Advance(count);
981        self.send_and_recv(cmd).await
982    }
983
984    /// Deletes the current row and keeps the cursor open.
985    pub async fn delete(&mut self) -> Result<(), IndexedDBError> {
986        if self.done {
987            return Err(IndexedDBError::NotFound);
988        }
989        let cmd = pb::cursor_command::Command::Delete(true);
990        self.send_mutation(cmd).await
991    }
992
993    /// Replaces the current row and keeps the cursor open.
994    pub async fn update(&mut self, value: Record) -> Result<(), IndexedDBError> {
995        if self.done {
996            return Err(IndexedDBError::NotFound);
997        }
998        let cmd = pb::cursor_command::Command::Update(record_to_pb_record(value));
999        self.send_mutation(cmd).await
1000    }
1001
1002    /// Closes the cursor stream and releases its transport resources.
1003    pub async fn close(self) -> Result<(), IndexedDBError> {
1004        let msg = pb::CursorClientMessage {
1005            msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
1006                command: Some(pb::cursor_command::Command::Close(true)),
1007            })),
1008        };
1009        self.tx
1010            .send(msg)
1011            .await
1012            .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1013        Ok(())
1014    }
1015
1016    async fn send_mutation(
1017        &mut self,
1018        cmd: pb::cursor_command::Command,
1019    ) -> Result<(), IndexedDBError> {
1020        let msg = pb::CursorClientMessage {
1021            msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
1022                command: Some(cmd),
1023            })),
1024        };
1025        self.tx
1026            .send(msg)
1027            .await
1028            .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1029        // Read ack -- if it contains an entry, update cursor state.
1030        let resp = self
1031            .stream
1032            .message()
1033            .await
1034            .map_err(map_status)?
1035            .ok_or_else(|| {
1036                IndexedDBError::Status(tonic::Status::internal(
1037                    "cursor stream ended during mutation",
1038                ))
1039            })?;
1040        match resp.result {
1041            Some(pb::cursor_response::Result::Entry(entry)) => {
1042                self.entry = Some(entry);
1043            }
1044            Some(pb::cursor_response::Result::Done(_)) => {}
1045            None => {
1046                return Err(IndexedDBError::Status(tonic::Status::internal(
1047                    "unexpected cursor mutation ack",
1048                )));
1049            }
1050        }
1051        Ok(())
1052    }
1053
1054    async fn send_and_recv(
1055        &mut self,
1056        cmd: pb::cursor_command::Command,
1057    ) -> Result<bool, IndexedDBError> {
1058        if self.done {
1059            return Ok(false);
1060        }
1061        let msg = pb::CursorClientMessage {
1062            msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
1063                command: Some(cmd),
1064            })),
1065        };
1066        self.tx
1067            .send(msg)
1068            .await
1069            .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1070
1071        let resp = self
1072            .stream
1073            .message()
1074            .await
1075            .map_err(map_status)?
1076            .ok_or_else(|| {
1077                IndexedDBError::Status(tonic::Status::internal("cursor stream ended"))
1078            })?;
1079
1080        match resp.result {
1081            Some(pb::cursor_response::Result::Entry(entry)) => {
1082                self.entry = Some(entry);
1083                self.done = false;
1084                Ok(true)
1085            }
1086            Some(pb::cursor_response::Result::Done(exhausted)) => {
1087                if exhausted {
1088                    self.done = true;
1089                }
1090                self.entry = None;
1091                Ok(false)
1092            }
1093            None => {
1094                self.entry = None;
1095                self.done = true;
1096                Ok(false)
1097            }
1098        }
1099    }
1100}
1101
1102async fn open_cursor_inner(
1103    client: &mut IndexedDbClient<IndexedDbTransport>,
1104    req: pb::OpenCursorRequest,
1105) -> Result<Cursor, IndexedDBError> {
1106    let keys_only = req.keys_only;
1107    let is_index = !req.index.is_empty();
1108    let (tx, rx) = mpsc::channel::<pb::CursorClientMessage>(CURSOR_CHANNEL_BUFFER);
1109
1110    let open_msg = pb::CursorClientMessage {
1111        msg: Some(pb::cursor_client_message::Msg::Open(req)),
1112    };
1113    tx.send(open_msg)
1114        .await
1115        .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1116
1117    let receiver_stream = ReceiverStream::new(rx);
1118    let mut stream = client
1119        .open_cursor(receiver_stream)
1120        .await
1121        .map_err(map_status)?
1122        .into_inner();
1123
1124    // Read the open ack to surface creation errors synchronously.
1125    let ack = stream.message().await.map_err(map_status)?.ok_or_else(|| {
1126        IndexedDBError::Status(tonic::Status::internal("cursor stream ended during open"))
1127    })?;
1128    match ack.result {
1129        Some(pb::cursor_response::Result::Done(false)) => {}
1130        Some(pb::cursor_response::Result::Done(true)) => {
1131            return Err(IndexedDBError::Status(tonic::Status::internal(
1132                "unexpected exhausted cursor open ack",
1133            )));
1134        }
1135        _ => {
1136            return Err(IndexedDBError::Status(tonic::Status::internal(
1137                "unexpected cursor open ack",
1138            )));
1139        }
1140    }
1141
1142    Ok(Cursor {
1143        tx,
1144        stream,
1145        keys_only,
1146        entry: None,
1147        done: false,
1148        index_cursor: is_index,
1149    })
1150}
1151
1152/// Client for a running IndexedDB provider.
1153pub struct IndexedDB {
1154    client: IndexedDbClient<IndexedDbTransport>,
1155}
1156
1157impl IndexedDB {
1158    /// Connects to the default IndexedDB transport socket.
1159    pub async fn connect() -> Result<Self, IndexedDBError> {
1160        Self::connect_named("").await
1161    }
1162
1163    /// Connects to a named IndexedDB transport socket.
1164    pub async fn connect_named(name: &str) -> Result<Self, IndexedDBError> {
1165        let target = std::env::var(ENV_HOST_SERVICE_SOCKET)
1166            .map_err(|_| IndexedDBError::Env(format!("{ENV_HOST_SERVICE_SOCKET} is not set")))?;
1167        let token = std::env::var(ENV_HOST_SERVICE_TOKEN).unwrap_or_default();
1168        let channel = match parse_indexeddb_target(&target)? {
1169            IndexedDBTarget::Unix(path) => {
1170                Endpoint::try_from("http://[::]:50051")?
1171                    .connect_with_connector(service_fn(move |_: Uri| {
1172                        let path = path.clone();
1173                        async move {
1174                            tokio::net::UnixStream::connect(path)
1175                                .await
1176                                .map(TokioIo::new)
1177                        }
1178                    }))
1179                    .await?
1180            }
1181            IndexedDBTarget::Tcp(address) => {
1182                Endpoint::from_shared(format!("http://{address}"))?
1183                    .connect()
1184                    .await?
1185            }
1186            IndexedDBTarget::Tls(address) => {
1187                Endpoint::from_shared(format!("https://{address}"))?
1188                    .tls_config(ClientTlsConfig::new().with_native_roots())?
1189                    .connect()
1190                    .await?
1191            }
1192        };
1193
1194        let client = IndexedDbClient::with_interceptor(
1195            channel,
1196            relay_token_interceptor(token.trim(), name)?,
1197        );
1198
1199        Ok(Self { client })
1200    }
1201
1202    /// Creates a named object store.
1203    pub async fn create_object_store(
1204        &mut self,
1205        name: &str,
1206        schema: ObjectStoreSchema,
1207    ) -> Result<ObjectStore, IndexedDBError> {
1208        let indexes = schema
1209            .indexes
1210            .into_iter()
1211            .map(|idx| pb::IndexSchema {
1212                name: idx.name,
1213                key_path: idx.key_path,
1214                unique: idx.unique,
1215            })
1216            .collect();
1217        self.client
1218            .create_object_store(pb::CreateObjectStoreRequest {
1219                name: name.to_string(),
1220                schema: Some(pb::ObjectStoreSchema {
1221                    indexes,
1222                    columns: vec![],
1223                }),
1224            })
1225            .await
1226            .map_err(map_status)?;
1227        Ok(self.object_store(name))
1228    }
1229
1230    /// Deletes a named object store.
1231    pub async fn delete_object_store(&mut self, name: &str) -> Result<(), IndexedDBError> {
1232        self.client
1233            .delete_object_store(pb::DeleteObjectStoreRequest {
1234                name: name.to_string(),
1235            })
1236            .await
1237            .map_err(map_status)?;
1238        Ok(())
1239    }
1240
1241    /// Returns a typed handle for one object store.
1242    pub fn object_store(&self, name: &str) -> ObjectStore {
1243        ObjectStore {
1244            client: self.client.clone(),
1245            store: name.to_string(),
1246        }
1247    }
1248
1249    /// Opens an explicit transaction over a fixed object-store scope.
1250    pub async fn transaction(
1251        &self,
1252        stores: &[&str],
1253        mode: TransactionMode,
1254        options: TransactionOptions,
1255    ) -> Result<Transaction, IndexedDBError> {
1256        let (tx, rx) = mpsc::channel::<pb::TransactionClientMessage>(TRANSACTION_CHANNEL_BUFFER);
1257        tx.send(pb::TransactionClientMessage {
1258            msg: Some(pb::transaction_client_message::Msg::Begin(
1259                pb::BeginTransactionRequest {
1260                    stores: stores.iter().map(|store| store.to_string()).collect(),
1261                    mode: mode.to_proto(),
1262                    durability_hint: options.durability_hint.to_proto(),
1263                },
1264            )),
1265        })
1266        .await
1267        .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1268
1269        let receiver_stream = ReceiverStream::new(rx);
1270        let mut client = self.client.clone();
1271        let mut stream = client
1272            .transaction(receiver_stream)
1273            .await
1274            .map_err(map_status)?
1275            .into_inner();
1276
1277        let ack = stream.message().await.map_err(map_status)?.ok_or_else(|| {
1278            IndexedDBError::Transaction("transaction stream ended during begin".to_string())
1279        })?;
1280        match ack.msg {
1281            Some(pb::transaction_server_message::Msg::Begin(_)) => {}
1282            _ => {
1283                return Err(IndexedDBError::Transaction(
1284                    "expected transaction begin response".to_string(),
1285                ));
1286            }
1287        }
1288
1289        Ok(Transaction {
1290            tx: Some(tx),
1291            stream,
1292            request_id: 0,
1293            closed: false,
1294        })
1295    }
1296}
1297
1298/// Explicit transaction over one or more object stores.
1299pub struct Transaction {
1300    tx: Option<mpsc::Sender<pb::TransactionClientMessage>>,
1301    stream: tonic::Streaming<pb::TransactionServerMessage>,
1302    request_id: u64,
1303    closed: bool,
1304}
1305
1306impl Transaction {
1307    /// Returns a transaction-scoped object store.
1308    pub fn object_store<'a>(&'a mut self, name: &str) -> TransactionObjectStore<'a> {
1309        TransactionObjectStore {
1310            tx: self,
1311            store: name.to_string(),
1312        }
1313    }
1314
1315    /// Commits the transaction.
1316    pub async fn commit(&mut self) -> Result<(), IndexedDBError> {
1317        self.ensure_open()?;
1318        let tx = self.tx.as_ref().ok_or_else(|| {
1319            IndexedDBError::Transaction("transaction is already finished".to_string())
1320        })?;
1321        tx.send(pb::TransactionClientMessage {
1322            msg: Some(pb::transaction_client_message::Msg::Commit(
1323                pb::TransactionCommitRequest {},
1324            )),
1325        })
1326        .await
1327        .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1328        self.closed = true;
1329        self.tx.take();
1330
1331        let resp = self
1332            .stream
1333            .message()
1334            .await
1335            .map_err(map_status)?
1336            .ok_or_else(|| {
1337                IndexedDBError::Transaction("transaction stream ended during commit".to_string())
1338            })?;
1339        match resp.msg {
1340            Some(pb::transaction_server_message::Msg::Commit(commit)) => {
1341                map_rpc_status(commit.error)
1342            }
1343            _ => Err(IndexedDBError::Transaction(
1344                "expected transaction commit response".to_string(),
1345            )),
1346        }
1347    }
1348
1349    /// Aborts the transaction. Aborting an already finished transaction is a no-op.
1350    pub async fn abort(&mut self, reason: &str) -> Result<(), IndexedDBError> {
1351        if self.closed {
1352            return Ok(());
1353        }
1354        let tx = self.tx.as_ref().ok_or_else(|| {
1355            IndexedDBError::Transaction("transaction is already finished".to_string())
1356        })?;
1357        tx.send(pb::TransactionClientMessage {
1358            msg: Some(pb::transaction_client_message::Msg::Abort(
1359                pb::TransactionAbortRequest {
1360                    reason: reason.to_string(),
1361                },
1362            )),
1363        })
1364        .await
1365        .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1366        self.closed = true;
1367        self.tx.take();
1368
1369        let resp = self
1370            .stream
1371            .message()
1372            .await
1373            .map_err(map_status)?
1374            .ok_or_else(|| {
1375                IndexedDBError::Transaction("transaction stream ended during abort".to_string())
1376            })?;
1377        match resp.msg {
1378            Some(pb::transaction_server_message::Msg::Abort(abort)) => map_rpc_status(abort.error),
1379            _ => Err(IndexedDBError::Transaction(
1380                "expected transaction abort response".to_string(),
1381            )),
1382        }
1383    }
1384
1385    async fn send_operation(
1386        &mut self,
1387        operation: pb::transaction_operation::Operation,
1388    ) -> Result<pb::TransactionOperationResponse, IndexedDBError> {
1389        self.ensure_open()?;
1390        self.request_id += 1;
1391        let request_id = self.request_id;
1392        let tx = self.tx.as_ref().ok_or_else(|| {
1393            IndexedDBError::Transaction("transaction is already finished".to_string())
1394        })?;
1395        tx.send(pb::TransactionClientMessage {
1396            msg: Some(pb::transaction_client_message::Msg::Operation(
1397                pb::TransactionOperation {
1398                    request_id,
1399                    operation: Some(operation),
1400                },
1401            )),
1402        })
1403        .await
1404        .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1405
1406        let resp = self
1407            .stream
1408            .message()
1409            .await
1410            .map_err(map_status)?
1411            .ok_or_else(|| {
1412                IndexedDBError::Transaction("transaction stream ended during operation".to_string())
1413            })?;
1414        let op = match resp.msg {
1415            Some(pb::transaction_server_message::Msg::Operation(op)) => op,
1416            _ => {
1417                self.close_locally();
1418                return Err(IndexedDBError::Transaction(
1419                    "expected transaction operation response".to_string(),
1420                ));
1421            }
1422        };
1423        if op.request_id != request_id {
1424            self.close_locally();
1425            return Err(IndexedDBError::Transaction(
1426                "transaction response request id mismatch".to_string(),
1427            ));
1428        }
1429        if let Err(err) = map_rpc_status(op.error.clone()) {
1430            self.close_locally();
1431            return Err(err);
1432        }
1433        Ok(op)
1434    }
1435
1436    fn ensure_open(&self) -> Result<(), IndexedDBError> {
1437        if self.closed {
1438            return Err(IndexedDBError::Transaction(
1439                "transaction is already finished".to_string(),
1440            ));
1441        }
1442        Ok(())
1443    }
1444
1445    fn close_locally(&mut self) {
1446        self.closed = true;
1447        self.tx.take();
1448    }
1449}
1450
1451/// Object-store operations scoped to an explicit transaction.
1452pub struct TransactionObjectStore<'a> {
1453    tx: &'a mut Transaction,
1454    store: String,
1455}
1456
1457impl TransactionObjectStore<'_> {
1458    /// Loads one record by primary key inside the transaction.
1459    pub async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
1460        let resp = self
1461            .tx
1462            .send_operation(pb::transaction_operation::Operation::Get(
1463                pb::ObjectStoreRequest {
1464                    store: self.store.clone(),
1465                    id: id.to_string(),
1466                },
1467            ))
1468            .await?;
1469        match resp.result {
1470            Some(pb::transaction_operation_response::Result::Record(record)) => Ok(record
1471                .record
1472                .as_ref()
1473                .map(pb_record_to_record)
1474                .unwrap_or_default()),
1475            _ => Err(unexpected_transaction_result()),
1476        }
1477    }
1478
1479    /// Resolves the primary key for id inside the transaction.
1480    pub async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
1481        let resp = self
1482            .tx
1483            .send_operation(pb::transaction_operation::Operation::GetKey(
1484                pb::ObjectStoreRequest {
1485                    store: self.store.clone(),
1486                    id: id.to_string(),
1487                },
1488            ))
1489            .await?;
1490        match resp.result {
1491            Some(pb::transaction_operation_response::Result::Key(key)) => Ok(key.key),
1492            _ => Err(unexpected_transaction_result()),
1493        }
1494    }
1495
1496    /// Inserts a new row inside the transaction.
1497    pub async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
1498        self.tx
1499            .send_operation(pb::transaction_operation::Operation::Add(
1500                pb::RecordRequest {
1501                    store: self.store.clone(),
1502                    record: Some(record_to_pb_record(record)),
1503                },
1504            ))
1505            .await?;
1506        Ok(())
1507    }
1508
1509    /// Upserts a row inside the transaction.
1510    pub async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
1511        self.tx
1512            .send_operation(pb::transaction_operation::Operation::Put(
1513                pb::RecordRequest {
1514                    store: self.store.clone(),
1515                    record: Some(record_to_pb_record(record)),
1516                },
1517            ))
1518            .await?;
1519        Ok(())
1520    }
1521
1522    /// Deletes one row inside the transaction.
1523    pub async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
1524        self.tx
1525            .send_operation(pb::transaction_operation::Operation::Delete(
1526                pb::ObjectStoreRequest {
1527                    store: self.store.clone(),
1528                    id: id.to_string(),
1529                },
1530            ))
1531            .await?;
1532        Ok(())
1533    }
1534
1535    /// Deletes every row in the object store inside the transaction.
1536    pub async fn clear(&mut self) -> Result<(), IndexedDBError> {
1537        self.tx
1538            .send_operation(pb::transaction_operation::Operation::Clear(
1539                pb::ObjectStoreNameRequest {
1540                    store: self.store.clone(),
1541                },
1542            ))
1543            .await?;
1544        Ok(())
1545    }
1546
1547    /// Loads every row that matches range inside the transaction.
1548    pub async fn get_all(
1549        &mut self,
1550        range: Option<KeyRange>,
1551    ) -> Result<Vec<Record>, IndexedDBError> {
1552        let resp = self
1553            .tx
1554            .send_operation(pb::transaction_operation::Operation::GetAll(
1555                pb::ObjectStoreRangeRequest {
1556                    store: self.store.clone(),
1557                    range: range.map(key_range_to_pb),
1558                },
1559            ))
1560            .await?;
1561        match resp.result {
1562            Some(pb::transaction_operation_response::Result::Records(records)) => {
1563                Ok(records.records.iter().map(pb_record_to_record).collect())
1564            }
1565            _ => Err(unexpected_transaction_result()),
1566        }
1567    }
1568
1569    /// Loads every primary key that matches range inside the transaction.
1570    pub async fn get_all_keys(
1571        &mut self,
1572        range: Option<KeyRange>,
1573    ) -> Result<Vec<String>, IndexedDBError> {
1574        let resp = self
1575            .tx
1576            .send_operation(pb::transaction_operation::Operation::GetAllKeys(
1577                pb::ObjectStoreRangeRequest {
1578                    store: self.store.clone(),
1579                    range: range.map(key_range_to_pb),
1580                },
1581            ))
1582            .await?;
1583        match resp.result {
1584            Some(pb::transaction_operation_response::Result::Keys(keys)) => Ok(keys.keys),
1585            _ => Err(unexpected_transaction_result()),
1586        }
1587    }
1588
1589    /// Counts rows that match range inside the transaction.
1590    pub async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
1591        let resp = self
1592            .tx
1593            .send_operation(pb::transaction_operation::Operation::Count(
1594                pb::ObjectStoreRangeRequest {
1595                    store: self.store.clone(),
1596                    range: range.map(key_range_to_pb),
1597                },
1598            ))
1599            .await?;
1600        match resp.result {
1601            Some(pb::transaction_operation_response::Result::Count(count)) => Ok(count.count),
1602            _ => Err(unexpected_transaction_result()),
1603        }
1604    }
1605
1606    /// Deletes rows that match range inside the transaction.
1607    pub async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
1608        let resp = self
1609            .tx
1610            .send_operation(pb::transaction_operation::Operation::DeleteRange(
1611                pb::ObjectStoreRangeRequest {
1612                    store: self.store.clone(),
1613                    range: Some(key_range_to_pb(range)),
1614                },
1615            ))
1616            .await?;
1617        match resp.result {
1618            Some(pb::transaction_operation_response::Result::Delete(deleted)) => {
1619                Ok(deleted.deleted)
1620            }
1621            _ => Err(unexpected_transaction_result()),
1622        }
1623    }
1624
1625    /// Returns a transaction-scoped secondary index.
1626    pub fn index<'a>(&'a mut self, name: &str) -> TransactionIndex<'a> {
1627        TransactionIndex {
1628            tx: &mut *self.tx,
1629            store: self.store.clone(),
1630            index: name.to_string(),
1631        }
1632    }
1633}
1634
1635/// Secondary-index operations scoped to an explicit transaction.
1636pub struct TransactionIndex<'a> {
1637    tx: &'a mut Transaction,
1638    store: String,
1639    index: String,
1640}
1641
1642impl TransactionIndex<'_> {
1643    /// Loads the first row that matches values inside the transaction.
1644    pub async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
1645        let resp = self
1646            .tx
1647            .send_operation(pb::transaction_operation::Operation::IndexGet(
1648                self.index_request(values, None),
1649            ))
1650            .await?;
1651        match resp.result {
1652            Some(pb::transaction_operation_response::Result::Record(record)) => Ok(record
1653                .record
1654                .as_ref()
1655                .map(pb_record_to_record)
1656                .unwrap_or_default()),
1657            _ => Err(unexpected_transaction_result()),
1658        }
1659    }
1660
1661    /// Resolves the primary key for the first matching row inside the transaction.
1662    pub async fn get_key(
1663        &mut self,
1664        values: &[serde_json::Value],
1665    ) -> Result<String, IndexedDBError> {
1666        let resp = self
1667            .tx
1668            .send_operation(pb::transaction_operation::Operation::IndexGetKey(
1669                self.index_request(values, None),
1670            ))
1671            .await?;
1672        match resp.result {
1673            Some(pb::transaction_operation_response::Result::Key(key)) => Ok(key.key),
1674            _ => Err(unexpected_transaction_result()),
1675        }
1676    }
1677
1678    /// Loads every row that matches values and range inside the transaction.
1679    pub async fn get_all(
1680        &mut self,
1681        values: &[serde_json::Value],
1682        range: Option<KeyRange>,
1683    ) -> Result<Vec<Record>, IndexedDBError> {
1684        let resp = self
1685            .tx
1686            .send_operation(pb::transaction_operation::Operation::IndexGetAll(
1687                self.index_request(values, range),
1688            ))
1689            .await?;
1690        match resp.result {
1691            Some(pb::transaction_operation_response::Result::Records(records)) => {
1692                Ok(records.records.iter().map(pb_record_to_record).collect())
1693            }
1694            _ => Err(unexpected_transaction_result()),
1695        }
1696    }
1697
1698    /// Loads every primary key that matches values and range inside the transaction.
1699    pub async fn get_all_keys(
1700        &mut self,
1701        values: &[serde_json::Value],
1702        range: Option<KeyRange>,
1703    ) -> Result<Vec<String>, IndexedDBError> {
1704        let resp = self
1705            .tx
1706            .send_operation(pb::transaction_operation::Operation::IndexGetAllKeys(
1707                self.index_request(values, range),
1708            ))
1709            .await?;
1710        match resp.result {
1711            Some(pb::transaction_operation_response::Result::Keys(keys)) => Ok(keys.keys),
1712            _ => Err(unexpected_transaction_result()),
1713        }
1714    }
1715
1716    /// Counts rows that match values and range inside the transaction.
1717    pub async fn count(
1718        &mut self,
1719        values: &[serde_json::Value],
1720        range: Option<KeyRange>,
1721    ) -> Result<i64, IndexedDBError> {
1722        let resp = self
1723            .tx
1724            .send_operation(pb::transaction_operation::Operation::IndexCount(
1725                self.index_request(values, range),
1726            ))
1727            .await?;
1728        match resp.result {
1729            Some(pb::transaction_operation_response::Result::Count(count)) => Ok(count.count),
1730            _ => Err(unexpected_transaction_result()),
1731        }
1732    }
1733
1734    /// Deletes rows that match values inside the transaction.
1735    pub async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
1736        let resp = self
1737            .tx
1738            .send_operation(pb::transaction_operation::Operation::IndexDelete(
1739                self.index_request(values, None),
1740            ))
1741            .await?;
1742        match resp.result {
1743            Some(pb::transaction_operation_response::Result::Delete(deleted)) => {
1744                Ok(deleted.deleted)
1745            }
1746            _ => Err(unexpected_transaction_result()),
1747        }
1748    }
1749
1750    /// Deletes rows that match values and range inside the transaction.
1751    pub async fn delete_range(
1752        &mut self,
1753        values: &[serde_json::Value],
1754        range: KeyRange,
1755    ) -> Result<i64, IndexedDBError> {
1756        let resp = self
1757            .tx
1758            .send_operation(pb::transaction_operation::Operation::IndexDelete(
1759                self.index_request(values, Some(range)),
1760            ))
1761            .await?;
1762        match resp.result {
1763            Some(pb::transaction_operation_response::Result::Delete(deleted)) => {
1764                Ok(deleted.deleted)
1765            }
1766            _ => Err(unexpected_transaction_result()),
1767        }
1768    }
1769
1770    fn index_request(
1771        &self,
1772        values: &[serde_json::Value],
1773        range: Option<KeyRange>,
1774    ) -> pb::IndexQueryRequest {
1775        pb::IndexQueryRequest {
1776            store: self.store.clone(),
1777            index: self.index.clone(),
1778            values: values.iter().map(json_to_typed_value).collect(),
1779            range: range.map(key_range_to_pb),
1780        }
1781    }
1782}
1783
1784enum IndexedDBTarget {
1785    Unix(String),
1786    Tcp(String),
1787    Tls(String),
1788}
1789
1790fn parse_indexeddb_target(raw_target: &str) -> Result<IndexedDBTarget, IndexedDBError> {
1791    let target = raw_target.trim();
1792    if target.is_empty() {
1793        return Err(IndexedDBError::Env(
1794            "IndexedDB transport target is required".to_string(),
1795        ));
1796    }
1797    if let Some(address) = target.strip_prefix("tcp://") {
1798        let address = address.trim();
1799        if address.is_empty() {
1800            return Err(IndexedDBError::Env(format!(
1801                "IndexedDB tcp target {raw_target:?} is missing host:port"
1802            )));
1803        }
1804        return Ok(IndexedDBTarget::Tcp(address.to_string()));
1805    }
1806    if let Some(address) = target.strip_prefix("tls://") {
1807        let address = address.trim();
1808        if address.is_empty() {
1809            return Err(IndexedDBError::Env(format!(
1810                "IndexedDB tls target {raw_target:?} is missing host:port"
1811            )));
1812        }
1813        return Ok(IndexedDBTarget::Tls(address.to_string()));
1814    }
1815    if let Some(path) = target.strip_prefix("unix://") {
1816        let path = path.trim();
1817        if path.is_empty() {
1818            return Err(IndexedDBError::Env(format!(
1819                "IndexedDB unix target {raw_target:?} is missing a socket path"
1820            )));
1821        }
1822        return Ok(IndexedDBTarget::Unix(path.to_string()));
1823    }
1824    if target.contains("://") {
1825        let scheme = target.split("://").next().unwrap_or_default();
1826        return Err(IndexedDBError::Env(format!(
1827            "unsupported IndexedDB target scheme {scheme:?}"
1828        )));
1829    }
1830    Ok(IndexedDBTarget::Unix(target.to_string()))
1831}
1832
1833/// CRUD, range-query, and cursor access for one object store.
1834pub struct ObjectStore {
1835    client: IndexedDbClient<IndexedDbTransport>,
1836    store: String,
1837}
1838
1839impl ObjectStore {
1840    /// Loads one record by primary key.
1841    pub async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
1842        let resp = self
1843            .client
1844            .get(pb::ObjectStoreRequest {
1845                store: self.store.clone(),
1846                id: id.to_string(),
1847            })
1848            .await
1849            .map_err(map_status)?;
1850        Ok(resp
1851            .into_inner()
1852            .record
1853            .as_ref()
1854            .map(pb_record_to_record)
1855            .unwrap_or_default())
1856    }
1857
1858    /// Resolves the primary key for id.
1859    pub async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
1860        let resp = self
1861            .client
1862            .get_key(pb::ObjectStoreRequest {
1863                store: self.store.clone(),
1864                id: id.to_string(),
1865            })
1866            .await
1867            .map_err(map_status)?;
1868        Ok(resp.into_inner().key)
1869    }
1870
1871    /// Inserts a new row and fails if the key already exists.
1872    pub async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
1873        self.client
1874            .add(pb::RecordRequest {
1875                store: self.store.clone(),
1876                record: Some(record_to_pb_record(record)),
1877            })
1878            .await
1879            .map_err(map_status)?;
1880        Ok(())
1881    }
1882
1883    /// Upserts a row by primary key.
1884    pub async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
1885        self.client
1886            .put(pb::RecordRequest {
1887                store: self.store.clone(),
1888                record: Some(record_to_pb_record(record)),
1889            })
1890            .await
1891            .map_err(map_status)?;
1892        Ok(())
1893    }
1894
1895    /// Deletes one row by primary key.
1896    pub async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
1897        self.client
1898            .delete(pb::ObjectStoreRequest {
1899                store: self.store.clone(),
1900                id: id.to_string(),
1901            })
1902            .await
1903            .map_err(map_status)?;
1904        Ok(())
1905    }
1906
1907    /// Deletes every row in the object store.
1908    pub async fn clear(&mut self) -> Result<(), IndexedDBError> {
1909        self.client
1910            .clear(pb::ObjectStoreNameRequest {
1911                store: self.store.clone(),
1912            })
1913            .await
1914            .map_err(map_status)?;
1915        Ok(())
1916    }
1917
1918    /// Loads every row that matches range.
1919    pub async fn get_all(
1920        &mut self,
1921        range: Option<KeyRange>,
1922    ) -> Result<Vec<Record>, IndexedDBError> {
1923        let resp = self
1924            .client
1925            .get_all(pb::ObjectStoreRangeRequest {
1926                store: self.store.clone(),
1927                range: range.map(key_range_to_pb),
1928            })
1929            .await
1930            .map_err(map_status)?;
1931        Ok(resp
1932            .into_inner()
1933            .records
1934            .iter()
1935            .map(pb_record_to_record)
1936            .collect())
1937    }
1938
1939    /// Loads every primary key that matches range.
1940    pub async fn get_all_keys(
1941        &mut self,
1942        range: Option<KeyRange>,
1943    ) -> Result<Vec<String>, IndexedDBError> {
1944        let resp = self
1945            .client
1946            .get_all_keys(pb::ObjectStoreRangeRequest {
1947                store: self.store.clone(),
1948                range: range.map(key_range_to_pb),
1949            })
1950            .await
1951            .map_err(map_status)?;
1952        Ok(resp.into_inner().keys)
1953    }
1954
1955    /// Counts rows that match range.
1956    pub async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
1957        let resp = self
1958            .client
1959            .count(pb::ObjectStoreRangeRequest {
1960                store: self.store.clone(),
1961                range: range.map(key_range_to_pb),
1962            })
1963            .await
1964            .map_err(map_status)?;
1965        Ok(resp.into_inner().count)
1966    }
1967
1968    /// Deletes rows that match range and returns the delete count.
1969    pub async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
1970        let resp = self
1971            .client
1972            .delete_range(pb::ObjectStoreRangeRequest {
1973                store: self.store.clone(),
1974                range: Some(key_range_to_pb(range)),
1975            })
1976            .await
1977            .map_err(map_status)?;
1978        Ok(resp.into_inner().deleted)
1979    }
1980
1981    /// Returns a typed handle for one secondary index.
1982    pub fn index(&self, name: &str) -> Index {
1983        Index {
1984            client: self.client.clone(),
1985            store: self.store.clone(),
1986            index: name.to_string(),
1987        }
1988    }
1989
1990    /// Opens a full-value cursor over the object store.
1991    pub async fn open_cursor(
1992        &mut self,
1993        range: Option<KeyRange>,
1994        direction: CursorDirection,
1995    ) -> Result<Cursor, IndexedDBError> {
1996        let req = pb::OpenCursorRequest {
1997            store: self.store.clone(),
1998            range: range.map(key_range_to_pb),
1999            direction: direction.to_proto(),
2000            keys_only: false,
2001            index: String::new(),
2002            values: vec![],
2003        };
2004        open_cursor_inner(&mut self.client, req).await
2005    }
2006
2007    /// Opens a key-only cursor over the object store.
2008    pub async fn open_key_cursor(
2009        &mut self,
2010        range: Option<KeyRange>,
2011        direction: CursorDirection,
2012    ) -> Result<Cursor, IndexedDBError> {
2013        let req = pb::OpenCursorRequest {
2014            store: self.store.clone(),
2015            range: range.map(key_range_to_pb),
2016            direction: direction.to_proto(),
2017            keys_only: true,
2018            index: String::new(),
2019            values: vec![],
2020        };
2021        open_cursor_inner(&mut self.client, req).await
2022    }
2023}
2024
2025/// Lookup and cursor access through one secondary index.
2026pub struct Index {
2027    client: IndexedDbClient<IndexedDbTransport>,
2028    store: String,
2029    index: String,
2030}
2031
2032impl Index {
2033    /// Loads the first row that matches values.
2034    pub async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
2035        let resp = self
2036            .client
2037            .index_get(pb::IndexQueryRequest {
2038                store: self.store.clone(),
2039                index: self.index.clone(),
2040                values: values.iter().map(json_to_typed_value).collect(),
2041                range: None,
2042            })
2043            .await
2044            .map_err(map_status)?;
2045        Ok(resp
2046            .into_inner()
2047            .record
2048            .as_ref()
2049            .map(pb_record_to_record)
2050            .unwrap_or_default())
2051    }
2052
2053    /// Resolves the primary key for the first row that matches values.
2054    pub async fn get_key(
2055        &mut self,
2056        values: &[serde_json::Value],
2057    ) -> Result<String, IndexedDBError> {
2058        let resp = self
2059            .client
2060            .index_get_key(pb::IndexQueryRequest {
2061                store: self.store.clone(),
2062                index: self.index.clone(),
2063                values: values.iter().map(json_to_typed_value).collect(),
2064                range: None,
2065            })
2066            .await
2067            .map_err(map_status)?;
2068        Ok(resp.into_inner().key)
2069    }
2070
2071    /// Loads every row that matches values and range.
2072    pub async fn get_all(
2073        &mut self,
2074        values: &[serde_json::Value],
2075        range: Option<KeyRange>,
2076    ) -> Result<Vec<Record>, IndexedDBError> {
2077        let resp = self
2078            .client
2079            .index_get_all(pb::IndexQueryRequest {
2080                store: self.store.clone(),
2081                index: self.index.clone(),
2082                values: values.iter().map(json_to_typed_value).collect(),
2083                range: range.map(key_range_to_pb),
2084            })
2085            .await
2086            .map_err(map_status)?;
2087        Ok(resp
2088            .into_inner()
2089            .records
2090            .iter()
2091            .map(pb_record_to_record)
2092            .collect())
2093    }
2094
2095    /// Loads every primary key that matches values and range.
2096    pub async fn get_all_keys(
2097        &mut self,
2098        values: &[serde_json::Value],
2099        range: Option<KeyRange>,
2100    ) -> Result<Vec<String>, IndexedDBError> {
2101        let resp = self
2102            .client
2103            .index_get_all_keys(pb::IndexQueryRequest {
2104                store: self.store.clone(),
2105                index: self.index.clone(),
2106                values: values.iter().map(json_to_typed_value).collect(),
2107                range: range.map(key_range_to_pb),
2108            })
2109            .await
2110            .map_err(map_status)?;
2111        Ok(resp.into_inner().keys)
2112    }
2113
2114    /// Counts rows that match values and range.
2115    pub async fn count(
2116        &mut self,
2117        values: &[serde_json::Value],
2118        range: Option<KeyRange>,
2119    ) -> Result<i64, IndexedDBError> {
2120        let resp = self
2121            .client
2122            .index_count(pb::IndexQueryRequest {
2123                store: self.store.clone(),
2124                index: self.index.clone(),
2125                values: values.iter().map(json_to_typed_value).collect(),
2126                range: range.map(key_range_to_pb),
2127            })
2128            .await
2129            .map_err(map_status)?;
2130        Ok(resp.into_inner().count)
2131    }
2132
2133    /// Deletes rows that match values and returns the delete count.
2134    pub async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
2135        let resp = self
2136            .client
2137            .index_delete(pb::IndexQueryRequest {
2138                store: self.store.clone(),
2139                index: self.index.clone(),
2140                values: values.iter().map(json_to_typed_value).collect(),
2141                range: None,
2142            })
2143            .await
2144            .map_err(map_status)?;
2145        Ok(resp.into_inner().deleted)
2146    }
2147
2148    /// Deletes rows that match values and range and returns the delete count.
2149    pub async fn delete_range(
2150        &mut self,
2151        values: &[serde_json::Value],
2152        range: KeyRange,
2153    ) -> Result<i64, IndexedDBError> {
2154        let resp = self
2155            .client
2156            .index_delete(pb::IndexQueryRequest {
2157                store: self.store.clone(),
2158                index: self.index.clone(),
2159                values: values.iter().map(json_to_typed_value).collect(),
2160                range: Some(key_range_to_pb(range)),
2161            })
2162            .await
2163            .map_err(map_status)?;
2164        Ok(resp.into_inner().deleted)
2165    }
2166
2167    /// Opens a full-value cursor over the secondary index.
2168    pub async fn open_cursor(
2169        &mut self,
2170        values: &[serde_json::Value],
2171        range: Option<KeyRange>,
2172        direction: CursorDirection,
2173    ) -> Result<Cursor, IndexedDBError> {
2174        let req = pb::OpenCursorRequest {
2175            store: self.store.clone(),
2176            range: range.map(key_range_to_pb),
2177            direction: direction.to_proto(),
2178            keys_only: false,
2179            index: self.index.clone(),
2180            values: values.iter().map(json_to_typed_value).collect(),
2181        };
2182        open_cursor_inner(&mut self.client, req).await
2183    }
2184
2185    /// Opens a key-only cursor over the secondary index.
2186    pub async fn open_key_cursor(
2187        &mut self,
2188        values: &[serde_json::Value],
2189        range: Option<KeyRange>,
2190        direction: CursorDirection,
2191    ) -> Result<Cursor, IndexedDBError> {
2192        let req = pb::OpenCursorRequest {
2193            store: self.store.clone(),
2194            range: range.map(key_range_to_pb),
2195            direction: direction.to_proto(),
2196            keys_only: true,
2197            index: self.index.clone(),
2198            values: values.iter().map(json_to_typed_value).collect(),
2199        };
2200        open_cursor_inner(&mut self.client, req).await
2201    }
2202}
2203
2204#[async_trait]
2205impl IndexedDBApi for IndexedDB {
2206    type ObjectStore = ObjectStore;
2207    type Transaction = Transaction;
2208
2209    async fn create_object_store(
2210        &mut self,
2211        name: &str,
2212        schema: ObjectStoreSchema,
2213    ) -> Result<ObjectStore, IndexedDBError> {
2214        IndexedDB::create_object_store(self, name, schema).await
2215    }
2216
2217    async fn delete_object_store(&mut self, name: &str) -> Result<(), IndexedDBError> {
2218        IndexedDB::delete_object_store(self, name).await
2219    }
2220
2221    fn object_store(&self, name: &str) -> ObjectStore {
2222        IndexedDB::object_store(self, name)
2223    }
2224
2225    async fn transaction(
2226        &self,
2227        stores: &[&str],
2228        mode: TransactionMode,
2229        options: TransactionOptions,
2230    ) -> Result<Transaction, IndexedDBError> {
2231        IndexedDB::transaction(self, stores, mode, options).await
2232    }
2233}
2234
2235#[async_trait]
2236impl ObjectStoreApi for ObjectStore {
2237    type Index = Index;
2238    type Cursor = Cursor;
2239
2240    async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
2241        ObjectStore::get(self, id).await
2242    }
2243
2244    async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
2245        ObjectStore::get_key(self, id).await
2246    }
2247
2248    async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
2249        ObjectStore::add(self, record).await
2250    }
2251
2252    async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
2253        ObjectStore::put(self, record).await
2254    }
2255
2256    async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
2257        ObjectStore::delete(self, id).await
2258    }
2259
2260    async fn clear(&mut self) -> Result<(), IndexedDBError> {
2261        ObjectStore::clear(self).await
2262    }
2263
2264    async fn get_all(&mut self, range: Option<KeyRange>) -> Result<Vec<Record>, IndexedDBError> {
2265        ObjectStore::get_all(self, range).await
2266    }
2267
2268    async fn get_all_keys(
2269        &mut self,
2270        range: Option<KeyRange>,
2271    ) -> Result<Vec<String>, IndexedDBError> {
2272        ObjectStore::get_all_keys(self, range).await
2273    }
2274
2275    async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
2276        ObjectStore::count(self, range).await
2277    }
2278
2279    async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
2280        ObjectStore::delete_range(self, range).await
2281    }
2282
2283    fn index(&self, name: &str) -> Index {
2284        ObjectStore::index(self, name)
2285    }
2286
2287    async fn open_cursor(
2288        &mut self,
2289        range: Option<KeyRange>,
2290        direction: CursorDirection,
2291    ) -> Result<Cursor, IndexedDBError> {
2292        ObjectStore::open_cursor(self, range, direction).await
2293    }
2294
2295    async fn open_key_cursor(
2296        &mut self,
2297        range: Option<KeyRange>,
2298        direction: CursorDirection,
2299    ) -> Result<Cursor, IndexedDBError> {
2300        ObjectStore::open_key_cursor(self, range, direction).await
2301    }
2302}
2303
2304#[async_trait]
2305impl IndexApi for Index {
2306    type Cursor = Cursor;
2307
2308    async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
2309        Index::get(self, values).await
2310    }
2311
2312    async fn get_key(&mut self, values: &[serde_json::Value]) -> Result<String, IndexedDBError> {
2313        Index::get_key(self, values).await
2314    }
2315
2316    async fn get_all(
2317        &mut self,
2318        values: &[serde_json::Value],
2319        range: Option<KeyRange>,
2320    ) -> Result<Vec<Record>, IndexedDBError> {
2321        Index::get_all(self, values, range).await
2322    }
2323
2324    async fn get_all_keys(
2325        &mut self,
2326        values: &[serde_json::Value],
2327        range: Option<KeyRange>,
2328    ) -> Result<Vec<String>, IndexedDBError> {
2329        Index::get_all_keys(self, values, range).await
2330    }
2331
2332    async fn count(
2333        &mut self,
2334        values: &[serde_json::Value],
2335        range: Option<KeyRange>,
2336    ) -> Result<i64, IndexedDBError> {
2337        Index::count(self, values, range).await
2338    }
2339
2340    async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
2341        Index::delete(self, values).await
2342    }
2343
2344    async fn delete_range(
2345        &mut self,
2346        values: &[serde_json::Value],
2347        range: KeyRange,
2348    ) -> Result<i64, IndexedDBError> {
2349        Index::delete_range(self, values, range).await
2350    }
2351
2352    async fn open_cursor(
2353        &mut self,
2354        values: &[serde_json::Value],
2355        range: Option<KeyRange>,
2356        direction: CursorDirection,
2357    ) -> Result<Cursor, IndexedDBError> {
2358        Index::open_cursor(self, values, range, direction).await
2359    }
2360
2361    async fn open_key_cursor(
2362        &mut self,
2363        values: &[serde_json::Value],
2364        range: Option<KeyRange>,
2365        direction: CursorDirection,
2366    ) -> Result<Cursor, IndexedDBError> {
2367        Index::open_key_cursor(self, values, range, direction).await
2368    }
2369}
2370
2371#[async_trait]
2372impl TransactionApi for Transaction {
2373    type ObjectStore<'a> = TransactionObjectStore<'a>;
2374
2375    fn object_store<'a>(&'a mut self, name: &str) -> TransactionObjectStore<'a> {
2376        Transaction::object_store(self, name)
2377    }
2378
2379    async fn commit(&mut self) -> Result<(), IndexedDBError> {
2380        Transaction::commit(self).await
2381    }
2382
2383    async fn abort(&mut self, reason: &str) -> Result<(), IndexedDBError> {
2384        Transaction::abort(self, reason).await
2385    }
2386}
2387
2388#[async_trait]
2389impl<'tx> TransactionObjectStoreApi for TransactionObjectStore<'tx> {
2390    type Index<'a>
2391        = TransactionIndex<'a>
2392    where
2393        Self: 'a;
2394
2395    async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
2396        TransactionObjectStore::get(self, id).await
2397    }
2398
2399    async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
2400        TransactionObjectStore::get_key(self, id).await
2401    }
2402
2403    async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
2404        TransactionObjectStore::add(self, record).await
2405    }
2406
2407    async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
2408        TransactionObjectStore::put(self, record).await
2409    }
2410
2411    async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
2412        TransactionObjectStore::delete(self, id).await
2413    }
2414
2415    async fn clear(&mut self) -> Result<(), IndexedDBError> {
2416        TransactionObjectStore::clear(self).await
2417    }
2418
2419    async fn get_all(&mut self, range: Option<KeyRange>) -> Result<Vec<Record>, IndexedDBError> {
2420        TransactionObjectStore::get_all(self, range).await
2421    }
2422
2423    async fn get_all_keys(
2424        &mut self,
2425        range: Option<KeyRange>,
2426    ) -> Result<Vec<String>, IndexedDBError> {
2427        TransactionObjectStore::get_all_keys(self, range).await
2428    }
2429
2430    async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
2431        TransactionObjectStore::count(self, range).await
2432    }
2433
2434    async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
2435        TransactionObjectStore::delete_range(self, range).await
2436    }
2437
2438    fn index<'a>(&'a mut self, name: &str) -> TransactionIndex<'a> {
2439        TransactionObjectStore::index(self, name)
2440    }
2441}
2442
2443#[async_trait]
2444impl TransactionIndexApi for TransactionIndex<'_> {
2445    async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
2446        TransactionIndex::get(self, values).await
2447    }
2448
2449    async fn get_key(&mut self, values: &[serde_json::Value]) -> Result<String, IndexedDBError> {
2450        TransactionIndex::get_key(self, values).await
2451    }
2452
2453    async fn get_all(
2454        &mut self,
2455        values: &[serde_json::Value],
2456        range: Option<KeyRange>,
2457    ) -> Result<Vec<Record>, IndexedDBError> {
2458        TransactionIndex::get_all(self, values, range).await
2459    }
2460
2461    async fn get_all_keys(
2462        &mut self,
2463        values: &[serde_json::Value],
2464        range: Option<KeyRange>,
2465    ) -> Result<Vec<String>, IndexedDBError> {
2466        TransactionIndex::get_all_keys(self, values, range).await
2467    }
2468
2469    async fn count(
2470        &mut self,
2471        values: &[serde_json::Value],
2472        range: Option<KeyRange>,
2473    ) -> Result<i64, IndexedDBError> {
2474        TransactionIndex::count(self, values, range).await
2475    }
2476
2477    async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
2478        TransactionIndex::delete(self, values).await
2479    }
2480
2481    async fn delete_range(
2482        &mut self,
2483        values: &[serde_json::Value],
2484        range: KeyRange,
2485    ) -> Result<i64, IndexedDBError> {
2486        TransactionIndex::delete_range(self, values, range).await
2487    }
2488}
2489
2490#[async_trait]
2491impl CursorApi for Cursor {
2492    fn key(&self) -> Option<serde_json::Value> {
2493        Cursor::key(self)
2494    }
2495
2496    fn primary_key(&self) -> &str {
2497        Cursor::primary_key(self)
2498    }
2499
2500    fn value(&self) -> Result<Record, IndexedDBError> {
2501        Cursor::value(self)
2502    }
2503
2504    async fn continue_next(&mut self) -> Result<bool, IndexedDBError> {
2505        Cursor::continue_next(self).await
2506    }
2507
2508    async fn continue_to_key(&mut self, key: serde_json::Value) -> Result<bool, IndexedDBError> {
2509        Cursor::continue_to_key(self, key).await
2510    }
2511
2512    async fn advance(&mut self, count: i32) -> Result<bool, IndexedDBError> {
2513        Cursor::advance(self, count).await
2514    }
2515
2516    async fn delete(&mut self) -> Result<(), IndexedDBError> {
2517        Cursor::delete(self).await
2518    }
2519
2520    async fn update(&mut self, value: Record) -> Result<(), IndexedDBError> {
2521        Cursor::update(self, value).await
2522    }
2523
2524    async fn close(self) -> Result<(), IndexedDBError> {
2525        Cursor::close(self).await
2526    }
2527}
2528
2529fn map_status(err: tonic::Status) -> IndexedDBError {
2530    match err.code() {
2531        tonic::Code::NotFound => IndexedDBError::NotFound,
2532        tonic::Code::AlreadyExists => IndexedDBError::AlreadyExists,
2533        tonic::Code::InvalidArgument => IndexedDBError::InvalidArgument(err.message().to_string()),
2534        tonic::Code::FailedPrecondition => IndexedDBError::Transaction(err.message().to_string()),
2535        _ => IndexedDBError::Status(err),
2536    }
2537}
2538
2539fn map_rpc_status(
2540    status: Option<crate::generated::google::rpc::Status>,
2541) -> Result<(), IndexedDBError> {
2542    let Some(status) = status else {
2543        return Ok(());
2544    };
2545    match status.code {
2546        0 => Ok(()),
2547        5 => Err(IndexedDBError::NotFound),
2548        6 => Err(IndexedDBError::AlreadyExists),
2549        3 => Err(IndexedDBError::InvalidArgument(status.message)),
2550        9 => Err(IndexedDBError::Transaction(status.message)),
2551        _ => Err(IndexedDBError::Transaction(status.message)),
2552    }
2553}
2554
2555fn unexpected_transaction_result() -> IndexedDBError {
2556    IndexedDBError::Transaction("unexpected transaction operation result".to_string())
2557}
2558
2559fn record_to_pb_record(record: Record) -> pb::Record {
2560    pb::Record {
2561        fields: record
2562            .into_iter()
2563            .map(|(k, v)| (k, json_to_typed_value(&v)))
2564            .collect(),
2565    }
2566}
2567
2568fn pb_record_to_record(r: &pb::Record) -> Record {
2569    r.fields
2570        .iter()
2571        .map(|(k, v)| (k.clone(), typed_value_to_json(v)))
2572        .collect()
2573}
2574
2575fn json_to_typed_value(v: &serde_json::Value) -> pb::TypedValue {
2576    use pb::typed_value::Kind;
2577    let kind = match v {
2578        serde_json::Value::Null => Kind::NullValue(0),
2579        serde_json::Value::Bool(b) => Kind::BoolValue(*b),
2580        serde_json::Value::Number(n) => {
2581            if let Some(i) = n.as_i64() {
2582                Kind::IntValue(i)
2583            } else {
2584                Kind::FloatValue(n.as_f64().unwrap_or(0.0))
2585            }
2586        }
2587        serde_json::Value::String(s) => Kind::StringValue(s.clone()),
2588        serde_json::Value::Array(arr) => {
2589            let values = arr.iter().map(json_to_prost_value).collect();
2590            Kind::JsonValue(prost_types::Value {
2591                kind: Some(prost_types::value::Kind::ListValue(
2592                    prost_types::ListValue { values },
2593                )),
2594            })
2595        }
2596        serde_json::Value::Object(obj) => {
2597            let fields = obj
2598                .iter()
2599                .map(|(k, v)| (k.clone(), json_to_prost_value(v)))
2600                .collect();
2601            Kind::JsonValue(prost_types::Value {
2602                kind: Some(prost_types::value::Kind::StructValue(prost_types::Struct {
2603                    fields,
2604                })),
2605            })
2606        }
2607    };
2608    pb::TypedValue { kind: Some(kind) }
2609}
2610
2611fn prost_value_to_json(v: &prost_types::Value) -> serde_json::Value {
2612    use prost_types::value::Kind;
2613    match &v.kind {
2614        Some(Kind::NullValue(_)) => serde_json::Value::Null,
2615        Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
2616        Some(Kind::NumberValue(n)) => serde_json::json!(*n),
2617        Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
2618        Some(Kind::ListValue(list)) => {
2619            serde_json::Value::Array(list.values.iter().map(prost_value_to_json).collect())
2620        }
2621        Some(Kind::StructValue(st)) => {
2622            let obj: serde_json::Map<String, serde_json::Value> = st
2623                .fields
2624                .iter()
2625                .map(|(k, v)| (k.clone(), prost_value_to_json(v)))
2626                .collect();
2627            serde_json::Value::Object(obj)
2628        }
2629        None => serde_json::Value::Null,
2630    }
2631}
2632
2633fn json_to_prost_value(v: &serde_json::Value) -> prost_types::Value {
2634    use prost_types::value::Kind;
2635    let kind = match v {
2636        serde_json::Value::Null => Kind::NullValue(0),
2637        serde_json::Value::Bool(b) => Kind::BoolValue(*b),
2638        serde_json::Value::Number(n) => Kind::NumberValue(n.as_f64().unwrap_or(0.0)),
2639        serde_json::Value::String(s) => Kind::StringValue(s.clone()),
2640        serde_json::Value::Array(arr) => {
2641            let values = arr.iter().map(json_to_prost_value).collect();
2642            Kind::ListValue(prost_types::ListValue { values })
2643        }
2644        serde_json::Value::Object(obj) => {
2645            let fields = obj
2646                .iter()
2647                .map(|(k, v)| (k.clone(), json_to_prost_value(v)))
2648                .collect();
2649            Kind::StructValue(prost_types::Struct { fields })
2650        }
2651    };
2652    prost_types::Value { kind: Some(kind) }
2653}
2654
2655fn key_value_to_json(kv: &pb::KeyValue) -> serde_json::Value {
2656    match &kv.kind {
2657        Some(pb::key_value::Kind::Scalar(tv)) => typed_value_to_json(tv),
2658        Some(pb::key_value::Kind::Array(arr)) => {
2659            serde_json::Value::Array(arr.elements.iter().map(key_value_to_json).collect())
2660        }
2661        None => serde_json::Value::Null,
2662    }
2663}
2664
2665fn json_to_key_value(v: &serde_json::Value) -> pb::KeyValue {
2666    if let serde_json::Value::Array(arr) = v {
2667        pb::KeyValue {
2668            kind: Some(pb::key_value::Kind::Array(pb::KeyValueArray {
2669                elements: arr.iter().map(json_to_key_value).collect(),
2670            })),
2671        }
2672    } else {
2673        pb::KeyValue {
2674            kind: Some(pb::key_value::Kind::Scalar(json_to_typed_value(v))),
2675        }
2676    }
2677}
2678
2679fn cursor_key_to_proto(key: &serde_json::Value, index_cursor: bool) -> Vec<pb::KeyValue> {
2680    if index_cursor {
2681        if let serde_json::Value::Array(parts) = key {
2682            return parts.iter().map(json_to_key_value).collect();
2683        }
2684    }
2685    vec![json_to_key_value(key)]
2686}
2687
2688fn typed_value_to_json(v: &pb::TypedValue) -> serde_json::Value {
2689    use pb::typed_value::Kind;
2690    match &v.kind {
2691        Some(Kind::NullValue(_)) => serde_json::Value::Null,
2692        Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
2693        Some(Kind::IntValue(i)) => serde_json::json!(*i),
2694        Some(Kind::FloatValue(f)) => serde_json::json!(*f),
2695        Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
2696        Some(Kind::BytesValue(b)) => serde_json::json!(b),
2697        Some(Kind::JsonValue(pv)) => prost_value_to_json(pv),
2698        Some(Kind::TimeValue(ts)) => {
2699            serde_json::Value::String(format!("{}.{}", ts.seconds, ts.nanos))
2700        }
2701        None => serde_json::Value::Null,
2702    }
2703}
2704
2705fn key_range_to_pb(kr: KeyRange) -> pb::KeyRange {
2706    pb::KeyRange {
2707        lower: kr.lower.map(|v| json_to_typed_value(&v)),
2708        upper: kr.upper.map(|v| json_to_typed_value(&v)),
2709        lower_open: kr.lower_open,
2710        upper_open: kr.upper_open,
2711    }
2712}
2713fn relay_token_interceptor(
2714    token: &str,
2715    binding: &str,
2716) -> Result<RelayTokenInterceptor, IndexedDBError> {
2717    let relay_token = if token.trim().is_empty() {
2718        None
2719    } else {
2720        Some(MetadataValue::try_from(token.to_string()).map_err(|err| {
2721            IndexedDBError::Env(format!("invalid IndexedDB relay token metadata: {err}"))
2722        })?)
2723    };
2724    let binding = if binding.trim().is_empty() {
2725        None
2726    } else {
2727        Some(
2728            MetadataValue::try_from(binding.trim().to_string()).map_err(|err| {
2729                IndexedDBError::Env(format!("invalid IndexedDB binding metadata: {err}"))
2730            })?,
2731        )
2732    };
2733    Ok(RelayTokenInterceptor {
2734        relay_token,
2735        binding,
2736    })
2737}
2738
2739#[derive(Clone)]
2740struct RelayTokenInterceptor {
2741    relay_token: Option<MetadataValue<tonic::metadata::Ascii>>,
2742    binding: Option<MetadataValue<tonic::metadata::Ascii>>,
2743}
2744
2745impl Interceptor for RelayTokenInterceptor {
2746    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, tonic::Status> {
2747        if let Some(header) = self.relay_token.clone() {
2748            request
2749                .metadata_mut()
2750                .insert(INDEXEDDB_RELAY_TOKEN_HEADER, header);
2751        }
2752        if let Some(header) = self.binding.clone() {
2753            request
2754                .metadata_mut()
2755                .insert(HOST_SERVICE_BINDING_HEADER, header);
2756        }
2757        Ok(request)
2758    }
2759}