Skip to main content

gestalt/
indexeddb.rs

1use std::collections::BTreeMap;
2
3use hyper_util::rt::TokioIo;
4use tokio::sync::mpsc;
5use tokio_stream::wrappers::ReceiverStream;
6use tonic::Request;
7use tonic::metadata::MetadataValue;
8use tonic::service::Interceptor;
9use tonic::service::interceptor::InterceptedService;
10use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
11use tower::service_fn;
12
13use crate::generated::v1::{self as pb, indexed_db_client::IndexedDbClient};
14
15type IndexedDbTransport = InterceptedService<Channel, RelayTokenInterceptor>;
16
17/// Default Unix-socket environment variable used by [`IndexedDB::connect`].
18pub const ENV_INDEXEDDB_SOCKET: &str = "GESTALT_INDEXEDDB_SOCKET";
19/// Suffix added to named IndexedDB socket variables for relay-token variables.
20pub const ENV_INDEXEDDB_SOCKET_TOKEN_SUFFIX: &str = "_TOKEN";
21const INDEXEDDB_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
22
23const CURSOR_CHANNEL_BUFFER: usize = 1;
24const TRANSACTION_CHANNEL_BUFFER: usize = 1;
25
26#[derive(Debug, thiserror::Error)]
27/// Errors returned by the IndexedDB transport client.
28pub enum IndexedDBError {
29    /// The requested record, object store, index, or cursor entry was missing.
30    #[error("not found")]
31    NotFound,
32    /// A create operation conflicted with an existing value.
33    #[error("already exists")]
34    AlreadyExists,
35    /// A cursor was opened in key-only mode and a value was requested.
36    #[error("cursor is keys-only; value not available")]
37    KeysOnly,
38    /// An explicit transaction failed or was already closed.
39    #[error("{0}")]
40    Transaction(String),
41    /// The host-service transport could not be created.
42    #[error("{0}")]
43    Transport(#[from] tonic::transport::Error),
44    /// The host-service RPC returned a gRPC status.
45    #[error("{0}")]
46    Status(#[from] tonic::Status),
47    /// Required environment or target configuration was invalid.
48    #[error("{0}")]
49    Env(String),
50}
51
52/// JSON-like value stored in an object store row.
53pub type Record = BTreeMap<String, serde_json::Value>;
54
55/// Constrains a query or cursor by lower and upper bounds.
56pub struct KeyRange {
57    /// Lower bound, inclusive unless `lower_open` is true.
58    pub lower: Option<serde_json::Value>,
59    /// Upper bound, inclusive unless `upper_open` is true.
60    pub upper: Option<serde_json::Value>,
61    /// Whether the lower bound is exclusive.
62    pub lower_open: bool,
63    /// Whether the upper bound is exclusive.
64    pub upper_open: bool,
65}
66
67/// Describes one secondary index on an object store.
68pub struct IndexSchema {
69    /// Index name.
70    pub name: String,
71    /// Record path used as the index key.
72    pub key_path: Vec<String>,
73    /// Whether the index enforces uniqueness.
74    pub unique: bool,
75}
76
77/// Describes the indexes attached to an object store.
78pub struct ObjectStoreSchema {
79    /// Secondary indexes to create with the object store.
80    pub indexes: Vec<IndexSchema>,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84/// Controls cursor traversal order.
85pub enum CursorDirection {
86    /// Iterate in ascending key order.
87    Next,
88    /// Iterate in ascending key order while collapsing duplicate index keys.
89    NextUnique,
90    /// Iterate in descending key order.
91    Prev,
92    /// Iterate in descending key order while collapsing duplicate index keys.
93    PrevUnique,
94}
95
96impl CursorDirection {
97    fn to_proto(self) -> i32 {
98        match self {
99            Self::Next => pb::CursorDirection::CursorNext as i32,
100            Self::NextUnique => pb::CursorDirection::CursorNextUnique as i32,
101            Self::Prev => pb::CursorDirection::CursorPrev as i32,
102            Self::PrevUnique => pb::CursorDirection::CursorPrevUnique as i32,
103        }
104    }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108/// Controls whether an explicit transaction may mutate scoped stores.
109pub enum TransactionMode {
110    /// Transaction may only read from scoped object stores.
111    Readonly,
112    /// Transaction may read and write scoped object stores.
113    Readwrite,
114}
115
116impl TransactionMode {
117    fn to_proto(self) -> i32 {
118        match self {
119            Self::Readonly => pb::TransactionMode::TransactionReadonly as i32,
120            Self::Readwrite => pb::TransactionMode::TransactionReadwrite as i32,
121        }
122    }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
126/// Provider durability hint for explicit transactions.
127pub enum TransactionDurabilityHint {
128    /// Let the host choose its default durability behavior.
129    #[default]
130    Default,
131    /// Prefer stricter durability.
132    Strict,
133    /// Prefer relaxed durability.
134    Relaxed,
135}
136
137impl TransactionDurabilityHint {
138    fn to_proto(self) -> i32 {
139        match self {
140            Self::Default => pb::TransactionDurabilityHint::TransactionDurabilityDefault as i32,
141            Self::Strict => pb::TransactionDurabilityHint::TransactionDurabilityStrict as i32,
142            Self::Relaxed => pb::TransactionDurabilityHint::TransactionDurabilityRelaxed as i32,
143        }
144    }
145}
146
147#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
148/// Options for an explicit transaction.
149pub struct TransactionOptions {
150    /// Durability hint for explicit transactions.
151    pub durability_hint: TransactionDurabilityHint,
152}
153
154/// Streaming cursor over object store or secondary index rows.
155pub struct Cursor {
156    tx: mpsc::Sender<pb::CursorClientMessage>,
157    stream: tonic::Streaming<pb::CursorResponse>,
158    keys_only: bool,
159    index_cursor: bool,
160    entry: Option<pb::CursorEntry>,
161    done: bool,
162}
163
164impl Cursor {
165    /// Returns the current cursor key.
166    pub fn key(&self) -> Option<serde_json::Value> {
167        let entry = self.entry.as_ref()?;
168        match entry.key.len() {
169            0 => None,
170            1 if !self.index_cursor => Some(key_value_to_json(&entry.key[0])),
171            _ => Some(serde_json::Value::Array(
172                entry.key.iter().map(key_value_to_json).collect(),
173            )),
174        }
175    }
176
177    /// Returns the current row's primary key.
178    pub fn primary_key(&self) -> &str {
179        self.entry
180            .as_ref()
181            .map(|e| e.primary_key.as_str())
182            .unwrap_or("")
183    }
184
185    /// Returns the current row value.
186    pub fn value(&self) -> Result<Record, IndexedDBError> {
187        if self.keys_only {
188            return Err(IndexedDBError::KeysOnly);
189        }
190        let entry = self.entry.as_ref().ok_or(IndexedDBError::NotFound)?;
191        Ok(entry
192            .record
193            .as_ref()
194            .map(pb_record_to_record)
195            .unwrap_or_default())
196    }
197
198    /// Advances the cursor by one row.
199    pub async fn continue_next(&mut self) -> Result<bool, IndexedDBError> {
200        let cmd = pb::cursor_command::Command::Next(true);
201        self.send_and_recv(cmd).await
202    }
203
204    /// Advances the cursor to key, or exhausts it if key does not exist.
205    pub async fn continue_to_key(
206        &mut self,
207        key: serde_json::Value,
208    ) -> Result<bool, IndexedDBError> {
209        let cmd = pb::cursor_command::Command::ContinueToKey(pb::CursorKeyTarget {
210            key: cursor_key_to_proto(&key, self.index_cursor),
211        });
212        self.send_and_recv(cmd).await
213    }
214
215    /// Skips count rows ahead.
216    pub async fn advance(&mut self, count: i32) -> Result<bool, IndexedDBError> {
217        let cmd = pb::cursor_command::Command::Advance(count);
218        self.send_and_recv(cmd).await
219    }
220
221    /// Deletes the current row and keeps the cursor open.
222    pub async fn delete(&mut self) -> Result<(), IndexedDBError> {
223        if self.done {
224            return Err(IndexedDBError::NotFound);
225        }
226        let cmd = pb::cursor_command::Command::Delete(true);
227        self.send_mutation(cmd).await
228    }
229
230    /// Replaces the current row and keeps the cursor open.
231    pub async fn update(&mut self, value: Record) -> Result<(), IndexedDBError> {
232        if self.done {
233            return Err(IndexedDBError::NotFound);
234        }
235        let cmd = pb::cursor_command::Command::Update(record_to_pb_record(value));
236        self.send_mutation(cmd).await
237    }
238
239    /// Closes the cursor stream and releases its transport resources.
240    pub async fn close(self) -> Result<(), IndexedDBError> {
241        let msg = pb::CursorClientMessage {
242            msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
243                command: Some(pb::cursor_command::Command::Close(true)),
244            })),
245        };
246        self.tx
247            .send(msg)
248            .await
249            .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
250        Ok(())
251    }
252
253    async fn send_mutation(
254        &mut self,
255        cmd: pb::cursor_command::Command,
256    ) -> Result<(), IndexedDBError> {
257        let msg = pb::CursorClientMessage {
258            msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
259                command: Some(cmd),
260            })),
261        };
262        self.tx
263            .send(msg)
264            .await
265            .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
266        // Read ack -- if it contains an entry, update cursor state.
267        let resp = self
268            .stream
269            .message()
270            .await
271            .map_err(map_status)?
272            .ok_or_else(|| {
273                IndexedDBError::Status(tonic::Status::internal(
274                    "cursor stream ended during mutation",
275                ))
276            })?;
277        match resp.result {
278            Some(pb::cursor_response::Result::Entry(entry)) => {
279                self.entry = Some(entry);
280            }
281            Some(pb::cursor_response::Result::Done(_)) => {}
282            None => {
283                return Err(IndexedDBError::Status(tonic::Status::internal(
284                    "unexpected cursor mutation ack",
285                )));
286            }
287        }
288        Ok(())
289    }
290
291    async fn send_and_recv(
292        &mut self,
293        cmd: pb::cursor_command::Command,
294    ) -> Result<bool, IndexedDBError> {
295        if self.done {
296            return Ok(false);
297        }
298        let msg = pb::CursorClientMessage {
299            msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
300                command: Some(cmd),
301            })),
302        };
303        self.tx
304            .send(msg)
305            .await
306            .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
307
308        let resp = self
309            .stream
310            .message()
311            .await
312            .map_err(map_status)?
313            .ok_or_else(|| {
314                IndexedDBError::Status(tonic::Status::internal("cursor stream ended"))
315            })?;
316
317        match resp.result {
318            Some(pb::cursor_response::Result::Entry(entry)) => {
319                self.entry = Some(entry);
320                self.done = false;
321                Ok(true)
322            }
323            Some(pb::cursor_response::Result::Done(exhausted)) => {
324                if exhausted {
325                    self.done = true;
326                }
327                self.entry = None;
328                Ok(false)
329            }
330            None => {
331                self.entry = None;
332                self.done = true;
333                Ok(false)
334            }
335        }
336    }
337}
338
339async fn open_cursor_inner(
340    client: &mut IndexedDbClient<IndexedDbTransport>,
341    req: pb::OpenCursorRequest,
342) -> Result<Cursor, IndexedDBError> {
343    let keys_only = req.keys_only;
344    let is_index = !req.index.is_empty();
345    let (tx, rx) = mpsc::channel::<pb::CursorClientMessage>(CURSOR_CHANNEL_BUFFER);
346
347    let open_msg = pb::CursorClientMessage {
348        msg: Some(pb::cursor_client_message::Msg::Open(req)),
349    };
350    tx.send(open_msg)
351        .await
352        .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
353
354    let receiver_stream = ReceiverStream::new(rx);
355    let mut stream = client
356        .open_cursor(receiver_stream)
357        .await
358        .map_err(map_status)?
359        .into_inner();
360
361    // Read the open ack to surface creation errors synchronously.
362    let ack = stream.message().await.map_err(map_status)?.ok_or_else(|| {
363        IndexedDBError::Status(tonic::Status::internal("cursor stream ended during open"))
364    })?;
365    match ack.result {
366        Some(pb::cursor_response::Result::Done(false)) => {}
367        Some(pb::cursor_response::Result::Done(true)) => {
368            return Err(IndexedDBError::Status(tonic::Status::internal(
369                "unexpected exhausted cursor open ack",
370            )));
371        }
372        _ => {
373            return Err(IndexedDBError::Status(tonic::Status::internal(
374                "unexpected cursor open ack",
375            )));
376        }
377    }
378
379    Ok(Cursor {
380        tx,
381        stream,
382        keys_only,
383        entry: None,
384        done: false,
385        index_cursor: is_index,
386    })
387}
388
389/// Client for a running IndexedDB provider.
390pub struct IndexedDB {
391    client: IndexedDbClient<IndexedDbTransport>,
392}
393
394impl IndexedDB {
395    /// Connects to the default IndexedDB transport socket.
396    pub async fn connect() -> Result<Self, IndexedDBError> {
397        Self::connect_named("").await
398    }
399
400    /// Connects to a named IndexedDB transport socket.
401    pub async fn connect_named(name: &str) -> Result<Self, IndexedDBError> {
402        let env_name = indexeddb_socket_env(name);
403        let target = std::env::var(&env_name)
404            .map_err(|_| IndexedDBError::Env(format!("{env_name} is not set")))?;
405        let token = std::env::var(indexeddb_socket_token_env(name)).unwrap_or_default();
406        let channel = match parse_indexeddb_target(&target)? {
407            IndexedDBTarget::Unix(path) => {
408                Endpoint::try_from("http://[::]:50051")?
409                    .connect_with_connector(service_fn(move |_: Uri| {
410                        let path = path.clone();
411                        async move {
412                            tokio::net::UnixStream::connect(path)
413                                .await
414                                .map(TokioIo::new)
415                        }
416                    }))
417                    .await?
418            }
419            IndexedDBTarget::Tcp(address) => {
420                Endpoint::from_shared(format!("http://{address}"))?
421                    .connect()
422                    .await?
423            }
424            IndexedDBTarget::Tls(address) => {
425                Endpoint::from_shared(format!("https://{address}"))?
426                    .tls_config(ClientTlsConfig::new().with_native_roots())?
427                    .connect()
428                    .await?
429            }
430        };
431
432        let client =
433            IndexedDbClient::with_interceptor(channel, relay_token_interceptor(token.trim())?);
434
435        Ok(Self { client })
436    }
437
438    /// Creates a named object store.
439    pub async fn create_object_store(
440        &mut self,
441        name: &str,
442        schema: ObjectStoreSchema,
443    ) -> Result<(), IndexedDBError> {
444        let indexes = schema
445            .indexes
446            .into_iter()
447            .map(|idx| pb::IndexSchema {
448                name: idx.name,
449                key_path: idx.key_path,
450                unique: idx.unique,
451            })
452            .collect();
453        self.client
454            .create_object_store(pb::CreateObjectStoreRequest {
455                name: name.to_string(),
456                schema: Some(pb::ObjectStoreSchema {
457                    indexes,
458                    columns: vec![],
459                }),
460            })
461            .await
462            .map_err(map_status)?;
463        Ok(())
464    }
465
466    /// Deletes a named object store.
467    pub async fn delete_object_store(&mut self, name: &str) -> Result<(), IndexedDBError> {
468        self.client
469            .delete_object_store(pb::DeleteObjectStoreRequest {
470                name: name.to_string(),
471            })
472            .await
473            .map_err(map_status)?;
474        Ok(())
475    }
476
477    /// Returns a typed handle for one object store.
478    pub fn object_store(&self, name: &str) -> ObjectStore {
479        ObjectStore {
480            client: self.client.clone(),
481            store: name.to_string(),
482        }
483    }
484
485    /// Opens an explicit transaction over a fixed object-store scope.
486    pub async fn transaction(
487        &self,
488        stores: &[&str],
489        mode: TransactionMode,
490        options: TransactionOptions,
491    ) -> Result<Transaction, IndexedDBError> {
492        let (tx, rx) = mpsc::channel::<pb::TransactionClientMessage>(TRANSACTION_CHANNEL_BUFFER);
493        tx.send(pb::TransactionClientMessage {
494            msg: Some(pb::transaction_client_message::Msg::Begin(
495                pb::BeginTransactionRequest {
496                    stores: stores.iter().map(|store| store.to_string()).collect(),
497                    mode: mode.to_proto(),
498                    durability_hint: options.durability_hint.to_proto(),
499                },
500            )),
501        })
502        .await
503        .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
504
505        let receiver_stream = ReceiverStream::new(rx);
506        let mut client = self.client.clone();
507        let mut stream = client
508            .transaction(receiver_stream)
509            .await
510            .map_err(map_status)?
511            .into_inner();
512
513        let ack = stream.message().await.map_err(map_status)?.ok_or_else(|| {
514            IndexedDBError::Transaction("transaction stream ended during begin".to_string())
515        })?;
516        match ack.msg {
517            Some(pb::transaction_server_message::Msg::Begin(_)) => {}
518            _ => {
519                return Err(IndexedDBError::Transaction(
520                    "expected transaction begin response".to_string(),
521                ));
522            }
523        }
524
525        Ok(Transaction {
526            tx: Some(tx),
527            stream,
528            request_id: 0,
529            closed: false,
530        })
531    }
532}
533
534/// Explicit transaction over one or more object stores.
535pub struct Transaction {
536    tx: Option<mpsc::Sender<pb::TransactionClientMessage>>,
537    stream: tonic::Streaming<pb::TransactionServerMessage>,
538    request_id: u64,
539    closed: bool,
540}
541
542impl Transaction {
543    /// Returns a transaction-scoped object store.
544    pub fn object_store<'a>(&'a mut self, name: &str) -> TransactionObjectStore<'a> {
545        TransactionObjectStore {
546            tx: self,
547            store: name.to_string(),
548        }
549    }
550
551    /// Commits the transaction.
552    pub async fn commit(&mut self) -> Result<(), IndexedDBError> {
553        self.ensure_open()?;
554        let tx = self.tx.as_ref().ok_or_else(|| {
555            IndexedDBError::Transaction("transaction is already finished".to_string())
556        })?;
557        tx.send(pb::TransactionClientMessage {
558            msg: Some(pb::transaction_client_message::Msg::Commit(
559                pb::TransactionCommitRequest {},
560            )),
561        })
562        .await
563        .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
564        self.closed = true;
565        self.tx.take();
566
567        let resp = self
568            .stream
569            .message()
570            .await
571            .map_err(map_status)?
572            .ok_or_else(|| {
573                IndexedDBError::Transaction("transaction stream ended during commit".to_string())
574            })?;
575        match resp.msg {
576            Some(pb::transaction_server_message::Msg::Commit(commit)) => {
577                map_rpc_status(commit.error)
578            }
579            _ => Err(IndexedDBError::Transaction(
580                "expected transaction commit response".to_string(),
581            )),
582        }
583    }
584
585    /// Aborts the transaction. Aborting an already finished transaction is a no-op.
586    pub async fn abort(&mut self, reason: &str) -> Result<(), IndexedDBError> {
587        if self.closed {
588            return Ok(());
589        }
590        let tx = self.tx.as_ref().ok_or_else(|| {
591            IndexedDBError::Transaction("transaction is already finished".to_string())
592        })?;
593        tx.send(pb::TransactionClientMessage {
594            msg: Some(pb::transaction_client_message::Msg::Abort(
595                pb::TransactionAbortRequest {
596                    reason: reason.to_string(),
597                },
598            )),
599        })
600        .await
601        .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
602        self.closed = true;
603        self.tx.take();
604
605        let resp = self
606            .stream
607            .message()
608            .await
609            .map_err(map_status)?
610            .ok_or_else(|| {
611                IndexedDBError::Transaction("transaction stream ended during abort".to_string())
612            })?;
613        match resp.msg {
614            Some(pb::transaction_server_message::Msg::Abort(abort)) => map_rpc_status(abort.error),
615            _ => Err(IndexedDBError::Transaction(
616                "expected transaction abort response".to_string(),
617            )),
618        }
619    }
620
621    async fn send_operation(
622        &mut self,
623        operation: pb::transaction_operation::Operation,
624    ) -> Result<pb::TransactionOperationResponse, IndexedDBError> {
625        self.ensure_open()?;
626        self.request_id += 1;
627        let request_id = self.request_id;
628        let tx = self.tx.as_ref().ok_or_else(|| {
629            IndexedDBError::Transaction("transaction is already finished".to_string())
630        })?;
631        tx.send(pb::TransactionClientMessage {
632            msg: Some(pb::transaction_client_message::Msg::Operation(
633                pb::TransactionOperation {
634                    request_id,
635                    operation: Some(operation),
636                },
637            )),
638        })
639        .await
640        .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
641
642        let resp = self
643            .stream
644            .message()
645            .await
646            .map_err(map_status)?
647            .ok_or_else(|| {
648                IndexedDBError::Transaction("transaction stream ended during operation".to_string())
649            })?;
650        let op = match resp.msg {
651            Some(pb::transaction_server_message::Msg::Operation(op)) => op,
652            _ => {
653                self.close_locally();
654                return Err(IndexedDBError::Transaction(
655                    "expected transaction operation response".to_string(),
656                ));
657            }
658        };
659        if op.request_id != request_id {
660            self.close_locally();
661            return Err(IndexedDBError::Transaction(
662                "transaction response request id mismatch".to_string(),
663            ));
664        }
665        if let Err(err) = map_rpc_status(op.error.clone()) {
666            self.close_locally();
667            return Err(err);
668        }
669        Ok(op)
670    }
671
672    fn ensure_open(&self) -> Result<(), IndexedDBError> {
673        if self.closed {
674            return Err(IndexedDBError::Transaction(
675                "transaction is already finished".to_string(),
676            ));
677        }
678        Ok(())
679    }
680
681    fn close_locally(&mut self) {
682        self.closed = true;
683        self.tx.take();
684    }
685}
686
687/// Object-store operations scoped to an explicit transaction.
688pub struct TransactionObjectStore<'a> {
689    tx: &'a mut Transaction,
690    store: String,
691}
692
693impl TransactionObjectStore<'_> {
694    /// Loads one record by primary key inside the transaction.
695    pub async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
696        let resp = self
697            .tx
698            .send_operation(pb::transaction_operation::Operation::Get(
699                pb::ObjectStoreRequest {
700                    store: self.store.clone(),
701                    id: id.to_string(),
702                },
703            ))
704            .await?;
705        match resp.result {
706            Some(pb::transaction_operation_response::Result::Record(record)) => Ok(record
707                .record
708                .as_ref()
709                .map(pb_record_to_record)
710                .unwrap_or_default()),
711            _ => Err(unexpected_transaction_result()),
712        }
713    }
714
715    /// Resolves the primary key for id inside the transaction.
716    pub async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
717        let resp = self
718            .tx
719            .send_operation(pb::transaction_operation::Operation::GetKey(
720                pb::ObjectStoreRequest {
721                    store: self.store.clone(),
722                    id: id.to_string(),
723                },
724            ))
725            .await?;
726        match resp.result {
727            Some(pb::transaction_operation_response::Result::Key(key)) => Ok(key.key),
728            _ => Err(unexpected_transaction_result()),
729        }
730    }
731
732    /// Inserts a new row inside the transaction.
733    pub async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
734        self.tx
735            .send_operation(pb::transaction_operation::Operation::Add(
736                pb::RecordRequest {
737                    store: self.store.clone(),
738                    record: Some(record_to_pb_record(record)),
739                },
740            ))
741            .await?;
742        Ok(())
743    }
744
745    /// Upserts a row inside the transaction.
746    pub async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
747        self.tx
748            .send_operation(pb::transaction_operation::Operation::Put(
749                pb::RecordRequest {
750                    store: self.store.clone(),
751                    record: Some(record_to_pb_record(record)),
752                },
753            ))
754            .await?;
755        Ok(())
756    }
757
758    /// Deletes one row inside the transaction.
759    pub async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
760        self.tx
761            .send_operation(pb::transaction_operation::Operation::Delete(
762                pb::ObjectStoreRequest {
763                    store: self.store.clone(),
764                    id: id.to_string(),
765                },
766            ))
767            .await?;
768        Ok(())
769    }
770
771    /// Deletes every row in the object store inside the transaction.
772    pub async fn clear(&mut self) -> Result<(), IndexedDBError> {
773        self.tx
774            .send_operation(pb::transaction_operation::Operation::Clear(
775                pb::ObjectStoreNameRequest {
776                    store: self.store.clone(),
777                },
778            ))
779            .await?;
780        Ok(())
781    }
782
783    /// Loads every row that matches range inside the transaction.
784    pub async fn get_all(
785        &mut self,
786        range: Option<KeyRange>,
787    ) -> Result<Vec<Record>, IndexedDBError> {
788        let resp = self
789            .tx
790            .send_operation(pb::transaction_operation::Operation::GetAll(
791                pb::ObjectStoreRangeRequest {
792                    store: self.store.clone(),
793                    range: range.map(key_range_to_pb),
794                },
795            ))
796            .await?;
797        match resp.result {
798            Some(pb::transaction_operation_response::Result::Records(records)) => {
799                Ok(records.records.iter().map(pb_record_to_record).collect())
800            }
801            _ => Err(unexpected_transaction_result()),
802        }
803    }
804
805    /// Loads every primary key that matches range inside the transaction.
806    pub async fn get_all_keys(
807        &mut self,
808        range: Option<KeyRange>,
809    ) -> Result<Vec<String>, IndexedDBError> {
810        let resp = self
811            .tx
812            .send_operation(pb::transaction_operation::Operation::GetAllKeys(
813                pb::ObjectStoreRangeRequest {
814                    store: self.store.clone(),
815                    range: range.map(key_range_to_pb),
816                },
817            ))
818            .await?;
819        match resp.result {
820            Some(pb::transaction_operation_response::Result::Keys(keys)) => Ok(keys.keys),
821            _ => Err(unexpected_transaction_result()),
822        }
823    }
824
825    /// Counts rows that match range inside the transaction.
826    pub async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
827        let resp = self
828            .tx
829            .send_operation(pb::transaction_operation::Operation::Count(
830                pb::ObjectStoreRangeRequest {
831                    store: self.store.clone(),
832                    range: range.map(key_range_to_pb),
833                },
834            ))
835            .await?;
836        match resp.result {
837            Some(pb::transaction_operation_response::Result::Count(count)) => Ok(count.count),
838            _ => Err(unexpected_transaction_result()),
839        }
840    }
841
842    /// Deletes rows that match range inside the transaction.
843    pub async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
844        let resp = self
845            .tx
846            .send_operation(pb::transaction_operation::Operation::DeleteRange(
847                pb::ObjectStoreRangeRequest {
848                    store: self.store.clone(),
849                    range: Some(key_range_to_pb(range)),
850                },
851            ))
852            .await?;
853        match resp.result {
854            Some(pb::transaction_operation_response::Result::Delete(deleted)) => {
855                Ok(deleted.deleted)
856            }
857            _ => Err(unexpected_transaction_result()),
858        }
859    }
860
861    /// Returns a transaction-scoped secondary index.
862    pub fn index<'a>(&'a mut self, name: &str) -> TransactionIndexClient<'a> {
863        TransactionIndexClient {
864            tx: &mut *self.tx,
865            store: self.store.clone(),
866            index: name.to_string(),
867        }
868    }
869}
870
871/// Secondary-index operations scoped to an explicit transaction.
872pub struct TransactionIndexClient<'a> {
873    tx: &'a mut Transaction,
874    store: String,
875    index: String,
876}
877
878impl TransactionIndexClient<'_> {
879    /// Loads the first row that matches values inside the transaction.
880    pub async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
881        let resp = self
882            .tx
883            .send_operation(pb::transaction_operation::Operation::IndexGet(
884                self.index_request(values, None),
885            ))
886            .await?;
887        match resp.result {
888            Some(pb::transaction_operation_response::Result::Record(record)) => Ok(record
889                .record
890                .as_ref()
891                .map(pb_record_to_record)
892                .unwrap_or_default()),
893            _ => Err(unexpected_transaction_result()),
894        }
895    }
896
897    /// Resolves the primary key for the first matching row inside the transaction.
898    pub async fn get_key(
899        &mut self,
900        values: &[serde_json::Value],
901    ) -> Result<String, IndexedDBError> {
902        let resp = self
903            .tx
904            .send_operation(pb::transaction_operation::Operation::IndexGetKey(
905                self.index_request(values, None),
906            ))
907            .await?;
908        match resp.result {
909            Some(pb::transaction_operation_response::Result::Key(key)) => Ok(key.key),
910            _ => Err(unexpected_transaction_result()),
911        }
912    }
913
914    /// Loads every row that matches values and range inside the transaction.
915    pub async fn get_all(
916        &mut self,
917        values: &[serde_json::Value],
918        range: Option<KeyRange>,
919    ) -> Result<Vec<Record>, IndexedDBError> {
920        let resp = self
921            .tx
922            .send_operation(pb::transaction_operation::Operation::IndexGetAll(
923                self.index_request(values, range),
924            ))
925            .await?;
926        match resp.result {
927            Some(pb::transaction_operation_response::Result::Records(records)) => {
928                Ok(records.records.iter().map(pb_record_to_record).collect())
929            }
930            _ => Err(unexpected_transaction_result()),
931        }
932    }
933
934    /// Loads every primary key that matches values and range inside the transaction.
935    pub async fn get_all_keys(
936        &mut self,
937        values: &[serde_json::Value],
938        range: Option<KeyRange>,
939    ) -> Result<Vec<String>, IndexedDBError> {
940        let resp = self
941            .tx
942            .send_operation(pb::transaction_operation::Operation::IndexGetAllKeys(
943                self.index_request(values, range),
944            ))
945            .await?;
946        match resp.result {
947            Some(pb::transaction_operation_response::Result::Keys(keys)) => Ok(keys.keys),
948            _ => Err(unexpected_transaction_result()),
949        }
950    }
951
952    /// Counts rows that match values and range inside the transaction.
953    pub async fn count(
954        &mut self,
955        values: &[serde_json::Value],
956        range: Option<KeyRange>,
957    ) -> Result<i64, IndexedDBError> {
958        let resp = self
959            .tx
960            .send_operation(pb::transaction_operation::Operation::IndexCount(
961                self.index_request(values, range),
962            ))
963            .await?;
964        match resp.result {
965            Some(pb::transaction_operation_response::Result::Count(count)) => Ok(count.count),
966            _ => Err(unexpected_transaction_result()),
967        }
968    }
969
970    /// Deletes rows that match values inside the transaction.
971    pub async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
972        let resp = self
973            .tx
974            .send_operation(pb::transaction_operation::Operation::IndexDelete(
975                self.index_request(values, None),
976            ))
977            .await?;
978        match resp.result {
979            Some(pb::transaction_operation_response::Result::Delete(deleted)) => {
980                Ok(deleted.deleted)
981            }
982            _ => Err(unexpected_transaction_result()),
983        }
984    }
985
986    fn index_request(
987        &self,
988        values: &[serde_json::Value],
989        range: Option<KeyRange>,
990    ) -> pb::IndexQueryRequest {
991        pb::IndexQueryRequest {
992            store: self.store.clone(),
993            index: self.index.clone(),
994            values: values.iter().map(json_to_typed_value).collect(),
995            range: range.map(key_range_to_pb),
996        }
997    }
998}
999
1000enum IndexedDBTarget {
1001    Unix(String),
1002    Tcp(String),
1003    Tls(String),
1004}
1005
1006fn parse_indexeddb_target(raw_target: &str) -> Result<IndexedDBTarget, IndexedDBError> {
1007    let target = raw_target.trim();
1008    if target.is_empty() {
1009        return Err(IndexedDBError::Env(
1010            "IndexedDB transport target is required".to_string(),
1011        ));
1012    }
1013    if let Some(address) = target.strip_prefix("tcp://") {
1014        let address = address.trim();
1015        if address.is_empty() {
1016            return Err(IndexedDBError::Env(format!(
1017                "IndexedDB tcp target {raw_target:?} is missing host:port"
1018            )));
1019        }
1020        return Ok(IndexedDBTarget::Tcp(address.to_string()));
1021    }
1022    if let Some(address) = target.strip_prefix("tls://") {
1023        let address = address.trim();
1024        if address.is_empty() {
1025            return Err(IndexedDBError::Env(format!(
1026                "IndexedDB tls target {raw_target:?} is missing host:port"
1027            )));
1028        }
1029        return Ok(IndexedDBTarget::Tls(address.to_string()));
1030    }
1031    if let Some(path) = target.strip_prefix("unix://") {
1032        let path = path.trim();
1033        if path.is_empty() {
1034            return Err(IndexedDBError::Env(format!(
1035                "IndexedDB unix target {raw_target:?} is missing a socket path"
1036            )));
1037        }
1038        return Ok(IndexedDBTarget::Unix(path.to_string()));
1039    }
1040    if target.contains("://") {
1041        let scheme = target.split("://").next().unwrap_or_default();
1042        return Err(IndexedDBError::Env(format!(
1043            "unsupported IndexedDB target scheme {scheme:?}"
1044        )));
1045    }
1046    Ok(IndexedDBTarget::Unix(target.to_string()))
1047}
1048
1049/// CRUD, range-query, and cursor access for one object store.
1050pub struct ObjectStore {
1051    client: IndexedDbClient<IndexedDbTransport>,
1052    store: String,
1053}
1054
1055impl ObjectStore {
1056    /// Loads one record by primary key.
1057    pub async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
1058        let resp = self
1059            .client
1060            .get(pb::ObjectStoreRequest {
1061                store: self.store.clone(),
1062                id: id.to_string(),
1063            })
1064            .await
1065            .map_err(map_status)?;
1066        Ok(resp
1067            .into_inner()
1068            .record
1069            .as_ref()
1070            .map(pb_record_to_record)
1071            .unwrap_or_default())
1072    }
1073
1074    /// Resolves the primary key for id.
1075    pub async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
1076        let resp = self
1077            .client
1078            .get_key(pb::ObjectStoreRequest {
1079                store: self.store.clone(),
1080                id: id.to_string(),
1081            })
1082            .await
1083            .map_err(map_status)?;
1084        Ok(resp.into_inner().key)
1085    }
1086
1087    /// Inserts a new row and fails if the key already exists.
1088    pub async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
1089        self.client
1090            .add(pb::RecordRequest {
1091                store: self.store.clone(),
1092                record: Some(record_to_pb_record(record)),
1093            })
1094            .await
1095            .map_err(map_status)?;
1096        Ok(())
1097    }
1098
1099    /// Upserts a row by primary key.
1100    pub async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
1101        self.client
1102            .put(pb::RecordRequest {
1103                store: self.store.clone(),
1104                record: Some(record_to_pb_record(record)),
1105            })
1106            .await
1107            .map_err(map_status)?;
1108        Ok(())
1109    }
1110
1111    /// Deletes one row by primary key.
1112    pub async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
1113        self.client
1114            .delete(pb::ObjectStoreRequest {
1115                store: self.store.clone(),
1116                id: id.to_string(),
1117            })
1118            .await
1119            .map_err(map_status)?;
1120        Ok(())
1121    }
1122
1123    /// Deletes every row in the object store.
1124    pub async fn clear(&mut self) -> Result<(), IndexedDBError> {
1125        self.client
1126            .clear(pb::ObjectStoreNameRequest {
1127                store: self.store.clone(),
1128            })
1129            .await
1130            .map_err(map_status)?;
1131        Ok(())
1132    }
1133
1134    /// Loads every row that matches range.
1135    pub async fn get_all(
1136        &mut self,
1137        range: Option<KeyRange>,
1138    ) -> Result<Vec<Record>, IndexedDBError> {
1139        let resp = self
1140            .client
1141            .get_all(pb::ObjectStoreRangeRequest {
1142                store: self.store.clone(),
1143                range: range.map(key_range_to_pb),
1144            })
1145            .await
1146            .map_err(map_status)?;
1147        Ok(resp
1148            .into_inner()
1149            .records
1150            .iter()
1151            .map(pb_record_to_record)
1152            .collect())
1153    }
1154
1155    /// Loads every primary key that matches range.
1156    pub async fn get_all_keys(
1157        &mut self,
1158        range: Option<KeyRange>,
1159    ) -> Result<Vec<String>, IndexedDBError> {
1160        let resp = self
1161            .client
1162            .get_all_keys(pb::ObjectStoreRangeRequest {
1163                store: self.store.clone(),
1164                range: range.map(key_range_to_pb),
1165            })
1166            .await
1167            .map_err(map_status)?;
1168        Ok(resp.into_inner().keys)
1169    }
1170
1171    /// Counts rows that match range.
1172    pub async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
1173        let resp = self
1174            .client
1175            .count(pb::ObjectStoreRangeRequest {
1176                store: self.store.clone(),
1177                range: range.map(key_range_to_pb),
1178            })
1179            .await
1180            .map_err(map_status)?;
1181        Ok(resp.into_inner().count)
1182    }
1183
1184    /// Deletes rows that match range and returns the delete count.
1185    pub async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
1186        let resp = self
1187            .client
1188            .delete_range(pb::ObjectStoreRangeRequest {
1189                store: self.store.clone(),
1190                range: Some(key_range_to_pb(range)),
1191            })
1192            .await
1193            .map_err(map_status)?;
1194        Ok(resp.into_inner().deleted)
1195    }
1196
1197    /// Returns a typed handle for one secondary index.
1198    pub fn index(&self, name: &str) -> IndexClient {
1199        IndexClient {
1200            client: self.client.clone(),
1201            store: self.store.clone(),
1202            index: name.to_string(),
1203        }
1204    }
1205
1206    /// Opens a full-value cursor over the object store.
1207    pub async fn open_cursor(
1208        &mut self,
1209        range: Option<KeyRange>,
1210        direction: CursorDirection,
1211    ) -> Result<Cursor, IndexedDBError> {
1212        let req = pb::OpenCursorRequest {
1213            store: self.store.clone(),
1214            range: range.map(key_range_to_pb),
1215            direction: direction.to_proto(),
1216            keys_only: false,
1217            index: String::new(),
1218            values: vec![],
1219        };
1220        open_cursor_inner(&mut self.client, req).await
1221    }
1222
1223    /// Opens a key-only cursor over the object store.
1224    pub async fn open_key_cursor(
1225        &mut self,
1226        range: Option<KeyRange>,
1227        direction: CursorDirection,
1228    ) -> Result<Cursor, IndexedDBError> {
1229        let req = pb::OpenCursorRequest {
1230            store: self.store.clone(),
1231            range: range.map(key_range_to_pb),
1232            direction: direction.to_proto(),
1233            keys_only: true,
1234            index: String::new(),
1235            values: vec![],
1236        };
1237        open_cursor_inner(&mut self.client, req).await
1238    }
1239}
1240
1241/// Lookup and cursor access through one secondary index.
1242pub struct IndexClient {
1243    client: IndexedDbClient<IndexedDbTransport>,
1244    store: String,
1245    index: String,
1246}
1247
1248impl IndexClient {
1249    /// Loads the first row that matches values.
1250    pub async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
1251        let resp = self
1252            .client
1253            .index_get(pb::IndexQueryRequest {
1254                store: self.store.clone(),
1255                index: self.index.clone(),
1256                values: values.iter().map(json_to_typed_value).collect(),
1257                range: None,
1258            })
1259            .await
1260            .map_err(map_status)?;
1261        Ok(resp
1262            .into_inner()
1263            .record
1264            .as_ref()
1265            .map(pb_record_to_record)
1266            .unwrap_or_default())
1267    }
1268
1269    /// Resolves the primary key for the first row that matches values.
1270    pub async fn get_key(
1271        &mut self,
1272        values: &[serde_json::Value],
1273    ) -> Result<String, IndexedDBError> {
1274        let resp = self
1275            .client
1276            .index_get_key(pb::IndexQueryRequest {
1277                store: self.store.clone(),
1278                index: self.index.clone(),
1279                values: values.iter().map(json_to_typed_value).collect(),
1280                range: None,
1281            })
1282            .await
1283            .map_err(map_status)?;
1284        Ok(resp.into_inner().key)
1285    }
1286
1287    /// Loads every row that matches values and range.
1288    pub async fn get_all(
1289        &mut self,
1290        values: &[serde_json::Value],
1291        range: Option<KeyRange>,
1292    ) -> Result<Vec<Record>, IndexedDBError> {
1293        let resp = self
1294            .client
1295            .index_get_all(pb::IndexQueryRequest {
1296                store: self.store.clone(),
1297                index: self.index.clone(),
1298                values: values.iter().map(json_to_typed_value).collect(),
1299                range: range.map(key_range_to_pb),
1300            })
1301            .await
1302            .map_err(map_status)?;
1303        Ok(resp
1304            .into_inner()
1305            .records
1306            .iter()
1307            .map(pb_record_to_record)
1308            .collect())
1309    }
1310
1311    /// Loads every primary key that matches values and range.
1312    pub async fn get_all_keys(
1313        &mut self,
1314        values: &[serde_json::Value],
1315        range: Option<KeyRange>,
1316    ) -> Result<Vec<String>, IndexedDBError> {
1317        let resp = self
1318            .client
1319            .index_get_all_keys(pb::IndexQueryRequest {
1320                store: self.store.clone(),
1321                index: self.index.clone(),
1322                values: values.iter().map(json_to_typed_value).collect(),
1323                range: range.map(key_range_to_pb),
1324            })
1325            .await
1326            .map_err(map_status)?;
1327        Ok(resp.into_inner().keys)
1328    }
1329
1330    /// Counts rows that match values and range.
1331    pub async fn count(
1332        &mut self,
1333        values: &[serde_json::Value],
1334        range: Option<KeyRange>,
1335    ) -> Result<i64, IndexedDBError> {
1336        let resp = self
1337            .client
1338            .index_count(pb::IndexQueryRequest {
1339                store: self.store.clone(),
1340                index: self.index.clone(),
1341                values: values.iter().map(json_to_typed_value).collect(),
1342                range: range.map(key_range_to_pb),
1343            })
1344            .await
1345            .map_err(map_status)?;
1346        Ok(resp.into_inner().count)
1347    }
1348
1349    /// Deletes rows that match values and returns the delete count.
1350    pub async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
1351        let resp = self
1352            .client
1353            .index_delete(pb::IndexQueryRequest {
1354                store: self.store.clone(),
1355                index: self.index.clone(),
1356                values: values.iter().map(json_to_typed_value).collect(),
1357                range: None,
1358            })
1359            .await
1360            .map_err(map_status)?;
1361        Ok(resp.into_inner().deleted)
1362    }
1363
1364    /// Opens a full-value cursor over the secondary index.
1365    pub async fn open_cursor(
1366        &mut self,
1367        values: &[serde_json::Value],
1368        range: Option<KeyRange>,
1369        direction: CursorDirection,
1370    ) -> Result<Cursor, IndexedDBError> {
1371        let req = pb::OpenCursorRequest {
1372            store: self.store.clone(),
1373            range: range.map(key_range_to_pb),
1374            direction: direction.to_proto(),
1375            keys_only: false,
1376            index: self.index.clone(),
1377            values: values.iter().map(json_to_typed_value).collect(),
1378        };
1379        open_cursor_inner(&mut self.client, req).await
1380    }
1381
1382    /// Opens a key-only cursor over the secondary index.
1383    pub async fn open_key_cursor(
1384        &mut self,
1385        values: &[serde_json::Value],
1386        range: Option<KeyRange>,
1387        direction: CursorDirection,
1388    ) -> Result<Cursor, IndexedDBError> {
1389        let req = pb::OpenCursorRequest {
1390            store: self.store.clone(),
1391            range: range.map(key_range_to_pb),
1392            direction: direction.to_proto(),
1393            keys_only: true,
1394            index: self.index.clone(),
1395            values: values.iter().map(json_to_typed_value).collect(),
1396        };
1397        open_cursor_inner(&mut self.client, req).await
1398    }
1399}
1400
1401fn map_status(err: tonic::Status) -> IndexedDBError {
1402    match err.code() {
1403        tonic::Code::NotFound => IndexedDBError::NotFound,
1404        tonic::Code::AlreadyExists => IndexedDBError::AlreadyExists,
1405        _ => IndexedDBError::Status(err),
1406    }
1407}
1408
1409fn map_rpc_status(
1410    status: Option<crate::generated::google::rpc::Status>,
1411) -> Result<(), IndexedDBError> {
1412    let Some(status) = status else {
1413        return Ok(());
1414    };
1415    match status.code {
1416        0 => Ok(()),
1417        5 => Err(IndexedDBError::NotFound),
1418        6 => Err(IndexedDBError::AlreadyExists),
1419        3 | 9 => Err(IndexedDBError::Transaction(status.message)),
1420        _ => Err(IndexedDBError::Transaction(status.message)),
1421    }
1422}
1423
1424fn unexpected_transaction_result() -> IndexedDBError {
1425    IndexedDBError::Transaction("unexpected transaction operation result".to_string())
1426}
1427
1428fn record_to_pb_record(record: Record) -> pb::Record {
1429    pb::Record {
1430        fields: record
1431            .into_iter()
1432            .map(|(k, v)| (k, json_to_typed_value(&v)))
1433            .collect(),
1434    }
1435}
1436
1437fn pb_record_to_record(r: &pb::Record) -> Record {
1438    r.fields
1439        .iter()
1440        .map(|(k, v)| (k.clone(), typed_value_to_json(v)))
1441        .collect()
1442}
1443
1444fn json_to_typed_value(v: &serde_json::Value) -> pb::TypedValue {
1445    use pb::typed_value::Kind;
1446    let kind = match v {
1447        serde_json::Value::Null => Kind::NullValue(0),
1448        serde_json::Value::Bool(b) => Kind::BoolValue(*b),
1449        serde_json::Value::Number(n) => {
1450            if let Some(i) = n.as_i64() {
1451                Kind::IntValue(i)
1452            } else {
1453                Kind::FloatValue(n.as_f64().unwrap_or(0.0))
1454            }
1455        }
1456        serde_json::Value::String(s) => Kind::StringValue(s.clone()),
1457        serde_json::Value::Array(arr) => {
1458            let values = arr.iter().map(json_to_prost_value).collect();
1459            Kind::JsonValue(prost_types::Value {
1460                kind: Some(prost_types::value::Kind::ListValue(
1461                    prost_types::ListValue { values },
1462                )),
1463            })
1464        }
1465        serde_json::Value::Object(obj) => {
1466            let fields = obj
1467                .iter()
1468                .map(|(k, v)| (k.clone(), json_to_prost_value(v)))
1469                .collect();
1470            Kind::JsonValue(prost_types::Value {
1471                kind: Some(prost_types::value::Kind::StructValue(prost_types::Struct {
1472                    fields,
1473                })),
1474            })
1475        }
1476    };
1477    pb::TypedValue { kind: Some(kind) }
1478}
1479
1480fn prost_value_to_json(v: &prost_types::Value) -> serde_json::Value {
1481    use prost_types::value::Kind;
1482    match &v.kind {
1483        Some(Kind::NullValue(_)) => serde_json::Value::Null,
1484        Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
1485        Some(Kind::NumberValue(n)) => serde_json::json!(*n),
1486        Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
1487        Some(Kind::ListValue(list)) => {
1488            serde_json::Value::Array(list.values.iter().map(prost_value_to_json).collect())
1489        }
1490        Some(Kind::StructValue(st)) => {
1491            let obj: serde_json::Map<String, serde_json::Value> = st
1492                .fields
1493                .iter()
1494                .map(|(k, v)| (k.clone(), prost_value_to_json(v)))
1495                .collect();
1496            serde_json::Value::Object(obj)
1497        }
1498        None => serde_json::Value::Null,
1499    }
1500}
1501
1502fn json_to_prost_value(v: &serde_json::Value) -> prost_types::Value {
1503    use prost_types::value::Kind;
1504    let kind = match v {
1505        serde_json::Value::Null => Kind::NullValue(0),
1506        serde_json::Value::Bool(b) => Kind::BoolValue(*b),
1507        serde_json::Value::Number(n) => Kind::NumberValue(n.as_f64().unwrap_or(0.0)),
1508        serde_json::Value::String(s) => Kind::StringValue(s.clone()),
1509        serde_json::Value::Array(arr) => {
1510            let values = arr.iter().map(json_to_prost_value).collect();
1511            Kind::ListValue(prost_types::ListValue { values })
1512        }
1513        serde_json::Value::Object(obj) => {
1514            let fields = obj
1515                .iter()
1516                .map(|(k, v)| (k.clone(), json_to_prost_value(v)))
1517                .collect();
1518            Kind::StructValue(prost_types::Struct { fields })
1519        }
1520    };
1521    prost_types::Value { kind: Some(kind) }
1522}
1523
1524fn key_value_to_json(kv: &pb::KeyValue) -> serde_json::Value {
1525    match &kv.kind {
1526        Some(pb::key_value::Kind::Scalar(tv)) => typed_value_to_json(tv),
1527        Some(pb::key_value::Kind::Array(arr)) => {
1528            serde_json::Value::Array(arr.elements.iter().map(key_value_to_json).collect())
1529        }
1530        None => serde_json::Value::Null,
1531    }
1532}
1533
1534fn json_to_key_value(v: &serde_json::Value) -> pb::KeyValue {
1535    if let serde_json::Value::Array(arr) = v {
1536        pb::KeyValue {
1537            kind: Some(pb::key_value::Kind::Array(pb::KeyValueArray {
1538                elements: arr.iter().map(json_to_key_value).collect(),
1539            })),
1540        }
1541    } else {
1542        pb::KeyValue {
1543            kind: Some(pb::key_value::Kind::Scalar(json_to_typed_value(v))),
1544        }
1545    }
1546}
1547
1548fn cursor_key_to_proto(key: &serde_json::Value, index_cursor: bool) -> Vec<pb::KeyValue> {
1549    if index_cursor {
1550        if let serde_json::Value::Array(parts) = key {
1551            return parts.iter().map(json_to_key_value).collect();
1552        }
1553    }
1554    vec![json_to_key_value(key)]
1555}
1556
1557fn typed_value_to_json(v: &pb::TypedValue) -> serde_json::Value {
1558    use pb::typed_value::Kind;
1559    match &v.kind {
1560        Some(Kind::NullValue(_)) => serde_json::Value::Null,
1561        Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
1562        Some(Kind::IntValue(i)) => serde_json::json!(*i),
1563        Some(Kind::FloatValue(f)) => serde_json::json!(*f),
1564        Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
1565        Some(Kind::BytesValue(b)) => serde_json::json!(b),
1566        Some(Kind::JsonValue(pv)) => prost_value_to_json(pv),
1567        Some(Kind::TimeValue(ts)) => {
1568            serde_json::Value::String(format!("{}.{}", ts.seconds, ts.nanos))
1569        }
1570        None => serde_json::Value::Null,
1571    }
1572}
1573
1574fn key_range_to_pb(kr: KeyRange) -> pb::KeyRange {
1575    pb::KeyRange {
1576        lower: kr.lower.map(|v| json_to_typed_value(&v)),
1577        upper: kr.upper.map(|v| json_to_typed_value(&v)),
1578        lower_open: kr.lower_open,
1579        upper_open: kr.upper_open,
1580    }
1581}
1582/// Returns the environment variable used for a named IndexedDB socket.
1583pub fn indexeddb_socket_env(name: &str) -> String {
1584    let trimmed = name.trim();
1585    if trimmed.is_empty() {
1586        return ENV_INDEXEDDB_SOCKET.to_string();
1587    }
1588    let mut env = String::from(ENV_INDEXEDDB_SOCKET);
1589    env.push('_');
1590    for ch in trimmed.chars() {
1591        if ch.is_ascii_alphanumeric() {
1592            env.push(ch.to_ascii_uppercase());
1593        } else {
1594            env.push('_');
1595        }
1596    }
1597    env
1598}
1599
1600/// Returns the environment variable used for a named IndexedDB relay token.
1601pub fn indexeddb_socket_token_env(name: &str) -> String {
1602    format!(
1603        "{}{}",
1604        indexeddb_socket_env(name),
1605        ENV_INDEXEDDB_SOCKET_TOKEN_SUFFIX
1606    )
1607}
1608
1609fn relay_token_interceptor(token: &str) -> Result<RelayTokenInterceptor, IndexedDBError> {
1610    let header = if token.trim().is_empty() {
1611        None
1612    } else {
1613        Some(MetadataValue::try_from(token.to_string()).map_err(|err| {
1614            IndexedDBError::Env(format!("invalid IndexedDB relay token metadata: {err}"))
1615        })?)
1616    };
1617    Ok(RelayTokenInterceptor { header })
1618}
1619
1620#[derive(Clone)]
1621struct RelayTokenInterceptor {
1622    header: Option<MetadataValue<tonic::metadata::Ascii>>,
1623}
1624
1625impl Interceptor for RelayTokenInterceptor {
1626    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, tonic::Status> {
1627        if let Some(header) = self.header.clone() {
1628            request
1629                .metadata_mut()
1630                .insert(INDEXEDDB_RELAY_TOKEN_HEADER, header);
1631        }
1632        Ok(request)
1633    }
1634}