1use std::collections::BTreeMap;
2
3use hyper_util::rt::TokioIo;
4use tokio::sync::mpsc;
5use tokio_stream::wrappers::ReceiverStream;
6use tonic::transport::{Channel, Endpoint, Uri};
7use tower::service_fn;
8
9use crate::generated::v1::{self as pb, indexed_db_client::IndexedDbClient};
10
11pub const ENV_INDEXEDDB_SOCKET: &str = "GESTALT_INDEXEDDB_SOCKET";
12
13const CURSOR_CHANNEL_BUFFER: usize = 1;
14
15#[derive(Debug, thiserror::Error)]
16pub enum IndexedDBError {
17 #[error("not found")]
18 NotFound,
19 #[error("already exists")]
20 AlreadyExists,
21 #[error("cursor is keys-only; value not available")]
22 KeysOnly,
23 #[error("{0}")]
24 Transport(#[from] tonic::transport::Error),
25 #[error("{0}")]
26 Status(#[from] tonic::Status),
27 #[error("{0}")]
28 Env(String),
29}
30
31pub type Record = BTreeMap<String, serde_json::Value>;
32
33pub struct KeyRange {
34 pub lower: Option<serde_json::Value>,
35 pub upper: Option<serde_json::Value>,
36 pub lower_open: bool,
37 pub upper_open: bool,
38}
39
40pub struct IndexSchema {
41 pub name: String,
42 pub key_path: Vec<String>,
43 pub unique: bool,
44}
45
46pub struct ObjectStoreSchema {
47 pub indexes: Vec<IndexSchema>,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum CursorDirection {
52 Next,
53 NextUnique,
54 Prev,
55 PrevUnique,
56}
57
58impl CursorDirection {
59 fn to_proto(self) -> i32 {
60 match self {
61 Self::Next => pb::CursorDirection::CursorNext as i32,
62 Self::NextUnique => pb::CursorDirection::CursorNextUnique as i32,
63 Self::Prev => pb::CursorDirection::CursorPrev as i32,
64 Self::PrevUnique => pb::CursorDirection::CursorPrevUnique as i32,
65 }
66 }
67}
68
69pub struct Cursor {
70 tx: mpsc::Sender<pb::CursorClientMessage>,
71 stream: tonic::Streaming<pb::CursorResponse>,
72 keys_only: bool,
73 index_cursor: bool,
74 entry: Option<pb::CursorEntry>,
75 done: bool,
76}
77
78impl Cursor {
79 pub fn key(&self) -> Option<serde_json::Value> {
80 let entry = self.entry.as_ref()?;
81 match entry.key.len() {
82 0 => None,
83 1 if !self.index_cursor => Some(key_value_to_json(&entry.key[0])),
84 _ => Some(serde_json::Value::Array(
85 entry.key.iter().map(key_value_to_json).collect(),
86 )),
87 }
88 }
89
90 pub fn primary_key(&self) -> &str {
91 self.entry
92 .as_ref()
93 .map(|e| e.primary_key.as_str())
94 .unwrap_or("")
95 }
96
97 pub fn value(&self) -> Result<Record, IndexedDBError> {
98 if self.keys_only {
99 return Err(IndexedDBError::KeysOnly);
100 }
101 let entry = self.entry.as_ref().ok_or(IndexedDBError::NotFound)?;
102 Ok(entry
103 .record
104 .as_ref()
105 .map(pb_record_to_record)
106 .unwrap_or_default())
107 }
108
109 pub async fn continue_next(&mut self) -> Result<bool, IndexedDBError> {
110 let cmd = pb::cursor_command::Command::Next(true);
111 self.send_and_recv(cmd).await
112 }
113
114 pub async fn continue_to_key(
115 &mut self,
116 key: serde_json::Value,
117 ) -> Result<bool, IndexedDBError> {
118 let cmd = pb::cursor_command::Command::ContinueToKey(pb::CursorKeyTarget {
119 key: cursor_key_to_proto(&key, self.index_cursor),
120 });
121 self.send_and_recv(cmd).await
122 }
123
124 pub async fn advance(&mut self, count: i32) -> Result<bool, IndexedDBError> {
125 let cmd = pb::cursor_command::Command::Advance(count);
126 self.send_and_recv(cmd).await
127 }
128
129 pub async fn delete(&mut self) -> Result<(), IndexedDBError> {
130 if self.done {
131 return Err(IndexedDBError::NotFound);
132 }
133 let cmd = pb::cursor_command::Command::Delete(true);
134 self.send_mutation(cmd).await
135 }
136
137 pub async fn update(&mut self, value: Record) -> Result<(), IndexedDBError> {
138 if self.done {
139 return Err(IndexedDBError::NotFound);
140 }
141 let cmd = pb::cursor_command::Command::Update(record_to_pb_record(value));
142 self.send_mutation(cmd).await
143 }
144
145 pub async fn close(self) -> Result<(), IndexedDBError> {
146 let msg = pb::CursorClientMessage {
147 msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
148 command: Some(pb::cursor_command::Command::Close(true)),
149 })),
150 };
151 self.tx
152 .send(msg)
153 .await
154 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
155 Ok(())
156 }
157
158 async fn send_mutation(
159 &mut self,
160 cmd: pb::cursor_command::Command,
161 ) -> Result<(), IndexedDBError> {
162 let msg = pb::CursorClientMessage {
163 msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
164 command: Some(cmd),
165 })),
166 };
167 self.tx
168 .send(msg)
169 .await
170 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
171 let resp = self
173 .stream
174 .message()
175 .await
176 .map_err(map_status)?
177 .ok_or_else(|| {
178 IndexedDBError::Status(tonic::Status::internal(
179 "cursor stream ended during mutation",
180 ))
181 })?;
182 match resp.result {
183 Some(pb::cursor_response::Result::Entry(entry)) => {
184 self.entry = Some(entry);
185 }
186 Some(pb::cursor_response::Result::Done(_)) => {}
187 None => {
188 return Err(IndexedDBError::Status(tonic::Status::internal(
189 "unexpected cursor mutation ack",
190 )));
191 }
192 }
193 Ok(())
194 }
195
196 async fn send_and_recv(
197 &mut self,
198 cmd: pb::cursor_command::Command,
199 ) -> Result<bool, IndexedDBError> {
200 if self.done {
201 return Ok(false);
202 }
203 let msg = pb::CursorClientMessage {
204 msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
205 command: Some(cmd),
206 })),
207 };
208 self.tx
209 .send(msg)
210 .await
211 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
212
213 let resp = self
214 .stream
215 .message()
216 .await
217 .map_err(map_status)?
218 .ok_or_else(|| {
219 IndexedDBError::Status(tonic::Status::internal("cursor stream ended"))
220 })?;
221
222 match resp.result {
223 Some(pb::cursor_response::Result::Entry(entry)) => {
224 self.entry = Some(entry);
225 self.done = false;
226 Ok(true)
227 }
228 Some(pb::cursor_response::Result::Done(exhausted)) => {
229 if exhausted {
230 self.done = true;
231 }
232 self.entry = None;
233 Ok(false)
234 }
235 None => {
236 self.entry = None;
237 self.done = true;
238 Ok(false)
239 }
240 }
241 }
242}
243
244async fn open_cursor_inner(
245 client: &mut IndexedDbClient<Channel>,
246 req: pb::OpenCursorRequest,
247) -> Result<Cursor, IndexedDBError> {
248 let keys_only = req.keys_only;
249 let is_index = !req.index.is_empty();
250 let (tx, rx) = mpsc::channel::<pb::CursorClientMessage>(CURSOR_CHANNEL_BUFFER);
251
252 let open_msg = pb::CursorClientMessage {
253 msg: Some(pb::cursor_client_message::Msg::Open(req)),
254 };
255 tx.send(open_msg)
256 .await
257 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
258
259 let receiver_stream = ReceiverStream::new(rx);
260 let mut stream = client
261 .open_cursor(receiver_stream)
262 .await
263 .map_err(map_status)?
264 .into_inner();
265
266 let ack = stream.message().await.map_err(map_status)?.ok_or_else(|| {
268 IndexedDBError::Status(tonic::Status::internal("cursor stream ended during open"))
269 })?;
270 match ack.result {
271 Some(pb::cursor_response::Result::Done(false)) => {}
272 Some(pb::cursor_response::Result::Done(true)) => {
273 return Err(IndexedDBError::Status(tonic::Status::internal(
274 "unexpected exhausted cursor open ack",
275 )));
276 }
277 _ => {
278 return Err(IndexedDBError::Status(tonic::Status::internal(
279 "unexpected cursor open ack",
280 )));
281 }
282 }
283
284 Ok(Cursor {
285 tx,
286 stream,
287 keys_only,
288 entry: None,
289 done: false,
290 index_cursor: is_index,
291 })
292}
293
294pub struct IndexedDB {
295 client: IndexedDbClient<Channel>,
296}
297
298impl IndexedDB {
299 pub async fn connect() -> Result<Self, IndexedDBError> {
300 Self::connect_named("").await
301 }
302
303 pub async fn connect_named(name: &str) -> Result<Self, IndexedDBError> {
304 let env_name = indexeddb_socket_env(name);
305 let socket_path = std::env::var(&env_name)
306 .map_err(|_| IndexedDBError::Env(format!("{env_name} is not set")))?;
307
308 let channel = Endpoint::try_from("http://[::]:50051")?
309 .connect_with_connector(service_fn(move |_: Uri| {
310 let path = socket_path.clone();
311 async move {
312 tokio::net::UnixStream::connect(path)
313 .await
314 .map(TokioIo::new)
315 }
316 }))
317 .await?;
318
319 Ok(Self {
320 client: IndexedDbClient::new(channel),
321 })
322 }
323
324 pub async fn create_object_store(
325 &mut self,
326 name: &str,
327 schema: ObjectStoreSchema,
328 ) -> Result<(), IndexedDBError> {
329 let indexes = schema
330 .indexes
331 .into_iter()
332 .map(|idx| pb::IndexSchema {
333 name: idx.name,
334 key_path: idx.key_path,
335 unique: idx.unique,
336 })
337 .collect();
338 self.client
339 .create_object_store(pb::CreateObjectStoreRequest {
340 name: name.to_string(),
341 schema: Some(pb::ObjectStoreSchema {
342 indexes,
343 columns: vec![],
344 }),
345 })
346 .await
347 .map_err(map_status)?;
348 Ok(())
349 }
350
351 pub async fn delete_object_store(&mut self, name: &str) -> Result<(), IndexedDBError> {
352 self.client
353 .delete_object_store(pb::DeleteObjectStoreRequest {
354 name: name.to_string(),
355 })
356 .await
357 .map_err(map_status)?;
358 Ok(())
359 }
360
361 pub fn object_store(&self, name: &str) -> ObjectStore {
362 ObjectStore {
363 client: self.client.clone(),
364 store: name.to_string(),
365 }
366 }
367}
368
369pub struct ObjectStore {
370 client: IndexedDbClient<Channel>,
371 store: String,
372}
373
374impl ObjectStore {
375 pub async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
376 let resp = self
377 .client
378 .get(pb::ObjectStoreRequest {
379 store: self.store.clone(),
380 id: id.to_string(),
381 })
382 .await
383 .map_err(map_status)?;
384 Ok(resp
385 .into_inner()
386 .record
387 .as_ref()
388 .map(pb_record_to_record)
389 .unwrap_or_default())
390 }
391
392 pub async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
393 let resp = self
394 .client
395 .get_key(pb::ObjectStoreRequest {
396 store: self.store.clone(),
397 id: id.to_string(),
398 })
399 .await
400 .map_err(map_status)?;
401 Ok(resp.into_inner().key)
402 }
403
404 pub async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
405 self.client
406 .add(pb::RecordRequest {
407 store: self.store.clone(),
408 record: Some(record_to_pb_record(record)),
409 })
410 .await
411 .map_err(map_status)?;
412 Ok(())
413 }
414
415 pub async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
416 self.client
417 .put(pb::RecordRequest {
418 store: self.store.clone(),
419 record: Some(record_to_pb_record(record)),
420 })
421 .await
422 .map_err(map_status)?;
423 Ok(())
424 }
425
426 pub async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
427 self.client
428 .delete(pb::ObjectStoreRequest {
429 store: self.store.clone(),
430 id: id.to_string(),
431 })
432 .await
433 .map_err(map_status)?;
434 Ok(())
435 }
436
437 pub async fn clear(&mut self) -> Result<(), IndexedDBError> {
438 self.client
439 .clear(pb::ObjectStoreNameRequest {
440 store: self.store.clone(),
441 })
442 .await
443 .map_err(map_status)?;
444 Ok(())
445 }
446
447 pub async fn get_all(
448 &mut self,
449 range: Option<KeyRange>,
450 ) -> Result<Vec<Record>, IndexedDBError> {
451 let resp = self
452 .client
453 .get_all(pb::ObjectStoreRangeRequest {
454 store: self.store.clone(),
455 range: range.map(key_range_to_pb),
456 })
457 .await
458 .map_err(map_status)?;
459 Ok(resp
460 .into_inner()
461 .records
462 .iter()
463 .map(pb_record_to_record)
464 .collect())
465 }
466
467 pub async fn get_all_keys(
468 &mut self,
469 range: Option<KeyRange>,
470 ) -> Result<Vec<String>, IndexedDBError> {
471 let resp = self
472 .client
473 .get_all_keys(pb::ObjectStoreRangeRequest {
474 store: self.store.clone(),
475 range: range.map(key_range_to_pb),
476 })
477 .await
478 .map_err(map_status)?;
479 Ok(resp.into_inner().keys)
480 }
481
482 pub async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
483 let resp = self
484 .client
485 .count(pb::ObjectStoreRangeRequest {
486 store: self.store.clone(),
487 range: range.map(key_range_to_pb),
488 })
489 .await
490 .map_err(map_status)?;
491 Ok(resp.into_inner().count)
492 }
493
494 pub async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
495 let resp = self
496 .client
497 .delete_range(pb::ObjectStoreRangeRequest {
498 store: self.store.clone(),
499 range: Some(key_range_to_pb(range)),
500 })
501 .await
502 .map_err(map_status)?;
503 Ok(resp.into_inner().deleted)
504 }
505
506 pub fn index(&self, name: &str) -> IndexClient {
507 IndexClient {
508 client: self.client.clone(),
509 store: self.store.clone(),
510 index: name.to_string(),
511 }
512 }
513
514 pub async fn open_cursor(
515 &mut self,
516 range: Option<KeyRange>,
517 direction: CursorDirection,
518 ) -> Result<Cursor, IndexedDBError> {
519 let req = pb::OpenCursorRequest {
520 store: self.store.clone(),
521 range: range.map(key_range_to_pb),
522 direction: direction.to_proto(),
523 keys_only: false,
524 index: String::new(),
525 values: vec![],
526 };
527 open_cursor_inner(&mut self.client, req).await
528 }
529
530 pub async fn open_key_cursor(
531 &mut self,
532 range: Option<KeyRange>,
533 direction: CursorDirection,
534 ) -> Result<Cursor, IndexedDBError> {
535 let req = pb::OpenCursorRequest {
536 store: self.store.clone(),
537 range: range.map(key_range_to_pb),
538 direction: direction.to_proto(),
539 keys_only: true,
540 index: String::new(),
541 values: vec![],
542 };
543 open_cursor_inner(&mut self.client, req).await
544 }
545}
546
547pub struct IndexClient {
548 client: IndexedDbClient<Channel>,
549 store: String,
550 index: String,
551}
552
553impl IndexClient {
554 pub async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
555 let resp = self
556 .client
557 .index_get(pb::IndexQueryRequest {
558 store: self.store.clone(),
559 index: self.index.clone(),
560 values: values.iter().map(json_to_typed_value).collect(),
561 range: None,
562 })
563 .await
564 .map_err(map_status)?;
565 Ok(resp
566 .into_inner()
567 .record
568 .as_ref()
569 .map(pb_record_to_record)
570 .unwrap_or_default())
571 }
572
573 pub async fn get_key(
574 &mut self,
575 values: &[serde_json::Value],
576 ) -> Result<String, IndexedDBError> {
577 let resp = self
578 .client
579 .index_get_key(pb::IndexQueryRequest {
580 store: self.store.clone(),
581 index: self.index.clone(),
582 values: values.iter().map(json_to_typed_value).collect(),
583 range: None,
584 })
585 .await
586 .map_err(map_status)?;
587 Ok(resp.into_inner().key)
588 }
589
590 pub async fn get_all(
591 &mut self,
592 values: &[serde_json::Value],
593 range: Option<KeyRange>,
594 ) -> Result<Vec<Record>, IndexedDBError> {
595 let resp = self
596 .client
597 .index_get_all(pb::IndexQueryRequest {
598 store: self.store.clone(),
599 index: self.index.clone(),
600 values: values.iter().map(json_to_typed_value).collect(),
601 range: range.map(key_range_to_pb),
602 })
603 .await
604 .map_err(map_status)?;
605 Ok(resp
606 .into_inner()
607 .records
608 .iter()
609 .map(pb_record_to_record)
610 .collect())
611 }
612
613 pub async fn get_all_keys(
614 &mut self,
615 values: &[serde_json::Value],
616 range: Option<KeyRange>,
617 ) -> Result<Vec<String>, IndexedDBError> {
618 let resp = self
619 .client
620 .index_get_all_keys(pb::IndexQueryRequest {
621 store: self.store.clone(),
622 index: self.index.clone(),
623 values: values.iter().map(json_to_typed_value).collect(),
624 range: range.map(key_range_to_pb),
625 })
626 .await
627 .map_err(map_status)?;
628 Ok(resp.into_inner().keys)
629 }
630
631 pub async fn count(
632 &mut self,
633 values: &[serde_json::Value],
634 range: Option<KeyRange>,
635 ) -> Result<i64, IndexedDBError> {
636 let resp = self
637 .client
638 .index_count(pb::IndexQueryRequest {
639 store: self.store.clone(),
640 index: self.index.clone(),
641 values: values.iter().map(json_to_typed_value).collect(),
642 range: range.map(key_range_to_pb),
643 })
644 .await
645 .map_err(map_status)?;
646 Ok(resp.into_inner().count)
647 }
648
649 pub async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
650 let resp = self
651 .client
652 .index_delete(pb::IndexQueryRequest {
653 store: self.store.clone(),
654 index: self.index.clone(),
655 values: values.iter().map(json_to_typed_value).collect(),
656 range: None,
657 })
658 .await
659 .map_err(map_status)?;
660 Ok(resp.into_inner().deleted)
661 }
662
663 pub async fn open_cursor(
664 &mut self,
665 values: &[serde_json::Value],
666 range: Option<KeyRange>,
667 direction: CursorDirection,
668 ) -> Result<Cursor, IndexedDBError> {
669 let req = pb::OpenCursorRequest {
670 store: self.store.clone(),
671 range: range.map(key_range_to_pb),
672 direction: direction.to_proto(),
673 keys_only: false,
674 index: self.index.clone(),
675 values: values.iter().map(json_to_typed_value).collect(),
676 };
677 open_cursor_inner(&mut self.client, req).await
678 }
679
680 pub async fn open_key_cursor(
681 &mut self,
682 values: &[serde_json::Value],
683 range: Option<KeyRange>,
684 direction: CursorDirection,
685 ) -> Result<Cursor, IndexedDBError> {
686 let req = pb::OpenCursorRequest {
687 store: self.store.clone(),
688 range: range.map(key_range_to_pb),
689 direction: direction.to_proto(),
690 keys_only: true,
691 index: self.index.clone(),
692 values: values.iter().map(json_to_typed_value).collect(),
693 };
694 open_cursor_inner(&mut self.client, req).await
695 }
696}
697
698fn map_status(err: tonic::Status) -> IndexedDBError {
699 match err.code() {
700 tonic::Code::NotFound => IndexedDBError::NotFound,
701 tonic::Code::AlreadyExists => IndexedDBError::AlreadyExists,
702 _ => IndexedDBError::Status(err),
703 }
704}
705
706fn record_to_pb_record(record: Record) -> pb::Record {
707 pb::Record {
708 fields: record
709 .into_iter()
710 .map(|(k, v)| (k, json_to_typed_value(&v)))
711 .collect(),
712 }
713}
714
715fn pb_record_to_record(r: &pb::Record) -> Record {
716 r.fields
717 .iter()
718 .map(|(k, v)| (k.clone(), typed_value_to_json(v)))
719 .collect()
720}
721
722fn json_to_typed_value(v: &serde_json::Value) -> pb::TypedValue {
723 use pb::typed_value::Kind;
724 let kind = match v {
725 serde_json::Value::Null => Kind::NullValue(0),
726 serde_json::Value::Bool(b) => Kind::BoolValue(*b),
727 serde_json::Value::Number(n) => {
728 if let Some(i) = n.as_i64() {
729 Kind::IntValue(i)
730 } else {
731 Kind::FloatValue(n.as_f64().unwrap_or(0.0))
732 }
733 }
734 serde_json::Value::String(s) => Kind::StringValue(s.clone()),
735 serde_json::Value::Array(arr) => {
736 let values = arr.iter().map(json_to_prost_value).collect();
737 Kind::JsonValue(prost_types::Value {
738 kind: Some(prost_types::value::Kind::ListValue(
739 prost_types::ListValue { values },
740 )),
741 })
742 }
743 serde_json::Value::Object(obj) => {
744 let fields = obj
745 .iter()
746 .map(|(k, v)| (k.clone(), json_to_prost_value(v)))
747 .collect();
748 Kind::JsonValue(prost_types::Value {
749 kind: Some(prost_types::value::Kind::StructValue(prost_types::Struct {
750 fields,
751 })),
752 })
753 }
754 };
755 pb::TypedValue { kind: Some(kind) }
756}
757
758fn prost_value_to_json(v: &prost_types::Value) -> serde_json::Value {
759 use prost_types::value::Kind;
760 match &v.kind {
761 Some(Kind::NullValue(_)) => serde_json::Value::Null,
762 Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
763 Some(Kind::NumberValue(n)) => serde_json::json!(*n),
764 Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
765 Some(Kind::ListValue(list)) => {
766 serde_json::Value::Array(list.values.iter().map(prost_value_to_json).collect())
767 }
768 Some(Kind::StructValue(st)) => {
769 let obj: serde_json::Map<String, serde_json::Value> = st
770 .fields
771 .iter()
772 .map(|(k, v)| (k.clone(), prost_value_to_json(v)))
773 .collect();
774 serde_json::Value::Object(obj)
775 }
776 None => serde_json::Value::Null,
777 }
778}
779
780fn json_to_prost_value(v: &serde_json::Value) -> prost_types::Value {
781 use prost_types::value::Kind;
782 let kind = match v {
783 serde_json::Value::Null => Kind::NullValue(0),
784 serde_json::Value::Bool(b) => Kind::BoolValue(*b),
785 serde_json::Value::Number(n) => Kind::NumberValue(n.as_f64().unwrap_or(0.0)),
786 serde_json::Value::String(s) => Kind::StringValue(s.clone()),
787 serde_json::Value::Array(arr) => {
788 let values = arr.iter().map(json_to_prost_value).collect();
789 Kind::ListValue(prost_types::ListValue { values })
790 }
791 serde_json::Value::Object(obj) => {
792 let fields = obj
793 .iter()
794 .map(|(k, v)| (k.clone(), json_to_prost_value(v)))
795 .collect();
796 Kind::StructValue(prost_types::Struct { fields })
797 }
798 };
799 prost_types::Value { kind: Some(kind) }
800}
801
802fn key_value_to_json(kv: &pb::KeyValue) -> serde_json::Value {
803 match &kv.kind {
804 Some(pb::key_value::Kind::Scalar(tv)) => typed_value_to_json(tv),
805 Some(pb::key_value::Kind::Array(arr)) => {
806 serde_json::Value::Array(arr.elements.iter().map(key_value_to_json).collect())
807 }
808 None => serde_json::Value::Null,
809 }
810}
811
812fn json_to_key_value(v: &serde_json::Value) -> pb::KeyValue {
813 if let serde_json::Value::Array(arr) = v {
814 pb::KeyValue {
815 kind: Some(pb::key_value::Kind::Array(pb::KeyValueArray {
816 elements: arr.iter().map(json_to_key_value).collect(),
817 })),
818 }
819 } else {
820 pb::KeyValue {
821 kind: Some(pb::key_value::Kind::Scalar(json_to_typed_value(v))),
822 }
823 }
824}
825
826fn cursor_key_to_proto(key: &serde_json::Value, index_cursor: bool) -> Vec<pb::KeyValue> {
827 if index_cursor {
828 if let serde_json::Value::Array(parts) = key {
829 return parts.iter().map(json_to_key_value).collect();
830 }
831 }
832 vec![json_to_key_value(key)]
833}
834
835fn typed_value_to_json(v: &pb::TypedValue) -> serde_json::Value {
836 use pb::typed_value::Kind;
837 match &v.kind {
838 Some(Kind::NullValue(_)) => serde_json::Value::Null,
839 Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
840 Some(Kind::IntValue(i)) => serde_json::json!(*i),
841 Some(Kind::FloatValue(f)) => serde_json::json!(*f),
842 Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
843 Some(Kind::BytesValue(b)) => serde_json::json!(b),
844 Some(Kind::JsonValue(pv)) => prost_value_to_json(pv),
845 Some(Kind::TimeValue(ts)) => {
846 serde_json::Value::String(format!("{}.{}", ts.seconds, ts.nanos))
847 }
848 None => serde_json::Value::Null,
849 }
850}
851
852fn key_range_to_pb(kr: KeyRange) -> pb::KeyRange {
853 pb::KeyRange {
854 lower: kr.lower.map(|v| json_to_typed_value(&v)),
855 upper: kr.upper.map(|v| json_to_typed_value(&v)),
856 lower_open: kr.lower_open,
857 upper_open: kr.upper_open,
858 }
859}
860pub fn indexeddb_socket_env(name: &str) -> String {
861 let trimmed = name.trim();
862 if trimmed.is_empty() {
863 return ENV_INDEXEDDB_SOCKET.to_string();
864 }
865 let mut env = String::from(ENV_INDEXEDDB_SOCKET);
866 env.push('_');
867 for ch in trimmed.chars() {
868 if ch.is_ascii_alphanumeric() {
869 env.push(ch.to_ascii_uppercase());
870 } else {
871 env.push('_');
872 }
873 }
874 env
875}