Skip to main content

gestalt/
indexeddb.rs

1use std::collections::BTreeMap;
2
3use hyper_util::rt::TokioIo;
4use tokio::sync::mpsc;
5use tokio_stream::wrappers::ReceiverStream;
6use tonic::transport::{Channel, Endpoint, Uri};
7use tower::service_fn;
8
9use crate::generated::v1::{self as pb, indexed_db_client::IndexedDbClient};
10
11pub const ENV_INDEXEDDB_SOCKET: &str = "GESTALT_INDEXEDDB_SOCKET";
12
13const CURSOR_CHANNEL_BUFFER: usize = 1;
14
15#[derive(Debug, thiserror::Error)]
16pub enum IndexedDBError {
17    #[error("not found")]
18    NotFound,
19    #[error("already exists")]
20    AlreadyExists,
21    #[error("cursor is keys-only; value not available")]
22    KeysOnly,
23    #[error("{0}")]
24    Transport(#[from] tonic::transport::Error),
25    #[error("{0}")]
26    Status(#[from] tonic::Status),
27    #[error("{0}")]
28    Env(String),
29}
30
31pub type Record = BTreeMap<String, serde_json::Value>;
32
33pub struct KeyRange {
34    pub lower: Option<serde_json::Value>,
35    pub upper: Option<serde_json::Value>,
36    pub lower_open: bool,
37    pub upper_open: bool,
38}
39
40pub struct IndexSchema {
41    pub name: String,
42    pub key_path: Vec<String>,
43    pub unique: bool,
44}
45
46pub struct ObjectStoreSchema {
47    pub indexes: Vec<IndexSchema>,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum CursorDirection {
52    Next,
53    NextUnique,
54    Prev,
55    PrevUnique,
56}
57
58impl CursorDirection {
59    fn to_proto(self) -> i32 {
60        match self {
61            Self::Next => pb::CursorDirection::CursorNext as i32,
62            Self::NextUnique => pb::CursorDirection::CursorNextUnique as i32,
63            Self::Prev => pb::CursorDirection::CursorPrev as i32,
64            Self::PrevUnique => pb::CursorDirection::CursorPrevUnique as i32,
65        }
66    }
67}
68
69pub struct Cursor {
70    tx: mpsc::Sender<pb::CursorClientMessage>,
71    stream: tonic::Streaming<pb::CursorResponse>,
72    keys_only: bool,
73    index_cursor: bool,
74    entry: Option<pb::CursorEntry>,
75    done: bool,
76}
77
78impl Cursor {
79    pub fn key(&self) -> Option<serde_json::Value> {
80        let entry = self.entry.as_ref()?;
81        match entry.key.len() {
82            0 => None,
83            1 if !self.index_cursor => Some(key_value_to_json(&entry.key[0])),
84            _ => Some(serde_json::Value::Array(
85                entry.key.iter().map(key_value_to_json).collect(),
86            )),
87        }
88    }
89
90    pub fn primary_key(&self) -> &str {
91        self.entry
92            .as_ref()
93            .map(|e| e.primary_key.as_str())
94            .unwrap_or("")
95    }
96
97    pub fn value(&self) -> Result<Record, IndexedDBError> {
98        if self.keys_only {
99            return Err(IndexedDBError::KeysOnly);
100        }
101        let entry = self.entry.as_ref().ok_or(IndexedDBError::NotFound)?;
102        Ok(entry
103            .record
104            .as_ref()
105            .map(pb_record_to_record)
106            .unwrap_or_default())
107    }
108
109    pub async fn continue_next(&mut self) -> Result<bool, IndexedDBError> {
110        let cmd = pb::cursor_command::Command::Next(true);
111        self.send_and_recv(cmd).await
112    }
113
114    pub async fn continue_to_key(
115        &mut self,
116        key: serde_json::Value,
117    ) -> Result<bool, IndexedDBError> {
118        let cmd = pb::cursor_command::Command::ContinueToKey(pb::CursorKeyTarget {
119            key: cursor_key_to_proto(&key, self.index_cursor),
120        });
121        self.send_and_recv(cmd).await
122    }
123
124    pub async fn advance(&mut self, count: i32) -> Result<bool, IndexedDBError> {
125        let cmd = pb::cursor_command::Command::Advance(count);
126        self.send_and_recv(cmd).await
127    }
128
129    pub async fn delete(&mut self) -> Result<(), IndexedDBError> {
130        if self.done {
131            return Err(IndexedDBError::NotFound);
132        }
133        let cmd = pb::cursor_command::Command::Delete(true);
134        self.send_mutation(cmd).await
135    }
136
137    pub async fn update(&mut self, value: Record) -> Result<(), IndexedDBError> {
138        if self.done {
139            return Err(IndexedDBError::NotFound);
140        }
141        let cmd = pb::cursor_command::Command::Update(record_to_pb_record(value));
142        self.send_mutation(cmd).await
143    }
144
145    pub async fn close(self) -> Result<(), IndexedDBError> {
146        let msg = pb::CursorClientMessage {
147            msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
148                command: Some(pb::cursor_command::Command::Close(true)),
149            })),
150        };
151        self.tx
152            .send(msg)
153            .await
154            .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
155        Ok(())
156    }
157
158    async fn send_mutation(
159        &mut self,
160        cmd: pb::cursor_command::Command,
161    ) -> Result<(), IndexedDBError> {
162        let msg = pb::CursorClientMessage {
163            msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
164                command: Some(cmd),
165            })),
166        };
167        self.tx
168            .send(msg)
169            .await
170            .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
171        // Read ack -- if it contains an entry, update cursor state.
172        let resp = self
173            .stream
174            .message()
175            .await
176            .map_err(map_status)?
177            .ok_or_else(|| {
178                IndexedDBError::Status(tonic::Status::internal(
179                    "cursor stream ended during mutation",
180                ))
181            })?;
182        match resp.result {
183            Some(pb::cursor_response::Result::Entry(entry)) => {
184                self.entry = Some(entry);
185            }
186            Some(pb::cursor_response::Result::Done(_)) => {}
187            None => {
188                return Err(IndexedDBError::Status(tonic::Status::internal(
189                    "unexpected cursor mutation ack",
190                )));
191            }
192        }
193        Ok(())
194    }
195
196    async fn send_and_recv(
197        &mut self,
198        cmd: pb::cursor_command::Command,
199    ) -> Result<bool, IndexedDBError> {
200        if self.done {
201            return Ok(false);
202        }
203        let msg = pb::CursorClientMessage {
204            msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
205                command: Some(cmd),
206            })),
207        };
208        self.tx
209            .send(msg)
210            .await
211            .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
212
213        let resp = self
214            .stream
215            .message()
216            .await
217            .map_err(map_status)?
218            .ok_or_else(|| {
219                IndexedDBError::Status(tonic::Status::internal("cursor stream ended"))
220            })?;
221
222        match resp.result {
223            Some(pb::cursor_response::Result::Entry(entry)) => {
224                self.entry = Some(entry);
225                self.done = false;
226                Ok(true)
227            }
228            Some(pb::cursor_response::Result::Done(exhausted)) => {
229                if exhausted {
230                    self.done = true;
231                }
232                self.entry = None;
233                Ok(false)
234            }
235            None => {
236                self.entry = None;
237                self.done = true;
238                Ok(false)
239            }
240        }
241    }
242}
243
244async fn open_cursor_inner(
245    client: &mut IndexedDbClient<Channel>,
246    req: pb::OpenCursorRequest,
247) -> Result<Cursor, IndexedDBError> {
248    let keys_only = req.keys_only;
249    let is_index = !req.index.is_empty();
250    let (tx, rx) = mpsc::channel::<pb::CursorClientMessage>(CURSOR_CHANNEL_BUFFER);
251
252    let open_msg = pb::CursorClientMessage {
253        msg: Some(pb::cursor_client_message::Msg::Open(req)),
254    };
255    tx.send(open_msg)
256        .await
257        .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
258
259    let receiver_stream = ReceiverStream::new(rx);
260    let mut stream = client
261        .open_cursor(receiver_stream)
262        .await
263        .map_err(map_status)?
264        .into_inner();
265
266    // Read the open ack to surface creation errors synchronously.
267    let ack = stream.message().await.map_err(map_status)?.ok_or_else(|| {
268        IndexedDBError::Status(tonic::Status::internal("cursor stream ended during open"))
269    })?;
270    match ack.result {
271        Some(pb::cursor_response::Result::Done(false)) => {}
272        Some(pb::cursor_response::Result::Done(true)) => {
273            return Err(IndexedDBError::Status(tonic::Status::internal(
274                "unexpected exhausted cursor open ack",
275            )));
276        }
277        _ => {
278            return Err(IndexedDBError::Status(tonic::Status::internal(
279                "unexpected cursor open ack",
280            )));
281        }
282    }
283
284    Ok(Cursor {
285        tx,
286        stream,
287        keys_only,
288        entry: None,
289        done: false,
290        index_cursor: is_index,
291    })
292}
293
294pub struct IndexedDB {
295    client: IndexedDbClient<Channel>,
296}
297
298impl IndexedDB {
299    pub async fn connect() -> Result<Self, IndexedDBError> {
300        Self::connect_named("").await
301    }
302
303    pub async fn connect_named(name: &str) -> Result<Self, IndexedDBError> {
304        let env_name = indexeddb_socket_env(name);
305        let socket_path = std::env::var(&env_name)
306            .map_err(|_| IndexedDBError::Env(format!("{env_name} is not set")))?;
307
308        let channel = Endpoint::try_from("http://[::]:50051")?
309            .connect_with_connector(service_fn(move |_: Uri| {
310                let path = socket_path.clone();
311                async move {
312                    tokio::net::UnixStream::connect(path)
313                        .await
314                        .map(TokioIo::new)
315                }
316            }))
317            .await?;
318
319        Ok(Self {
320            client: IndexedDbClient::new(channel),
321        })
322    }
323
324    pub async fn create_object_store(
325        &mut self,
326        name: &str,
327        schema: ObjectStoreSchema,
328    ) -> Result<(), IndexedDBError> {
329        let indexes = schema
330            .indexes
331            .into_iter()
332            .map(|idx| pb::IndexSchema {
333                name: idx.name,
334                key_path: idx.key_path,
335                unique: idx.unique,
336            })
337            .collect();
338        self.client
339            .create_object_store(pb::CreateObjectStoreRequest {
340                name: name.to_string(),
341                schema: Some(pb::ObjectStoreSchema {
342                    indexes,
343                    columns: vec![],
344                }),
345            })
346            .await
347            .map_err(map_status)?;
348        Ok(())
349    }
350
351    pub async fn delete_object_store(&mut self, name: &str) -> Result<(), IndexedDBError> {
352        self.client
353            .delete_object_store(pb::DeleteObjectStoreRequest {
354                name: name.to_string(),
355            })
356            .await
357            .map_err(map_status)?;
358        Ok(())
359    }
360
361    pub fn object_store(&self, name: &str) -> ObjectStore {
362        ObjectStore {
363            client: self.client.clone(),
364            store: name.to_string(),
365        }
366    }
367}
368
369pub struct ObjectStore {
370    client: IndexedDbClient<Channel>,
371    store: String,
372}
373
374impl ObjectStore {
375    pub async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
376        let resp = self
377            .client
378            .get(pb::ObjectStoreRequest {
379                store: self.store.clone(),
380                id: id.to_string(),
381            })
382            .await
383            .map_err(map_status)?;
384        Ok(resp
385            .into_inner()
386            .record
387            .as_ref()
388            .map(pb_record_to_record)
389            .unwrap_or_default())
390    }
391
392    pub async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
393        let resp = self
394            .client
395            .get_key(pb::ObjectStoreRequest {
396                store: self.store.clone(),
397                id: id.to_string(),
398            })
399            .await
400            .map_err(map_status)?;
401        Ok(resp.into_inner().key)
402    }
403
404    pub async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
405        self.client
406            .add(pb::RecordRequest {
407                store: self.store.clone(),
408                record: Some(record_to_pb_record(record)),
409            })
410            .await
411            .map_err(map_status)?;
412        Ok(())
413    }
414
415    pub async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
416        self.client
417            .put(pb::RecordRequest {
418                store: self.store.clone(),
419                record: Some(record_to_pb_record(record)),
420            })
421            .await
422            .map_err(map_status)?;
423        Ok(())
424    }
425
426    pub async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
427        self.client
428            .delete(pb::ObjectStoreRequest {
429                store: self.store.clone(),
430                id: id.to_string(),
431            })
432            .await
433            .map_err(map_status)?;
434        Ok(())
435    }
436
437    pub async fn clear(&mut self) -> Result<(), IndexedDBError> {
438        self.client
439            .clear(pb::ObjectStoreNameRequest {
440                store: self.store.clone(),
441            })
442            .await
443            .map_err(map_status)?;
444        Ok(())
445    }
446
447    pub async fn get_all(
448        &mut self,
449        range: Option<KeyRange>,
450    ) -> Result<Vec<Record>, IndexedDBError> {
451        let resp = self
452            .client
453            .get_all(pb::ObjectStoreRangeRequest {
454                store: self.store.clone(),
455                range: range.map(key_range_to_pb),
456            })
457            .await
458            .map_err(map_status)?;
459        Ok(resp
460            .into_inner()
461            .records
462            .iter()
463            .map(pb_record_to_record)
464            .collect())
465    }
466
467    pub async fn get_all_keys(
468        &mut self,
469        range: Option<KeyRange>,
470    ) -> Result<Vec<String>, IndexedDBError> {
471        let resp = self
472            .client
473            .get_all_keys(pb::ObjectStoreRangeRequest {
474                store: self.store.clone(),
475                range: range.map(key_range_to_pb),
476            })
477            .await
478            .map_err(map_status)?;
479        Ok(resp.into_inner().keys)
480    }
481
482    pub async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
483        let resp = self
484            .client
485            .count(pb::ObjectStoreRangeRequest {
486                store: self.store.clone(),
487                range: range.map(key_range_to_pb),
488            })
489            .await
490            .map_err(map_status)?;
491        Ok(resp.into_inner().count)
492    }
493
494    pub async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
495        let resp = self
496            .client
497            .delete_range(pb::ObjectStoreRangeRequest {
498                store: self.store.clone(),
499                range: Some(key_range_to_pb(range)),
500            })
501            .await
502            .map_err(map_status)?;
503        Ok(resp.into_inner().deleted)
504    }
505
506    pub fn index(&self, name: &str) -> IndexClient {
507        IndexClient {
508            client: self.client.clone(),
509            store: self.store.clone(),
510            index: name.to_string(),
511        }
512    }
513
514    pub async fn open_cursor(
515        &mut self,
516        range: Option<KeyRange>,
517        direction: CursorDirection,
518    ) -> Result<Cursor, IndexedDBError> {
519        let req = pb::OpenCursorRequest {
520            store: self.store.clone(),
521            range: range.map(key_range_to_pb),
522            direction: direction.to_proto(),
523            keys_only: false,
524            index: String::new(),
525            values: vec![],
526        };
527        open_cursor_inner(&mut self.client, req).await
528    }
529
530    pub async fn open_key_cursor(
531        &mut self,
532        range: Option<KeyRange>,
533        direction: CursorDirection,
534    ) -> Result<Cursor, IndexedDBError> {
535        let req = pb::OpenCursorRequest {
536            store: self.store.clone(),
537            range: range.map(key_range_to_pb),
538            direction: direction.to_proto(),
539            keys_only: true,
540            index: String::new(),
541            values: vec![],
542        };
543        open_cursor_inner(&mut self.client, req).await
544    }
545}
546
547pub struct IndexClient {
548    client: IndexedDbClient<Channel>,
549    store: String,
550    index: String,
551}
552
553impl IndexClient {
554    pub async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
555        let resp = self
556            .client
557            .index_get(pb::IndexQueryRequest {
558                store: self.store.clone(),
559                index: self.index.clone(),
560                values: values.iter().map(json_to_typed_value).collect(),
561                range: None,
562            })
563            .await
564            .map_err(map_status)?;
565        Ok(resp
566            .into_inner()
567            .record
568            .as_ref()
569            .map(pb_record_to_record)
570            .unwrap_or_default())
571    }
572
573    pub async fn get_key(
574        &mut self,
575        values: &[serde_json::Value],
576    ) -> Result<String, IndexedDBError> {
577        let resp = self
578            .client
579            .index_get_key(pb::IndexQueryRequest {
580                store: self.store.clone(),
581                index: self.index.clone(),
582                values: values.iter().map(json_to_typed_value).collect(),
583                range: None,
584            })
585            .await
586            .map_err(map_status)?;
587        Ok(resp.into_inner().key)
588    }
589
590    pub async fn get_all(
591        &mut self,
592        values: &[serde_json::Value],
593        range: Option<KeyRange>,
594    ) -> Result<Vec<Record>, IndexedDBError> {
595        let resp = self
596            .client
597            .index_get_all(pb::IndexQueryRequest {
598                store: self.store.clone(),
599                index: self.index.clone(),
600                values: values.iter().map(json_to_typed_value).collect(),
601                range: range.map(key_range_to_pb),
602            })
603            .await
604            .map_err(map_status)?;
605        Ok(resp
606            .into_inner()
607            .records
608            .iter()
609            .map(pb_record_to_record)
610            .collect())
611    }
612
613    pub async fn get_all_keys(
614        &mut self,
615        values: &[serde_json::Value],
616        range: Option<KeyRange>,
617    ) -> Result<Vec<String>, IndexedDBError> {
618        let resp = self
619            .client
620            .index_get_all_keys(pb::IndexQueryRequest {
621                store: self.store.clone(),
622                index: self.index.clone(),
623                values: values.iter().map(json_to_typed_value).collect(),
624                range: range.map(key_range_to_pb),
625            })
626            .await
627            .map_err(map_status)?;
628        Ok(resp.into_inner().keys)
629    }
630
631    pub async fn count(
632        &mut self,
633        values: &[serde_json::Value],
634        range: Option<KeyRange>,
635    ) -> Result<i64, IndexedDBError> {
636        let resp = self
637            .client
638            .index_count(pb::IndexQueryRequest {
639                store: self.store.clone(),
640                index: self.index.clone(),
641                values: values.iter().map(json_to_typed_value).collect(),
642                range: range.map(key_range_to_pb),
643            })
644            .await
645            .map_err(map_status)?;
646        Ok(resp.into_inner().count)
647    }
648
649    pub async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
650        let resp = self
651            .client
652            .index_delete(pb::IndexQueryRequest {
653                store: self.store.clone(),
654                index: self.index.clone(),
655                values: values.iter().map(json_to_typed_value).collect(),
656                range: None,
657            })
658            .await
659            .map_err(map_status)?;
660        Ok(resp.into_inner().deleted)
661    }
662
663    pub async fn open_cursor(
664        &mut self,
665        values: &[serde_json::Value],
666        range: Option<KeyRange>,
667        direction: CursorDirection,
668    ) -> Result<Cursor, IndexedDBError> {
669        let req = pb::OpenCursorRequest {
670            store: self.store.clone(),
671            range: range.map(key_range_to_pb),
672            direction: direction.to_proto(),
673            keys_only: false,
674            index: self.index.clone(),
675            values: values.iter().map(json_to_typed_value).collect(),
676        };
677        open_cursor_inner(&mut self.client, req).await
678    }
679
680    pub async fn open_key_cursor(
681        &mut self,
682        values: &[serde_json::Value],
683        range: Option<KeyRange>,
684        direction: CursorDirection,
685    ) -> Result<Cursor, IndexedDBError> {
686        let req = pb::OpenCursorRequest {
687            store: self.store.clone(),
688            range: range.map(key_range_to_pb),
689            direction: direction.to_proto(),
690            keys_only: true,
691            index: self.index.clone(),
692            values: values.iter().map(json_to_typed_value).collect(),
693        };
694        open_cursor_inner(&mut self.client, req).await
695    }
696}
697
698fn map_status(err: tonic::Status) -> IndexedDBError {
699    match err.code() {
700        tonic::Code::NotFound => IndexedDBError::NotFound,
701        tonic::Code::AlreadyExists => IndexedDBError::AlreadyExists,
702        _ => IndexedDBError::Status(err),
703    }
704}
705
706fn record_to_pb_record(record: Record) -> pb::Record {
707    pb::Record {
708        fields: record
709            .into_iter()
710            .map(|(k, v)| (k, json_to_typed_value(&v)))
711            .collect(),
712    }
713}
714
715fn pb_record_to_record(r: &pb::Record) -> Record {
716    r.fields
717        .iter()
718        .map(|(k, v)| (k.clone(), typed_value_to_json(v)))
719        .collect()
720}
721
722fn json_to_typed_value(v: &serde_json::Value) -> pb::TypedValue {
723    use pb::typed_value::Kind;
724    let kind = match v {
725        serde_json::Value::Null => Kind::NullValue(0),
726        serde_json::Value::Bool(b) => Kind::BoolValue(*b),
727        serde_json::Value::Number(n) => {
728            if let Some(i) = n.as_i64() {
729                Kind::IntValue(i)
730            } else {
731                Kind::FloatValue(n.as_f64().unwrap_or(0.0))
732            }
733        }
734        serde_json::Value::String(s) => Kind::StringValue(s.clone()),
735        serde_json::Value::Array(arr) => {
736            let values = arr.iter().map(json_to_prost_value).collect();
737            Kind::JsonValue(prost_types::Value {
738                kind: Some(prost_types::value::Kind::ListValue(
739                    prost_types::ListValue { values },
740                )),
741            })
742        }
743        serde_json::Value::Object(obj) => {
744            let fields = obj
745                .iter()
746                .map(|(k, v)| (k.clone(), json_to_prost_value(v)))
747                .collect();
748            Kind::JsonValue(prost_types::Value {
749                kind: Some(prost_types::value::Kind::StructValue(prost_types::Struct {
750                    fields,
751                })),
752            })
753        }
754    };
755    pb::TypedValue { kind: Some(kind) }
756}
757
758fn prost_value_to_json(v: &prost_types::Value) -> serde_json::Value {
759    use prost_types::value::Kind;
760    match &v.kind {
761        Some(Kind::NullValue(_)) => serde_json::Value::Null,
762        Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
763        Some(Kind::NumberValue(n)) => serde_json::json!(*n),
764        Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
765        Some(Kind::ListValue(list)) => {
766            serde_json::Value::Array(list.values.iter().map(prost_value_to_json).collect())
767        }
768        Some(Kind::StructValue(st)) => {
769            let obj: serde_json::Map<String, serde_json::Value> = st
770                .fields
771                .iter()
772                .map(|(k, v)| (k.clone(), prost_value_to_json(v)))
773                .collect();
774            serde_json::Value::Object(obj)
775        }
776        None => serde_json::Value::Null,
777    }
778}
779
780fn json_to_prost_value(v: &serde_json::Value) -> prost_types::Value {
781    use prost_types::value::Kind;
782    let kind = match v {
783        serde_json::Value::Null => Kind::NullValue(0),
784        serde_json::Value::Bool(b) => Kind::BoolValue(*b),
785        serde_json::Value::Number(n) => Kind::NumberValue(n.as_f64().unwrap_or(0.0)),
786        serde_json::Value::String(s) => Kind::StringValue(s.clone()),
787        serde_json::Value::Array(arr) => {
788            let values = arr.iter().map(json_to_prost_value).collect();
789            Kind::ListValue(prost_types::ListValue { values })
790        }
791        serde_json::Value::Object(obj) => {
792            let fields = obj
793                .iter()
794                .map(|(k, v)| (k.clone(), json_to_prost_value(v)))
795                .collect();
796            Kind::StructValue(prost_types::Struct { fields })
797        }
798    };
799    prost_types::Value { kind: Some(kind) }
800}
801
802fn key_value_to_json(kv: &pb::KeyValue) -> serde_json::Value {
803    match &kv.kind {
804        Some(pb::key_value::Kind::Scalar(tv)) => typed_value_to_json(tv),
805        Some(pb::key_value::Kind::Array(arr)) => {
806            serde_json::Value::Array(arr.elements.iter().map(key_value_to_json).collect())
807        }
808        None => serde_json::Value::Null,
809    }
810}
811
812fn json_to_key_value(v: &serde_json::Value) -> pb::KeyValue {
813    if let serde_json::Value::Array(arr) = v {
814        pb::KeyValue {
815            kind: Some(pb::key_value::Kind::Array(pb::KeyValueArray {
816                elements: arr.iter().map(json_to_key_value).collect(),
817            })),
818        }
819    } else {
820        pb::KeyValue {
821            kind: Some(pb::key_value::Kind::Scalar(json_to_typed_value(v))),
822        }
823    }
824}
825
826fn cursor_key_to_proto(key: &serde_json::Value, index_cursor: bool) -> Vec<pb::KeyValue> {
827    if index_cursor {
828        if let serde_json::Value::Array(parts) = key {
829            return parts.iter().map(json_to_key_value).collect();
830        }
831    }
832    vec![json_to_key_value(key)]
833}
834
835fn typed_value_to_json(v: &pb::TypedValue) -> serde_json::Value {
836    use pb::typed_value::Kind;
837    match &v.kind {
838        Some(Kind::NullValue(_)) => serde_json::Value::Null,
839        Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
840        Some(Kind::IntValue(i)) => serde_json::json!(*i),
841        Some(Kind::FloatValue(f)) => serde_json::json!(*f),
842        Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
843        Some(Kind::BytesValue(b)) => serde_json::json!(b),
844        Some(Kind::JsonValue(pv)) => prost_value_to_json(pv),
845        Some(Kind::TimeValue(ts)) => {
846            serde_json::Value::String(format!("{}.{}", ts.seconds, ts.nanos))
847        }
848        None => serde_json::Value::Null,
849    }
850}
851
852fn key_range_to_pb(kr: KeyRange) -> pb::KeyRange {
853    pb::KeyRange {
854        lower: kr.lower.map(|v| json_to_typed_value(&v)),
855        upper: kr.upper.map(|v| json_to_typed_value(&v)),
856        lower_open: kr.lower_open,
857        upper_open: kr.upper_open,
858    }
859}
860pub fn indexeddb_socket_env(name: &str) -> String {
861    let trimmed = name.trim();
862    if trimmed.is_empty() {
863        return ENV_INDEXEDDB_SOCKET.to_string();
864    }
865    let mut env = String::from(ENV_INDEXEDDB_SOCKET);
866    env.push('_');
867    for ch in trimmed.chars() {
868        if ch.is_ascii_alphanumeric() {
869            env.push(ch.to_ascii_uppercase());
870        } else {
871            env.push('_');
872        }
873    }
874    env
875}